Class: Tina4::Queue
- Inherits:
-
Object
- Object
- Tina4::Queue
- Defined in:
- lib/tina4/queue.rb
Overview
Queue — unified wrapper for queue management operations. Auto-detects backend from TINA4_QUEUE_BACKEND env var.
Usage:
# Auto-detect from env (default: lite/file backend)
queue = Queue.new(topic: "tasks")
# Explicit backend
queue = Queue.new(topic: "tasks", backend: :rabbitmq)
# Or pass a backend instance directly (legacy)
queue = Queue.new(topic: "tasks", backend: my_backend)
Instance Attribute Summary collapse
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#retry_backoff ⇒ Object
readonly
Returns the value of attribute retry_backoff.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
-
#visibility_timeout ⇒ Object
readonly
Returns the value of attribute visibility_timeout.
Class Method Summary collapse
-
.default_visibility_timeout ⇒ Object
Reservation/visibility timeout in seconds, from env (default 300 = 5 min).
- .parse_amqp_url(url) ⇒ Object
-
.resolve_backend(name = nil, max_retries: 3, retry_backoff: 0, visibility_timeout: nil) ⇒ Object
Resolve the default backend from env vars.
- .resolve_kafka_config ⇒ Object
- .resolve_mongo_config ⇒ Object
- .resolve_rabbitmq_config ⇒ Object
Instance Method Summary collapse
-
#backend ⇒ Object
Get the underlying backend instance.
-
#clear ⇒ Object
Clear all pending jobs from this queue’s topic.
-
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
-
#dead_letters(max_retries: nil) ⇒ Object
Get dead letter jobs — messages that exceeded max retries.
-
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
-
#get_topic ⇒ Object
Get the topic name this queue was constructed with.
-
#initialize(topic:, backend: nil, max_retries: 3, retry_backoff: 0, visibility_timeout: nil) ⇒ Queue
constructor
A new instance of Queue.
-
#pop ⇒ Object
Pop the next available job.
-
#pop_batch(count) ⇒ Object
Pop up to count jobs at once.
-
#pop_by_id(topic = nil, id) ⇒ Object
Pop a specific job by ID from the queue.
-
#process(topic: nil, max_jobs: nil, batch_size: 1, &handler) ⇒ Object
Consume all available jobs and pass each to handler, then stop.
-
#produce(topic, payload, priority: 0, delay_seconds: 0) ⇒ Object
Produce a message onto a topic.
-
#purge(status, max_retries: nil) ⇒ Object
Delete messages by status (completed, failed, dead).
-
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue.
-
#retry(job_id = nil, delay_seconds: 0) ⇒ Object
Retry a specific failed job by ID, or all dead-letter jobs if no id given.
-
#retry_failed(max_retries: nil) ⇒ Object
Re-queue failed messages (under max_retries) back to pending.
-
#size(status: "pending") ⇒ Object
Get the number of messages by status.
Constructor Details
#initialize(topic:, backend: nil, max_retries: 3, retry_backoff: 0, visibility_timeout: nil) ⇒ Queue
Returns a new instance of Queue.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/tina4/queue.rb', line 22 def initialize(topic:, backend: nil, max_retries: 3, retry_backoff: 0, visibility_timeout: nil) @topic = topic @max_retries = max_retries # Seconds to wait before a failed job is re-attempted (lite backend). # Default 0 = retry on the very next pop/consume iteration. @retry_backoff = retry_backoff # Reservation/visibility timeout (seconds). A popped job is reserved for # this long; if the consumer dies before complete()/fail() the next pop() # reclaims it (at-least-once delivery). Falls back to # TINA4_QUEUE_VISIBILITY_TIMEOUT, else 300 (5 min). <= 0 disables reclaim. # RabbitMQ/Kafka ignore it — the broker owns redelivery. @visibility_timeout = visibility_timeout.nil? ? self.class.default_visibility_timeout : visibility_timeout.to_f @backend = resolve_backend_arg(backend) end |
Instance Attribute Details
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def max_retries @max_retries end |
#retry_backoff ⇒ Object (readonly)
Returns the value of attribute retry_backoff.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def retry_backoff @retry_backoff end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def topic @topic end |
#visibility_timeout ⇒ Object (readonly)
Returns the value of attribute visibility_timeout.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def visibility_timeout @visibility_timeout end |
Class Method Details
.default_visibility_timeout ⇒ Object
Reservation/visibility timeout in seconds, from env (default 300 = 5 min).
39 40 41 42 43 |
# File 'lib/tina4/queue.rb', line 39 def self.default_visibility_timeout Float(ENV.fetch("TINA4_QUEUE_VISIBILITY_TIMEOUT", "300")) rescue ArgumentError, TypeError 300.0 end |
.parse_amqp_url(url) ⇒ Object
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/tina4/queue.rb', line 379 def self.parse_amqp_url(url) config = {} url = url.sub("amqp://", "").sub("amqps://", "") if url.include?("@") creds, rest = url.split("@", 2) if creds.include?(":") config[:username], config[:password] = creds.split(":", 2) else config[:username] = creds end else rest = url end if rest.include?("/") hostport, vhost = rest.split("/", 2) config[:vhost] = vhost.start_with?("/") ? vhost : "/#{vhost}" if vhost && !vhost.empty? else hostport = rest end if hostport.include?(":") host, port = hostport.split(":", 2) config[:host] = host config[:port] = port.to_i elsif hostport && !hostport.empty? config[:host] = hostport end config end |
.resolve_backend(name = nil, max_retries: 3, retry_backoff: 0, visibility_timeout: nil) ⇒ Object
Resolve the default backend from env vars.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/tina4/queue.rb', line 286 def self.resolve_backend(name = nil, max_retries: 3, retry_backoff: 0, visibility_timeout: nil) chosen = name || ENV.fetch("TINA4_QUEUE_BACKEND", "file").downcase.strip vt = visibility_timeout.nil? ? default_visibility_timeout : visibility_timeout case chosen.to_s when "lite", "file", "default" Tina4::QueueBackends::LiteBackend.new( max_retries: max_retries, retry_backoff: retry_backoff, visibility_timeout: vt ) when "rabbitmq" # Broker manages visibility/redelivery (unacked messages requeue on # channel close) — the framework timeout is accepted but not used. config = resolve_rabbitmq_config Tina4::QueueBackends::RabbitmqBackend.new(config) when "kafka" # Consumer-group offsets manage redelivery — framework timeout N/A. config = resolve_kafka_config Tina4::QueueBackends::KafkaBackend.new(config) when "mongodb", "mongo" config = resolve_mongo_config config[:visibility_timeout] = vt config[:max_retries] = max_retries Tina4::QueueBackends::MongoBackend.new(config) else raise ArgumentError, "Unknown queue backend: #{chosen.inspect}. Use 'lite', 'rabbitmq', 'kafka', or 'mongodb'." end end |
.resolve_kafka_config ⇒ Object
351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/tina4/queue.rb', line 351 def self.resolve_kafka_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config[:brokers] = url.sub("kafka://", "") end brokers = ENV["TINA4_KAFKA_BROKERS"] config[:brokers] = brokers if brokers config[:brokers] ||= "localhost:9092" config[:group_id] = ENV.fetch("TINA4_KAFKA_GROUP_ID", "tina4_consumer_group") config end |
.resolve_mongo_config ⇒ Object
364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/tina4/queue.rb', line 364 def self.resolve_mongo_config config = {} uri = ENV["TINA4_MONGO_URI"] config[:uri] = uri if uri config[:host] = ENV.fetch("TINA4_MONGO_HOST", "localhost") unless uri config[:port] = (ENV["TINA4_MONGO_PORT"] || 27017).to_i unless uri username = ENV["TINA4_MONGO_USERNAME"] password = ENV["TINA4_MONGO_PASSWORD"] config[:username] = username if username config[:password] = password if password config[:db] = ENV.fetch("TINA4_MONGO_DB", "tina4") config[:collection] = ENV.fetch("TINA4_MONGO_COLLECTION", "tina4_queue") config end |
.resolve_rabbitmq_config ⇒ Object
337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/tina4/queue.rb', line 337 def self.resolve_rabbitmq_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config = parse_amqp_url(url) end config[:host] ||= ENV.fetch("TINA4_RABBITMQ_HOST", "localhost") config[:port] ||= (ENV["TINA4_RABBITMQ_PORT"] || 5672).to_i config[:username] ||= ENV.fetch("TINA4_RABBITMQ_USERNAME", "guest") config[:password] ||= ENV.fetch("TINA4_RABBITMQ_PASSWORD", "guest") config[:vhost] ||= ENV.fetch("TINA4_RABBITMQ_VHOST", "/") config end |
Instance Method Details
#backend ⇒ Object
Get the underlying backend instance.
281 282 283 |
# File 'lib/tina4/queue.rb', line 281 def backend @backend end |
#clear ⇒ Object
Clear all pending jobs from this queue’s topic. Returns count removed.
79 80 81 82 |
# File 'lib/tina4/queue.rb', line 79 def clear # -> int return 0 unless @backend.respond_to?(:clear) @backend.clear(@topic) end |
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
Usage:
queue.consume("emails") do |job|
process(job)
end
# Consume a specific job by ID:
queue.consume("emails", id: "abc-123") do |job|
process(job)
end
# Or as an enumerator:
queue.consume("emails").each { |job| process(job) }
Consume jobs from a topic using a long-running generator.
Polls the queue continuously. When empty, sleeps for poll_interval seconds before polling again. No external while-loop or sleep needed.
queue.consume("emails") { |job| process(job) }
queue.consume("emails", poll_interval: 5) { |job| process(job) }
queue.consume("emails", id: "abc-123") { |job| process(job) }
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/tina4/queue.rb', line 149 def consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) topic ||= @topic if id # Single job by ID — no polling job = pop_by_id(topic, id) if job block_given? ? yield(job) : (return Enumerator.new { |y| y << job }) end return block_given? ? nil : Enumerator.new { |_| } end # poll_interval=0 → single-pass drain (returns when empty) # poll_interval>0 → long-running poll (sleeps when empty, never returns) # iterations>0 → stop after consuming N jobs if block_given? consumed = 0 if batch_size > 1 loop do jobs = pop_batch(batch_size) if jobs.empty? break if poll_interval <= 0 sleep(poll_interval) next end yield jobs consumed += jobs.length break if iterations > 0 && consumed >= iterations end else loop do job = attach(@backend.dequeue(topic)) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yield job consumed += 1 break if iterations > 0 && consumed >= iterations end end else Enumerator.new do |yielder| consumed = 0 loop do job = attach(@backend.dequeue(topic)) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yielder << job consumed += 1 break if iterations > 0 && consumed >= iterations end end end end |
#dead_letters(max_retries: nil) ⇒ Object
Get dead letter jobs — messages that exceeded max retries. Pass max_retries to override the queue’s default.
99 100 101 102 |
# File 'lib/tina4/queue.rb', line 99 def dead_letters(max_retries: nil) # -> list[dict] return [] unless @backend.respond_to?(:dead_letters) @backend.dead_letters(@topic, max_retries: max_retries || @max_retries) end |
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
85 86 87 88 |
# File 'lib/tina4/queue.rb', line 85 def failed # -> list[dict] return [] unless @backend.respond_to?(:failed) @backend.failed(@topic, max_retries: @max_retries) end |
#get_topic ⇒ Object
Get the topic name this queue was constructed with.
238 239 240 |
# File 'lib/tina4/queue.rb', line 238 def get_topic @topic end |
#pop ⇒ Object
Pop the next available job. Returns Job or nil.
56 57 58 |
# File 'lib/tina4/queue.rb', line 56 def pop # -> Job|None attach(@backend.dequeue(@topic)) end |
#pop_batch(count) ⇒ Object
Pop up to count jobs at once. Returns a partial batch if fewer available.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/tina4/queue.rb', line 61 def pop_batch(count) jobs = if @backend.respond_to?(:dequeue_batch) @backend.dequeue_batch(@topic, count) else collected = [] count.times do job = @backend.dequeue(@topic) break if job.nil? collected << job end collected end jobs.each { |job| attach(job) } jobs end |
#pop_by_id(topic = nil, id) ⇒ Object
Pop a specific job by ID from the queue. Searches the pending queue for the given topic (defaults to this queue’s topic). Returns the matching Job (claimed/removed from the queue) or nil.
212 213 214 215 |
# File 'lib/tina4/queue.rb', line 212 def pop_by_id(topic = nil, id) return nil unless @backend.respond_to?(:find_by_id) attach(@backend.find_by_id(topic || @topic, id)) end |
#process(topic: nil, max_jobs: nil, batch_size: 1, &handler) ⇒ Object
Consume all available jobs and pass each to handler, then stop.
Simpler alternative to consume() for drain-and-exit use cases.
queue.process { |job| handle(job); job.complete }
queue.process(topic: "emails", max_jobs: 10) { |job| ... }
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/tina4/queue.rb', line 249 def process(topic: nil, max_jobs: nil, batch_size: 1, &handler) raise ArgumentError, "block required" unless block_given? drain_topic = topic || @topic processed = 0 loop do break if max_jobs && processed >= max_jobs if batch_size > 1 remaining = max_jobs ? [batch_size, max_jobs - processed].min : batch_size jobs = @backend.respond_to?(:dequeue_batch) ? @backend.dequeue_batch(drain_topic, remaining) : (1..remaining).map { @backend.dequeue(drain_topic) }.compact break if jobs.empty? begin handler.call(jobs) rescue => e jobs.each { |job| job.fail(e.) } end processed += jobs.length else job = @backend.dequeue(drain_topic) break if job.nil? begin handler.call(job) rescue => e job.fail(e.) end processed += 1 end end end |
#produce(topic, payload, priority: 0, delay_seconds: 0) ⇒ Object
Produce a message onto a topic. Convenience wrapper around push().
118 119 120 121 122 123 |
# File 'lib/tina4/queue.rb', line 118 def produce(topic, payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = Job.new(topic: topic, payload: payload, priority: priority, available_at: available_at, queue: self) @backend.enqueue() end |
#purge(status, max_retries: nil) ⇒ Object
Delete messages by status (completed, failed, dead).
105 106 107 108 |
# File 'lib/tina4/queue.rb', line 105 def purge(status, max_retries: nil) # -> int return 0 unless @backend.respond_to?(:purge) @backend.purge(@topic, status) end |
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue. Returns the Job. priority: higher-priority messages are dequeued first (default 0). delay_seconds: delay before the message becomes available (default 0).
48 49 50 51 52 53 |
# File 'lib/tina4/queue.rb', line 48 def push(payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = Job.new(topic: @topic, payload: payload, priority: priority, available_at: available_at, queue: self) @backend.enqueue() end |
#retry(job_id = nil, delay_seconds: 0) ⇒ Object
Retry a specific failed job by ID, or all dead-letter jobs if no id given. Returns true if re-queued.
92 93 94 95 |
# File 'lib/tina4/queue.rb', line 92 def retry(job_id = nil, delay_seconds: 0) # -> bool return false unless @backend.respond_to?(:retry_job) @backend.retry_job(@topic, job_id: job_id, delay_seconds: delay_seconds) end |
#retry_failed(max_retries: nil) ⇒ Object
Re-queue failed messages (under max_retries) back to pending. Returns the number of jobs re-queued.
112 113 114 115 |
# File 'lib/tina4/queue.rb', line 112 def retry_failed(max_retries: nil) # -> int return 0 unless @backend.respond_to?(:retry_failed) @backend.retry_failed(@topic, max_retries: max_retries || @max_retries) end |
#size(status: "pending") ⇒ Object
Get the number of messages by status. status: “pending” (default) counts pending messages in the topic queue. status: “failed” or “dead” counts messages in the dead_letter directory.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/tina4/queue.rb', line 220 def size(status: "pending") case status.to_s when "pending" @backend.size(@topic) when "reserved" @backend.respond_to?(:reserved_count) ? @backend.reserved_count(@topic) : 0 when "failed", "dead" if @backend.respond_to?(:dead_letter_count) @backend.dead_letter_count(@topic) else 0 end else @backend.size(@topic) end end |