GUnicorn: Queue not working after re-starting worker

ghz 昨天 ⋅ 6 views

Problem Statement

After booting the GUnicorn worker processes, I want the worker processes still be able to receive data from another process. Currently, I'm trying to use multiprocessing.Queue to achieve this. Specifically, I start a data management process before forking the workers and use two queues to connect it with the workers. One queue is for the workers to request data from the data management process, the other to receive the data. In the post_fork hook, a worker sends out a request to the request queue and receives a response on the response queue, and only then proceeds to serving the application.

This works fine at first. However, when I manually terminate the workers and gunicorn restarts it, it will get stuck in the post_fork method and never receive a response from the data management process.

Minimal Example

The following code shows a minimal example (config.py):

import logging
import os
import multiprocessing
logging.basicConfig(level=logging.INFO)

bind = "localhost:8080"
workers = 1


def s(req_q: multiprocessing.Queue, resp_q: multiprocessing.Queue):
    while True:
        logging.info("Waiting for messages")
        other_pid = req_q.get()
        logging.info("Got a message from %d", other_pid)
        resp_q.put(os.getpid())


m = multiprocessing.Manager()
q1 = m.Queue()
q2 = m.Queue()


proc = multiprocessing.Process(target=s, args=(q1, q2), daemon=True)
proc.start()


def post_fork(server, worker):
    logging.info("Sending request")
    q1.put(os.getpid())
    logging.info("Request sent")
    other_pid = q2.get()
    logging.info("Got response from %d", other_pid)

My application module (app.py) is:

from flask import Flask
app = Flask(__name__)

And I start the server via

$ gunicorn -c config.py app:app
INFO:root:Waiting for messages
[2023-01-31 14:20:46 +0800] [24553] [INFO] Starting gunicorn 20.1.0
[2023-01-31 14:20:46 +0800] [24553] [INFO] Listening at: http://127.0.0.1:8080 (24553)
[2023-01-31 14:20:46 +0800] [24553] [INFO] Using worker: sync
[2023-01-31 14:20:46 +0800] [24580] [INFO] Booting worker with pid: 24580
INFO:root:Sending request
INFO:root:Request sent
INFO:root:Got a message from 24580
INFO:root:Waiting for messages
INFO:root:Got response from 24574

The log shows that the messages were successfully exchanged. Now, we'll stop the worker process and let gunicorn restart it:

$ kill 24580
[2023-01-31 14:22:40 +0800] [24580] [INFO] Worker exiting (pid: 24580)
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/util.py", line 319, in _exit_function
    p.join()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 122, in join
    assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
[2023-01-31 14:22:40 +0800] [24553] [WARNING] Worker with pid 24574 was terminated due to signal 15
[2023-01-31 14:22:40 +0800] [29497] [INFO] Booting worker with pid: 29497
INFO:root:Sending request
INFO:root:Request sent

Question

Why doesn't s receive the message from the worker after re-starting?

Besides, why am I getting this 'can only join a child process' error thrown? Does it has something to do with the problem?

Environment

  • Python: 3.8.0
  • GUnicorn: 20.1.0
  • OS: Ubuntu 18.04

Side Note

I already considered the following alternative designs:

  • Use HTTP/gRPC/... to share the data: The data that I need to share isn't serializable
  • Use threading.Thread instead of multiprocessing.Process for the data management process: The data management process initializes an object that will throw an error when it is forked, so I cannot initialize this object within the GUnicorn master process.

Answer

Problem Breakdown

You're encountering two issues in your setup with Gunicorn and the multiprocessing queue:

  1. Worker Stuck in post_fork Method: After a worker is killed and Gunicorn restarts it, the worker never receives a response from the data management process in the post_fork method.

  2. AssertionError: can only join a child process: This error appears when the multiprocessing.Process tries to join a process that is not a child of the current process, which likely results from Gunicorn's worker lifecycle and its handling of process forking.

Why the Worker Gets Stuck

In your setup, the data management process is a background process that interacts with the worker via two queues. When a worker is forked, it sends a message to the data management process (via q1) and waits for a response (via q2). This works well initially, but after the worker is terminated and Gunicorn restarts it, it gets stuck in the post_fork method because:

  1. Lost Connection to Data Management Process: When the worker is restarted, it may not have the same connection to the data management process, especially if it's running in a different process group or environment. The multiprocessing.Queue might not be able to resume communication because the process states have changed.

  2. State Reset: After the worker dies and restarts, the state of the queues and the process might not be properly reinitialized. The connection between the post_fork method and the data management process could be lost, and the worker is unable to continue receiving data from the queue.

Why the AssertionError Occurs

The error message 'can only join a child process' occurs because you're attempting to join a process that isn't a child of the current process. This can happen in Gunicorn when you try to manage subprocesses using multiprocessing in a way that doesn't respect the process hierarchy set up by Gunicorn.

In Gunicorn, workers are forked from the master process, and this forking mechanism is already managed by Gunicorn. When the worker process is manually killed and then restarted, the process tree can become inconsistent. Specifically, Gunicorn itself is trying to manage the worker processes, and the error might arise because your multiprocessing logic is trying to join processes in a way that's incompatible with how Gunicorn handles its own process management.

Solutions

1. Use multiprocessing.get_start_method() to Control How Processes Are Forked

Gunicorn's worker model uses the fork() system call, but you can control how multiprocessing processes are created using the multiprocessing.get_start_method() function. By default, multiprocessing uses the "fork" method on Unix-like systems, but it may be better to use the "spawn" method in your case to avoid issues with forking processes.

You can set the start method for your multiprocessing processes in the config.py file:

import multiprocessing
multiprocessing.set_start_method('spawn', force=True)

This will ensure that all new processes (including those in the data management process and the Gunicorn workers) are spawned instead of forked. This can help prevent issues when workers are restarted by Gunicorn.

2. Use Separate Queues for Each Worker

The problem could also arise from trying to share a multiprocessing.Queue between different worker processes managed by Gunicorn. Each worker should have its own communication channel. You can use multiprocessing.Manager to create a manager process that manages the queues for each worker. The manager can be responsible for creating new queues for each worker on startup.

For example, in the post_fork hook:

def post_fork(server, worker):
    logging.info("Sending request")
    
    # Create new queues for each worker
    m = multiprocessing.Manager()
    req_q = m.Queue()
    resp_q = m.Queue()

    # Start a new data management process with fresh queues
    proc = multiprocessing.Process(target=s, args=(req_q, resp_q), daemon=True)
    proc.start()

    # Proceed with communication
    req_q.put(os.getpid())
    logging.info("Request sent")
    other_pid = resp_q.get()
    logging.info("Got response from %d", other_pid)

This ensures that each worker gets a fresh set of queues and a new data management process when it forks.

3. Use multiprocessing.Pool for Worker Management

Instead of manually managing each worker with your own queues, you can use multiprocessing.Pool for managing workers. Pool handles the forking and communication more gracefully, allowing you to avoid many of the issues that arise from manually forking and managing processes.

from multiprocessing import Pool, Manager

def worker_task(req_q, resp_q):
    req_q.put(os.getpid())
    resp_pid = resp_q.get()
    return resp_pid

def post_fork(server, worker):
    logging.info("Sending request")
    
    # Use a manager to create shared queues
    m = Manager()
    req_q = m.Queue()
    resp_q = m.Queue()

    with Pool(processes=1) as pool:
        result = pool.apply(worker_task, args=(req_q, resp_q))
        logging.info("Got response from %d", result)

This abstracts the complexity of managing processes and queues manually and might prevent issues during worker restarts.

4. Explicitly Reinitialize Communication on Worker Restart

If you're handling your own processes and queues, you may need to explicitly handle the case where workers are restarted. This might include:

  • Recreating the data management process and queues when a worker restarts.
  • Ensuring the state is properly reinitialized by using a setup function or a condition check in the post_fork hook to detect when a worker has been restarted.

Conclusion

  • Use "spawn" method: This will avoid some issues with the forking mechanism of Gunicorn.
  • Use separate queues for each worker: This ensures isolation between workers.
  • Use a pool: If applicable, using multiprocessing.Pool can simplify process management.

These approaches should help resolve the issues of workers being stuck after a restart and the AssertionError caused by process management conflicts.