Class: Tina4::Queue
- Inherits:
-
Object
- Object
- Tina4::Queue
- Defined in:
- lib/tina4/queue.rb
Overview
Queue — unified wrapper for queue management operations. Auto-detects backend from TINA4_QUEUE_BACKEND env var.
Usage:
# Auto-detect from env (default: lite/file backend)
queue = Queue.new(topic: "tasks")
# Explicit backend
queue = Queue.new(topic: "tasks", backend: :rabbitmq)
# Or pass a backend instance directly (legacy)
queue = Queue.new(topic: "tasks", backend: my_backend)
Instance Attribute Summary collapse
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#retry_backoff ⇒ Object
readonly
Returns the value of attribute retry_backoff.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
- .parse_amqp_url(url) ⇒ Object
-
.resolve_backend(name = nil, max_retries: 3, retry_backoff: 0) ⇒ Object
Resolve the default backend from env vars.
- .resolve_kafka_config ⇒ Object
- .resolve_mongo_config ⇒ Object
- .resolve_rabbitmq_config ⇒ Object
Instance Method Summary collapse
-
#backend ⇒ Object
Get the underlying backend instance.
-
#clear ⇒ Object
Clear all pending jobs from this queue’s topic.
-
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
-
#dead_letters(max_retries: nil) ⇒ Object
Get dead letter jobs — messages that exceeded max retries.
-
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
-
#get_topic ⇒ Object
Get the topic name this queue was constructed with.
-
#initialize(topic:, backend: nil, max_retries: 3, retry_backoff: 0) ⇒ Queue
constructor
A new instance of Queue.
-
#pop ⇒ Object
Pop the next available job.
-
#pop_batch(count) ⇒ Object
Pop up to count jobs at once.
-
#pop_by_id(topic = nil, id) ⇒ Object
Pop a specific job by ID from the queue.
-
#process(topic: nil, max_jobs: nil, batch_size: 1, &handler) ⇒ Object
Consume all available jobs and pass each to handler, then stop.
-
#produce(topic, payload, priority: 0, delay_seconds: 0) ⇒ Object
Produce a message onto a topic.
-
#purge(status, max_retries: nil) ⇒ Object
Delete messages by status (completed, failed, dead).
-
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue.
-
#retry(job_id = nil, delay_seconds: 0) ⇒ Object
Retry a specific failed job by ID, or all dead-letter jobs if no id given.
-
#retry_failed(max_retries: nil) ⇒ Object
Re-queue failed messages (under max_retries) back to pending.
-
#size(status: "pending") ⇒ Object
Get the number of messages by status.
Constructor Details
#initialize(topic:, backend: nil, max_retries: 3, retry_backoff: 0) ⇒ Queue
Returns a new instance of Queue.
22 23 24 25 26 27 28 29 |
# File 'lib/tina4/queue.rb', line 22 def initialize(topic:, backend: nil, max_retries: 3, retry_backoff: 0) @topic = topic @max_retries = max_retries # Seconds to wait before a failed job is re-attempted (lite backend). # Default 0 = retry on the very next pop/consume iteration. @retry_backoff = retry_backoff @backend = resolve_backend_arg(backend) end |
Instance Attribute Details
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def max_retries @max_retries end |
#retry_backoff ⇒ Object (readonly)
Returns the value of attribute retry_backoff.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def retry_backoff @retry_backoff end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def topic @topic end |
Class Method Details
.parse_amqp_url(url) ⇒ Object
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/tina4/queue.rb', line 353 def self.parse_amqp_url(url) config = {} url = url.sub("amqp://", "").sub("amqps://", "") if url.include?("@") creds, rest = url.split("@", 2) if creds.include?(":") config[:username], config[:password] = creds.split(":", 2) else config[:username] = creds end else rest = url end if rest.include?("/") hostport, vhost = rest.split("/", 2) config[:vhost] = vhost.start_with?("/") ? vhost : "/#{vhost}" if vhost && !vhost.empty? else hostport = rest end if hostport.include?(":") host, port = hostport.split(":", 2) config[:host] = host config[:port] = port.to_i elsif hostport && !hostport.empty? config[:host] = hostport end config end |
.resolve_backend(name = nil, max_retries: 3, retry_backoff: 0) ⇒ Object
Resolve the default backend from env vars.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/tina4/queue.rb', line 270 def self.resolve_backend(name = nil, max_retries: 3, retry_backoff: 0) chosen = name || ENV.fetch("TINA4_QUEUE_BACKEND", "file").downcase.strip case chosen.to_s when "lite", "file", "default" Tina4::QueueBackends::LiteBackend.new(max_retries: max_retries, retry_backoff: retry_backoff) when "rabbitmq" config = resolve_rabbitmq_config Tina4::QueueBackends::RabbitmqBackend.new(config) when "kafka" config = resolve_kafka_config Tina4::QueueBackends::KafkaBackend.new(config) when "mongodb", "mongo" config = resolve_mongo_config Tina4::QueueBackends::MongoBackend.new(config) else raise ArgumentError, "Unknown queue backend: #{chosen.inspect}. Use 'lite', 'rabbitmq', 'kafka', or 'mongodb'." end end |
.resolve_kafka_config ⇒ Object
325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/tina4/queue.rb', line 325 def self.resolve_kafka_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config[:brokers] = url.sub("kafka://", "") end brokers = ENV["TINA4_KAFKA_BROKERS"] config[:brokers] = brokers if brokers config[:brokers] ||= "localhost:9092" config[:group_id] = ENV.fetch("TINA4_KAFKA_GROUP_ID", "tina4_consumer_group") config end |
.resolve_mongo_config ⇒ Object
338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/tina4/queue.rb', line 338 def self.resolve_mongo_config config = {} uri = ENV["TINA4_MONGO_URI"] config[:uri] = uri if uri config[:host] = ENV.fetch("TINA4_MONGO_HOST", "localhost") unless uri config[:port] = (ENV["TINA4_MONGO_PORT"] || 27017).to_i unless uri username = ENV["TINA4_MONGO_USERNAME"] password = ENV["TINA4_MONGO_PASSWORD"] config[:username] = username if username config[:password] = password if password config[:db] = ENV.fetch("TINA4_MONGO_DB", "tina4") config[:collection] = ENV.fetch("TINA4_MONGO_COLLECTION", "tina4_queue") config end |
.resolve_rabbitmq_config ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/tina4/queue.rb', line 311 def self.resolve_rabbitmq_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config = parse_amqp_url(url) end config[:host] ||= ENV.fetch("TINA4_RABBITMQ_HOST", "localhost") config[:port] ||= (ENV["TINA4_RABBITMQ_PORT"] || 5672).to_i config[:username] ||= ENV.fetch("TINA4_RABBITMQ_USERNAME", "guest") config[:password] ||= ENV.fetch("TINA4_RABBITMQ_PASSWORD", "guest") config[:vhost] ||= ENV.fetch("TINA4_RABBITMQ_VHOST", "/") config end |
Instance Method Details
#backend ⇒ Object
Get the underlying backend instance.
265 266 267 |
# File 'lib/tina4/queue.rb', line 265 def backend @backend end |
#clear ⇒ Object
Clear all pending jobs from this queue’s topic. Returns count removed.
65 66 67 68 |
# File 'lib/tina4/queue.rb', line 65 def clear # -> int return 0 unless @backend.respond_to?(:clear) @backend.clear(@topic) end |
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
Usage:
queue.consume("emails") do |job|
process(job)
end
# Consume a specific job by ID:
queue.consume("emails", id: "abc-123") do |job|
process(job)
end
# Or as an enumerator:
queue.consume("emails").each { |job| process(job) }
Consume jobs from a topic using a long-running generator.
Polls the queue continuously. When empty, sleeps for poll_interval seconds before polling again. No external while-loop or sleep needed.
queue.consume("emails") { |job| process(job) }
queue.consume("emails", poll_interval: 5) { |job| process(job) }
queue.consume("emails", id: "abc-123") { |job| process(job) }
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/tina4/queue.rb', line 135 def consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) topic ||= @topic if id # Single job by ID — no polling job = pop_by_id(topic, id) if job block_given? ? yield(job) : (return Enumerator.new { |y| y << job }) end return block_given? ? nil : Enumerator.new { |_| } end # poll_interval=0 → single-pass drain (returns when empty) # poll_interval>0 → long-running poll (sleeps when empty, never returns) # iterations>0 → stop after consuming N jobs if block_given? consumed = 0 if batch_size > 1 loop do jobs = pop_batch(batch_size) if jobs.empty? break if poll_interval <= 0 sleep(poll_interval) next end yield jobs consumed += jobs.length break if iterations > 0 && consumed >= iterations end else loop do job = attach(@backend.dequeue(topic)) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yield job consumed += 1 break if iterations > 0 && consumed >= iterations end end else Enumerator.new do |yielder| consumed = 0 loop do job = attach(@backend.dequeue(topic)) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yielder << job consumed += 1 break if iterations > 0 && consumed >= iterations end end end end |
#dead_letters(max_retries: nil) ⇒ Object
Get dead letter jobs — messages that exceeded max retries. Pass max_retries to override the queue’s default.
85 86 87 88 |
# File 'lib/tina4/queue.rb', line 85 def dead_letters(max_retries: nil) # -> list[dict] return [] unless @backend.respond_to?(:dead_letters) @backend.dead_letters(@topic, max_retries: max_retries || @max_retries) end |
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
71 72 73 74 |
# File 'lib/tina4/queue.rb', line 71 def failed # -> list[dict] return [] unless @backend.respond_to?(:failed) @backend.failed(@topic, max_retries: @max_retries) end |
#get_topic ⇒ Object
Get the topic name this queue was constructed with.
222 223 224 |
# File 'lib/tina4/queue.rb', line 222 def get_topic @topic end |
#pop ⇒ Object
Pop the next available job. Returns Job or nil.
42 43 44 |
# File 'lib/tina4/queue.rb', line 42 def pop # -> Job|None attach(@backend.dequeue(@topic)) end |
#pop_batch(count) ⇒ Object
Pop up to count jobs at once. Returns a partial batch if fewer available.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/tina4/queue.rb', line 47 def pop_batch(count) jobs = if @backend.respond_to?(:dequeue_batch) @backend.dequeue_batch(@topic, count) else collected = [] count.times do job = @backend.dequeue(@topic) break if job.nil? collected << job end collected end jobs.each { |job| attach(job) } jobs end |
#pop_by_id(topic = nil, id) ⇒ Object
Pop a specific job by ID from the queue. Searches the pending queue for the given topic (defaults to this queue’s topic). Returns the matching Job (claimed/removed from the queue) or nil.
198 199 200 201 |
# File 'lib/tina4/queue.rb', line 198 def pop_by_id(topic = nil, id) return nil unless @backend.respond_to?(:find_by_id) attach(@backend.find_by_id(topic || @topic, id)) end |
#process(topic: nil, max_jobs: nil, batch_size: 1, &handler) ⇒ Object
Consume all available jobs and pass each to handler, then stop.
Simpler alternative to consume() for drain-and-exit use cases.
queue.process { |job| handle(job); job.complete }
queue.process(topic: "emails", max_jobs: 10) { |job| ... }
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/tina4/queue.rb', line 233 def process(topic: nil, max_jobs: nil, batch_size: 1, &handler) raise ArgumentError, "block required" unless block_given? drain_topic = topic || @topic processed = 0 loop do break if max_jobs && processed >= max_jobs if batch_size > 1 remaining = max_jobs ? [batch_size, max_jobs - processed].min : batch_size jobs = @backend.respond_to?(:dequeue_batch) ? @backend.dequeue_batch(drain_topic, remaining) : (1..remaining).map { @backend.dequeue(drain_topic) }.compact break if jobs.empty? begin handler.call(jobs) rescue => e jobs.each { |job| job.fail(e.) } end processed += jobs.length else job = @backend.dequeue(drain_topic) break if job.nil? begin handler.call(job) rescue => e job.fail(e.) end processed += 1 end end end |
#produce(topic, payload, priority: 0, delay_seconds: 0) ⇒ Object
Produce a message onto a topic. Convenience wrapper around push().
104 105 106 107 108 109 |
# File 'lib/tina4/queue.rb', line 104 def produce(topic, payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = Job.new(topic: topic, payload: payload, priority: priority, available_at: available_at, queue: self) @backend.enqueue() end |
#purge(status, max_retries: nil) ⇒ Object
Delete messages by status (completed, failed, dead).
91 92 93 94 |
# File 'lib/tina4/queue.rb', line 91 def purge(status, max_retries: nil) # -> int return 0 unless @backend.respond_to?(:purge) @backend.purge(@topic, status) end |
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue. Returns the Job. priority: higher-priority messages are dequeued first (default 0). delay_seconds: delay before the message becomes available (default 0).
34 35 36 37 38 39 |
# File 'lib/tina4/queue.rb', line 34 def push(payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = Job.new(topic: @topic, payload: payload, priority: priority, available_at: available_at, queue: self) @backend.enqueue() end |
#retry(job_id = nil, delay_seconds: 0) ⇒ Object
Retry a specific failed job by ID, or all dead-letter jobs if no id given. Returns true if re-queued.
78 79 80 81 |
# File 'lib/tina4/queue.rb', line 78 def retry(job_id = nil, delay_seconds: 0) # -> bool return false unless @backend.respond_to?(:retry_job) @backend.retry_job(@topic, job_id: job_id, delay_seconds: delay_seconds) end |
#retry_failed(max_retries: nil) ⇒ Object
Re-queue failed messages (under max_retries) back to pending. Returns the number of jobs re-queued.
98 99 100 101 |
# File 'lib/tina4/queue.rb', line 98 def retry_failed(max_retries: nil) # -> int return 0 unless @backend.respond_to?(:retry_failed) @backend.retry_failed(@topic, max_retries: max_retries || @max_retries) end |
#size(status: "pending") ⇒ Object
Get the number of messages by status. status: “pending” (default) counts pending messages in the topic queue. status: “failed” or “dead” counts messages in the dead_letter directory.
206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/tina4/queue.rb', line 206 def size(status: "pending") case status.to_s when "pending" @backend.size(@topic) when "failed", "dead" if @backend.respond_to?(:dead_letter_count) @backend.dead_letter_count(@topic) else 0 end else @backend.size(@topic) end end |