Example 1
from multiprocessing import Manager, Process
def timeout_query_by_func_name(params,settings,function_name):
m=Manager()
ns=m.Namespace()
ns.my_df=None
p= Process(target=function_name, args=(params,settings,ns))
p.start()
if ‘timeout’ in settings.keys():
timeout=int(settings[‘timeout’])
else:
timeout=600
p.join(timeout)
if p.is_alive():
p.terminate()
p.join()
raise Exception(‘Time out when calling ‘+function_name.name)
return ns.my_df
Example 2
def workon_finishing():
ps=[]
for product, run_id in product_runs.iteritems():
p=Process(target=run_id_status_monitor, args=(product, run_id))
p.start()
ps.append(p)
print("Start monitoring..... this can take up to a few hours")
for p in ps:
p.join()
print("Stopped monitoring. This run is closed.")
Mind the COMMA
If you see something like...
TypeError: 'somekindof' object is not iterable
just change the args=(something)
to args=(something,)
About 'shared' objects, below trials are actually not recommended, especially for Process. It adds quite a lot overhead. Thread are fine with that dfs. Idk why but using list references are very addictive in Python.
from multiprocessing import Process, Manager
import pandas as pd
import threading
def f1(name_space):
name_space.df=pd.DataFrame({‘a’:[1,2],’b’:[3,4]})
def f2(name_space):
name_space.df=pd.DataFrame({‘a’:[1,4],’b’:[2,3]})
def f3(items,i):
items[i]=pd.DataFrame({‘c’:[2,5]})
if name==”main“:
m=Manager()
ns=m.Namespace()
ns_1=m.Namespace()
ns_2=m.Namespace()
ns_3=m.Namespace()
ns_1.df=None
ns_2.df=None
ns_3.df=None
ps=[]
p1= Process(target=f1, args=(ns_1,))
p1.start()
p2= Process(target=f2, args=(ns_2,))
p2.start()
p3= Process(target=f1, args=(ns_3,))
p3.start()
p4= Process(target=f2, args=(ns_3,))
p4.start()
dfs=[None]*9
p3 = Process(target=f3, args=(dfs,1,))
p3.start()
t1=threading.Thread(target=f3, args=(dfs,2,))
t1.start()
ts=[]
for i in [4,6,7]:
t=threading.Thread(target=f3,args=(dfs,i,))
t.start()
ts.append(t)
p1.join()
p2.join()
p3.join()
p4.join()
t1.join()
for t in ts:
t.join()
print(ns_1.df)
print(ns_2.df)
print(ns_3.df)
print(dfs[1])
print(dfs[2])
print(dfs[4])
print(dfs[6])
print(dfs[7])
</code></pre>
the outputs are
a b
0 1 3
1 2 4
a b
0 1 2
1 4 3
a b
0 1 2
1 4 3
None
c
0 2
1 5
c
0 2
1 5
c
0 2
1 5
c
0 2
1 5
But, since join(timeout) only works for Process and with the nice terminate() function, please use Manager in case you need TIMEOUT. That's the whole point.
1
from multiprocessing.managers import Man def timeout_query_by_func_name(params,settings,function_name): m=Manager() ns=m.Namespace() ns.my_df=None p= Process(target=function_name, args=(params,settings,ns)) p.start() if 'timeout' in settings.keys(): timeout=int(settings['timeout']) else: timeout=600 p.join(timeout) if p.is_alive(): p.terminate() p.join() raise Exception('Time out when calling '+function_name.__name__) return ns.my_df