Problem Statement
After booting the GUnicorn worker processes, I want the worker processes still be able to receive data from another process. Currently, I'm trying to use multiprocessing.Queue
to achieve this. Specifically, I start a data management process before forking the workers and use two queues to connect it with the workers. One queue is for the workers to request data from the data management process, the other to receive the data. In the post_fork
hook, a worker sends out a request to the request queue and receives a response on the response queue, and only then proceeds to serving the application.
This works fine at first. However, when I manually terminate the workers and gunicorn restarts it, it will get stuck in the post_fork
method and never receive a response from the data management process.
Minimal Example
The following code shows a minimal example (config.py
):
import logging
import os
import multiprocessing
logging.basicConfig(level=logging.INFO)
bind = "localhost:8080"
workers = 1
def s(req_q: multiprocessing.Queue, resp_q: multiprocessing.Queue):
while True:
logging.info("Waiting for messages")
other_pid = req_q.get()
logging.info("Got a message from %d", other_pid)
resp_q.put(os.getpid())
m = multiprocessing.Manager()
q1 = m.Queue()
q2 = m.Queue()
proc = multiprocessing.Process(target=s, args=(q1, q2), daemon=True)
proc.start()
def post_fork(server, worker):
logging.info("Sending request")
q1.put(os.getpid())
logging.info("Request sent")
other_pid = q2.get()
logging.info("Got response from %d", other_pid)
My application module (app.py
) is:
from flask import Flask
app = Flask(__name__)
And I start the server via
$ gunicorn -c config.py app:app
INFO:root:Waiting for messages
[2023-01-31 14:20:46 +0800] [24553] [INFO] Starting gunicorn 20.1.0
[2023-01-31 14:20:46 +0800] [24553] [INFO] Listening at: http://127.0.0.1:8080 (24553)
[2023-01-31 14:20:46 +0800] [24553] [INFO] Using worker: sync
[2023-01-31 14:20:46 +0800] [24580] [INFO] Booting worker with pid: 24580
INFO:root:Sending request
INFO:root:Request sent
INFO:root:Got a message from 24580
INFO:root:Waiting for messages
INFO:root:Got response from 24574
The log shows that the messages were successfully exchanged. Now, we'll stop the worker process and let gunicorn restart it:
$ kill 24580
[2023-01-31 14:22:40 +0800] [24580] [INFO] Worker exiting (pid: 24580)
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/util.py", line 319, in _exit_function
p.join()
File "/usr/lib/python3.6/multiprocessing/process.py", line 122, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
[2023-01-31 14:22:40 +0800] [24553] [WARNING] Worker with pid 24574 was terminated due to signal 15
[2023-01-31 14:22:40 +0800] [29497] [INFO] Booting worker with pid: 29497
INFO:root:Sending request
INFO:root:Request sent
Question
Why doesn't s
receive the message from the worker after re-starting?
Besides, why am I getting this 'can only join a child process' error thrown? Does it has something to do with the problem?
Environment
- Python: 3.8.0
- GUnicorn: 20.1.0
- OS: Ubuntu 18.04
Side Note
I already considered the following alternative designs:
- Use HTTP/gRPC/... to share the data: The data that I need to share isn't serializable
- Use
threading.Thread
instead ofmultiprocessing.Process
for the data management process: The data management process initializes an object that will throw an error when it is forked, so I cannot initialize this object within the GUnicorn master process.
Answer
Problem Breakdown
You're encountering two issues in your setup with Gunicorn and the multiprocessing queue:
-
Worker Stuck in
post_fork
Method: After a worker is killed and Gunicorn restarts it, the worker never receives a response from the data management process in thepost_fork
method. -
AssertionError: can only join a child process
: This error appears when themultiprocessing.Process
tries to join a process that is not a child of the current process, which likely results from Gunicorn's worker lifecycle and its handling of process forking.
Why the Worker Gets Stuck
In your setup, the data management process is a background process that interacts with the worker via two queues. When a worker is forked, it sends a message to the data management process (via q1
) and waits for a response (via q2
). This works well initially, but after the worker is terminated and Gunicorn restarts it, it gets stuck in the post_fork
method because:
-
Lost Connection to Data Management Process: When the worker is restarted, it may not have the same connection to the data management process, especially if it's running in a different process group or environment. The
multiprocessing.Queue
might not be able to resume communication because the process states have changed. -
State Reset: After the worker dies and restarts, the state of the queues and the process might not be properly reinitialized. The connection between the
post_fork
method and the data management process could be lost, and the worker is unable to continue receiving data from the queue.
Why the AssertionError
Occurs
The error message 'can only join a child process'
occurs because you're attempting to join a process that isn't a child of the current process. This can happen in Gunicorn when you try to manage subprocesses using multiprocessing
in a way that doesn't respect the process hierarchy set up by Gunicorn.
In Gunicorn, workers are forked from the master process, and this forking mechanism is already managed by Gunicorn. When the worker process is manually killed and then restarted, the process tree can become inconsistent. Specifically, Gunicorn itself is trying to manage the worker processes, and the error might arise because your multiprocessing logic is trying to join processes in a way that's incompatible with how Gunicorn handles its own process management.
Solutions
1. Use multiprocessing.get_start_method()
to Control How Processes Are Forked
Gunicorn's worker model uses the fork()
system call, but you can control how multiprocessing processes are created using the multiprocessing.get_start_method()
function. By default, multiprocessing
uses the "fork" method on Unix-like systems, but it may be better to use the "spawn" method in your case to avoid issues with forking processes.
You can set the start method for your multiprocessing
processes in the config.py
file:
import multiprocessing
multiprocessing.set_start_method('spawn', force=True)
This will ensure that all new processes (including those in the data management process and the Gunicorn workers) are spawned instead of forked. This can help prevent issues when workers are restarted by Gunicorn.
2. Use Separate Queues for Each Worker
The problem could also arise from trying to share a multiprocessing.Queue
between different worker processes managed by Gunicorn. Each worker should have its own communication channel. You can use multiprocessing.Manager
to create a manager process that manages the queues for each worker. The manager can be responsible for creating new queues for each worker on startup.
For example, in the post_fork
hook:
def post_fork(server, worker):
logging.info("Sending request")
# Create new queues for each worker
m = multiprocessing.Manager()
req_q = m.Queue()
resp_q = m.Queue()
# Start a new data management process with fresh queues
proc = multiprocessing.Process(target=s, args=(req_q, resp_q), daemon=True)
proc.start()
# Proceed with communication
req_q.put(os.getpid())
logging.info("Request sent")
other_pid = resp_q.get()
logging.info("Got response from %d", other_pid)
This ensures that each worker gets a fresh set of queues and a new data management process when it forks.
3. Use multiprocessing.Pool
for Worker Management
Instead of manually managing each worker with your own queues, you can use multiprocessing.Pool
for managing workers. Pool
handles the forking and communication more gracefully, allowing you to avoid many of the issues that arise from manually forking and managing processes.
from multiprocessing import Pool, Manager
def worker_task(req_q, resp_q):
req_q.put(os.getpid())
resp_pid = resp_q.get()
return resp_pid
def post_fork(server, worker):
logging.info("Sending request")
# Use a manager to create shared queues
m = Manager()
req_q = m.Queue()
resp_q = m.Queue()
with Pool(processes=1) as pool:
result = pool.apply(worker_task, args=(req_q, resp_q))
logging.info("Got response from %d", result)
This abstracts the complexity of managing processes and queues manually and might prevent issues during worker restarts.
4. Explicitly Reinitialize Communication on Worker Restart
If you're handling your own processes and queues, you may need to explicitly handle the case where workers are restarted. This might include:
- Recreating the data management process and queues when a worker restarts.
- Ensuring the state is properly reinitialized by using a setup function or a condition check in the
post_fork
hook to detect when a worker has been restarted.
Conclusion
- Use "spawn" method: This will avoid some issues with the forking mechanism of Gunicorn.
- Use separate queues for each worker: This ensures isolation between workers.
- Use a pool: If applicable, using
multiprocessing.Pool
can simplify process management.
These approaches should help resolve the issues of workers being stuck after a restart and the AssertionError
caused by process management conflicts.