Class: Philiprehberger::Debounce::Batcher

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/debounce/batcher.rb

Overview

Buffer items and flush in size-or-time-triggered batches.

Items are pushed via #<< or #push. The block is invoked with the buffered array when either:

  • the buffer has accumulated ‘size` items, or

  • ‘max_wait` seconds have elapsed since the first buffered item.

The block always receives a non-empty Array. Manual #flush forces an immediate invocation of any pending items; #cancel discards them without invoking the block.

Instance Method Summary collapse

Constructor Details

#initialize(size:, max_wait:, on_error: nil, &block) ⇒ Batcher

Returns a new instance of Batcher.

Raises:

  • (ArgumentError)


17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/philiprehberger/debounce/batcher.rb', line 17

def initialize(size:, max_wait:, on_error: nil, &block)
  raise ArgumentError, 'Block is required' unless block
  raise ArgumentError, 'Size must be a positive Integer' unless size.is_a?(Integer) && size.positive?
  raise ArgumentError, 'max_wait must be a positive Numeric' unless max_wait.is_a?(Numeric) && max_wait.positive?

  @size = size
  @max_wait = max_wait
  @block = block
  @on_error = on_error
  @queue = []
  @mutex = Mutex.new
  @timer_thread = nil
  @generation = 0
end

Instance Method Details

#<<(item) ⇒ self

Add an item to the batch. Triggers a size-flush when full, otherwise starts a max_wait timer on the first buffered item.

Parameters:

  • item (Object)

    the item to buffer

Returns:

  • (self)


37
38
39
# File 'lib/philiprehberger/debounce/batcher.rb', line 37

def <<(item)
  push(item)
end

#cancelvoid

This method returns an undefined value.

Discard pending items without invoking the block.



74
75
76
77
78
79
# File 'lib/philiprehberger/debounce/batcher.rb', line 74

def cancel
  @mutex.synchronize do
    @queue.clear
    @generation += 1
  end
end

#flushvoid

This method returns an undefined value.

Force an immediate flush of pending items.



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/philiprehberger/debounce/batcher.rb', line 60

def flush
  items = nil
  @mutex.synchronize do
    return if @queue.empty?

    items = @queue.dup
    @queue.clear
    @generation += 1
  end
  invoke_block(items)
end

#pendingInteger

Number of items currently buffered.

Returns:

  • (Integer)


83
84
85
# File 'lib/philiprehberger/debounce/batcher.rb', line 83

def pending
  @mutex.synchronize { @queue.length }
end

#pending_itemsArray

Snapshot of the current buffered items.

Returns:

  • (Array)


89
90
91
# File 'lib/philiprehberger/debounce/batcher.rb', line 89

def pending_items
  @mutex.synchronize { @queue.dup }
end

#push(item) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/philiprehberger/debounce/batcher.rb', line 41

def push(item)
  items = nil
  @mutex.synchronize do
    @queue << item
    if @queue.length >= @size
      items = @queue.dup
      @queue.clear
      @generation += 1
    elsif @queue.length == 1
      @generation += 1
      schedule_flush(@generation)
    end
  end
  invoke_block(items) if items
  self
end