Class: Ration::Subscription
- Inherits:
-
Object
- Object
- Ration::Subscription
- Defined in:
- lib/ration/subscription.rb
Constant Summary collapse
- OVERFLOW_POLICIES =
%i[close drop_oldest].freeze
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #deliver(event) ⇒ Object
- #each_event(timeout: nil) ⇒ Object
-
#initialize(max:, filter: nil, on_overflow: :close, logger:) ⇒ Subscription
constructor
A new instance of Subscription.
- #pop(timeout: nil) ⇒ Object
Constructor Details
#initialize(max:, filter: nil, on_overflow: :close, logger:) ⇒ Subscription
Returns a new instance of Subscription.
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/ration/subscription.rb', line 9 def initialize(max:, filter: nil, on_overflow: :close, logger:) unless OVERFLOW_POLICIES.include?(on_overflow) raise ArgumentError, "Unknown on_overflow: #{on_overflow.inspect} (expected one of #{OVERFLOW_POLICIES.inspect})" end @id = SecureRandom.uuid @queue = SizedQueue.new(max) @filter = filter @on_overflow = on_overflow @logger = logger end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
7 8 9 |
# File 'lib/ration/subscription.rb', line 7 def id @id end |
Instance Method Details
#close ⇒ Object
42 43 44 |
# File 'lib/ration/subscription.rb', line 42 def close @queue.close end |
#closed? ⇒ Boolean
38 39 40 |
# File 'lib/ration/subscription.rb', line 38 def closed? @queue.closed? end |
#deliver(event) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/ration/subscription.rb', line 46 def deliver(event) return if closed? return unless passes_filter?(event) begin @queue.push(event, true) rescue ThreadError handle_overflow(event) rescue ClosedQueueError # closed concurrently; nothing to do end end |
#each_event(timeout: nil) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/ration/subscription.rb', line 25 def each_event(timeout: nil) return enum_for(:each_event, timeout: timeout) unless block_given? until closed? event = pop(timeout: timeout) break if closed? yield event end self end |
#pop(timeout: nil) ⇒ Object
21 22 23 |
# File 'lib/ration/subscription.rb', line 21 def pop(timeout: nil) @queue.pop(timeout: timeout) end |