Class: Polyrun::Queue::FileStore

Inherits:
Object
  • Object
show all
Defined in:
lib/polyrun/queue/file_store.rb,
lib/polyrun/queue/file_store_pending.rb

Overview

File-backed queue (spec_queue.md): queue.json, pending/*.json chunks, done.jsonl, leases.json (OS flock).

Constant Summary collapse

CHUNK_SIZE =
500

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(root) ⇒ FileStore

Returns a new instance of FileStore.



13
14
15
# File 'lib/polyrun/queue/file_store.rb', line 13

def initialize(root)
  @root = File.expand_path(root)
end

Instance Attribute Details

#rootObject (readonly)

Returns the value of attribute root.



11
12
13
# File 'lib/polyrun/queue/file_store.rb', line 11

def root
  @root
end

Instance Method Details

#ack!(lease_id:, worker_id:) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/polyrun/queue/file_store.rb', line 57

def ack!(lease_id:, worker_id:)
  with_lock do
    leases = read_leases
    lease = leases[lease_id]
    raise Polyrun::Error, "unknown lease: #{lease_id}" unless lease

    if lease["worker_id"].to_s != worker_id.to_s
      raise Polyrun::Error, "lease worker mismatch"
    end

    leases.delete(lease_id)
    write_leases!(leases)

    paths = lease["paths"] || []
    meta = load_meta!
    meta["done_count"] = Integer(meta["done_count"]) + paths.size
    append_done_lines!(paths)
    write_meta!(meta)
    append_ledger("ACK" => lease_id, "worker_id" => worker_id.to_s, "paths" => paths)
  end
  true
end

#claim!(worker_id:, batch_size:) ⇒ Object

Raises:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/polyrun/queue/file_store.rb', line 30

def claim!(worker_id:, batch_size:)
  batch_size = Integer(batch_size)
  raise Polyrun::Error, "batch_size must be >= 1" if batch_size < 1

  lease_id = SecureRandom.uuid
  batch = []
  with_lock do
    meta = load_meta!
    batch = take_pending_batch!(meta, batch_size)
    leases = read_leases
    leases[lease_id] = {
      "worker_id" => worker_id.to_s,
      "paths" => batch,
      "claimed_at" => Time.now.utc.iso8601
    }
    write_meta!(meta)
    write_leases!(leases)
    append_ledger(
      "CLAIM" => lease_id,
      "worker_id" => worker_id.to_s,
      "paths" => batch,
      "pending_remaining" => meta["pending_count"]
    )
  end
  {"lease_id" => lease_id, "paths" => batch}
end

#init!(items) ⇒ Object

Raises:



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/polyrun/queue/file_store.rb', line 17

def init!(items)
  FileUtils.mkdir_p(@root)
  raise Polyrun::Error, "queue already exists: #{queue_path}" if File.file?(queue_path)

  items = items.map(&:to_s)
  meta = base_meta(items.size)
  FileUtils.mkdir_p(pending_dir)
  write_pending_chunks!(items, meta)
  atomic_write(queue_path, JSON.generate(meta))
  atomic_write(ledger_path, "")
  true
end

#statusObject



80
81
82
83
84
85
86
87
88
89
# File 'lib/polyrun/queue/file_store.rb', line 80

def status
  with_lock do
    meta = load_meta!
    {
      "pending" => Integer(meta["pending_count"]),
      "done" => Integer(meta["done_count"]),
      "leases" => read_leases.keys.size
    }
  end
end