Class: Legion::Transport::InProcess::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/legion/transport/in_process.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, name, _opts = {}) ⇒ Queue

Returns a new instance of Queue.



104
105
106
107
108
# File 'lib/legion/transport/in_process.rb', line 104

def initialize(channel, name, _opts = {})
  @channel = channel
  @name = name
  @bindings = []
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



102
103
104
# File 'lib/legion/transport/in_process.rb', line 102

def channel
  @channel
end

#nameObject (readonly)

Returns the value of attribute name.



102
103
104
# File 'lib/legion/transport/in_process.rb', line 102

def name
  @name
end

Instance Method Details

#acknowledgeObject



142
# File 'lib/legion/transport/in_process.rb', line 142

def acknowledge(*); end

#bind(exchange, routing_key: '#') ⇒ Object



110
111
112
113
# File 'lib/legion/transport/in_process.rb', line 110

def bind(exchange, routing_key: '#', **)
  @bindings << { exchange: exchange, routing_key: routing_key }
  self
end

#deleteObject



146
147
148
# File 'lib/legion/transport/in_process.rb', line 146

def delete
  true
end

#rejectObject



144
# File 'lib/legion/transport/in_process.rb', line 144

def reject(*); end

#subscribe(manual_ack: true, _block: false, consumer_tag: nil, _on_cancellation: nil, &callback) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/legion/transport/in_process.rb', line 115

def subscribe(manual_ack: true, _block: false, consumer_tag: nil, _on_cancellation: nil, **, &callback)
  tag = consumer_tag || SecureRandom.uuid
  consumer = Consumer.new(@channel, self, tag, !manual_ack, false)

  keys = @bindings.map { |b| b[:routing_key] }.reject { |k| k.nil? || k.empty? }
  keys = [@name] if keys.empty?

  keys.each do |rk|
    Legion::Transport::Local.subscribe(rk) do |payload|
      delivery_info = DeliveryInfo.new(
        delivery_tag: SecureRandom.uuid,
        routing_key:  rk,
        exchange:     @name
      )
       = MessageProperties.new(
        content_type:     'application/json',
        headers:          {},
        timestamp:        ::Time.now,
        content_encoding: nil
      )
      callback.call(delivery_info, , payload)
    end
  end

  consumer
end