Class: Legion::Transport::InProcess::Queue
- Inherits:
-
Object
- Object
- Legion::Transport::InProcess::Queue
- Defined in:
- lib/legion/transport/in_process.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #acknowledge ⇒ Object
- #bind(exchange, routing_key: '#') ⇒ Object
- #delete ⇒ Object
-
#initialize(channel, name, _opts = {}) ⇒ Queue
constructor
A new instance of Queue.
- #reject ⇒ Object
- #subscribe(manual_ack: true, _block: false, consumer_tag: nil, _on_cancellation: nil, &callback) ⇒ Object
Constructor Details
#initialize(channel, name, _opts = {}) ⇒ Queue
Returns a new instance of Queue.
104 105 106 107 108 |
# File 'lib/legion/transport/in_process.rb', line 104 def initialize(channel, name, _opts = {}) @channel = channel @name = name @bindings = [] end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
102 103 104 |
# File 'lib/legion/transport/in_process.rb', line 102 def channel @channel end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
102 103 104 |
# File 'lib/legion/transport/in_process.rb', line 102 def name @name end |
Instance Method Details
#acknowledge ⇒ Object
142 |
# File 'lib/legion/transport/in_process.rb', line 142 def acknowledge(*); end |
#bind(exchange, routing_key: '#') ⇒ Object
110 111 112 113 |
# File 'lib/legion/transport/in_process.rb', line 110 def bind(exchange, routing_key: '#', **) @bindings << { exchange: exchange, routing_key: routing_key } self end |
#delete ⇒ Object
146 147 148 |
# File 'lib/legion/transport/in_process.rb', line 146 def delete true end |
#reject ⇒ Object
144 |
# File 'lib/legion/transport/in_process.rb', line 144 def reject(*); end |
#subscribe(manual_ack: true, _block: false, consumer_tag: nil, _on_cancellation: nil, &callback) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/legion/transport/in_process.rb', line 115 def subscribe(manual_ack: true, _block: false, consumer_tag: nil, _on_cancellation: nil, **, &callback) tag = consumer_tag || SecureRandom.uuid consumer = Consumer.new(@channel, self, tag, !manual_ack, false) keys = @bindings.map { |b| b[:routing_key] }.reject { |k| k.nil? || k.empty? } keys = [@name] if keys.empty? keys.each do |rk| Legion::Transport::Local.subscribe(rk) do |payload| delivery_info = DeliveryInfo.new( delivery_tag: SecureRandom.uuid, routing_key: rk, exchange: @name ) = MessageProperties.new( content_type: 'application/json', headers: {}, timestamp: ::Time.now, content_encoding: nil ) callback.call(delivery_info, , payload) end end consumer end |