Class: Shoryuken::Manager

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/shoryuken/manager.rb

Overview

Manages message dispatching and processing for a single processing group. Coordinates between the fetcher, polling strategy, and processor.

Constant Summary collapse

BATCH_LIMIT =

Maximum number of messages to fetch in a single batch request

10
MIN_DISPATCH_INTERVAL =

Minimum interval between dispatch cycles

0.1

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(group, fetcher, polling_strategy, concurrency, executor) ⇒ Manager

Initializes a new Manager for a processing group

Parameters:

  • group (String)

    the processing group name

  • fetcher (Shoryuken::Fetcher)

    the message fetcher

  • polling_strategy (Shoryuken::Polling::BaseStrategy)

    the polling strategy

  • concurrency (Integer)

    the maximum number of concurrent processors

  • executor (Concurrent::ExecutorService)

    the executor for async operations



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/shoryuken/manager.rb', line 26

def initialize(group, fetcher, polling_strategy, concurrency, executor)
  @group                      = group
  @fetcher                    = fetcher
  @polling_strategy           = polling_strategy
  @max_processors             = concurrency
  @busy_processors            = Shoryuken::Helpers::AtomicCounter.new(0)
  @executor                   = executor
  @running                    = Shoryuken::Helpers::AtomicBoolean.new(true)
  @stop_new_dispatching       = Shoryuken::Helpers::AtomicBoolean.new(false)
  @dispatching_release_signal = ::Queue.new
end

Instance Attribute Details

#groupString (readonly)

Returns the processing group name.

Returns:

  • (String)

    the processing group name



17
18
19
# File 'lib/shoryuken/manager.rb', line 17

def group
  @group
end

Instance Method Details

#await_dispatching_in_progressvoid

This method returns an undefined value.

Waits for any in-progress dispatching to complete



56
57
58
59
60
61
# File 'lib/shoryuken/manager.rb', line 56

def await_dispatching_in_progress
  # There might still be a dispatching on-going, as the response from SQS could take some time
  # We don't want to stop the process before processing incoming messages, as they would stay "in-flight" for some time on SQS
  # We use a queue, as the dispatch_loop is running on another thread, and this is a efficient way of communicating between threads.
  @dispatching_release_signal.pop
end

#running?Boolean

Checks if the manager is still running

Returns:

  • (Boolean)

    true if the manager is running



66
67
68
# File 'lib/shoryuken/manager.rb', line 66

def running?
  @running.true? && @executor.running?
end

#startvoid

This method returns an undefined value.

Starts the dispatch loop



41
42
43
44
# File 'lib/shoryuken/manager.rb', line 41

def start
  fire_utilization_update_event
  dispatch_loop
end

#stop_new_dispatchingvoid

This method returns an undefined value.

Signals the manager to stop dispatching new messages



49
50
51
# File 'lib/shoryuken/manager.rb', line 49

def stop_new_dispatching
  @stop_new_dispatching.make_true
end