Class: BBK::App::Dispatcher
- Inherits:
-
Object
- Object
- BBK::App::Dispatcher
- Defined in:
- lib/bbk/app/dispatcher.rb,
lib/bbk/app/dispatcher/route.rb,
lib/bbk/app/dispatcher/result.rb,
lib/bbk/app/dispatcher/message.rb,
lib/bbk/app/dispatcher/message_stream.rb,
lib/bbk/app/dispatcher/queue_stream_strategy.rb
Defined Under Namespace
Classes: Message, MessageStream, QueueStreamStrategy, Result, Route
Constant Summary collapse
- ANSWER_DOMAIN =
'answer'.freeze
- DEFAULT_PROTOCOL =
'default'.freeze
Instance Attribute Summary collapse
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
-
#force_quit ⇒ Object
Returns the value of attribute force_quit.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#middlewares ⇒ Object
readonly
Returns the value of attribute middlewares.
-
#observer ⇒ Object
readonly
Returns the value of attribute observer.
-
#pool_size ⇒ Object
Returns the value of attribute pool_size.
-
#publishers ⇒ Object
readonly
Returns the value of attribute publishers.
Instance Method Summary collapse
-
#close(_timeout = 5) ⇒ Object
stop dispatcher and wait for termination Чтоб остановить диспетчер надо: 1.
- #execute_message(message) ⇒ Object
-
#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#process(message) ⇒ Object
process one message and sending existed results messages.
- #register_consumer(consumer) ⇒ Object
- #register_middleware(middleware) ⇒ Object
- #register_publisher(publisher) ⇒ Object
-
#run ⇒ Object
Run all consumers and blocks on message processing.
Constructor Details
#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher
Returns a new instance of Dispatcher.
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/bbk/app/dispatcher.rb', line 40 def initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) @observer = observer @pool_size = pool_size logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: 'Dispatcher') @consumers = [] @publishers = [] @middlewares = [] @pool_factory = pool_factory @stream_strategy_class = stream_strategy @force_quit = false end |
Instance Attribute Details
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def consumers @consumers end |
#force_quit ⇒ Object
Returns the value of attribute force_quit.
34 35 36 |
# File 'lib/bbk/app/dispatcher.rb', line 34 def force_quit @force_quit end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def logger @logger end |
#middlewares ⇒ Object (readonly)
Returns the value of attribute middlewares.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def middlewares @middlewares end |
#observer ⇒ Object (readonly)
Returns the value of attribute observer.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def observer @observer end |
#pool_size ⇒ Object
Returns the value of attribute pool_size.
34 35 36 |
# File 'lib/bbk/app/dispatcher.rb', line 34 def pool_size @pool_size end |
#publishers ⇒ Object (readonly)
Returns the value of attribute publishers.
35 36 37 |
# File 'lib/bbk/app/dispatcher.rb', line 35 def publishers @publishers end |
Instance Method Details
#close(_timeout = 5) ⇒ Object
stop dispatcher and wait for termination Чтоб остановить диспетчер надо:
-
остановить консьюмеры
-
остановить прием новых сообщений - @stream.close
-
дождаться обработки всего в очереди или таймаут
-
остановить потоки
-
остановить паблишеры
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/bbk/app/dispatcher.rb', line 91 def close(_timeout = 5) ActiveSupport::Notifications.instrument 'dispatcher.close', dispatcher: self consumers.each do |cons| begin cons.stop rescue StandardError => e logger.error "Consumer #{cons} stop error: #{e}" logger.debug e.backtrace end end @stream_strategy&.stop(5) consumers.each do |cons| begin cons.close rescue StandardError => e logger.error "Consumer #{cons} close error: #{e}" logger.debug e.backtrace end end publishers.each do |pub| begin pub.close rescue StandardError => e logger.error "Publisher #{pub} close error: #{e}" logger.debug e.backtrace end end end |
#execute_message(message) ⇒ Object
123 124 125 126 127 |
# File 'lib/bbk/app/dispatcher.rb', line 123 def () build_processing_stack.call().select do |r| r.is_a?(BBK::App::Dispatcher::Result) end end |
#process(message) ⇒ Object
process one message and sending existed results messages
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/bbk/app/dispatcher.rb', line 130 def process() results = () logger.debug "There are #{results.count} results to send from #{.headers[:message_id]}..." send_results(, results).value rescue StandardError => e logger.error "Failed processing message: #{e.inspect}" ActiveSupport::Notifications.instrument 'dispatcher.exception', msg: , exception: e .nack(error: e) close if force_quit end |
#register_consumer(consumer) ⇒ Object
53 54 55 |
# File 'lib/bbk/app/dispatcher.rb', line 53 def register_consumer(consumer) consumers << consumer end |
#register_middleware(middleware) ⇒ Object
62 63 64 |
# File 'lib/bbk/app/dispatcher.rb', line 62 def register_middleware(middleware) middlewares << middleware end |
#register_publisher(publisher) ⇒ Object
57 58 59 60 |
# File 'lib/bbk/app/dispatcher.rb', line 57 def register_publisher(publisher) raise "Publisher support #{DEFAULT_PROTOCOL}" if publisher.protocols.include?(DEFAULT_PROTOCOL) publishers << publisher end |
#run ⇒ Object
Run all consumers and blocks on message processing
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/bbk/app/dispatcher.rb', line 67 def run @pool = @pool_factory.call(@pool_size, 10) @stream_strategy = @stream_strategy_class.new(@pool, logger: logger) ActiveSupport::Notifications.instrument 'dispatcher.run', dispatcher: self @stream_strategy.run(consumers) do |msg| begin logger.tagged(msg.headers[:message_id]) do process msg end rescue StandardError => e logger.fatal "E[#{@stream_strategy_class}]: #{e}" logger.fatal "E[#{@stream_strategy_class}]: #{e.backtrace.join("\n")}" end end end |