PostgreSQL
 sql >> Datenbank >  >> RDS >> PostgreSQL

Verwenden Sie die binäre Tabelle COPY FROM mit psycopg2

Hier ist das binäre Äquivalent von COPY FROM für Python 3:

from io import BytesIO
from struct import pack
import psycopg2

# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
        (23256, 348, 43.23524, 2494827.949375)]

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Determine starting value for sequence
curs.execute("SELECT nextval('num_data_id_seq')")
id_seq = curs.fetchone()[0]

# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))

# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
                      ('i', 'i', 'h', 'f', 'd'),
                      ( 4,   4,   2,   4,   8 )))
for row in data:
    # Number of columns/fields (always 5)
    cpy.write(pack('!h', 5))
    for col, fmt, size in row_format:
        value = (id_seq if col == -1 else row[col])
        cpy.write(pack('!i' + fmt, size, value))
    id_seq += 1  # manually increment sequence outside of database

# File trailer
cpy.write(pack('!h', -1))

# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)

# Update sequence on database
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
conn.commit()

Aktualisieren

Ich habe den obigen Ansatz zum Schreiben der Dateien für COPY umgeschrieben. Meine Daten in Python befinden sich in NumPy-Arrays, daher ist es sinnvoll, diese zu verwenden. Hier sind einige Beispiel-data mit mit 1M Zeilen, 7 Spalten:

import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# NumPy record array
shape = (7, 2000, 500)
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))

dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
         [('s' + str(x), 'f4') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data['id'] = np.arange(shape[1]*shape[2]) + 1
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
prv = 's0'
for nxt in data.dtype.names[4:]:
    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
    prv = nxt

In meiner Datenbank habe ich zwei Tabellen, die so aussehen:

CREATE TABLE num_data_binary
(
  id integer PRIMARY KEY,
  node integer NOT NULL,
  ts smallint NOT NULL,
  s0 real,
  s1 real,
  s2 real,
  s3 real,
  s4 real,
  s5 real,
  s6 real
) WITH (OIDS=FALSE);

und eine weitere ähnliche Tabelle namens num_data_text .

Hier sind einige einfache Hilfsfunktionen zum Vorbereiten der Daten für COPY (sowohl Text- als auch Binärformate), indem die Informationen im NumPy-Record-Array verwendet werden:

def prepare_text(dat):
    cpy = BytesIO()
    for row in dat:
        cpy.write('\t'.join([repr(x) for x in row]) + '\n')
    return(cpy)

def prepare_binary(dat):
    pgcopy_dtype = [('num_fields','>i2')]
    for field, dtype in dat.dtype.descr:
        pgcopy_dtype += [(field + '_length', '>i4'),
                         (field, dtype.replace('<', '>'))]
    pgcopy = np.empty(dat.shape, pgcopy_dtype)
    pgcopy['num_fields'] = len(dat.dtype)
    for i in range(len(dat.dtype)):
        field = dat.dtype.names[i]
        pgcopy[field + '_length'] = dat.dtype[i].alignment
        pgcopy[field] = dat[field]
    cpy = BytesIO()
    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
    cpy.write(pgcopy.tostring())  # all rows
    cpy.write(pack('!h', -1))  # file trailer
    return(cpy)

So verwende ich die Hilfsfunktionen, um die beiden COPY-Formatmethoden zu bewerten:

def time_pgcopy(dat, table, binary):
    print('Processing copy object for ' + table)
    tstart = datetime.now()
    if binary:
        cpy = prepare_binary(dat)
    else:  # text
        cpy = prepare_text(dat)
    tendw = datetime.now()
    print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
          str(cpy.tell()) + ' bytes; transfering to database')
    cpy.seek(0)
    if binary:
        curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
    else:  # text
        curs.copy_from(cpy, table)
    conn.commit()
    tend = datetime.now()
    print('Database copy time: ' + str(tend - tendw))
    print('        Total time: ' + str(tend - tstart))
    return

time_pgcopy(data, 'num_data_text', binary=False)
time_pgcopy(data, 'num_data_binary', binary=True)

Hier ist die Ausgabe der letzten beiden time_pgcopy Befehle:

Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
        Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
        Total time: 0:00:24.622095

Sowohl die Schritte NumPy → Datei als auch Datei → Datenbank sind mit dem binären Ansatz viel schneller. Der offensichtliche Unterschied besteht darin, wie Python die COPY-Datei vorbereitet, was für Text sehr langsam ist. Im Allgemeinen lädt das Binärformat in 2/3 der Zeit in die Datenbank als das Textformat für dieses Schema.

Zuletzt habe ich die Werte in beiden Tabellen innerhalb der Datenbank verglichen, um zu sehen, ob die Zahlen unterschiedlich waren. Etwa 1,46 % der Zeilen haben unterschiedliche Werte für die Spalte s0 , und dieser Anteil steigt auf 6,17 % für s6 (wahrscheinlich im Zusammenhang mit der Zufallsmethode, die ich verwendet habe). Die absoluten Unterschiede ungleich Null zwischen allen 70M 32-Bit-Float-Werten liegen zwischen 9,3132257e-010 und 7,6293945e-006. Diese kleinen Unterschiede zwischen der Text- und der Binärlademethode sind auf den Genauigkeitsverlust der Float → Text → Float-Konvertierungen zurückzuführen, die für die Textformatmethode erforderlich sind.