Class: Async::Background::Queue::Store

Inherits:
Object
  • Object
show all
Includes:
Clock
Defined in:
lib/async/background/queue/store.rb

Constant Summary collapse

SCHEMA =
<<~SQL
  PRAGMA auto_vacuum = INCREMENTAL;
  CREATE TABLE IF NOT EXISTS jobs (
    id         INTEGER PRIMARY KEY,
    class_name TEXT    NOT NULL,
    args       TEXT    NOT NULL DEFAULT '[]',
    options    TEXT,
    status     TEXT    NOT NULL DEFAULT 'pending',
    created_at REAL    NOT NULL,
    run_at     REAL    NOT NULL,
    locked_by  INTEGER,
    locked_at  REAL
  );
  CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(run_at, id) WHERE status = 'pending';
SQL
MMAP_SIZE =
268_435_456
PRAGMAS =
->(mmap_size) {
  <<~SQL
    PRAGMA journal_mode       = WAL;
    PRAGMA synchronous        = NORMAL;
    PRAGMA mmap_size          = #{mmap_size};
    PRAGMA cache_size         = -16000;
    PRAGMA temp_store         = MEMORY;
    PRAGMA busy_timeout       = 5000;
    PRAGMA journal_size_limit = 67108864;
  SQL
}.freeze
CLEANUP_INTERVAL =
300
CLEANUP_AGE =
3600

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path: self.class.default_path, mmap: true) ⇒ Store

Returns a new instance of Store.



46
47
48
49
50
51
52
# File 'lib/async/background/queue/store.rb', line 46

def initialize(path: self.class.default_path, mmap: true)
  @path            = path
  @pragma_sql      = PRAGMAS.call(mmap ? MMAP_SIZE : 0).freeze
  @db              = nil
  @schema_checked  = false
  @last_cleanup_at = nil
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



44
45
46
# File 'lib/async/background/queue/store.rb', line 44

def path
  @path
end

Class Method Details

.default_pathObject



125
126
127
# File 'lib/async/background/queue/store.rb', line 125

def self.default_path
  "async_background_queue.db"
end

Instance Method Details

#closeObject



116
117
118
119
120
121
122
123
# File 'lib/async/background/queue/store.rb', line 116

def close
  return unless @db && !@db.closed?

  finalize_statements
  @db.execute("PRAGMA optimize") rescue nil
  @db.close
  @db = nil
end

#complete(job_id) ⇒ Object



82
83
84
85
# File 'lib/async/background/queue/store.rb', line 82

def complete(job_id)
  ensure_connection
  @complete_stmt.execute(job_id)
end

#enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object



64
65
66
67
68
69
# File 'lib/async/background/queue/store.rb', line 64

def enqueue(class_name, args = [], run_at = nil, options: {})
  ensure_connection
  now = realtime_now
  @enqueue_stmt.execute(class_name, JSON.generate(args), dump_options(options), now, run_at || now)
  @db.last_insert_row_id
end

#ensure_database!Object



54
55
56
57
58
59
60
61
62
# File 'lib/async/background/queue/store.rb', line 54

def ensure_database!
  require_sqlite3
  db = SQLite3::Database.new(@path)
  configure_database(db)
  db.execute_batch(SCHEMA)
  db.execute("PRAGMA wal_checkpoint(TRUNCATE)")
  db.close
  @schema_checked = true
end

#fail(job_id) ⇒ Object



87
88
89
90
# File 'lib/async/background/queue/store.rb', line 87

def fail(job_id)
  ensure_connection
  @fail_stmt.execute(job_id)
end

#fetch(worker_id) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/async/background/queue/store.rb', line 71

def fetch(worker_id)
  ensure_connection
  now = realtime_now

  row = transaction { with_stmt(@fetch_stmt) { |s| s.execute(worker_id, now, now).first } }
  return unless row

  maybe_cleanup
  { id: row[0], class_name: row[1], args: JSON.parse(row[2]), options: load_options(row[3]) }
end

#recover(worker_id) ⇒ Object



110
111
112
113
114
# File 'lib/async/background/queue/store.rb', line 110

def recover(worker_id)
  ensure_connection
  @requeue_stmt.execute(worker_id)
  @db.changes
end

#retry_or_fail(job_id, fallback_options: nil) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/async/background/queue/store.rb', line 92

def retry_or_fail(job_id, fallback_options: nil)
  ensure_connection

  transaction do
    stored = with_stmt(@retry_state_stmt) { |s| load_options(s.execute(job_id).first&.first) }
    policy = stored.empty? ? normalize_options(fallback_options) : Job::Options.new(**stored)

    if policy&.retry? && policy.next_attempt <= policy.retry
      advanced = policy.with_attempt(policy.next_attempt)
      @retry_stmt.execute(realtime_now + advanced.next_retry_delay(advanced.attempt), dump_options(advanced.to_h.compact), job_id)
      :retried
    else
      @fail_stmt.execute(job_id)
      :failed
    end
  end
end