Implementing Twisted style local multiple deferred callbacks in Celery
I am quite new to using Celery and was wondering how TWSITED type multiple deferred callbacks can be implemented in Celery
MY TWISTED CODE uses perspective broker and is as follows. I have a Handler (server) which handles some events and returns the result. The Dispatcher (Client) prints the result returned using a deferred callback.
Handler.py (Server)
from twisted.application import service, internet
from twisted.internet import reactor, task
from twisted.spread import pb
from Dispatcher import Event
from Dispatcher import CopyEvent
class ReceiverEvent(pb.RemoteCopy, Event):
pass
pb.setUnjellyableForClass(CopyEvent, ReceiverEvent)
class Handler(pb.Root):
def remote_eventEnqueue(self, pond):
d = task.deferLater(reactor,5,handle_event,sender=self)
return d
def handle_event(sender):
print "Do Something"
return "did something"
if __name__ == '__main__':
h=Handler()
reactor.listenTCP(8739, pb.PBServerFactory(h))
reactor.run()
Now the Dispatcher.py (Client)
from twisted.spread import pb, jelly
from twisted.python import log
from twisted.internet import reactor
from Event import Event
class CopyEvent(Event, pb.Copyable):
pass
class Dispatcher:
def __init__(self, event):
self.event = event
def dispatch_event(self, remote):
d = remote.callRemote("eventEnqueue", self.event)
d.addCallback(self.printMessage)
def printMessage(self, text):
print text
def main():
from Handler import CopyEvent
event = CopyEvent()
d = Dispatcher(event)
factory = pb.PBClientFactory()
reactor.connectTCP("localhost", 8739, factory)
deferred = factory.getRootObject()
deferred.addCallback(d.dispatch_event)
reactor.run()
if __name__ == '__main__':
main()
I tried implementing this in Celery.
Handler.py (Server)
from celery import Celery
app=Celery('tasks',backend='amqp',broker='amqp://guest@localhost//')
@app.task
def handle_event():
print "Do Something"
return "did something"
Dispatcher.py (Client)
from Handler import handle_event
from datetime import datetime
def print_message(text):
print text
t=handle_event.apply_async(countdown=10,link=print_message.s('Done')) ##HOWTO?
My exact question is how can one implement deferred callbacks TWISTED style on local functions like print_message in Celery. When handle_Event method is finished it returns result on which I would like to have another callback method (print_message) which is LOCAL
Any other possible Design workflow to do this in Celery?
Answer
In Celery, you can implement a style similar to Twisted's deferred callbacks using Celery's built-in functionality for chaining tasks. When one task completes, you can link another task to it by using the link
argument, as you've done in your example. However, to clarify and make sure you're handling local callbacks in the way you intend, I'll break it down a bit.
The link
parameter is used to attach a callback function to a task, so once the original task is completed, the linked callback will be executed. In your case, print_message
should execute after handle_event
is finished.
Here's a revised version of your Dispatcher.py
to handle this:
Dispatcher.py (Client)
from Handler import handle_event
from datetime import datetime
# Callback function to be called after the main task
def print_message(text):
print(f"Callback received: {text}")
# Trigger the handle_event task with a 10-second delay
t = handle_event.apply_async(countdown=10, link=print_message.s('Done'))
Key points:
apply_async(countdown=10)
will delay the execution of the task by 10 seconds.link=print_message.s('Done')
linksprint_message
to be executed afterhandle_event
completes. The.s()
is used to create a signature, allowing you to pass parameters to theprint_message
callback.- The
print_message
function will get called with the result ofhandle_event
(which, in your example, will be"did something"
).
In Celery, you can chain multiple tasks in this way, even with local functions, as you are doing with print_message
. This is quite similar to Twisted’s deferred callback mechanism. If you need to pass results between tasks or have more complex logic, you can also use task chaining (with .chain()
) or groups (.group()
) for parallel execution.