Implementing Twisted style local multiple deferred callbacks in

ghz 昨天 ⋅ 2 views

Implementing Twisted style local multiple deferred callbacks in Celery

I am quite new to using Celery and was wondering how TWSITED type multiple deferred callbacks can be implemented in Celery

MY TWISTED CODE uses perspective broker and is as follows. I have a Handler (server) which handles some events and returns the result. The Dispatcher (Client) prints the result returned using a deferred callback.

Handler.py (Server)

from twisted.application import service, internet
from twisted.internet import reactor, task
from twisted.spread import pb
from Dispatcher import Event
from Dispatcher import CopyEvent

class ReceiverEvent(pb.RemoteCopy, Event):
    pass
pb.setUnjellyableForClass(CopyEvent, ReceiverEvent)


class Handler(pb.Root):

def remote_eventEnqueue(self, pond):
    d = task.deferLater(reactor,5,handle_event,sender=self)
    return d

def handle_event(sender):
    print "Do Something"
    return "did something"

if __name__ == '__main__':
    h=Handler()
    reactor.listenTCP(8739, pb.PBServerFactory(h))
    reactor.run()

Now the Dispatcher.py (Client)

from twisted.spread import pb, jelly
from twisted.python import log
from twisted.internet import reactor
from Event import Event

class CopyEvent(Event, pb.Copyable):
    pass

class Dispatcher:
    def __init__(self, event):
        self.event = event

    def dispatch_event(self, remote):
        d = remote.callRemote("eventEnqueue", self.event)   
        d.addCallback(self.printMessage)

    def printMessage(self, text):
        print text

def main():
    from Handler import CopyEvent
    event = CopyEvent()
    d = Dispatcher(event)
    factory = pb.PBClientFactory()
    reactor.connectTCP("localhost", 8739, factory)
    deferred = factory.getRootObject()
    deferred.addCallback(d.dispatch_event)
    reactor.run()

if __name__ == '__main__':
    main()

I tried implementing this in Celery.

Handler.py (Server)

from celery import Celery

app=Celery('tasks',backend='amqp',broker='amqp://guest@localhost//')

@app.task

def handle_event():
     print "Do Something"
     return "did something"

Dispatcher.py (Client)

from Handler import handle_event
from datetime import datetime

def print_message(text):
    print text


t=handle_event.apply_async(countdown=10,link=print_message.s('Done'))  ##HOWTO?

My exact question is how can one implement deferred callbacks TWISTED style on local functions like print_message in Celery. When handle_Event method is finished it returns result on which I would like to have another callback method (print_message) which is LOCAL

Any other possible Design workflow to do this in Celery?

Answer

In Celery, you can implement a style similar to Twisted's deferred callbacks using Celery's built-in functionality for chaining tasks. When one task completes, you can link another task to it by using the link argument, as you've done in your example. However, to clarify and make sure you're handling local callbacks in the way you intend, I'll break it down a bit.

The link parameter is used to attach a callback function to a task, so once the original task is completed, the linked callback will be executed. In your case, print_message should execute after handle_event is finished.

Here's a revised version of your Dispatcher.py to handle this:

Dispatcher.py (Client)

from Handler import handle_event
from datetime import datetime

# Callback function to be called after the main task
def print_message(text):
    print(f"Callback received: {text}")

# Trigger the handle_event task with a 10-second delay
t = handle_event.apply_async(countdown=10, link=print_message.s('Done'))

Key points:

  1. apply_async(countdown=10) will delay the execution of the task by 10 seconds.
  2. link=print_message.s('Done') links print_message to be executed after handle_event completes. The .s() is used to create a signature, allowing you to pass parameters to the print_message callback.
  3. The print_message function will get called with the result of handle_event (which, in your example, will be "did something").

In Celery, you can chain multiple tasks in this way, even with local functions, as you are doing with print_message. This is quite similar to Twisted’s deferred callback mechanism. If you need to pass results between tasks or have more complex logic, you can also use task chaining (with .chain()) or groups (.group()) for parallel execution.