Oracle
 sql >> Datenbank >  >> RDS >> Oracle

Gibt es eine Möglichkeit, FORALL zu verwenden, um Daten aus einem Array einzufügen?

Ich interessiere mich wirklich dafür, was schneller wäre, also habe ich einige Möglichkeiten getestet, sie zu vergleichen:

  • einfaches executemany ohne Tricks.
  • das gleiche mit APPEND_VALUES Hinweis innerhalb der Anweisung.
  • union all Ansatz, den Sie in einer anderen Frage versucht haben. Dies sollte langsamer sein als oben, da es ein wirklich sehr großes erzeugt -Anweisung (die möglicherweise mehr Netzwerk als die Daten selbst erfordern kann). Es sollte dann auf der DB-Seite geparst werden, was ebenfalls viel Zeit in Anspruch nimmt und alle Vorteile vernachlässigt (ganz zu schweigen von der potenziellen Größenbeschränkung). Dann habe ich executemany ausgeführt Habe es zum Testen mit Chunks gemacht, um keine einzige Anweisung für 100.000 Datensätze zu erstellen. Ich habe keine Verkettung von Werten innerhalb der Anweisung verwendet, weil ich sie sicher halten wollte.
  • insert all . Die gleichen Nachteile, aber keine Gewerkschaften. Vergleichen Sie es mit union Version.
  • Serialisierung der Daten in JSON und Deserialisierung auf DB-Seite mit json_table . Potenziell gute Leistung mit einer einzigen kurzen Anweisung und einer einzigen Datenübertragung mit wenig Overhead von JSON.
  • Ihr vorgeschlagener FORALL in der PL/SQL-Wrapper-Prozedur. Sollte dasselbe sein wie executemany seit macht dasselbe, aber auf der Datenbankseite. Overhead der Transformation der Daten in die Sammlung.
  • Dasselbe FORALL , aber mit einem spaltenorientierten Ansatz zum Übergeben der Daten:Übergeben Sie einfache Listen von Spaltenwerten anstelle von komplexen Typen. Sollte viel schneller sein als FORALL mit Sammlung, da die Daten nicht in den Typ der Sammlung serialisiert werden müssen.

Ich habe Oracle Autonomous Database in Oracle Cloud mit einem kostenlosen Konto verwendet. Jede Methode wurde 10 Mal in einer Schleife mit demselben Eingabedatensatz von 100.000 Datensätzen ausgeführt, die Tabelle wurde vor jedem Test neu erstellt. Das ist das Ergebnis, das ich habe. Vorbereitungs- und Ausführungszeiten sind hier die Datenumwandlung am clientseitigen Ende des DB-Aufrufs selbst.

>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method:  exec_many.
    Duration, avg: 2.3083874 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
    Duration, avg: 2.6031369 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method:  union_all.
    Duration, avg: 27.9444233 s
    Preparation time, avg: 0.0408773 s
    Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
    Duration, avg: 70.6442494 s
    Preparation time, avg: 0.0289269 s
    Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
    Duration, avg: 10.4648237 s
    Preparation time, avg: 9.7907693 s
    Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method:     forall.
    Duration, avg: 5.5622837 s
    Preparation time, avg: 1.8972456000000002 s
    Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
    Duration, avg: 2.6702698000000002 s
    Preparation time, avg: 0.055710800000000005 s
    Execution time, avg: 2.6105702 s
>>> 

Der schnellste Weg ist einfach executemany , nicht so sehr überraschen. Interessant ist hier das APPEND_VALUES verbessert die Abfrage nicht und benötigt im Durchschnitt mehr Zeit, daher muss dies genauer untersucht werden.

Über FORALL :Wie erwartet dauert das individuelle Array für jede Spalte weniger Zeit, da keine Datenvorbereitung dafür erforderlich ist. Es ist mehr oder weniger vergleichbar mit executemany , aber ich denke, dass der PL/SQL-Overhead hier eine Rolle spielt.

Ein weiterer interessanter Teil für mich ist JSON:Die meiste Zeit wurde damit verbracht, LOB in die Datenbank und die Serialisierung zu schreiben, aber die Abfrage selbst war sehr schnell. Vielleicht kann der Schreibvorgang mit chuncsize oder auf andere Weise verbessert werden, um LOB-Daten in die select-Anweisung zu übergeben, aber nach meinem Code ist es alles andere als ein sehr einfacher und unkomplizierter Ansatz mit executemany .

Es gibt auch mögliche Ansätze ohne Python, die sollten schneller sein als native Tools für externe Daten, aber ich habe sie nicht getestet:

Unten ist der Code, den ich zum Testen verwendet habe.

import cx_Oracle as db
import os, random, json
import datetime as dt


class PerfTest:
  
  def __init__(self, size):
    self._con = db.connect(
      os.environ["ora_cloud_usr"],
      os.environ["ora_cloud_pwd"],
      "test_low",
      encoding="UTF-8"
    )
    self._cur = self._con.cursor()
    self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
  
  def __del__(self):
    if self._con:
      self._con.rollback()
      self._con.close()
 
#Create objets
  def setup(self):
    try:
      self._cur.execute("drop table rand")
      #print("table dropped")
    except:
      pass
  
    self._cur.execute("""create table rand(
      id int,
      str varchar2(100),
      val number
    )""")
    
    self._cur.execute("""create or replace package pkg_test as
  type ts_test is record (
    id rand.id%type,
    str rand.str%type,
    val rand.val%type
  );
  type tt_test is table of ts_test index by pls_integer;
  
  type tt_ids is table of rand.id%type index by pls_integer;
  type tt_strs is table of rand.str%type index by pls_integer;
  type tt_vals is table of rand.val%type index by pls_integer;
  
  procedure write_data(p_data in tt_test);
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  );

end;""")
    self._cur.execute("""create or replace package body pkg_test as
  procedure write_data(p_data in tt_test)
  as
  begin
    forall i in indices of p_data
      insert into rand(id, str, val)
      values (p_data(i).id, p_data(i).str, p_data(i).val)
    ;
    
    commit;

  end;
  
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  ) as
  begin
    forall i in indices of p_ids
      insert into rand(id, str, val)
      values (p_ids(i), p_strs(i), p_vals(i))
    ;
    
    commit;
    
  end;

end;
""")

 
  def build_union(self, size):
      return """insert into rand(id, str, val)
    select id, str, val from rand where 1 = 0 union all
    """ + """ union all """.join(
      ["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )
 
 
  def build_insert_all(self, size):
      return """
      """.join(
      ["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )


#Test case with executemany
  def exec_many(self):
    start = dt.datetime.now()
    self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)
 
 
#The same as above but with prepared statement (no parsing)
  def exec_many_append(self):
    start = dt.datetime.now()
    self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)


#Union All approach (chunked). Should have large parse time
  def union_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = self.build_union(size)
    
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute unions
    start_exec = dt.datetime.now()
    self._cur.executemany(new_stmt, new_inp)
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_stmt = self.build_union(remainder)
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(new_stmt, new_inp)
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)


#The same as union all, but with no need to union something
  def insert_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = """insert all
    {}
    select * from dual"""
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute
    start_exec = dt.datetime.now()
    self._cur.executemany(
      new_stmt.format(self.build_insert_all(size)),
      new_inp
    )
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(
        new_stmt.format(self.build_insert_all(remainder)),
        new_inp
      )
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)

    
#Serialize at server side and do deserialization at DB side
  def json_table(self):
    start_prepare = dt.datetime.now()
    new_inp = json.dumps([
      { "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
    ])
    
    lob_var = self._con.createlob(db.DB_TYPE_CLOB)
    lob_var.write(new_inp)
    
    start_exec = dt.datetime.now()
    self._cur.execute("""
    insert into rand(id, str, val)
    select id, str, val
    from json_table(
      to_clob(:json), '$[*]'
      columns
        id int,
        str varchar2(100),
        val number
    )
    """, json=lob_var)
    dur_exec = dt.datetime.now() - start_exec
    
    self._con.commit()
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL
  def forall(self):
    start_prepare = dt.datetime.now()
    collection_type = self._con.gettype("PKG_TEST.TT_TEST")
    record_type = self._con.gettype("PKG_TEST.TS_TEST")
    
    def recBuilder(x):
      rec = record_type.newobject()
      rec.ID = x[0]
      rec.STR = x[1]
      rec.VAL = x[2]
      
      return rec

    inp_collection = collection_type.newobject([
      recBuilder(i) for i in self.inp
    ])
    
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data", [inp_collection])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL and plain collections
  def forall_columnar(self):
    start_prepare = dt.datetime.now()
    ids, strs, vals = map(list, zip(*self.inp))
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)

  
#Run test
  def run(self, method, iterations, *args):
    #Cleanup schema
    self.setup()

    start = dt.datetime.now()
    runtime = []
    for i in range(iterations):
      single_run = getattr(self, method)(*args)
      runtime.append(single_run)
    
    dur = dt.datetime.now() - start
    dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
    dur_exec_total = sum([i.total_seconds() for _, i in runtime])
    
    print("""Method: {meth}.
    Duration, avg: {run_dur} s
    Preparation time, avg: {prep} s
    Execution time, avg: {ex} s""".format(
      inp_s=len(self.inp),
      meth=method,
      run_dur=dur.total_seconds() / iterations,
      prep=dur_prep_total / iterations,
      ex=dur_exec_total / iterations
    ))