Class: Ration::Subscription
- Inherits:
-
Object
- Object
- Ration::Subscription
- Defined in:
- lib/ration/subscription.rb
Constant Summary collapse
- OVERFLOW_POLICIES =
%i[close drop_oldest].freeze
- DEFAULT_FILTER =
->(_event) { true }
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #deliver(event) ⇒ Object
- #each_event(timeout: nil) ⇒ Object
-
#initialize(max:, filter: nil, on_overflow: :close, logger:) ⇒ Subscription
constructor
A new instance of Subscription.
- #pop(timeout: nil) ⇒ Object
Constructor Details
#initialize(max:, filter: nil, on_overflow: :close, logger:) ⇒ Subscription
Returns a new instance of Subscription.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/ration/subscription.rb', line 10 def initialize(max:, filter: nil, on_overflow: :close, logger:) unless OVERFLOW_POLICIES.include?(on_overflow) raise ArgumentError, "Unknown on_overflow: #{on_overflow.inspect} (expected one of #{OVERFLOW_POLICIES.inspect})" end @id = SecureRandom.uuid @queue = SizedQueue.new(max) @filter = filter || DEFAULT_FILTER @on_overflow = on_overflow @logger = logger end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
8 9 10 |
# File 'lib/ration/subscription.rb', line 8 def id @id end |
Instance Method Details
#close ⇒ Object
43 44 45 |
# File 'lib/ration/subscription.rb', line 43 def close @queue.close end |
#closed? ⇒ Boolean
39 40 41 |
# File 'lib/ration/subscription.rb', line 39 def closed? @queue.closed? end |
#deliver(event) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/ration/subscription.rb', line 47 def deliver(event) return if closed? return unless passes_filter?(event) begin @queue.push(event, true) rescue ThreadError handle_overflow(event) rescue ClosedQueueError # closed concurrently; nothing to do end end |
#each_event(timeout: nil) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/ration/subscription.rb', line 26 def each_event(timeout: nil) return enum_for(:each_event, timeout: timeout) unless block_given? until closed? event = pop(timeout: timeout) break if closed? yield event end self end |
#pop(timeout: nil) ⇒ Object
22 23 24 |
# File 'lib/ration/subscription.rb', line 22 def pop(timeout: nil) @queue.pop(timeout: timeout) end |