Class: Tina4::Queue
- Inherits:
-
Object
- Object
- Tina4::Queue
- Defined in:
- lib/tina4/queue.rb
Overview
Queue — unified wrapper for queue management operations. Auto-detects backend from TINA4_QUEUE_BACKEND env var.
Usage:
# Auto-detect from env (default: lite/file backend)
queue = Queue.new(topic: "tasks")
# Explicit backend
queue = Queue.new(topic: "tasks", backend: :rabbitmq)
# Or pass a backend instance directly (legacy)
queue = Queue.new(topic: "tasks", backend: my_backend)
Instance Attribute Summary collapse
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
- .parse_amqp_url(url) ⇒ Object
-
.resolve_backend(name = nil) ⇒ Object
Resolve the default backend from env vars.
- .resolve_kafka_config ⇒ Object
- .resolve_mongo_config ⇒ Object
- .resolve_rabbitmq_config ⇒ Object
Instance Method Summary collapse
-
#backend ⇒ Object
Get the underlying backend instance.
-
#clear ⇒ Object
Clear all pending jobs from this queue’s topic.
-
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
-
#dead_letters(max_retries: nil) ⇒ Object
Get dead letter jobs — messages that exceeded max retries.
-
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
-
#get_topic ⇒ Object
Get the topic name this queue was constructed with.
-
#initialize(topic:, backend: nil, max_retries: 3) ⇒ Queue
constructor
A new instance of Queue.
-
#pop ⇒ Object
Pop the next available job.
-
#pop_batch(count) ⇒ Object
Pop up to count jobs at once.
-
#pop_by_id(id) ⇒ Object
Pop a specific job by ID from the queue.
-
#process(topic: nil, max_jobs: nil, batch_size: 1, &handler) ⇒ Object
Consume all available jobs and pass each to handler, then stop.
-
#produce(topic, payload, priority: 0, delay_seconds: 0) ⇒ Object
Produce a message onto a topic.
-
#purge(status, max_retries: nil) ⇒ Object
Delete messages by status (completed, failed, dead).
-
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue.
-
#retry(job_id = nil, delay_seconds: 0) ⇒ Object
Retry a specific failed job by ID, or all dead-letter jobs if no id given.
-
#retry_failed(max_retries: nil) ⇒ Object
Re-queue failed messages (under max_retries) back to pending.
-
#size(status: "pending") ⇒ Object
Get the number of messages by status.
Constructor Details
#initialize(topic:, backend: nil, max_retries: 3) ⇒ Queue
Returns a new instance of Queue.
22 23 24 25 26 |
# File 'lib/tina4/queue.rb', line 22 def initialize(topic:, backend: nil, max_retries: 3) @topic = topic @max_retries = max_retries @backend = resolve_backend_arg(backend) end |
Instance Attribute Details
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def max_retries @max_retries end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def topic @topic end |
Class Method Details
.parse_amqp_url(url) ⇒ Object
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/tina4/queue.rb', line 333 def self.parse_amqp_url(url) config = {} url = url.sub("amqp://", "").sub("amqps://", "") if url.include?("@") creds, rest = url.split("@", 2) if creds.include?(":") config[:username], config[:password] = creds.split(":", 2) else config[:username] = creds end else rest = url end if rest.include?("/") hostport, vhost = rest.split("/", 2) config[:vhost] = vhost.start_with?("/") ? vhost : "/#{vhost}" if vhost && !vhost.empty? else hostport = rest end if hostport.include?(":") host, port = hostport.split(":", 2) config[:host] = host config[:port] = port.to_i elsif hostport && !hostport.empty? config[:host] = hostport end config end |
.resolve_backend(name = nil) ⇒ Object
Resolve the default backend from env vars.
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/tina4/queue.rb', line 262 def self.resolve_backend(name = nil) chosen = name || ENV.fetch("TINA4_QUEUE_BACKEND", "file").downcase.strip case chosen.to_s when "lite", "file", "default" Tina4::QueueBackends::LiteBackend.new when "rabbitmq" config = resolve_rabbitmq_config Tina4::QueueBackends::RabbitmqBackend.new(config) when "kafka" config = resolve_kafka_config Tina4::QueueBackends::KafkaBackend.new(config) when "mongodb", "mongo" config = resolve_mongo_config Tina4::QueueBackends::MongoBackend.new(config) else raise ArgumentError, "Unknown queue backend: #{chosen.inspect}. Use 'lite', 'rabbitmq', 'kafka', or 'mongodb'." end end |
.resolve_kafka_config ⇒ Object
305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/tina4/queue.rb', line 305 def self.resolve_kafka_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config[:brokers] = url.sub("kafka://", "") end brokers = ENV["TINA4_KAFKA_BROKERS"] config[:brokers] = brokers if brokers config[:brokers] ||= "localhost:9092" config[:group_id] = ENV.fetch("TINA4_KAFKA_GROUP_ID", "tina4_consumer_group") config end |
.resolve_mongo_config ⇒ Object
318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/tina4/queue.rb', line 318 def self.resolve_mongo_config config = {} uri = ENV["TINA4_MONGO_URI"] config[:uri] = uri if uri config[:host] = ENV.fetch("TINA4_MONGO_HOST", "localhost") unless uri config[:port] = (ENV["TINA4_MONGO_PORT"] || 27017).to_i unless uri username = ENV["TINA4_MONGO_USERNAME"] password = ENV["TINA4_MONGO_PASSWORD"] config[:username] = username if username config[:password] = password if password config[:db] = ENV.fetch("TINA4_MONGO_DB", "tina4") config[:collection] = ENV.fetch("TINA4_MONGO_COLLECTION", "tina4_queue") config end |
.resolve_rabbitmq_config ⇒ Object
291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/tina4/queue.rb', line 291 def self.resolve_rabbitmq_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config = parse_amqp_url(url) end config[:host] ||= ENV.fetch("TINA4_RABBITMQ_HOST", "localhost") config[:port] ||= (ENV["TINA4_RABBITMQ_PORT"] || 5672).to_i config[:username] ||= ENV.fetch("TINA4_RABBITMQ_USERNAME", "guest") config[:password] ||= ENV.fetch("TINA4_RABBITMQ_PASSWORD", "guest") config[:vhost] ||= ENV.fetch("TINA4_RABBITMQ_VHOST", "/") config end |
Instance Method Details
#backend ⇒ Object
Get the underlying backend instance.
257 258 259 |
# File 'lib/tina4/queue.rb', line 257 def backend @backend end |
#clear ⇒ Object
Clear all pending jobs from this queue’s topic. Returns count removed.
59 60 61 62 |
# File 'lib/tina4/queue.rb', line 59 def clear # -> int return 0 unless @backend.respond_to?(:clear) @backend.clear(@topic) end |
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
Usage:
queue.consume("emails") do |job|
process(job)
end
# Consume a specific job by ID:
queue.consume("emails", id: "abc-123") do |job|
process(job)
end
# Or as an enumerator:
queue.consume("emails").each { |job| process(job) }
Consume jobs from a topic using a long-running generator.
Polls the queue continuously. When empty, sleeps for poll_interval seconds before polling again. No external while-loop or sleep needed.
queue.consume("emails") { |job| process(job) }
queue.consume("emails", poll_interval: 5) { |job| process(job) }
queue.consume("emails", id: "abc-123") { |job| process(job) }
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/tina4/queue.rb', line 129 def consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, batch_size: 1, &block) topic ||= @topic if id # Single job by ID — no polling job = pop_by_id(topic, id) if job block_given? ? yield(job) : (return Enumerator.new { |y| y << job }) end return block_given? ? nil : Enumerator.new { |_| } end # poll_interval=0 → single-pass drain (returns when empty) # poll_interval>0 → long-running poll (sleeps when empty, never returns) # iterations>0 → stop after consuming N jobs if block_given? consumed = 0 if batch_size > 1 loop do jobs = pop_batch(batch_size) if jobs.empty? break if poll_interval <= 0 sleep(poll_interval) next end yield jobs consumed += jobs.length break if iterations > 0 && consumed >= iterations end else loop do job = @backend.dequeue(topic) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yield job consumed += 1 break if iterations > 0 && consumed >= iterations end end else Enumerator.new do |yielder| consumed = 0 loop do job = @backend.dequeue(topic) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yielder << job consumed += 1 break if iterations > 0 && consumed >= iterations end end end end |
#dead_letters(max_retries: nil) ⇒ Object
Get dead letter jobs — messages that exceeded max retries. Pass max_retries to override the queue’s default.
79 80 81 82 |
# File 'lib/tina4/queue.rb', line 79 def dead_letters(max_retries: nil) # -> list[dict] return [] unless @backend.respond_to?(:dead_letters) @backend.dead_letters(@topic, max_retries: max_retries || @max_retries) end |
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
65 66 67 68 |
# File 'lib/tina4/queue.rb', line 65 def failed # -> list[dict] return [] unless @backend.respond_to?(:failed) @backend.failed(@topic, max_retries: @max_retries) end |
#get_topic ⇒ Object
Get the topic name this queue was constructed with.
214 215 216 |
# File 'lib/tina4/queue.rb', line 214 def get_topic @topic end |
#pop ⇒ Object
Pop the next available job. Returns Job or nil.
39 40 41 |
# File 'lib/tina4/queue.rb', line 39 def pop # -> Job|None @backend.dequeue(@topic) end |
#pop_batch(count) ⇒ Object
Pop up to count jobs at once. Returns a partial batch if fewer available.
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/tina4/queue.rb', line 44 def pop_batch(count) if @backend.respond_to?(:dequeue_batch) @backend.dequeue_batch(@topic, count) else jobs = [] count.times do job = @backend.dequeue(@topic) break if job.nil? jobs << job end jobs end end |
#pop_by_id(id) ⇒ Object
Pop a specific job by ID from the queue.
190 191 192 193 |
# File 'lib/tina4/queue.rb', line 190 def pop_by_id(id) return nil unless @backend.respond_to?(:find_by_id) @backend.find_by_id(@topic, id) end |
#process(topic: nil, max_jobs: nil, batch_size: 1, &handler) ⇒ Object
Consume all available jobs and pass each to handler, then stop.
Simpler alternative to consume() for drain-and-exit use cases.
queue.process { |job| handle(job); job.complete }
queue.process(topic: "emails", max_jobs: 10) { |job| ... }
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/tina4/queue.rb', line 225 def process(topic: nil, max_jobs: nil, batch_size: 1, &handler) raise ArgumentError, "block required" unless block_given? drain_topic = topic || @topic processed = 0 loop do break if max_jobs && processed >= max_jobs if batch_size > 1 remaining = max_jobs ? [batch_size, max_jobs - processed].min : batch_size jobs = @backend.respond_to?(:dequeue_batch) ? @backend.dequeue_batch(drain_topic, remaining) : (1..remaining).map { @backend.dequeue(drain_topic) }.compact break if jobs.empty? begin handler.call(jobs) rescue => e jobs.each { |job| job.fail(e.) } end processed += jobs.length else job = @backend.dequeue(drain_topic) break if job.nil? begin handler.call(job) rescue => e job.fail(e.) end processed += 1 end end end |
#produce(topic, payload, priority: 0, delay_seconds: 0) ⇒ Object
Produce a message onto a topic. Convenience wrapper around push().
98 99 100 101 102 103 |
# File 'lib/tina4/queue.rb', line 98 def produce(topic, payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = Job.new(topic: topic, payload: payload, priority: priority, available_at: available_at, queue: self) @backend.enqueue() end |
#purge(status, max_retries: nil) ⇒ Object
Delete messages by status (completed, failed, dead).
85 86 87 88 |
# File 'lib/tina4/queue.rb', line 85 def purge(status, max_retries: nil) # -> int return 0 unless @backend.respond_to?(:purge) @backend.purge(@topic, status) end |
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue. Returns the Job. priority: higher-priority messages are dequeued first (default 0). delay_seconds: delay before the message becomes available (default 0).
31 32 33 34 35 36 |
# File 'lib/tina4/queue.rb', line 31 def push(payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = Job.new(topic: @topic, payload: payload, priority: priority, available_at: available_at, queue: self) @backend.enqueue() end |
#retry(job_id = nil, delay_seconds: 0) ⇒ Object
Retry a specific failed job by ID, or all dead-letter jobs if no id given. Returns true if re-queued.
72 73 74 75 |
# File 'lib/tina4/queue.rb', line 72 def retry(job_id = nil, delay_seconds: 0) # -> bool return false unless @backend.respond_to?(:retry_job) @backend.retry_job(@topic, job_id: job_id, delay_seconds: delay_seconds) end |
#retry_failed(max_retries: nil) ⇒ Object
Re-queue failed messages (under max_retries) back to pending. Returns the number of jobs re-queued.
92 93 94 95 |
# File 'lib/tina4/queue.rb', line 92 def retry_failed(max_retries: nil) # -> int return 0 unless @backend.respond_to?(:retry_failed) @backend.retry_failed(@topic, max_retries: max_retries || @max_retries) end |
#size(status: "pending") ⇒ Object
Get the number of messages by status. status: “pending” (default) counts pending messages in the topic queue. status: “failed” or “dead” counts messages in the dead_letter directory.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/tina4/queue.rb', line 198 def size(status: "pending") case status.to_s when "pending" @backend.size(@topic) when "failed", "dead" if @backend.respond_to?(:dead_letter_count) @backend.dead_letter_count(@topic) else 0 end else @backend.size(@topic) end end |