Class: Ration::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/ration/subscription.rb

Constant Summary collapse

OVERFLOW_POLICIES =
%i[close drop_oldest].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max:, filter: nil, on_overflow: :close, logger:) ⇒ Subscription

Returns a new instance of Subscription.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/ration/subscription.rb', line 9

def initialize(max:, filter: nil, on_overflow: :close, logger:)
  unless OVERFLOW_POLICIES.include?(on_overflow)
    raise ArgumentError, "Unknown on_overflow: #{on_overflow.inspect} (expected one of #{OVERFLOW_POLICIES.inspect})"
  end

  @id          = SecureRandom.uuid
  @queue       = SizedQueue.new(max)
  @filter      = filter
  @on_overflow = on_overflow
  @logger      = logger
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



7
8
9
# File 'lib/ration/subscription.rb', line 7

def id
  @id
end

Instance Method Details

#closeObject



42
43
44
# File 'lib/ration/subscription.rb', line 42

def close
  @queue.close
end

#closed?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/ration/subscription.rb', line 38

def closed?
  @queue.closed?
end

#deliver(event) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ration/subscription.rb', line 46

def deliver(event)
  return if closed?
  return unless passes_filter?(event)

  begin
    @queue.push(event, true)
  rescue ThreadError
    handle_overflow(event)
  rescue ClosedQueueError
    # closed concurrently; nothing to do
  end
end

#each_event(timeout: nil) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/ration/subscription.rb', line 25

def each_event(timeout: nil)
  return enum_for(:each_event, timeout: timeout) unless block_given?

  until closed?
    event = pop(timeout: timeout)
    break if closed?

    yield event
  end

  self
end

#pop(timeout: nil) ⇒ Object



21
22
23
# File 'lib/ration/subscription.rb', line 21

def pop(timeout: nil)
  @queue.pop(timeout: timeout)
end