Class: Shoryuken::Manager
- Inherits:
-
Object
- Object
- Shoryuken::Manager
- Includes:
- Util
- Defined in:
- lib/shoryuken/manager.rb
Overview
Manages message dispatching and processing for a single processing group. Coordinates between the fetcher, polling strategy, and processor.
Constant Summary collapse
- BATCH_LIMIT =
Maximum number of messages to fetch in a single batch request
10- MIN_DISPATCH_INTERVAL =
Minimum interval between dispatch cycles
0.1
Instance Attribute Summary collapse
-
#group ⇒ String
readonly
The processing group name.
Instance Method Summary collapse
-
#await_dispatching_in_progress ⇒ void
Waits for any in-progress dispatching to complete.
-
#initialize(group, fetcher, polling_strategy, concurrency, executor) ⇒ Manager
constructor
Initializes a new Manager for a processing group.
-
#running? ⇒ Boolean
Checks if the manager is still running.
-
#start ⇒ void
Starts the dispatch loop.
-
#stop_new_dispatching ⇒ void
Signals the manager to stop dispatching new messages.
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(group, fetcher, polling_strategy, concurrency, executor) ⇒ Manager
Initializes a new Manager for a processing group
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/shoryuken/manager.rb', line 26 def initialize(group, fetcher, polling_strategy, concurrency, executor) @group = group @fetcher = fetcher @polling_strategy = polling_strategy @max_processors = concurrency @busy_processors = Shoryuken::Helpers::AtomicCounter.new(0) @executor = executor @running = Shoryuken::Helpers::AtomicBoolean.new(true) @stop_new_dispatching = Shoryuken::Helpers::AtomicBoolean.new(false) @dispatching_release_signal = ::Queue.new end |
Instance Attribute Details
#group ⇒ String (readonly)
Returns the processing group name.
17 18 19 |
# File 'lib/shoryuken/manager.rb', line 17 def group @group end |
Instance Method Details
#await_dispatching_in_progress ⇒ void
This method returns an undefined value.
Waits for any in-progress dispatching to complete
56 57 58 59 60 61 |
# File 'lib/shoryuken/manager.rb', line 56 def await_dispatching_in_progress # There might still be a dispatching on-going, as the response from SQS could take some time # We don't want to stop the process before processing incoming messages, as they would stay "in-flight" for some time on SQS # We use a queue, as the dispatch_loop is running on another thread, and this is a efficient way of communicating between threads. @dispatching_release_signal.pop end |
#running? ⇒ Boolean
Checks if the manager is still running
66 67 68 |
# File 'lib/shoryuken/manager.rb', line 66 def running? @running.true? && @executor.running? end |
#start ⇒ void
This method returns an undefined value.
Starts the dispatch loop
41 42 43 44 |
# File 'lib/shoryuken/manager.rb', line 41 def start fire_utilization_update_event dispatch_loop end |
#stop_new_dispatching ⇒ void
This method returns an undefined value.
Signals the manager to stop dispatching new messages
49 50 51 |
# File 'lib/shoryuken/manager.rb', line 49 def stop_new_dispatching @stop_new_dispatching.make_true end |