Class: Postburner::Job Abstract

Inherits:
ApplicationRecord show all
Includes:
Callbacks, Commands, Execution, Insertion, Logging, Properties, Statistics
Defined in:
app/models/postburner/job.rb

Overview

This class is abstract.

Subclass and implement #perform to define job behavior

TODO:

Add ‘cancelled_at` field to block job execution if present (See opensource/postburner#16)

Note:

Job won’t run unless queued_at is set and is prior to execution time

Note:

Subclasses must implement the #perform method

Base class for all Postburner background jobs using Single Table Inheritance (STI).

Postburner::Job provides a PostgreSQL-backed job queue system built on Beanstalkd. Every job is stored as an ActiveRecord model, enabling database queries, foreign key relationships, and full audit trails.

Examples:

Creating and queueing a job

class ProcessDonation < Postburner::Job
  queue 'critical'
  priority 0

  def perform(args)
    donation = Donation.find(args['donation_id'])
    # ... process donation ...
    log "Processed donation #{donation.id}"
  end
end

job = ProcessDonation.create!(args: { 'donation_id' => 123 })
job.queue!(delay: 1.hour)

With callbacks

class SendEmail < Postburner::Job
  before_enqueue :validate_recipient
  after_processed :track_delivery

  def perform(args)
    UserMailer.welcome(args['email']).deliver_now
  end

  private

  def validate_recipient
    raise "Invalid email" unless args['email'].include?('@')
  end

  def track_delivery
    Analytics.track('email_sent', email: args['email'])
  end
end

Direct Known Subclasses

Mailer, OrphanedJob, TrackedJob

Defined Under Namespace

Classes: AlreadyProcessed, BadFormat, MalformedResponse, OrphanedJobError, PrematurePerform

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Statistics

#elapsed_ms, #intended_at, #stats

Methods included from Execution

#perform!

Methods included from Insertion

#queue!, #requeue!, #will_insert?

Methods included from Commands

#delete!, #extend!, #kick!, #remove!

Methods included from Logging

#log, #log!, #log_exception, #log_exception!

Methods included from Properties

#priority, #queue_name, #retry_delay_for_attempt, #should_retry?, #ttr, #tube_name

Instance Attribute Details

#argsHash (readonly)

JSONB arguments passed to perform

Returns:

  • (Hash)

    the current value of args



72
73
74
# File 'app/models/postburner/job.rb', line 72

def args
  @args
end

#attempt_countInteger (readonly)

Number of execution attempts

Returns:

  • (Integer)

    the current value of attempt_count



72
73
74
# File 'app/models/postburner/job.rb', line 72

def attempt_count
  @attempt_count
end

#attempting_atTime (readonly)

Time first attempt started

Returns:

  • (Time)

    the current value of attempting_at



72
73
74
# File 'app/models/postburner/job.rb', line 72

def attempting_at
  @attempting_at
end

#attemptsArray<Time> (readonly)

Array of attempt timestamps

Returns:

  • (Array<Time>)

    the current value of attempts



72
73
74
# File 'app/models/postburner/job.rb', line 72

def attempts
  @attempts
end

#bkidInteger (readonly)

Beanstalkd job ID (nil in test mode)

Returns:

  • (Integer)

    the current value of bkid



72
73
74
# File 'app/models/postburner/job.rb', line 72

def bkid
  @bkid
end

#durationFloat (readonly)

Milliseconds between processing_at and processed_at

Returns:

  • (Float)

    the current value of duration



72
73
74
# File 'app/models/postburner/job.rb', line 72

def duration
  @duration
end

#errataArray<Hash> (readonly)

Array of error details with timestamps

Returns:

  • (Array<Hash>)

    the current value of errata



72
73
74
# File 'app/models/postburner/job.rb', line 72

def errata
  @errata
end

#error_countInteger (readonly)

Number of errors encountered

Returns:

  • (Integer)

    the current value of error_count



72
73
74
# File 'app/models/postburner/job.rb', line 72

def error_count
  @error_count
end

#idInteger (readonly)

Primary key

Returns:

  • (Integer)

    the current value of id



72
73
74
# File 'app/models/postburner/job.rb', line 72

def id
  @id
end

#lagFloat (readonly)

Milliseconds between intended_at and attempting_at

Returns:

  • (Float)

    the current value of lag



72
73
74
# File 'app/models/postburner/job.rb', line 72

def lag
  @lag
end

#log_countInteger (readonly)

Number of log entries

Returns:

  • (Integer)

    the current value of log_count



72
73
74
# File 'app/models/postburner/job.rb', line 72

def log_count
  @log_count
end

#logsArray<Hash> (readonly)

Array of log entries with timestamps

Returns:

  • (Array<Hash>)

    the current value of logs



72
73
74
# File 'app/models/postburner/job.rb', line 72

def logs
  @logs
end

#processed_atTime (readonly)

Time job completed successfully

Returns:

  • (Time)

    the current value of processed_at



72
73
74
# File 'app/models/postburner/job.rb', line 72

def processed_at
  @processed_at
end

#processing_atTime (readonly)

Time current/last processing started

Returns:

  • (Time)

    the current value of processing_at



72
73
74
# File 'app/models/postburner/job.rb', line 72

def processing_at
  @processing_at
end

#queued_atTime (readonly)

Time job was queued

Returns:

  • (Time)

    the current value of queued_at



72
73
74
# File 'app/models/postburner/job.rb', line 72

def queued_at
  @queued_at
end

#removed_atTime (readonly)

Time job was removed from queue

Returns:

  • (Time)

    the current value of removed_at



72
73
74
# File 'app/models/postburner/job.rb', line 72

def removed_at
  @removed_at
end

#run_atTime (readonly)

Scheduled execution time (nil for immediate)

Returns:

  • (Time)

    the current value of run_at



72
73
74
# File 'app/models/postburner/job.rb', line 72

def run_at
  @run_at
end

#sidString (readonly)

Unique UUID identifier

Returns:

  • (String)

    the current value of sid



72
73
74
# File 'app/models/postburner/job.rb', line 72

def sid
  @sid
end

#typeString (readonly)

STI type column for job subclass

Returns:

  • (String)

    the current value of type



72
73
74
# File 'app/models/postburner/job.rb', line 72

def type
  @type
end

Class Method Details

.find_sti_class(type_name) ⇒ Class

Resolves an STI type name to a class, falling back to OrphanedJob when the original class no longer exists. This tolerates rows whose ‘type` column references a deleted or renamed job class, preventing ActiveRecord::SubclassNotFound from crashing callers (e.g. the admin queued-jobs page).

Parameters:

  • type_name (String)

    The value stored in the ‘type` column

Returns:

  • (Class)

    The resolved class, or OrphanedJob if missing



100
101
102
103
104
# File 'app/models/postburner/job.rb', line 100

def self.find_sti_class(type_name)
  super
rescue ActiveRecord::SubclassNotFound
  Postburner::OrphanedJob
end

Instance Method Details

#bkBeaneater::Job?

Returns the Beanstalkd job object for direct queue operations.

Provides access to the underlying Beaneater job object for advanced Beanstalkd operations. Caches the job object in an instance variable.

Examples:

Direct Beanstalkd operations

bk_job = job.bk
bk_job.stats           # Get job statistics
bk_job.bury            # Bury the job
bk_job.release(pri: 0) # Release with priority

Returns:

  • (Beaneater::Job, nil)

    Beanstalkd job object or nil if no bkid

See Also:



200
201
202
203
204
205
# File 'app/models/postburner/job.rb', line 200

def bk
  return @__job if @__job
  return unless self.bkid.present?

  @__job = Postburner.connection.beanstalk.jobs.find(self.bkid)
end

#bk!Beaneater::Job?

Returns the Beanstalkd job object with cache invalidation.

Same as #bk but clears the cached instance variable first, forcing a fresh lookup from Beanstalkd. Use when job state may have changed.

Examples:

job.bk!  # Forces fresh lookup

Returns:

  • (Beaneater::Job, nil)

    Beanstalkd job object or nil if no bkid

See Also:



233
234
235
236
# File 'app/models/postburner/job.rb', line 233

def bk!
  @__job = nil
  self.bk
end

#bk=(job) ⇒ Object

Sets the Beanstalkd job object directly.

Used by the worker to pass the reserved Beanstalkd job to the model, ensuring that operations like Commands#extend!, Commands#delete!, and Commands#kick! use the actual reserved job rather than a fresh lookup via peek.

Parameters:

  • job (Beaneater::Job)

    Reserved Beanstalkd job object

See Also:



217
218
219
# File 'app/models/postburner/job.rb', line 217

def bk=(job)
  @__job = job
end

#destroyself

Note:

This is a standard Rails method with Beanstalkd integration via callback

Destroys the job record and removes it from Beanstalkd queue.

Uses before_destroy callback to call Commands#delete! which removes the job from Beanstalkd before destroying the ActiveRecord model.

Returns:

  • (self)

    the destroyed job object

Raises:

  • (Beaneater::NotConnected)

    if Beanstalkd connection fails

See Also:



# File 'app/models/postburner/job.rb', line 158

#destroy!self

Note:

This is a standard Rails method with Beanstalkd integration via callback

Destroys the job record and removes it from Beanstalkd queue, raising on failure.

Uses before_destroy callback to call Commands#delete! which removes the job from Beanstalkd before destroying the ActiveRecord model.

Returns:

  • (self)

    the destroyed job object

Raises:

  • (ActiveRecord::RecordNotDestroyed)

    if validations prevent destruction

  • (Beaneater::NotConnected)

    if Beanstalkd connection fails

See Also:



# File 'app/models/postburner/job.rb', line 170

#orphaned?Boolean

Returns whether this job instance is an orphaned placeholder.

Always false on the base class. Overridden to return true in OrphanedJob.

Returns:

  • (Boolean)


113
# File 'app/models/postburner/job.rb', line 113

def orphaned? = false

#perform(args = nil) ⇒ void

This method is abstract.

Subclasses must implement this method

Note:

Use Logging#log or Logging#log! within perform to add entries to the job’s audit trail

Note:

Exceptions will be caught, logged to errata, and re-raised

Note:

Args are always accessible via self.args regardless of method signature

This method returns an undefined value.

Abstract method to be implemented by subclasses.

This method contains the actual work logic for the job. It is called by Execution#perform! within the processing callbacks. Any exceptions raised will be logged and re-raised, causing the job to fail.

Examples:

With args parameter

class ProcessPayment < Postburner::Job
  def perform(args)
    payment = Payment.find(args['payment_id'])
    payment.process!
    log "Payment #{payment.id} processed successfully"
  end
end

Without args parameter (access via self.args)

class CleanupJob < Postburner::Job
  def perform
    log "Cleaning up #{self.args['table']}"
    # self.args is always available
  end
end

Parameters:

  • args (Hash) (defaults to: nil)

    Job arguments from the args JSONB column (optional)

Raises:

  • (NotImplementedError)

    if subclass does not implement this method

See Also:



154
155
156
# File 'app/models/postburner/job.rb', line 154

def perform(args=nil)
  raise NotImplementedError, "Subclasses must implement the perform method"
end