Class: ActionSubscriber::RouteSet

Inherits:
Object
  • Object
show all
Includes:
Bunny::Subscriber, MarchHare::Subscriber
Defined in:
lib/action_subscriber/route_set.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Bunny::Subscriber

#bunny_consumers, #cancel_consumers!, #setup_subscriptions!, #start_subscribers!

Methods included from Subscriber

#safely_restart_subscriber

Methods included from Logging

initialize_logger, logger, #logger, logger=

Methods included from MarchHare::Subscriber

#cancel_consumers!, #march_hare_consumers, #setup_subscriptions!, #start_subscribers!

Constructor Details

#initialize(routes) ⇒ RouteSet

Returns a new instance of RouteSet.



11
12
13
# File 'lib/action_subscriber/route_set.rb', line 11

def initialize(routes)
  @routes = routes
end

Instance Attribute Details

#routesObject (readonly)

Returns the value of attribute routes.



9
10
11
# File 'lib/action_subscriber/route_set.rb', line 9

def routes
  @routes
end

Instance Method Details



15
16
17
# File 'lib/action_subscriber/route_set.rb', line 15

def print_middleware_stack
  ::ActionSubscriber.config.middleware.print_middleware_stack
end


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/action_subscriber/route_set.rb', line 19

def print_subscriptions
  print_middleware_stack
  routes.group_by(&:subscriber).each do |subscriber, routes|
    logger.info subscriber.name
    routes.each do |route|
      threadpool = ::ActionSubscriber::ThreadPools.threadpools[route.threadpool_name]
      logger.info "  -- method: #{route.action}"
      logger.info "    --  threadpool: #{route.threadpool_name} (#{threadpool.max_length} threads)"
      logger.info "    --    exchange: #{route.exchange}"
      logger.info "    --       queue: #{route.queue}"
      logger.info "    -- routing_key: #{route.routing_key}"
      logger.info "    --    prefetch: #{route.prefetch}"
      if route.acknowledgements != subscriber.acknowledge_messages?
        logger.error "WARNING subscriber has acknowledgements as #{subscriber.acknowledge_messages?} and route has acknowledgements as #{route.acknowledgements}"
      end
    end
  end
end


38
39
40
41
42
43
44
# File 'lib/action_subscriber/route_set.rb', line 38

def print_threadpool_stats
  ::ActionSubscriber::ThreadPools.threadpools.each do |name, threadpool|
    logger.info "Threadpool #{name}"
    logger.info "  -- available threads: #{threadpool.length}"
    logger.info "  --           backlog: #{threadpool.queue_length}"
  end
end

#wait_to_finish_with_timeout(timeout) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/action_subscriber/route_set.rb', line 46

def wait_to_finish_with_timeout(timeout)
  finisher_threads = []

  ::ActionSubscriber::ThreadPools.threadpools.map do |name, threadpool|
    logger.info "  -- Threadpool #{name} (queued: #{threadpool.queue_length})"
    finisher_threads << ::Thread.new(threadpool, timeout, name) do |internal_pool, internal_timeout, internal_name|
      completed = internal_pool.wait_for_termination(internal_timeout)

      unless completed
        logger.error "  -- FAILED #{internal_name} did not finish shutting down within #{internal_timeout}sec"
      end
    end
  end

  finisher_threads.each(&:join)
end