Class: Cosmo::Processor
- Inherits:
-
Object
- Object
- Cosmo::Processor
- Defined in:
- lib/cosmo/processor.rb
Overview
rubocop:disable Metrics/ClassLength
Direct Known Subclasses
Constant Summary collapse
- STREAM_PAUSED_RECHECK_TTL =
Seconds a stream’s paused state is cached before re-checking (override via COSMO_STREAM_PAUSED_RECHECK_TTL)
5.0- STREAMS_PAUSED_IDLE_SLEEP =
Seconds to sleep when every stream is paused, preventing a tight CPU spin (override via COSMO_STREAMS_PAUSED_IDLE_SLEEP)
1.0- STREAM_EMPTY_BACKOFF_MAX =
Max seconds to sleep between empty fetches (override via COSMO_STREAM_EMPTY_BACKOFF_MAX)
5.0
Instance Attribute Summary collapse
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(pool, running, options) ⇒ Processor
constructor
A new instance of Processor.
- #run ⇒ Object
- #stop(timeout = ) ⇒ Object
Constructor Details
Instance Attribute Details
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
13 14 15 |
# File 'lib/cosmo/processor.rb', line 13 def consumers @consumers end |
Class Method Details
.run ⇒ Object
9 10 11 |
# File 'lib/cosmo/processor.rb', line 9 def self.run(...) new(...).tap(&:run) end |
Instance Method Details
#run ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/cosmo/processor.rb', line 24 def run setup return unless @consumers.any? @running.make_true run_loop end |
#stop(timeout = ) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/cosmo/processor.rb', line 32 def stop(timeout = Config[:timeout]) @running.make_false @pool.shutdown @consumers.each { |(s, _)| s.unsubscribe rescue nil } @pool.wait_for_termination(timeout) @threads.compact.each { _1.join(timeout) || _1.kill } @consumers.clear @threads.clear end |