Class: Tina4::Queue
- Inherits:
-
Object
- Object
- Tina4::Queue
- Defined in:
- lib/tina4/queue.rb
Overview
Queue — unified wrapper for queue management operations. Auto-detects backend from TINA4_QUEUE_BACKEND env var.
Usage:
# Auto-detect from env (default: lite/file backend)
queue = Queue.new(topic: "tasks")
# Explicit backend
queue = Queue.new(topic: "tasks", backend: :rabbitmq)
# Or pass a backend instance directly (legacy)
queue = Queue.new(topic: "tasks", backend: my_backend)
Instance Attribute Summary collapse
-
#max_retries ⇒ Object
readonly
Returns the value of attribute max_retries.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
- .parse_amqp_url(url) ⇒ Object
-
.resolve_backend(name = nil) ⇒ Object
Resolve the default backend from env vars.
- .resolve_kafka_config ⇒ Object
- .resolve_mongo_config ⇒ Object
- .resolve_rabbitmq_config ⇒ Object
Instance Method Summary collapse
-
#backend ⇒ Object
Get the underlying backend instance.
-
#clear ⇒ Object
Clear all pending jobs from this queue’s topic.
-
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
-
#dead_letters ⇒ Object
Get dead letter jobs — messages that exceeded max retries.
-
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
-
#initialize(topic:, backend: nil, max_retries: 3) ⇒ Queue
constructor
A new instance of Queue.
-
#pop ⇒ Object
Pop the next available job.
-
#pop_by_id(id) ⇒ Object
Pop a specific job by ID from the queue.
-
#produce(topic, payload, priority = 0) ⇒ Object
Produce a message onto a topic.
-
#purge(status) ⇒ Object
Delete messages by status (completed, failed, dead).
-
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue.
-
#retry(delay_seconds = 0) ⇒ Object
Retry failed jobs on this queue’s topic.
-
#retry_failed ⇒ Object
Re-queue failed messages (under max_retries) back to pending.
-
#size(status: "pending") ⇒ Object
Get the number of messages by status.
Constructor Details
#initialize(topic:, backend: nil, max_retries: 3) ⇒ Queue
Returns a new instance of Queue.
22 23 24 25 26 |
# File 'lib/tina4/queue.rb', line 22 def initialize(topic:, backend: nil, max_retries: 3) @topic = topic @max_retries = max_retries @backend = resolve_backend_arg(backend) end |
Instance Attribute Details
#max_retries ⇒ Object (readonly)
Returns the value of attribute max_retries.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def max_retries @max_retries end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
20 21 22 |
# File 'lib/tina4/queue.rb', line 20 def topic @topic end |
Class Method Details
.parse_amqp_url(url) ⇒ Object
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/tina4/queue.rb', line 258 def self.parse_amqp_url(url) config = {} url = url.sub("amqp://", "").sub("amqps://", "") if url.include?("@") creds, rest = url.split("@", 2) if creds.include?(":") config[:username], config[:password] = creds.split(":", 2) else config[:username] = creds end else rest = url end if rest.include?("/") hostport, vhost = rest.split("/", 2) config[:vhost] = vhost.start_with?("/") ? vhost : "/#{vhost}" if vhost && !vhost.empty? else hostport = rest end if hostport.include?(":") host, port = hostport.split(":", 2) config[:host] = host config[:port] = port.to_i elsif hostport && !hostport.empty? config[:host] = hostport end config end |
.resolve_backend(name = nil) ⇒ Object
Resolve the default backend from env vars.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/tina4/queue.rb', line 187 def self.resolve_backend(name = nil) chosen = name || ENV.fetch("TINA4_QUEUE_BACKEND", "file").downcase.strip case chosen.to_s when "lite", "file", "default" Tina4::QueueBackends::LiteBackend.new when "rabbitmq" config = resolve_rabbitmq_config Tina4::QueueBackends::RabbitmqBackend.new(config) when "kafka" config = resolve_kafka_config Tina4::QueueBackends::KafkaBackend.new(config) when "mongodb", "mongo" config = resolve_mongo_config Tina4::QueueBackends::MongoBackend.new(config) else raise ArgumentError, "Unknown queue backend: #{chosen.inspect}. Use 'lite', 'rabbitmq', 'kafka', or 'mongodb'." end end |
.resolve_kafka_config ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/tina4/queue.rb', line 230 def self.resolve_kafka_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config[:brokers] = url.sub("kafka://", "") end brokers = ENV["TINA4_KAFKA_BROKERS"] config[:brokers] = brokers if brokers config[:brokers] ||= "localhost:9092" config[:group_id] = ENV.fetch("TINA4_KAFKA_GROUP_ID", "tina4_consumer_group") config end |
.resolve_mongo_config ⇒ Object
243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/tina4/queue.rb', line 243 def self.resolve_mongo_config config = {} uri = ENV["TINA4_MONGO_URI"] config[:uri] = uri if uri config[:host] = ENV.fetch("TINA4_MONGO_HOST", "localhost") unless uri config[:port] = (ENV["TINA4_MONGO_PORT"] || 27017).to_i unless uri username = ENV["TINA4_MONGO_USERNAME"] password = ENV["TINA4_MONGO_PASSWORD"] config[:username] = username if username config[:password] = password if password config[:db] = ENV.fetch("TINA4_MONGO_DB", "tina4") config[:collection] = ENV.fetch("TINA4_MONGO_COLLECTION", "tina4_queue") config end |
.resolve_rabbitmq_config ⇒ Object
216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/tina4/queue.rb', line 216 def self.resolve_rabbitmq_config config = {} url = ENV["TINA4_QUEUE_URL"] if url config = parse_amqp_url(url) end config[:host] ||= ENV.fetch("TINA4_RABBITMQ_HOST", "localhost") config[:port] ||= (ENV["TINA4_RABBITMQ_PORT"] || 5672).to_i config[:username] ||= ENV.fetch("TINA4_RABBITMQ_USERNAME", "guest") config[:password] ||= ENV.fetch("TINA4_RABBITMQ_PASSWORD", "guest") config[:vhost] ||= ENV.fetch("TINA4_RABBITMQ_VHOST", "/") config end |
Instance Method Details
#backend ⇒ Object
Get the underlying backend instance.
182 183 184 |
# File 'lib/tina4/queue.rb', line 182 def backend @backend end |
#clear ⇒ Object
Clear all pending jobs from this queue’s topic. Returns count removed.
44 45 46 47 |
# File 'lib/tina4/queue.rb', line 44 def clear # -> int return 0 unless @backend.respond_to?(:clear) @backend.clear(@topic) end |
#consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, &block) ⇒ Object
Consume jobs from a topic using an Enumerator (yield pattern).
Usage:
queue.consume("emails") do |job|
process(job)
end
# Consume a specific job by ID:
queue.consume("emails", id: "abc-123") do |job|
process(job)
end
# Or as an enumerator:
queue.consume("emails").each { |job| process(job) }
Consume jobs from a topic using a long-running generator.
Polls the queue continuously. When empty, sleeps for poll_interval seconds before polling again. No external while-loop or sleep needed.
queue.consume("emails") { |job| process(job) }
queue.consume("emails", poll_interval: 5) { |job| process(job) }
queue.consume("emails", id: "abc-123") { |job| process(job) }
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/tina4/queue.rb', line 111 def consume(topic = nil, id: nil, poll_interval: 1.0, iterations: 0, &block) topic ||= @topic if id # Single job by ID — no polling job = pop_by_id(topic, id) if job block_given? ? yield(job) : (return Enumerator.new { |y| y << job }) end return block_given? ? nil : Enumerator.new { |_| } end # poll_interval=0 → single-pass drain (returns when empty) # poll_interval>0 → long-running poll (sleeps when empty, never returns) # iterations>0 → stop after consuming N jobs if block_given? consumed = 0 loop do job = @backend.dequeue(topic) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yield job consumed += 1 break if iterations > 0 && consumed >= iterations end else Enumerator.new do |yielder| consumed = 0 loop do job = @backend.dequeue(topic) if job.nil? break if poll_interval <= 0 sleep(poll_interval) next end yielder << job consumed += 1 break if iterations > 0 && consumed >= iterations end end end end |
#dead_letters ⇒ Object
Get dead letter jobs — messages that exceeded max retries.
62 63 64 65 |
# File 'lib/tina4/queue.rb', line 62 def dead_letters # -> list[dict] return [] unless @backend.respond_to?(:dead_letters) @backend.dead_letters(@topic, max_retries: @max_retries) end |
#failed ⇒ Object
Get jobs that failed but are still eligible for retry (under max_retries).
50 51 52 53 |
# File 'lib/tina4/queue.rb', line 50 def failed # -> list[dict] return [] unless @backend.respond_to?(:failed) @backend.failed(@topic, max_retries: @max_retries) end |
#pop ⇒ Object
Pop the next available job. Returns Job or nil.
39 40 41 |
# File 'lib/tina4/queue.rb', line 39 def pop # -> Job|None @backend.dequeue(@topic) end |
#pop_by_id(id) ⇒ Object
Pop a specific job by ID from the queue.
158 159 160 161 |
# File 'lib/tina4/queue.rb', line 158 def pop_by_id(id) return nil unless @backend.respond_to?(:find_by_id) @backend.find_by_id(@topic, id) end |
#produce(topic, payload, priority = 0) ⇒ Object
Produce a message onto a topic. Convenience wrapper around push().
81 82 83 84 85 |
# File 'lib/tina4/queue.rb', line 81 def produce(topic, payload, priority = 0) = Job.new(topic: topic, payload: payload, priority: priority) @backend.enqueue() end |
#purge(status) ⇒ Object
Delete messages by status (completed, failed, dead).
68 69 70 71 |
# File 'lib/tina4/queue.rb', line 68 def purge(status) # -> int return 0 unless @backend.respond_to?(:purge) @backend.purge(@topic, status) end |
#push(payload, priority: 0, delay_seconds: 0) ⇒ Object
Push a job onto the queue. Returns the Job. priority: higher-priority messages are dequeued first (default 0). delay_seconds: delay before the message becomes available (default 0).
31 32 33 34 35 36 |
# File 'lib/tina4/queue.rb', line 31 def push(payload, priority: 0, delay_seconds: 0) available_at = delay_seconds > 0 ? Time.now + delay_seconds : nil = Job.new(topic: @topic, payload: payload, priority: priority, available_at: available_at) @backend.enqueue() end |
#retry(delay_seconds = 0) ⇒ Object
Retry failed jobs on this queue’s topic. Returns true if re-queued.
56 57 58 59 |
# File 'lib/tina4/queue.rb', line 56 def retry(delay_seconds = 0) # -> bool return false unless @backend.respond_to?(:retry_job) @backend.retry_job(@topic, delay_seconds: delay_seconds) end |
#retry_failed ⇒ Object
Re-queue failed messages (under max_retries) back to pending. Returns the number of jobs re-queued.
75 76 77 78 |
# File 'lib/tina4/queue.rb', line 75 def retry_failed # -> int return 0 unless @backend.respond_to?(:retry_failed) @backend.retry_failed(@topic, max_retries: @max_retries) end |
#size(status: "pending") ⇒ Object
Get the number of messages by status. status: “pending” (default) counts pending messages in the topic queue. status: “failed” or “dead” counts messages in the dead_letter directory.
166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/tina4/queue.rb', line 166 def size(status: "pending") case status.to_s when "pending" @backend.size(@topic) when "failed", "dead" if @backend.respond_to?(:dead_letter_count) @backend.dead_letter_count(@topic) else 0 end else @backend.size(@topic) end end |