Python multiprocessing, read input from child process

ghz 9days ago ⋅ 21 views

I have a multiprocessing system, with a main process and two child ones.

One of the child processes (say, C1) sends messages to the other child (C2) through a Queue. C2 analyzes the messages from C1 and, when certain conditions occur, it requires some input from the user.

So, the situation looks like this:

main.py

from child_process_01 import Child_Process_01
from child_process_02 import Child_Process_02

set_start_method("spawn")

c1_c2_q = Queue(maxsize=1)

c1 = mp.Process(target=Child_Process_01, args=(c1_c2_q,))
c1.daemon = True
c1.start()

c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q,))
c2.daemon = True
c2.start()

Child_Process_01

message = produced somehow
c1_c2_q.put((message))

Child_Process_02

## infinite loop, keep checking messages and ask for user input under certain conditions

while True:
    try:
        message = c1_c2_q.get_nowait()
    except:
        message = None

    ## when message arrives
    if message:
        if message = something:
            usr_input = input('In this situation, I need user input')  

The code doesn't work as it is, since the multiprocessing module closes the standard input of all processes it creates, as I found out in many answers here. A good suggestion seemed to redefine the standard in in the main process and to send it to the child, like explained here or here, so I tried it:

main.py

newstdin = os.fdopen(os.dup(sys.stdin.fileno()))

c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q, newstdin))
c2.daemon = True
c2.start()

Child_Process_02

def Child_Process_02(c1_c2_q, newstdin):
    sys.stdin = newstdin
    while True:
        try:
            message = c1_c2_q.get_nowait()
        except:
            message = None

        ## when message arrives
        if message:
            if message = something:
                usr_input = input('In this situation, I need user input')  

but this doesn't work either, because I can't pass the newstdin object created in the main process through a queue, I get the error:

TypeError: cannot pickle '_io.TextIOWrapper' object

Moreover, a few comments to similar question discouraged the practice of reading input from child processes in general, but I can't imagine how to do it differently. Any suggestions?

Answer

The problem you're facing revolves around the limitations of Python's multiprocessing library, which does not allow passing file objects like sys.stdin through inter-process communication (e.g., via a Queue) because they are not pickleable. Additionally, the child processes cannot directly read user input since their standard input is closed by default in a multiprocessing context.

To solve this, here are a few possible approaches:

1. Use Pipe for communication with the main process for user input

Instead of sending sys.stdin to the child process, you can have the child process request input from the main process via a Pipe. The main process will manage user input and pass it to the child process when needed.

main.py:

import multiprocessing as mp
import os
import sys

def Child_Process_01(c1_c2_q):
    # This simulates some processing in C1
    message = "need_user_input"
    c1_c2_q.put(message)

def Child_Process_02(c1_c2_q, input_pipe):
    while True:
        try:
            message = c1_c2_q.get_nowait()
        except:
            message = None
        
        # Process the message and request user input
        if message:
            if message == "need_user_input":
                input_pipe.send('Requesting user input')
                usr_input = input_pipe.recv()  # Receive the user input
                print(f"Received input: {usr_input}")

if __name__ == "__main__":
    # Setup the Queue for communication
    c1_c2_q = mp.Queue(maxsize=1)

    # Setup the Pipe for user input
    input_pipe_parent, input_pipe_child = mp.Pipe()

    # Start the child processes
    c1 = mp.Process(target=Child_Process_01, args=(c1_c2_q,))
    c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q, input_pipe_child))

    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    # Main process handles user input
    while True:
        if input_pipe_parent.poll():  # Check if child requested input
            print("Child requested input.")
            user_input = input("Enter user input: ")
            input_pipe_parent.send(user_input)  # Send the input to child process

Explanation:

  1. Pipe Setup: A Pipe is created between the main process and Child_Process_02. The main process uses the parent end of the pipe (input_pipe_parent) to send input, and the child uses the child end (input_pipe_child) to receive input.
  2. Child Process: In Child_Process_02, it checks for messages from c1_c2_q. When a message requiring user input is received, it signals the main process via the pipe and waits to receive the user input from the main process.
  3. Main Process: The main process waits for the child process to request input via input_pipe_parent.poll(). Once a request is detected, it prompts the user for input and sends it to the child process via the pipe.

2. Use Queue for passing user input requests

Another option is to let the child processes request user input through the Queue itself, and have the main process handle the user input request.

main.py:

import multiprocessing as mp
import os
import sys

def Child_Process_01(c1_c2_q):
    # Simulating a message from C1
    message = "need_user_input"
    c1_c2_q.put(message)

def Child_Process_02(c1_c2_q):
    while True:
        try:
            message = c1_c2_q.get_nowait()
        except:
            message = None

        if message:
            if message == "need_user_input":
                print("Child process needs user input.")
                usr_input = input('Please provide input: ')
                c1_c2_q.put(usr_input)  # Send input back to the main process or other child

if __name__ == "__main__":
    c1_c2_q = mp.Queue(maxsize=1)

    c1 = mp.Process(target=Child_Process_01, args=(c1_c2_q,))
    c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q,))

    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    # Main process collects user input and sends it back to C2
    while True:
        if not c1_c2_q.empty():
            user_input = c1_c2_q.get()  # Get the input request from C2
            print(f"Main received input: {user_input}")

3. Use Threading with the main process managing user input

In some cases, you might consider running the user input handling in a separate thread in the main process, while the child processes handle their tasks asynchronously. This approach avoids dealing with inter-process communication for input.

main.py:

import multiprocessing as mp
import threading
import time

def user_input_thread():
    while True:
        usr_input = input("Enter user input: ")
        # Do something with the input, possibly send to a process or queue
        print(f"User input received: {usr_input}")

def Child_Process_01(c1_c2_q):
    time.sleep(2)
    message = "need_user_input"
    c1_c2_q.put(message)

def Child_Process_02(c1_c2_q):
    while True:
        message = c1_c2_q.get()
        if message == "need_user_input":
            print("Child Process 2 requires input")

if __name__ == "__main__":
    c1_c2_q = mp.Queue()

    # Start the user input thread in the main process
    input_thread = threading.Thread(target=user_input_thread, daemon=True)
    input_thread.start()

    c1 = mp.Process(target=Child_Process_01, args=(c1_c2_q,))
    c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q,))

    c1.start()
    c2.start()

    c1.join()
    c2.join()

Conclusion:

  • Pipe-based communication is probably the most direct solution. The main process handles user input and passes it to the child process when needed.
  • Using a Queue to send messages about user input requests works as well, but you would need a mechanism to avoid blocking the queue when waiting for input.
  • Threading for user input management is another solution, but it can complicate your code further when working with multiple processes.

These approaches avoid directly passing sys.stdin or file handles, which are not pickleable in the multiprocessing context. Let me know if you need further assistance!