Um eine einfache Warteschlange in Redis zu realisieren, die zum erneuten Senden abgestürzter Jobs verwendet werden kann, würde ich Folgendes versuchen:
- 1 Liste "up_for_grabs"
- 1 Liste "wird_gearbeitet_am"
- automatisch ablaufende Sperren
Ein Arbeiter, der versucht, einen Job zu ergattern, würde etwa so vorgehen:
timeout = 3600
#wrap this in a transaction so our cleanup wont kill the task
#Move the job away from the queue so nobody else tries to claim it
job = RPOPLPUSH(up_for_grabs, being_worked_on)
#Set a lock and expire it, the value tells us when that job will time out. This can be arbitrary though
SETEX('lock:' + job, Time.now + timeout, timeout)
#our application logic
do_work(job)
#Remove the finished item from the queue.
LREM being_worked_on -1 job
#Delete the item's lock. If it crashes here, the expire will take care of it
DEL('lock:' + job)
Und hin und wieder könnten wir einfach unsere Liste nehmen und überprüfen, ob alle darin enthaltenen Jobs tatsächlich gesperrt sind. Wenn wir Jobs finden, die KEINE Sperre haben, bedeutet dies, dass sie abgelaufen sind und unser Worker wahrscheinlich abgestürzt ist In diesem Fall würden wir erneut einreichen.
Dies wäre der Pseudo-Code dafür:
loop do
items = LRANGE(being_worked_on, 0, -1)
items.each do |job|
if !(EXISTS("lock:" + job))
puts "We found a job that didn't have a lock, resubmitting"
LREM being_worked_on -1 job
LPUSH(up_for_grabs, job)
end
end
sleep 60
end