Class: BBK::App::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/app/dispatcher.rb,
lib/bbk/app/dispatcher/route.rb,
lib/bbk/app/dispatcher/result.rb,
lib/bbk/app/dispatcher/message.rb,
lib/bbk/app/dispatcher/message_stream.rb,
lib/bbk/app/dispatcher/queue_stream_strategy.rb
more...

Defined Under Namespace

Classes: Message, MessageStream, QueueStreamStrategy, Result, Route

Constant Summary collapse

ANSWER_DOMAIN =
'answer'.freeze
DEFAULT_PROTOCOL =
'default'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy) ⇒ Dispatcher

Returns a new instance of Dispatcher.

[View source]

40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/bbk/app/dispatcher.rb', line 40

def initialize(observer, pool_size: 3, logger: BBK::App.logger, pool_factory: SimplePoolFactory, stream_strategy: QueueStreamStrategy)
  @observer = observer
  @pool_size = pool_size
  logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
  @logger = BBK::Utils::ProxyLogger.new(logger, tags: 'Dispatcher')
  @consumers = []
  @publishers = []
  @middlewares = []
  @pool_factory = pool_factory
  @stream_strategy_class = stream_strategy
  @force_quit = false
end

Instance Attribute Details

#consumersObject (readonly)

Returns the value of attribute consumers.


35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def consumers
  @consumers
end

#force_quitObject

Returns the value of attribute force_quit.


34
35
36
# File 'lib/bbk/app/dispatcher.rb', line 34

def force_quit
  @force_quit
end

#loggerObject (readonly)

Returns the value of attribute logger.


35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def logger
  @logger
end

#middlewaresObject (readonly)

Returns the value of attribute middlewares.


35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def middlewares
  @middlewares
end

#observerObject (readonly)

Returns the value of attribute observer.


35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def observer
  @observer
end

#pool_sizeObject

Returns the value of attribute pool_size.


34
35
36
# File 'lib/bbk/app/dispatcher.rb', line 34

def pool_size
  @pool_size
end

#publishersObject (readonly)

Returns the value of attribute publishers.


35
36
37
# File 'lib/bbk/app/dispatcher.rb', line 35

def publishers
  @publishers
end

Instance Method Details

#close(_timeout = 5) ⇒ Object

stop dispatcher and wait for termination Чтоб остановить диспетчер надо:

  1. остановить консьюмеры

  2. остановить прием новых сообщений - @stream.close

  3. дождаться обработки всего в очереди или таймаут

  4. остановить потоки

  5. остановить паблишеры

[View source]

92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/bbk/app/dispatcher.rb', line 92

def close(_timeout = 5)
  ActiveSupport::Notifications.instrument 'dispatcher.close', dispatcher: self
  consumers.each do |cons|
    begin
      cons.stop
    rescue StandardError => e
      logger.error "Consumer #{cons} stop error: #{e}"
      logger.debug e.backtrace
    end
  end

  @stream_strategy&.stop(5)

  consumers.each do |cons|
    begin
      cons.close
    rescue StandardError => e
      logger.error "Consumer #{cons} close error: #{e}"
      logger.debug e.backtrace
    end
  end

  publishers.each do |pub|
    begin
      pub.close
    rescue StandardError => e
      logger.error "Publisher #{pub} close error: #{e}"
      logger.debug e.backtrace
    end
  end
end

#execute_message(message) ⇒ Object

[View source]

124
125
126
127
128
# File 'lib/bbk/app/dispatcher.rb', line 124

def execute_message(message)
  build_processing_stack.call(message).select do |r|
    r.is_a?(BBK::App::Dispatcher::Result)
  end
end

#process(message) ⇒ Object

process one message and sending existed results messages

[View source]

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/bbk/app/dispatcher.rb', line 131

def process(message)
  results = execute_message(message)
  if message.respond_to?(:nacked?) && message.nacked?
    logger.debug "Ignore sending results: message(#{message.headers[:message_id]}) nacked in processor"
    return
  end
  logger.debug "There are #{results.count} results to send from #{message.headers[:message_id]}..."
  ActiveSupport::Notifications.instrument 'dispatcher.process', msg: message do
    send_results(message, results).value
  end
rescue StandardError => e
  logger.error "Failed processing message: #{e.inspect}"
  # это событие устарело, вместо него надо использовать `dispatcher.process`
  ActiveSupport::Notifications.instrument 'dispatcher.exception', msg: message, exception: e
  message.nack(error: e)
  close if force_quit
end

#register_consumer(consumer) ⇒ Object

[View source]

53
54
55
# File 'lib/bbk/app/dispatcher.rb', line 53

def register_consumer(consumer)
  consumers << consumer
end

#register_middleware(middleware) ⇒ Object

[View source]

63
64
65
# File 'lib/bbk/app/dispatcher.rb', line 63

def register_middleware(middleware)
  middlewares << middleware
end

#register_publisher(publisher) ⇒ Object

[View source]

57
58
59
60
61
# File 'lib/bbk/app/dispatcher.rb', line 57

def register_publisher(publisher)
  raise "Publisher support #{DEFAULT_PROTOCOL}" if publisher.protocols.include?(DEFAULT_PROTOCOL)

  publishers << publisher
end

#runObject

Run all consumers and blocks on message processing

[View source]

68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/bbk/app/dispatcher.rb', line 68

def run
  @pool = @pool_factory.call(@pool_size, 10)
  @stream_strategy = @stream_strategy_class.new(@pool, logger: logger)
  ActiveSupport::Notifications.instrument 'dispatcher.run', dispatcher: self

  @stream_strategy.run(consumers) do |msg|
    begin
      logger.tagged(msg.headers[:message_id]) do
        process msg
      end
    rescue StandardError => e
      logger.fatal "E[#{@stream_strategy_class}]: #{e}"
      logger.fatal "E[#{@stream_strategy_class}]: #{e.backtrace.join("\n")}"
    end
  end
end