Class: Upkeep::Subscriptions::AsyncDurableWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/upkeep/subscriptions/async_durable_writer.rb

Defined Under Namespace

Classes: Job

Constant Summary collapse

DEFAULT_BATCH_SIZE =
100
DEFAULT_FLUSH_INTERVAL =
1.0

Instance Method Summary collapse

Constructor Details

#initialize(batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL, &persist_batch) ⇒ AsyncDurableWriter

Returns a new instance of AsyncDurableWriter.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 10

def initialize(batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL, &persist_batch)
  @batch_size = batch_size
  @flush_interval = flush_interval
  @persist_batch = persist_batch
  @mutex = Mutex.new
  @available = ConditionVariable.new
  @drained = ConditionVariable.new
  @queue = []
  @pending = 0
  @inflight_ids = Hash.new(0)
  @closed = false
  @flush_now = false
  @errors = []
  @worker = Thread.new { work_loop }
  @worker.name = "upkeep-durable-writer" if @worker.respond_to?(:name=)
end

Instance Method Details

#cancel(ids) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 37

def cancel(ids)
  ids = Array(ids)
  return [] if ids.empty?

  requested_ids = ids.to_h { |id| [id, true] }

  @mutex.synchronize do
    queued_ids = {}
    removed = 0
    @queue.delete_if do |job|
      id = job.subscription.id
      requested_ids.key?(id).tap do |matched|
        if matched
          queued_ids[id] = true
          removed += 1
        end
      end
    end
    @pending -= removed
    @drained.broadcast if @pending.zero?
    persisted_ids = ids.reject { |id| queued_ids[id] }
    @drained.wait(@mutex) while persisted_ids.any? { |id| @inflight_ids.fetch(id, 0).positive? }
    persisted_ids
  end
end

#drain(raise_errors: true) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 63

def drain(raise_errors: true)
  errors = @mutex.synchronize do
    @flush_now = true
    @available.broadcast
    @drained.wait(@mutex) while @pending.positive?
    drained_errors = @errors
    @errors = [] if raise_errors
    drained_errors
  end

  raise errors.first if raise_errors && errors.any?

  errors
end

#enqueue(subscription, entries:, operation:) ⇒ Object



27
28
29
30
31
32
33
34
35
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 27

def enqueue(subscription, entries:, operation:)
  @mutex.synchronize do
    raise IOError, "Upkeep durable writer is closed" if @closed

    @queue << Job.new(subscription, entries, operation)
    @pending += 1
    @available.signal
  end
end

#shutdownObject



78
79
80
81
82
83
84
85
# File 'lib/upkeep/subscriptions/async_durable_writer.rb', line 78

def shutdown
  drain(raise_errors: false)
  @mutex.synchronize do
    @closed = true
    @available.broadcast
  end
  @worker.join
end