Module: Postburner::Insertion

Extended by:
ActiveSupport::Concern
Included in:
Job
Defined in:
app/concerns/postburner/insertion.rb

Overview

Concern providing queue insertion methods for Postburner jobs.

Handles enqueuing jobs to Beanstalkd (or test strategies), including scheduling, re-queueing, and the internal insertion mechanics via after_save_commit callbacks.

Examples:

Basic queueing

class MyJob < Postburner::Job
  def perform(args)
    # ... work ...
  end
end

job = MyJob.create!(args: { foo: 'bar' })
job.queue!

Scheduled queueing

job.queue!(delay: 1.hour)
job.queue!(at: 2.days.from_now)

Instance Method Summary collapse

Instance Method Details

#queue!(options = {}) ⇒ true

Enqueues the job to Beanstalkd for processing.

Sets queued_at timestamp and optionally run_at for scheduled execution. Triggers enqueue callbacks and inserts job into Beanstalkd via after_save_commit hook. In test mode, executes immediately instead of queueing to Beanstalkd.

Examples:

Queue immediately

job.queue!

Queue with delay

job.queue!(delay: 1.hour)

Queue at specific time

job.queue!(at: Time.zone.now + 2.days)
job.queue!(at: Time.zone.parse('2025-01-15 09:00:00'))
job.queue!(at: '2025-01-15 09:00:00'.in_time_zone)
job.queue!(at: Time.parse('2025-01-15 09:00:00 EST'))

Queue with priority and TTR

job.queue!(priority: 0, ttr: 600)

Queue to specific queue

job.queue!(queue: 'critical', delay: 30.minutes)

Parameters:

  • options (Hash) (defaults to: {})

    Queue options

Options Hash (options):

  • :at (Time, ActiveSupport::Duration)

    Absolute time to run the job

  • :delay (Integer, ActiveSupport::Duration)

    Seconds to delay execution

  • :priority (Integer)

    Priority (0-4294967295, 0 = HIGHEST), sets instance attribute

  • :pri (Integer)

    Beanstalkd priority (pass-through, for backwards compatibility)

  • :ttr (Integer)

    Time-to-run in seconds (1-4294967295, 0 is silently changed to 1)

  • :queue (String)

    Queue name override

Returns:

  • (true)

    on success (including if already queued)

Raises:

  • (ActiveRecord::RecordInvalid)

    if job is not valid

  • (AlreadyProcessed)

    if job was already processed

See Also:



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'app/concerns/postburner/insertion.rb', line 71

def queue!(options={})
  return true if self.queued_at.present? && self.bkid.present?
  raise ActiveRecord::RecordInvalid, "Can't queue unless valid." unless self.valid?
  raise AlreadyProcessed, "Processed at #{self.processed_at}" if self.processed_at

  # Extract and set instance-level overrides
  self.priority = options.delete(:priority) if options.key?(:priority)
  self.ttr = options.delete(:ttr) if options.key?(:ttr)
  self.queue_name = options.delete(:queue) if options.key?(:queue)

  at = options.delete(:at)
  now = Time.current

  self.queued_at = now
  self.run_at = case
                when at.present?
                  # this is rudimentary, add error handling
                  options[:delay] ||= at.to_i - now.to_i
                  at
                when options[:delay].present?
                  now + options[:delay].seconds
                end

  @_insert_options = options

  run_callbacks :enqueue do
    self.save!
  end

  true
end

#requeue!(options = {}) ⇒ true

Re-queues an existing job by removing it from Beanstalkd and queueing again.

Calls #delete! to remove from Beanstalkd, resets queuing metadata, then calls #queue! with new options.

Examples:

Requeue with different delay

job.requeue!(delay: 5.minutes)

Parameters:

  • options (Hash) (defaults to: {})

    Queue options (same as #queue!)

Options Hash (options):

  • :at (Time, ActiveSupport::Duration)

    Absolute time to run the job

  • :delay (Integer, ActiveSupport::Duration)

    Seconds to delay execution

  • :priority (Integer)

    Priority (0-4294967295, lower = higher priority)

  • :ttr (Integer)

    Time-to-run in seconds (1-4294967295)

  • :queue (String)

    Queue name override

Returns:

  • (true)

    on success

Raises:

  • (ActiveRecord::RecordInvalid)

    if job is not valid

  • (Beaneater::NotConnected)

    if Beanstalkd connection fails

See Also:



126
127
128
129
130
131
# File 'app/concerns/postburner/insertion.rb', line 126

def requeue!(options={})
  self.delete!
  self.bkid, self.queued_at = nil, nil

  self.queue! options
end

#will_insert?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Checks if job is flagged for insertion into Beanstalkd.

Set internally by #queue! to trigger insertion via after_save_commit hook.

Returns:

  • (Boolean)

    true if job will be inserted on save



140
141
142
# File 'app/concerns/postburner/insertion.rb', line 140

def will_insert?
  @_insert_options.is_a? Hash
end