PostgreSQL
 sql >> Datenbank >  >> RDS >> PostgreSQL

Hängen Sie das Python-Skript mit SQLAlchemy und Multiprocessing ein

Ich glaube der TypeError kommt von multiprocessing 's get .

Ich habe den gesamten DB-Code aus Ihrem Skript entfernt. Schau dir das mal an:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Verwenden von r.wait gibt das erwartete Ergebnis zurück, aber mit r.get löst TypeError aus . Wie in Python-Dokumentation beschrieben , verwenden Sie r.wait nach einem map_async .

Bearbeiten :Ich muss meine vorherige Antwort ändern. Ich glaube jetzt an den TypeError kommt von SQLAlchemy. Ich habe mein Skript geändert, um den Fehler zu reproduzieren.

Bearbeiten 2 :Es sieht so aus, als ob das Problem darin besteht, dass multiprocessing.pool funktioniert nicht gut, wenn ein Worker eine Ausnahme auslöst, deren Konstruktor einen Parameter erfordert (siehe auch hier ).

Ich habe mein Skript geändert, um dies hervorzuheben.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Da Ihr Code in Ihrem Fall eine SQLAlchemy-Ausnahme auslöst, besteht die einzige Lösung, die mir einfällt, darin, alle Ausnahmen in do abzufangen Funktion und erneut eine normale Exception auslösen stattdessen. Etwa so:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Bearbeiten 3 :Es scheint also ein Fehler in Python zu sein , aber richtige Ausnahmen in SQLAlchemy würden Abhilfe schaffen:Daher habe ich das Problem mit SQLAlchemy angesprochen , auch.

Als Workaround des Problems finde ich die Lösung am Ende von Edit 2 tun würde (Callbacks in try-außer und re-raise einschließen).