PostgreSQL
 sql >> Datenbank >  >> RDS >> PostgreSQL

Jobqueue als SQL-Tabelle mit mehreren Consumern (PostgreSQL)

Ich verwende Postgres auch für eine FIFO-Warteschlange. Ich habe ursprünglich ACCESS EXCLUSIVE verwendet, das bei hoher Parallelität korrekte Ergebnisse liefert, aber den unglücklichen Effekt hat, dass es sich gegenseitig mit pg_dump ausschließt, das während seiner Ausführung eine ACCESS SHARE-Sperre erwirbt. Dies führt dazu, dass meine Funktion next() für eine sehr lange Zeit gesperrt wird (die Dauer von pg_dump). Dies war nicht akzeptabel, da wir ein 24x7-Geschäft sind und die Kunden die tote Zeit in der Warteschlange mitten in der Nacht nicht mochten.

Ich dachte, es muss eine weniger restriktive Sperre geben, die immer noch gleichzeitig sicher ist und nicht sperrt, während pg_dump ausgeführt wird. Meine Suche führte mich zu diesem SO-Beitrag.

Dann habe ich etwas recherchiert.

Die folgenden Modi sind ausreichend für eine FIFO-Warteschlangen-NEXT()-Funktion, die den Status eines Jobs von in die Warteschlange gestellt aktualisiert zum Laufen ohne Nebenläufigkeit fehlschlagen und auch nicht gegen pg_dump blockieren:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Abfrage:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Das Ergebnis sieht folgendermaßen aus:

UPDATE 1
 job_id
--------
     98
(1 row)

Hier ist ein Shell-Skript, das alle unterschiedlichen Sperrmodi bei hoher Parallelität (30) testet.

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Code ist auch hier, wenn Sie ihn bearbeiten möchten:https://gist.github.com/1083936

Ich aktualisiere meine Anwendung, um den EXCLUSIVE-Modus zu verwenden, da dies der restriktivste Modus ist, der a) korrekt ist und b) nicht mit pg_dump kollidiert. Ich habe mich für die restriktivste Option entschieden, da es am wenigsten riskant erscheint, die App von ACCESS EXCLUSIVE zu ändern, ohne ein Super-Experte für Postgres-Sperren zu sein.

Ich fühle mich ziemlich wohl mit meinem Prüfstand und mit den allgemeinen Ideen hinter der Antwort. Ich hoffe, dass das Teilen dieses Beitrags hilft, dieses Problem für andere zu lösen.