Ich glaube der TypeError
kommt von multiprocessing
's get
.
Ich habe den gesamten DB-Code aus Ihrem Skript entfernt. Schau dir das mal an:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Verwenden von r.wait
gibt das erwartete Ergebnis zurück, aber mit r.get
löst TypeError
aus . Wie in Python-Dokumentation
beschrieben , verwenden Sie r.wait
nach einem map_async
.
Bearbeiten :Ich muss meine vorherige Antwort ändern. Ich glaube jetzt an den TypeError
kommt von SQLAlchemy. Ich habe mein Skript geändert, um den Fehler zu reproduzieren.
Bearbeiten 2 :Es sieht so aus, als ob das Problem darin besteht, dass multiprocessing.pool
funktioniert nicht gut, wenn ein Worker eine Ausnahme auslöst, deren Konstruktor einen Parameter erfordert (siehe auch hier
).
Ich habe mein Skript geändert, um dies hervorzuheben.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Da Ihr Code in Ihrem Fall eine SQLAlchemy-Ausnahme auslöst, besteht die einzige Lösung, die mir einfällt, darin, alle Ausnahmen in do
abzufangen Funktion und erneut eine normale Exception
auslösen stattdessen. Etwa so:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Bearbeiten 3 :Es scheint also ein Fehler in Python zu sein , aber richtige Ausnahmen in SQLAlchemy würden Abhilfe schaffen:Daher habe ich das Problem mit SQLAlchemy angesprochen , auch.
Als Workaround des Problems finde ich die Lösung am Ende von Edit 2 tun würde (Callbacks in try-außer und re-raise einschließen).