PostgreSQL
 sql >> Datenbank >  >> RDS >> PostgreSQL

Verbindungsprobleme mit SQLAlchemy und mehreren Prozessen

Zitieren "Wie verwende ich Engines / Verbindungen / Sitzungen mit Python-Multiprocessing oder os.fork ()?" mit zusätzlicher Betonung:

Das SQLAlchemy-Engine-Objekt verweist auf einen Verbindungspool vorhandener Datenbankverbindungen. Wenn dieses Objekt also in einen untergeordneten Prozess repliziert wird, ist das Ziel, sicherzustellen, dass keine Datenbankverbindungen übertragen werden .

und

Für den Fall, dass eine transaktionsaktive Sitzung oder Verbindung geteilt wird, gibt es jedoch keine automatische Lösung dafür; Eine Anwendung muss sicherstellen, dass ein neuer untergeordneter Prozess nur neue Verbindungsobjekte und Transaktionen sowie ORM-Sitzungsobjekte initiiert.

Das Problem rührt von dem gegabelten untergeordneten Prozess her, der die Live-globale session erbt , die an einer Verbindung festhält . Beim Ziel ruft init auf , überschreibt es die globalen Verweise auf engine und session , wodurch ihre Refcounts im Kind auf 0 verringert werden, wodurch sie zum Abschließen gezwungen werden. Wenn Sie zum Beispiel auf die eine oder andere Weise einen weiteren Verweis auf die geerbte Sitzung im Kind erstellen, verhindern Sie, dass sie bereinigt wird – aber tun Sie das nicht. Nach main angeschlossen ist und zum normalen Geschäftsbetrieb zurückkehrt, versucht es, die jetzt möglicherweise abgeschlossene – oder anderweitig nicht synchrone – Verbindung zu nutzen. Warum dies erst nach einigen Iterationen einen Fehler verursacht, bin ich mir nicht sicher.

Die einzige Möglichkeit, mit dieser Situation umzugehen, indem Sie Globals so verwenden, wie Sie es tun, ist

  1. Alle Sitzungen schließen
  2. Rufen Sie engine.dispose() auf

vor dem Gabeln. Dadurch wird verhindert, dass Verbindungen zum Kind undicht werden. Zum Beispiel:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

Ihr zweites Beispiel löst keine Finalisierung im untergeordneten Element aus und scheint daher nur zu funktionieren, obwohl es möglicherweise genauso fehlerhaft ist wie das erste, da es immer noch eine Kopie der Sitzung und ihrer Verbindung erbt, die lokal in main .