Class: Upkeep::Subscriptions::AsyncDurableWriter
- Inherits:
-
Object
- Object
- Upkeep::Subscriptions::AsyncDurableWriter
- Defined in:
- lib/upkeep/subscriptions/async_durable_writer.rb
Defined Under Namespace
Classes: Job
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
100- DEFAULT_FLUSH_INTERVAL =
1.0
Instance Method Summary collapse
- #cancel(ids) ⇒ Object
- #drain(raise_errors: true) ⇒ Object
- #enqueue(subscription, entries:, operation:) ⇒ Object
-
#initialize(batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL, &persist_batch) ⇒ AsyncDurableWriter
constructor
A new instance of AsyncDurableWriter.
- #shutdown ⇒ Object
Constructor Details
#initialize(batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL, &persist_batch) ⇒ AsyncDurableWriter
Returns a new instance of AsyncDurableWriter.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 10 def initialize(batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL, &persist_batch) @batch_size = batch_size @flush_interval = flush_interval @persist_batch = persist_batch @mutex = Mutex.new @available = ConditionVariable.new @drained = ConditionVariable.new @queue = [] @pending = 0 @inflight_ids = Hash.new(0) @closed = false @flush_now = false @errors = [] @worker = Thread.new { work_loop } @worker.name = "upkeep-durable-writer" if @worker.respond_to?(:name=) end |
Instance Method Details
#cancel(ids) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 37 def cancel(ids) ids = Array(ids) return [] if ids.empty? requested_ids = ids.to_h { |id| [id, true] } @mutex.synchronize do queued_ids = {} removed = 0 @queue.delete_if do |job| id = job.subscription.id requested_ids.key?(id).tap do |matched| if matched queued_ids[id] = true removed += 1 end end end @pending -= removed @drained.broadcast if @pending.zero? persisted_ids = ids.reject { |id| queued_ids[id] } @drained.wait(@mutex) while persisted_ids.any? { |id| @inflight_ids.fetch(id, 0).positive? } persisted_ids end end |
#drain(raise_errors: true) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 63 def drain(raise_errors: true) errors = @mutex.synchronize do @flush_now = true @available.broadcast @drained.wait(@mutex) while @pending.positive? drained_errors = @errors @errors = [] if raise_errors drained_errors end raise errors.first if raise_errors && errors.any? errors end |
#enqueue(subscription, entries:, operation:) ⇒ Object
27 28 29 30 31 32 33 34 35 |
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 27 def enqueue(subscription, entries:, operation:) @mutex.synchronize do raise IOError, "Upkeep durable writer is closed" if @closed @queue << Job.new(subscription, entries, operation) @pending += 1 @available.signal end end |
#shutdown ⇒ Object
78 79 80 81 82 83 84 85 |
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 78 def shutdown drain(raise_errors: false) @mutex.synchronize do @closed = true @available.broadcast end @worker.join end |