Class: Polyrun::Queue::FileStore
- Inherits:
-
Object
- Object
- Polyrun::Queue::FileStore
- Defined in:
- lib/polyrun/queue/file_store.rb,
lib/polyrun/queue/file_store_pending.rb
Overview
File-backed queue (spec_queue.md): queue.json, pending/*.json chunks, done.jsonl, leases.json (OS flock).
Constant Summary collapse
- CHUNK_SIZE =
500
Instance Attribute Summary collapse
-
#root ⇒ Object
readonly
Returns the value of attribute root.
Instance Method Summary collapse
- #ack!(lease_id:, worker_id:) ⇒ Object
- #claim!(worker_id:, batch_size:) ⇒ Object
- #init!(items) ⇒ Object
-
#initialize(root) ⇒ FileStore
constructor
A new instance of FileStore.
- #status ⇒ Object
Constructor Details
Instance Attribute Details
#root ⇒ Object (readonly)
Returns the value of attribute root.
11 12 13 |
# File 'lib/polyrun/queue/file_store.rb', line 11 def root @root end |
Instance Method Details
#ack!(lease_id:, worker_id:) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/polyrun/queue/file_store.rb', line 57 def ack!(lease_id:, worker_id:) with_lock do leases = read_leases lease = leases[lease_id] raise Polyrun::Error, "unknown lease: #{lease_id}" unless lease if lease["worker_id"].to_s != worker_id.to_s raise Polyrun::Error, "lease worker mismatch" end leases.delete(lease_id) write_leases!(leases) paths = lease["paths"] || [] = ["done_count"] = Integer(["done_count"]) + paths.size append_done_lines!(paths) () append_ledger("ACK" => lease_id, "worker_id" => worker_id.to_s, "paths" => paths) end true end |
#claim!(worker_id:, batch_size:) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/polyrun/queue/file_store.rb', line 30 def claim!(worker_id:, batch_size:) batch_size = Integer(batch_size) raise Polyrun::Error, "batch_size must be >= 1" if batch_size < 1 lease_id = SecureRandom.uuid batch = [] with_lock do = batch = take_pending_batch!(, batch_size) leases = read_leases leases[lease_id] = { "worker_id" => worker_id.to_s, "paths" => batch, "claimed_at" => Time.now.utc.iso8601 } () write_leases!(leases) append_ledger( "CLAIM" => lease_id, "worker_id" => worker_id.to_s, "paths" => batch, "pending_remaining" => ["pending_count"] ) end {"lease_id" => lease_id, "paths" => batch} end |
#init!(items) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/polyrun/queue/file_store.rb', line 17 def init!(items) FileUtils.mkdir_p(@root) raise Polyrun::Error, "queue already exists: #{queue_path}" if File.file?(queue_path) items = items.map(&:to_s) = (items.size) FileUtils.mkdir_p(pending_dir) write_pending_chunks!(items, ) atomic_write(queue_path, JSON.generate()) atomic_write(ledger_path, "") true end |
#status ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/polyrun/queue/file_store.rb', line 80 def status with_lock do = { "pending" => Integer(["pending_count"]), "done" => Integer(["done_count"]), "leases" => read_leases.keys.size } end end |