I want to run a code with process parallel to my main code but also want to access its parameters or start/stop the process via command prompt.
my machine is win7 64bit. Something in mind is:
from multiprocessing import Process
class dllapi():
...
def apiloop(params, args):
apiclient = dllapi(**args)
while True:
apiclient.cycle()
params = [....]
def mainloop(args):
p = Process(target = apiloop, args=(params, args, ))
while True:
cmd = input()
if cmd == 'kill':
p.terminate()
if cmd == 'stop':
pass # no idea
if cmd == 'resume':
pass # no idea
if cmd == 'report':
print (params)
I wish to make it simple. I did tried to make apiloop as thread yet input() could freeze the program and stopped apiloop working until i pressed enter...
To share the parameters from apiloop process, i did try queue and pipe, but, seem to me, queue needs .join to wait until apiloop is done and pipe has buffer limit.
(actually i can make apiclient.cycle runs every 1s but i wish to keep apiclient alive)
I wish to know if it's worth to dig deeper with multiprocessing (e.g. will try manager as well...) or there are other approaches which is more suitable for my case. Thanks in advance...
*** UPDATED: 201809170953***
Some progress with manager as below:
from multiprocessing import Process, Manager
class dllapi():
...
class webclientapi():
...
def apiloop(args, cmd, params):
apiclient = dllapi(**args)
status = True
while True:
# command from main
if cmd == 'stop':
status = False
elif cmd == 'start':
status = True
cmd = None
# stop or run
if status == True:
apiclient.cycle()
# update parameters
params['status'] = status
def uploadloop(cmds, params):
uploadclient = webclientapi()
status = True
while True:
# command from main
if cmd == 'stop':
status = False
elif cmd == 'start':
status = True
cmd = None
# stop or run
if status == True:
# upload 'status' from apiclient to somewhere
uploadclient.cycle(params['status'])
def mainloop(args):
manager = Manager()
mpcmds = {}
mpparams = {}
mps = {}
mpcmds ['apiloop'] = manager.Value('u', 'start')
mpparams ['apiloop'] = manager.dict()
mps ['apiloop'] = Process(target = apiloop, args=(args, mpcmds['apiloop'], mpparams['apiloop'])
mpcmds ['uploadloop'] = manager.Value('u', 'start')
# mpparams ['uploadloop'] is directly from mpparams ['apiloop']
mps ['uploadloop'] = Process(target = uploadloop, args=(mpcmds['uploadloop'], mpparams['apiloop'])
for key, mp in mps.items():
mp.daemon = True
mp.start()
while True:
cmd = input().split(' ')
# kill daemon process with exit()
if cmd[0] == 'bye':
exit()
# kill individual process
if cmd[0] == 'kill':
mps[cmd[1]].terminate()
# stop individual process via command
if cmd[0] == 'stop':
mpcmds[cmd[1]] = 'stop'
# stop individual process via command
if cmd[0] == 'start':
mpcmds[cmd[1]] = 'start'
# report individual process info via command
if cmd[0] == 'report':
print (mpparams ['apiloop'])
Hope this'd help someone.
Answer
You're on the right track with your code, but there are some important points and optimizations you can make to better handle process communication and control. Below is a revised version of your example that enhances the way processes interact with each other, and allows for command-line control to start/stop processes and retrieve status information.
Key Adjustments:
- Use
Manager
correctly: You're already usingManager
to share data between processes, but we need to ensure that we're handling the communication between theapiloop
anduploadloop
processes in a way that is efficient and avoids common pitfalls (e.g., deadlock, race conditions). - Process Control with
Value
andEvent
: Instead of just using aValue
for status, useEvent
objects for process synchronization, allowing you to start/stop processes cleanly without continuously checking the status. - Command Input: Your
mainloop
should handle commands asynchronously to avoid blocking the main process, which will allow real-time updates to your processes.
Solution with Manager
, Event
, and Queue
:
from multiprocessing import Process, Manager, Event
import time
class dllapi:
def __init__(self, **args):
# Initialize with args
self.args = args
def cycle(self):
# Simulate cycle action
print(f"Running API cycle with {self.args}")
time.sleep(1) # Simulate work being done
class webclientapi:
def __init__(self):
pass
def cycle(self, status):
# Simulate uploading status
print(f"Uploading status: {status}")
time.sleep(1)
def apiloop(args, cmd_event, params):
apiclient = dllapi(**args)
while True:
if cmd_event.is_set(): # Check if 'start' has been triggered
apiclient.cycle()
params['status'] = 'Running' if cmd_event.is_set() else 'Stopped'
time.sleep(0.5) # Simulate waiting for next cycle
def uploadloop(cmd_event, params):
uploadclient = webclientapi()
while True:
if cmd_event.is_set(): # Check if 'start' has been triggered
uploadclient.cycle(params['status'])
time.sleep(1) # Simulate uploading interval
def mainloop(args):
manager = Manager()
# Shared data
mpcmds = manager.dict()
mpparams = manager.dict()
# Shared Event for start/stop signaling
apiloop_event = Event()
uploadloop_event = Event()
# Initializing parameters
mpparams['apiloop'] = manager.dict({'status': 'Stopped'})
mpcmds['apiloop'] = apiloop_event
mpcmds['uploadloop'] = uploadloop_event
# Create processes
mps = {
'apiloop': Process(target=apiloop, args=(args, apiloop_event, mpparams['apiloop'])),
'uploadloop': Process(target=uploadloop, args=(uploadloop_event, mpparams['apiloop']))
}
for key, mp in mps.items():
mp.daemon = True # Daemonize the processes so they terminate when main exits
mp.start()
while True:
cmd = input("Enter command: ").split(' ')
if cmd[0] == 'bye':
print("Exiting...")
break
elif cmd[0] == 'kill':
if cmd[1] in mps:
mps[cmd[1]].terminate()
print(f"Terminated {cmd[1]}")
elif cmd[0] == 'stop':
if cmd[1] in mpcmds:
mpcmds[cmd[1]].clear() # Stop the corresponding process
print(f"Stopped {cmd[1]}")
elif cmd[0] == 'start':
if cmd[1] in mpcmds:
mpcmds[cmd[1]].set() # Start the corresponding process
print(f"Started {cmd[1]}")
elif cmd[0] == 'report':
if cmd[1] in mpparams:
print(f"Status of {cmd[1]}: {mpparams[cmd[1]]['status']}")
if __name__ == "__main__":
args = {'param1': 'value1', 'param2': 'value2'} # Example arguments for dllapi
mainloop(args)
Explanation of Changes:
-
Event for Process Control:
- We use
Event
objects (apiloop_event
,uploadloop_event
) to manage process control. AnEvent
is a flag that can be set (started) or cleared (stopped). This eliminates the need for constantly checking status flags, and is more efficient for signaling between processes.
- We use
-
Daemon Processes:
- The processes are set as daemon processes using
mp.daemon = True
. This ensures that these processes will terminate automatically when the main program (mainloop
) exits.
- The processes are set as daemon processes using
-
Command Handling:
- Commands like
start
,stop
,kill
, andreport
are parsed from input and control the respective processes. The processes are controlled usingEvent.set()
(start) andEvent.clear()
(stop).
- Commands like
-
Shared Parameters:
- The
params
dictionary is shared across processes usingManager.dict()
, and updated inapiloop
and accessed inuploadloop
. This enables passing data like status or other parameters between processes.
- The
Example Command-Line Interaction:
Enter command: start apiloop
Started apiloop
Enter command: start uploadloop
Started uploadloop
Enter command: report apiloop
Status of apiloop: Running
Enter command: stop apiloop
Stopped apiloop
Enter command: report apiloop
Status of apiloop: Stopped
Enter command: kill uploadloop
Terminated uploadloop
Enter command: bye
Exiting...
Advantages of This Approach:
- Real-Time Control: You can control your processes in real time (start/stop) via the command prompt without blocking the main thread.
- Clean Inter-Process Communication: Using
Event
objects andManager
dictionaries provides a robust way to communicate between processes without running into buffer issues or deadlock. - Daemonization: The processes will automatically exit when the main program exits, preventing orphaned processes.
This approach should work well for your use case and is flexible enough to allow for further process management enhancements in the future.