0%

wicked-processes

Example 1

from multiprocessing import Manager, Process

def timeout_query_by_func_name(params,settings,function_name):
m=Manager()
ns=m.Namespace()
ns.my_df=None
p= Process(target=function_name, args=(params,settings,ns))
p.start()
if ‘timeout’ in settings.keys():
timeout=int(settings[‘timeout’])
else:
timeout=600
p.join(timeout)
if p.is_alive():
p.terminate()
p.join()
raise Exception(‘Time out when calling ‘+function_name.name)
return ns.my_df

Example 2

def workon_finishing():
    ps=[]
    for product, run_id in product_runs.iteritems():
        p=Process(target=run_id_status_monitor, args=(product, run_id))
        p.start()
        ps.append(p)

print("Start monitoring..... this can take up to a few hours")
for p in ps:
    p.join()
print("Stopped monitoring. This run is closed.")
Mind the COMMA

If you see something like...

TypeError: 'somekindof' object is not iterable

just change the args=(something) to args=(something,)

About 'shared' objects, below trials are actually not recommended, especially for Process. It adds quite a lot overhead. Thread are fine with that dfs. Idk why but using list references are very addictive in Python.

from multiprocessing import Process, Manager
import pandas as pd
import threading

def f1(name_space):
name_space.df=pd.DataFrame({‘a’:[1,2],’b’:[3,4]})

def f2(name_space):
name_space.df=pd.DataFrame({‘a’:[1,4],’b’:[2,3]})

def f3(items,i):
items[i]=pd.DataFrame({‘c’:[2,5]})

if name==”main“:
m=Manager()
ns=m.Namespace()
ns_1=m.Namespace()
ns_2=m.Namespace()
ns_3=m.Namespace()

ns_1.df=None
ns_2.df=None
ns_3.df=None

ps=[]
p1= Process(target=f1, args=(ns_1,))
p1.start()
p2= Process(target=f2, args=(ns_2,))
p2.start()
p3= Process(target=f1, args=(ns_3,))
p3.start()
p4= Process(target=f2, args=(ns_3,))
p4.start()
dfs=[None]*9
p3 = Process(target=f3, args=(dfs,1,))
p3.start()
t1=threading.Thread(target=f3, args=(dfs,2,))
t1.start()
ts=[]
for i in [4,6,7]:
    t=threading.Thread(target=f3,args=(dfs,i,))
    t.start()
    ts.append(t)
p1.join()
p2.join()
p3.join()
p4.join()
t1.join()
for t in ts:
    t.join()
print(ns_1.df)
print(ns_2.df)
print(ns_3.df)
print(dfs[1])
print(dfs[2])
print(dfs[4])
print(dfs[6])
print(dfs[7])
</code></pre>
the outputs are
 
   a  b
0  1  3
1  2  4
   a  b
0  1  2
1  4  3
   a  b
0  1  2
1  4  3
None
   c
0  2
1  5
   c
0  2
1  5
   c
0  2
1  5
   c
0  2
1  5
But, since join(timeout) only works for Process and with the nice terminate() function, please use Manager in case you need TIMEOUT. That's the whole point.
1
from multiprocessing.managers import Man def timeout_query_by_func_name(params,settings,function_name): m=Manager() ns=m.Namespace() ns.my_df=None p= Process(target=function_name, args=(params,settings,ns)) p.start() if 'timeout' in settings.keys(): timeout=int(settings['timeout']) else: timeout=600 p.join(timeout) if p.is_alive(): p.terminate() p.join() raise Exception('Time out when calling '+function_name.__name__) return ns.my_df