PostgreSQL
 sql >> Datenbank >  >> RDS >> PostgreSQL

SQLAlchemy-Set-Mitgliedschaft für sehr große Sets

In solch einem extremen Fall ist es besser, zuerst darüber nachzudenken, was die empfohlene SQL-Lösung ist, und diese dann in SQLAlchemy zu implementieren – notfalls sogar mit reinem SQL. Eine solche Lösung besteht darin, eine temporäre Tabelle für key_set zu erstellen Daten und zum Auffüllen.

Um so etwas wie Ihr Setup zu testen, habe ich das folgende Modell erstellt

class Table(Base):
    __tablename__ = 'mytable'
    my_key = Column(Integer, primary_key=True)

und füllte es mit 20.000.000 Zeilen:

In [1]: engine.execute("""
   ...:     insert into mytable
   ...:     select generate_series(1, 20000001)
   ...:     """)

Ich habe auch einige Helfer zum Testen verschiedener Kombinationen von temporären Tabellen, Auffüllen und Abfragen erstellt. Beachten Sie, dass die Abfragen die Core-Tabelle verwenden, um das ORM und seine Maschinerie zu umgehen – der Beitrag zum Timing wäre sowieso konstant:

# testdb is just your usual SQLAlchemy imports, and some
# preconfigured engine options.
from testdb import *
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Executable, ClauseElement
from io import StringIO
from itertools import product

class Table(Base):
    __tablename__ = "mytable"
    my_key = Column(Integer, primary_key=True)

def with_session(f):
    def wrapper(*a, **kw):
        session = Session(bind=engine)
        try:
            return f(session, *a, **kw)

        finally:
            session.close()
    return wrapper

def all(_, query):
    return query.all()

def explain(analyze=False):
    def cont(session, query):
        results = session.execute(Explain(query.statement, analyze))
        return [l for l, in results]

    return cont

class Explain(Executable, ClauseElement):
    def __init__(self, stmt, analyze=False):
        self.stmt = stmt
        self.analyze = analyze

@compiles(Explain)
def visit_explain(element, compiler, **kw):
    stmt = "EXPLAIN "

    if element.analyze:
        stmt += "ANALYZE "

    stmt += compiler.process(element.stmt, **kw)
    return stmt

def create_tmp_tbl_w_insert(session, key_set, unique=False):
    session.execute("CREATE TEMPORARY TABLE x (k INTEGER NOT NULL)")
    x = table("x", column("k"))
    session.execute(x.insert().values([(k,) for k in key_set]))

    if unique:
        session.execute("CREATE UNIQUE INDEX ON x (k)")

    session.execute("ANALYZE x")
    return x

def create_tmp_tbl_w_copy(session, key_set, unique=False):
    session.execute("CREATE TEMPORARY TABLE x (k INTEGER NOT NULL)")
    # This assumes that the string representation of the Python values
    # is a valid representation for Postgresql as well. If this is not
    # the case, `cur.mogrify()` should be used.
    file = StringIO("".join([f"{k}\n" for k in key_set]))
    # HACK ALERT, get the DB-API connection object
    with session.connection().connection.connection.cursor() as cur:
        cur.copy_from(file, "x")

    if unique:
        session.execute("CREATE UNIQUE INDEX ON x (k)")

    session.execute("ANALYZE x")
    return table("x", column("k"))

tmp_tbl_factories = {
    "insert": create_tmp_tbl_w_insert,
    "insert (uniq)": lambda session, key_set: create_tmp_tbl_w_insert(session, key_set, unique=True),
    "copy": create_tmp_tbl_w_copy,
    "copy (uniq)": lambda session, key_set: create_tmp_tbl_w_copy(session, key_set, unique=True),
}

query_factories = {
    "in": lambda session, _, x: session.query(Table.__table__).
        filter(Table.my_key.in_(x.select().as_scalar())),
    "exists": lambda session, _, x: session.query(Table.__table__).
        filter(exists().where(x.c.k == Table.my_key)),
    "join": lambda session, _, x: session.query(Table.__table__).
        join(x, x.c.k == Table.my_key)
}

tests = {
    "test in": (
        lambda _s, _ks: None,
        lambda session, key_set, _: session.query(Table.__table__).
            filter(Table.my_key.in_(key_set))
    ),
    "test in expanding": (
        lambda _s, _kw: None,
        lambda session, key_set, _: session.query(Table.__table__).
            filter(Table.my_key.in_(bindparam('key_set', key_set, expanding=True)))
    ),
    **{
        f"test {ql} w/ {tl}": (tf, qf)
        for (tl, tf), (ql, qf)
        in product(tmp_tbl_factories.items(), query_factories.items())
    }
}

@with_session
def run_test(session, key_set, tmp_tbl_factory, query_factory, *, cont=all):
    x = tmp_tbl_factory(session, key_set)
    return cont(session, query_factory(session, key_set, x))

Für kleine Schlüsselsätze einfach IN Abfrage, die Sie haben, ist ungefähr so ​​schnell wie die anderen, aber mit einem key_set von 100.000 beginnen die aufwändigeren Lösungen zu gewinnen:

In [10]: for test, steps in tests.items():
    ...:     print(f"{test:<28}", end=" ")
    ...:     %timeit -r2 -n2 run_test(range(100000), *steps)
    ...:     
test in                      2.21 s ± 7.31 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in expanding            630 ms ± 929 µs per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ insert            1.83 s ± 3.73 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ insert        1.83 s ± 3.99 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ insert          1.86 s ± 3.76 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ insert (uniq)     1.87 s ± 6.67 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ insert (uniq) 1.84 s ± 125 µs per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ insert (uniq)   1.85 s ± 2.8 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ copy              246 ms ± 1.18 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ copy          243 ms ± 2.31 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ copy            258 ms ± 3.05 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ copy (uniq)       261 ms ± 1.39 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ copy (uniq)   267 ms ± 8.24 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ copy (uniq)     264 ms ± 1.16 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)

Erhöhen des key_set bis 1.000.000:

In [11]: for test, steps in tests.items():
    ...:     print(f"{test:<28}", end=" ")
    ...:     %timeit -r2 -n1 run_test(range(1000000), *steps)
    ...:     
test in                      23.8 s ± 158 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in expanding            6.96 s ± 3.02 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ insert            19.6 s ± 79.3 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ insert        20.1 s ± 114 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ insert          19.5 s ± 7.93 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ insert (uniq)     19.5 s ± 45.4 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ insert (uniq) 19.6 s ± 73.6 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ insert (uniq)   20 s ± 57.5 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ copy              2.53 s ± 49.9 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ copy          2.56 s ± 1.96 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ copy            2.61 s ± 26.8 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ copy (uniq)       2.63 s ± 3.79 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ copy (uniq)   2.61 s ± 916 µs per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ copy (uniq)     2.6 s ± 5.31 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)

Schlüsselsatz von 10.000.000, COPY nur Lösungen, da die anderen mein gesamtes RAM auffraßen und vor dem Töten Swaps durchliefen, was darauf hindeutete, dass sie auf dieser Maschine niemals fertig werden würden:

In [12]: for test, steps in tests.items():
    ...:     if "copy" in test:
    ...:         print(f"{test:<28}", end=" ")
    ...:         %timeit -r1 -n1 run_test(range(10000000), *steps)
    ...:     
test in w/ copy              28.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test exists w/ copy          29.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test join w/ copy            29.7 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test in w/ copy (uniq)       28.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test exists w/ copy (uniq)   27.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test join w/ copy (uniq)     28.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Für kleine Schlüsselsätze (~100.000 oder weniger) spielt es also keine Rolle, was Sie verwenden, obwohl Sie bindparam erweitern ist ein klarer Zeitgewinn im Vergleich zur Benutzerfreundlichkeit, aber für viel größere Sätze sollten Sie die Verwendung einer temporären Tabelle und COPY in Betracht ziehen .

Es ist bemerkenswert, dass die Abfragepläne für große Mengen identisch sind, wenn der eindeutige Index verwendet wird:

In [13]: print(*run_test(range(10000000),
    ...:                 tmp_tbl_factories["copy (uniq)"],
    ...:                 query_factories["in"],
    ...:                 cont=explain()), sep="\n")
Merge Join  (cost=45.44..760102.11 rows=9999977 width=4)
  Merge Cond: (mytable.my_key = x.k)
  ->  Index Only Scan using mytable_pkey on mytable  (cost=0.44..607856.88 rows=20000096 width=4)
  ->  Index Only Scan using x_k_idx on x  (cost=0.43..303939.09 rows=9999977 width=4)

In [14]: print(*run_test(range(10000000),
    ...:                 tmp_tbl_factories["copy (uniq)"],
    ...:                 query_factories["exists"],
    ...:                 cont=explain()), sep="\n")
Merge Join  (cost=44.29..760123.36 rows=9999977 width=4)
  Merge Cond: (mytable.my_key = x.k)
  ->  Index Only Scan using mytable_pkey on mytable  (cost=0.44..607856.88 rows=20000096 width=4)
  ->  Index Only Scan using x_k_idx on x  (cost=0.43..303939.09 rows=9999977 width=4)

In [15]: print(*run_test(range(10000000),
    ...:                 tmp_tbl_factories["copy (uniq)"],
    ...:                 query_factories["join"],
    ...:                 cont=explain()), sep="\n")
Merge Join  (cost=39.06..760113.29 rows=9999977 width=4)
  Merge Cond: (mytable.my_key = x.k)
  ->  Index Only Scan using mytable_pkey on mytable  (cost=0.44..607856.88 rows=20000096 width=4)
  ->  Index Only Scan using x_k_idx on x  (cost=0.43..303939.09 rows=9999977 width=4)

Da die Testtabellen irgendwie künstlich sind, können nur Index-Scans verwendet werden.

Zum Schluss noch die Zeitangaben für die "Fußgänger"-Methode zum groben Vergleich:

In [3]: for ksl in [100000, 1000000]:
   ...:     %time [session.query(Table).get(k) for k in range(ksl)]
   ...:     session.rollback()
   ...:     
CPU times: user 1min, sys: 1.76 s, total: 1min 1s
Wall time: 1min 13s
CPU times: user 9min 48s, sys: 17.3 s, total: 10min 5s
Wall time: 12min 1s

Das Problem ist die Verwendung von Query.get() schließt zwangsläufig das ORM ein, während dies bei den ursprünglichen Vergleichen nicht der Fall war. Dennoch sollte klar sein, dass die separaten Roundtrips zur Datenbank teuer sind, selbst wenn eine lokale Datenbank verwendet wird.