How to send data periodically with asyncio.Protocol subclass

ghz 昨天 ⋅ 3 views

I have asyncio.Protocol subclass

class MyProtocol(Protocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        pass

    def eof_received(self):
        self.exit_future.set_result(True)

    def connection_lost(self, exc):
        self.exit_future.set_result(True)

and network connection created with

while True:
    try:
        exit_future = Future(loop=loop)
        transport, protocol = await loop.create_connection(lambda: MyProtocol(exit_future), host, port)

        await exit_future
        transport.close()
    except:
        pass

Now the question is: how can I send some data on some external event occurs? For instance when asyncio.Queue is not empty (queue.get will not block), what fills that queue is not related to asyncio? What is the most correct way to call transport.write when something happens?

Answer

To send data over the transport when an external event (like a non-empty asyncio.Queue) occurs, you need to be able to write to the transport from another task or callback that monitors the queue. Since the transport.write method needs to be called asynchronously and asyncio.Queue is a coroutine-based API, you can achieve this by running a coroutine that listens for the queue and sends data through the transport when appropriate.

Here's a structured approach to do this:

Steps:

  1. Monitor the Queue: You need a separate coroutine that checks the asyncio.Queue for new items. When the queue gets an item, it will send the data through the transport.

  2. Queue and Protocol Integration: The queue monitoring logic should be part of the event loop, and you should use asyncio.create_task to ensure the queue monitoring runs alongside your main connection handling logic.

  3. Writing to the Transport: When the queue is not empty, you will call transport.write within the event loop.

Code Example:

Let's modify your existing code to handle this scenario:

import asyncio

class MyProtocol(asyncio.Protocol):
    def __init__(self, exit_future, queue):
        self.exit_future = exit_future
        self.queue = queue
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        pass  # Handle received data here if needed

    def eof_received(self):
        self.exit_future.set_result(True)

    def connection_lost(self, exc):
        self.exit_future.set_result(True)

    async def write_from_queue(self):
        while True:
            # Wait until there's something in the queue
            data = await self.queue.get()  # This will not block forever because we call .get() asynchronously
            if data is None:  # We can use `None` as a signal to stop
                break
            if self.transport:
                self.transport.write(data)  # Send data over the connection
                print(f"Sent data: {data}")


async def main(loop, host, port):
    queue = asyncio.Queue()

    while True:
        try:
            exit_future = asyncio.Future(loop=loop)
            # Pass the queue to the protocol constructor
            transport, protocol = await loop.create_connection(lambda: MyProtocol(exit_future, queue), host, port)

            # Start a task to send data from the queue
            asyncio.create_task(protocol.write_from_queue())

            await exit_future  # Wait for the connection to close
            transport.close()

        except Exception as e:
            print(f"Connection error: {e}")
            await asyncio.sleep(1)  # Reattempt connection after a delay if needed

async def external_event(queue):
    """Simulate some external event that puts data in the queue"""
    while True:
        await asyncio.sleep(5)  # Simulating external event interval
        data = b"Hello, World!"  # Example data to send
        print(f"External event: Adding data to queue")
        await queue.put(data)

loop = asyncio.get_event_loop()
host, port = 'localhost', 8888

# Start the external event simulation
loop.create_task(external_event(asyncio.Queue()))

# Start the main connection logic
loop.run_until_complete(main(loop, host, port))

Key Points:

  1. Queue Monitoring:

    • The write_from_queue coroutine in MyProtocol is responsible for waiting for data in the asyncio.Queue. It continuously listens for items using await self.queue.get() and then writes the data to the transport when available.
  2. Queue and Protocol Integration:

    • The queue is passed to the protocol as part of the connection setup.
    • You can simulate an external event (like a user input, file read, or any other event) by putting data into the queue, which will be picked up and sent over the transport.
  3. Handling External Events:

    • The external_event coroutine simulates adding data to the queue every 5 seconds (you can modify this to suit your actual event).
    • Whenever new data is added to the queue, it triggers the write_from_queue method, which sends the data over the network.
  4. Error Handling:

    • The try-except block around the connection ensures that the connection is retried in case of failure, with a short delay (asyncio.sleep(1)) between attempts.

Important Considerations:

  • Concurrency: This solution works well with asyncio’s concurrency model. The queue.get() is non-blocking and will allow your event loop to handle other tasks while waiting for data to be put into the queue.

  • Closing the Connection: If you want to cleanly stop the connection (e.g., when you receive a special message or the queue is empty), you can add a special None or similar value to the queue and break the loop. This ensures the connection will be closed gracefully.

  • Event Loop: Ensure that you are running the event loop properly and managing the connection lifecycle (open/close) as needed.

Further Enhancements:

  • You can also implement some form of timeout for reading from the queue if you need the system to stop waiting after a certain period.
  • If you need to manage more advanced error handling or multiple connections, you can expand this logic with connection retries, connection pooling, or event-based triggers.

This approach effectively decouples the external event source (whatever is adding data to the queue) from the network transport handling, and uses asyncio's event loop to manage concurrent operations in a clean and scalable way.