Class: ActivePublisher::Async::InMemoryAdapter::ConsumerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/active_publisher/async/in_memory_adapter/consumer_thread.rb

Constant Summary collapse

CHANNEL_CLOSED_ERRORS =
[::Bunny::ChannelAlreadyClosed]
NETWORK_ERRORS =
[::Bunny::NetworkFailure, ::Bunny::TCPConnectionFailed, ::Bunny::ConnectionTimeout,
::Timeout::Error, ::IOError].freeze
PRECONDITION_ERRORS =
[::Bunny::PreconditionFailed]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(listen_queue) ⇒ ConsumerThread

Returns a new instance of ConsumerThread.



27
28
29
30
31
32
33
34
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 27

def initialize(listen_queue)
  @queue = listen_queue
  @sampled_queue_size = queue.size
  @flush_max = ::ActivePublisher.configuration.messages_per_batch

  update_last_tick_at
  start_thread
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



5
6
7
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 5

def channel
  @channel
end

#flush_maxObject (readonly)

Returns the value of attribute flush_max.



5
6
7
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 5

def flush_max
  @flush_max
end

#last_tick_atObject (readonly)

Returns the value of attribute last_tick_at.



5
6
7
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 5

def last_tick_at
  @last_tick_at
end

#queueObject (readonly)

Returns the value of attribute queue.



5
6
7
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 5

def queue
  @queue
end

#sampled_queue_sizeObject (readonly)

Returns the value of attribute sampled_queue_size.



5
6
7
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 5

def sampled_queue_size
  @sampled_queue_size
end

#threadObject (readonly)

Returns the value of attribute thread.



5
6
7
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 5

def thread
  @thread
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 36

def alive?
  @thread && @thread.alive?
end

#killObject



40
41
42
43
# File 'lib/active_publisher/async/in_memory_adapter/consumer_thread.rb', line 40

def kill
  @thread.kill if @thread
  @thread = nil
end