Class: Tina4::Queue
- Inherits:
-
Object
- Object
- Tina4::Queue
- Defined in:
- lib/tina4/queue.rb
Overview
Queue — unified wrapper for queue management operations. Auto-detects backend from TINA4_QUEUE_BACKEND env var.
Usage:
# Auto-detect from env (default: lite/file backend)
queue = Queue.new(topic: "tasks")
# Explicit backend
queue = Queue.new(topic: "tasks", backend: :rabbitmq)
# Or pass a backend instance directly (legacy)
queue = Queue.new(topic: "tasks", backend: my_backend)
Instance Attribute Summary collapse
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
- .parse_amqp_url(url) ⇒ Object
-
.resolve_backend(name = nil) ⇒ Object
Resolve the default backend from env vars.
- .resolve_kafka_config ⇒ Object
- .resolve_mongo_config ⇒ Object
- .resolve_rabbitmq_config ⇒ Object
Instance Method Summary collapse
-
#backend ⇒ Object
Get the underlying backend instance.
-
#consume(topic = nil, id: nil, poll_interval: 1.0, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
-
#dead_letters ⇒ Object
Get dead letter jobs — messages that exceeded max retries.
-
#initialize(topic:, backend: nil, max_retries: 3) ⇒ Queue
constructor
A new instance of Queue.
-
#pop ⇒ Object
Pop the next available job.
-
#pop_by_id(topic, id) ⇒ Object
Pop a specific job by ID from the queue.
-
#produce(topic, payload) ⇒ Object
Produce a message onto a topic.
-
#purge(status) ⇒ Object
Delete messages by status (completed, failed, dead).
-
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue.
-
#retry_failed ⇒ Object
Re-queue failed messages (under max_retries) back to pending.
-
#size(status: "pending") ⇒ Object
Get the number of messages by status.
Constructor Details
#initialize(topic:, backend: nil, max_retries: 3) ⇒ Queue
Returns a new instance of Queue.
88 89 90 91 92 |
# File 'lib/tina4/queue.rb', line 88 def initialize(topic:, backend: nil, max_retries: 3) @topic = topic @max_retries = max_retries @backend = resolve_backend_arg(backend) end |
Instance Attribute Details
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
86 87 88 |
# File 'lib/tina4/queue.rb', line 86 def max_retries @max_retries end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
86 87 88 |
# File 'lib/tina4/queue.rb', line 86 def topic @topic end |
Class Method Details
.parse_amqp_url(url) ⇒ Object
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/tina4/queue.rb', line 299 def self.parse_amqp_url(url) config = {} url = url.sub("amqp://", "").sub("amqps://", "") if url.include?("@") creds, rest = url.split("@", 2) if creds.include?(":") config[:username], config[:password] = creds.split(":", 2) else config[:username] = creds end else rest = url end if rest.include?("/") hostport, vhost = rest.split("/", 2) config[:vhost] = vhost.start_with?("/") ? vhost : "/#{vhost}" if vhost && !vhost.empty? else hostport = rest end if hostport.include?(":") host, port = hostport.split(":", 2) config[:host] = host config[:port] = port.to_i elsif hostport && !hostport.empty? config[:host] = hostport end config end |
.resolve_backend(name = nil) ⇒ Object
Resolve the default backend from env vars.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/tina4/queue.rb', line 228 def self.resolve_backend(name = nil) chosen = name || ENV.fetch("TINA4_QUEUE_BACKEND", "file").downcase.strip case chosen.to_s when "lite", "file", "default" Tina4::QueueBackends::LiteBackend.new when "rabbitmq" config = resolve_rabbitmq_config Tina4::QueueBackends::RabbitmqBackend.new(config) when "kafka" config = resolve_kafka_config Tina4::QueueBackends::KafkaBackend.new(config) when "mongodb", "mongo" config = resolve_mongo_config Tina4::QueueBackends::MongoBackend.new(config) else raise ArgumentError, "Unknown queue backend: #{chosen.inspect}. Use 'lite', 'rabbitmq', 'kafka', or 'mongodb'." end end |
.resolve_kafka_config ⇒ Object
271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/tina4/queue.rb', line 271 def self.resolve_kafka_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config[:brokers] = url.sub("kafka://", "") end brokers = ENV["TINA4_KAFKA_BROKERS"] config[:brokers] = brokers if brokers config[:brokers] ||= "localhost:9092" config[:group_id] = ENV.fetch("TINA4_KAFKA_GROUP_ID", "tina4_consumer_group") config end |
.resolve_mongo_config ⇒ Object
284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/tina4/queue.rb', line 284 def self.resolve_mongo_config config = {} uri = ENV["TINA4_MONGO_URI"] config[:uri] = uri if uri config[:host] = ENV.fetch("TINA4_MONGO_HOST", "localhost") unless uri config[:port] = (ENV["TINA4_MONGO_PORT"] || 27017).to_i unless uri username = ENV["TINA4_MONGO_USERNAME"] password = ENV["TINA4_MONGO_PASSWORD"] config[:username] = username if username config[:password] = password if password config[:db] = ENV.fetch("TINA4_MONGO_DB", "tina4") config[:collection] = ENV.fetch("TINA4_MONGO_COLLECTION", "tina4_queue") config end |
.resolve_rabbitmq_config ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/tina4/queue.rb', line 257 def self.resolve_rabbitmq_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config = parse_amqp_url(url) end config[:host] ||= ENV.fetch("TINA4_RABBITMQ_HOST", "localhost") config[:port] ||= (ENV["TINA4_RABBITMQ_PORT"] || 5672).to_i config[:username] ||= ENV.fetch("TINA4_RABBITMQ_USERNAME", "guest") config[:password] ||= ENV.fetch("TINA4_RABBITMQ_PASSWORD", "guest") config[:vhost] ||= ENV.fetch("TINA4_RABBITMQ_VHOST", "/") config end |
Instance Method Details
#backend ⇒ Object
Get the underlying backend instance.
223 224 225 |
# File 'lib/tina4/queue.rb', line 223 def backend @backend end |
#consume(topic = nil, id: nil, poll_interval: 1.0, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
Usage:
queue.consume("emails") do |job|
process(job)
end
# Consume a specific job by ID:
queue.consume("emails", id: "abc-123") do |job|
process(job)
end
# Or as an enumerator:
queue.consume("emails").each { |job| process(job) }
Consume jobs from a topic using a long-running generator.
Polls the queue continuously. When empty, sleeps for poll_interval seconds before polling again. No external while-loop or sleep needed.
queue.consume("emails") { |job| process(job) }
queue.consume("emails", poll_interval: 5) { |job| process(job) }
queue.consume("emails", id: "abc-123") { |job| process(job) }
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/tina4/queue.rb', line 159 def consume(topic = nil, id: nil, poll_interval: 1.0, &block) topic ||= @topic if id # Single job by ID — no polling job = pop_by_id(topic, id) if job block_given? ? yield(job) : (return Enumerator.new { |y| y << job }) end return block_given? ? nil : Enumerator.new { |_| } end # poll_interval=0 → single-pass drain (returns when empty) # poll_interval>0 → long-running poll (sleeps when empty, never returns) if block_given? loop do job = @backend.dequeue(topic) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yield job end else Enumerator.new do |yielder| loop do job = @backend.dequeue(topic) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yielder << job end end end end |
#dead_letters ⇒ Object
Get dead letter jobs — messages that exceeded max retries.
110 111 112 113 |
# File 'lib/tina4/queue.rb', line 110 def dead_letters return [] unless @backend.respond_to?(:dead_letters) @backend.dead_letters(@topic, max_retries: @max_retries) end |
#pop ⇒ Object
Pop the next available job. Returns QueueMessage or nil.
105 106 107 |
# File 'lib/tina4/queue.rb', line 105 def pop @backend.dequeue(@topic) end |
#pop_by_id(topic, id) ⇒ Object
Pop a specific job by ID from the queue.
199 200 201 202 |
# File 'lib/tina4/queue.rb', line 199 def pop_by_id(topic, id) return nil unless @backend.respond_to?(:find_by_id) @backend.find_by_id(topic, id) end |
#produce(topic, payload) ⇒ Object
Produce a message onto a topic. Convenience wrapper around push().
129 130 131 132 133 |
# File 'lib/tina4/queue.rb', line 129 def produce(topic, payload) = QueueMessage.new(topic: topic, payload: payload) @backend.enqueue() end |
#purge(status) ⇒ Object
Delete messages by status (completed, failed, dead).
116 117 118 119 |
# File 'lib/tina4/queue.rb', line 116 def purge(status) return 0 unless @backend.respond_to?(:purge) @backend.purge(@topic, status) end |
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue. Returns the QueueMessage. priority: higher-priority messages are dequeued first (default 0). delay_seconds: delay before the message becomes available (default 0).
97 98 99 100 101 102 |
# File 'lib/tina4/queue.rb', line 97 def push(payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = QueueMessage.new(topic: @topic, payload: payload, priority: priority, available_at: available_at) @backend.enqueue() end |
#retry_failed ⇒ Object
Re-queue failed messages (under max_retries) back to pending. Returns the number of jobs re-queued.
123 124 125 126 |
# File 'lib/tina4/queue.rb', line 123 def retry_failed return 0 unless @backend.respond_to?(:retry_failed) @backend.retry_failed(@topic, max_retries: @max_retries) end |
#size(status: "pending") ⇒ Object
Get the number of messages by status. status: “pending” (default) counts pending messages in the topic queue. status: “failed” or “dead” counts messages in the dead_letter directory.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/tina4/queue.rb', line 207 def size(status: "pending") case status.to_s when "pending" @backend.size(@topic) when "failed", "dead" if @backend.respond_to?(:dead_letter_count) @backend.dead_letter_count(@topic) else 0 end else @backend.size(@topic) end end |