Class: Ration::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/ration/subscription.rb

Constant Summary collapse

OVERFLOW_POLICIES =
%i[close drop_oldest].freeze
DEFAULT_FILTER =
->(_event) { true }

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max:, filter: nil, on_overflow: :close, logger:) ⇒ Subscription

Returns a new instance of Subscription.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/ration/subscription.rb', line 10

def initialize(max:, filter: nil, on_overflow: :close, logger:)
  unless OVERFLOW_POLICIES.include?(on_overflow)
    raise ArgumentError, "Unknown on_overflow: #{on_overflow.inspect} (expected one of #{OVERFLOW_POLICIES.inspect})"
  end

  @id          = SecureRandom.uuid
  @queue       = SizedQueue.new(max)
  @filter      = filter || DEFAULT_FILTER
  @on_overflow = on_overflow
  @logger      = logger
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



8
9
10
# File 'lib/ration/subscription.rb', line 8

def id
  @id
end

Instance Method Details

#closeObject



43
44
45
# File 'lib/ration/subscription.rb', line 43

def close
  @queue.close
end

#closed?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/ration/subscription.rb', line 39

def closed?
  @queue.closed?
end

#deliver(event) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/ration/subscription.rb', line 47

def deliver(event)
  return if closed?
  return unless passes_filter?(event)

  begin
    @queue.push(event, true)
  rescue ThreadError
    handle_overflow(event)
  rescue ClosedQueueError
    # closed concurrently; nothing to do
  end
end

#each_event(timeout: nil) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/ration/subscription.rb', line 26

def each_event(timeout: nil)
  return enum_for(:each_event, timeout: timeout) unless block_given?

  until closed?
    event = pop(timeout: timeout)
    break if closed?

    yield event
  end

  self
end

#pop(timeout: nil) ⇒ Object



22
23
24
# File 'lib/ration/subscription.rb', line 22

def pop(timeout: nil)
  @queue.pop(timeout: timeout)
end