Class: Postburner::Tube

Inherits:
Object
  • Object
show all
Defined in:
lib/postburner/tube.rb

Overview

Note:

Beanstalkd is not designed for job enumeration or searching. It is optimized for fast job reservation, not inspection. This class iterates through job IDs sequentially which is inherently inefficient. Use this only for debugging and development purposes, not for production workloads or high-volume introspection of queues.

A wrapper around Beaneater tubes for inspecting Beanstalkd queues.

Provides methods to enumerate tubes and paginate through jobs, primarily used by the Postburner web UI for job inspection.

Examples:

List all tubes

tubes = Postburner::Tube.all
tubes.each { |tube| puts tube.peek_ids }

Paginate through jobs in a tube

tube = Postburner::Tube.all.first
first_page = tube.jobs(20)
second_page = tube.jobs(20, after: first_page.last.id)

See Also:

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tube) ⇒ Tube

Initialize a new Tube wrapper.

Parameters:

  • tube (Beaneater::Tube)

    The underlying Beaneater tube object



27
28
29
# File 'lib/postburner/tube.rb', line 27

def initialize(tube)
  @tube = tube
end

Class Method Details

.allArray<Postburner::Tube>

Get all tubes as Postburner::Tube instances.

Examples:

Postburner::Tube.all
# => [#<Postburner::Tube>, #<Postburner::Tube>, ...]

Returns:



38
39
40
# File 'lib/postburner/tube.rb', line 38

def self.all
  Postburner.connection.tubes.all.map { |tube| self.new(tube) }
end

.peek_idsArray<Integer>

Get all peeked job IDs across all known tubes.

Useful for finding the minimum known job ID when starting pagination.

Examples:

Postburner::Tube.peek_ids
# => [1, 5, 12, 47, 89, 102]

Returns:

  • (Array<Integer>)

    Sorted array of job IDs from all tubes



51
52
53
# File 'lib/postburner/tube.rb', line 51

def self.peek_ids
  self.all.map(&:peek_ids).flatten.sort
end

Instance Method Details

#jobs(count = 20, limit: 1000, after: nil) ⇒ Array<Beaneater::Job>

Get a paginated array of jobs from this tube.

Efficiently retrieves jobs by starting from known IDs and iterating until the requested count is fulfilled. Supports cursor-based pagination via the after parameter.

Examples:

Get first page of jobs

tube.jobs(20)
# => [#<Beaneater::Job id=1>, #<Beaneater::Job id=5>, ...]

Access job properties

jobs = tube.jobs(5)
jobs.first.id      # => 1
jobs.first.body    # => "{\"job_class\":\"MyJob\",...}"
jobs.first.stats   # => {"tube"=>"default", "state"=>"ready", ...}

Paginate through jobs

page1 = tube.jobs(20)
page2 = tube.jobs(20, after: page1.last.id)
# => [#<Beaneater::Job id=25>, #<Beaneater::Job id=26>, ...]

Parameters:

  • count (Integer) (defaults to: 20)

    Number of jobs to return (default: 20)

  • limit (Integer) (defaults to: 1000)

    Maximum ID range to scan (default: 1000)

  • after (Integer, nil) (defaults to: nil)

    Return jobs after this ID for pagination

Returns:

  • (Array<Beaneater::Job>)

    Jobs from this tube



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/postburner/tube.rb', line 94

def jobs(count=20, limit: 1000, after: nil)
  # Note: beaneater transforms hyphenated beanstalkd stats to underscores
  stats = @tube.stats
  tube_name = stats.name

  jobs = Array.new

  min_known = (
    peek_ids.any? ? self.peek_ids.min : self.class.peek_ids.min
  ).to_i
  min = after ? after.to_i + 1 : min_known
  max = min + limit

  for i in min..max
    job = @tube.client.jobs.find(i)
    if job
      job_stats = job.stats
      jobs << job if job_stats.tube == tube_name
    end
    break if jobs.length >= count
  end

  return jobs
end

#peek_idsArray<Integer>

Get peeked job IDs from this tube.

Peeks at buried, ready, and delayed states to find known job IDs.

Examples:

tube = Postburner::Tube.all.first
ids = tube.peek_ids  # => [1, 5, 12]

Returns:

  • (Array<Integer>)

    Job IDs visible via peek operations



64
65
66
67
# File 'lib/postburner/tube.rb', line 64

def peek_ids
  [ :buried, :ready, :delayed ].map { |type| @tube.peek(type) }.
    reject(&:nil?).map(&:id).map(&:to_i)
end