Class: Async::Background::Queue::Store

Inherits:
Object
  • Object
show all
Includes:
Clock
Defined in:
lib/async/background/queue/store.rb

Constant Summary collapse

SCHEMA =
<<~SQL
  PRAGMA auto_vacuum = INCREMENTAL;
  CREATE TABLE IF NOT EXISTS jobs (
    id         INTEGER PRIMARY KEY,
    class_name TEXT    NOT NULL,
    args       TEXT    NOT NULL DEFAULT '[]',
    options    TEXT,
    status     TEXT    NOT NULL DEFAULT 'pending',
    created_at REAL    NOT NULL,
    run_at     REAL    NOT NULL,
    locked_by  INTEGER,
    locked_at  REAL
  );
  CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(run_at, id) WHERE status = 'pending';
SQL
MMAP_SIZE =
268_435_456
PRAGMAS =
->(mmap_size) {
  <<~SQL
    PRAGMA journal_mode       = WAL;
    PRAGMA synchronous        = NORMAL;
    PRAGMA mmap_size          = #{mmap_size};
    PRAGMA cache_size         = -16000;
    PRAGMA temp_store         = MEMORY;
    PRAGMA busy_timeout       = 5000;
    PRAGMA journal_size_limit = 67108864;
  SQL
}.freeze
CLEANUP_INTERVAL =
300
CLEANUP_AGE =
3600

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path: self.class.default_path, mmap: true) ⇒ Store

Returns a new instance of Store.



46
47
48
49
50
51
52
53
# File 'lib/async/background/queue/store.rb', line 46

def initialize(path: self.class.default_path, mmap: true)
  @path = path
  @mmap = mmap
  @pragma_sql = PRAGMAS.call(mmap ? MMAP_SIZE : 0).freeze
  @db   = nil
  @schema_checked = false
  @last_cleanup_at = nil
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



44
45
46
# File 'lib/async/background/queue/store.rb', line 44

def path
  @path
end

Class Method Details

.default_pathObject



121
122
123
# File 'lib/async/background/queue/store.rb', line 121

def self.default_path
  "async_background_queue.db"
end

Instance Method Details

#closeObject



112
113
114
115
116
117
118
119
# File 'lib/async/background/queue/store.rb', line 112

def close
  return unless @db && !@db.closed?

  finalize_statements
  @db.execute("PRAGMA optimize") rescue nil
  @db.close
  @db = nil
end

#complete(job_id) ⇒ Object



96
97
98
99
# File 'lib/async/background/queue/store.rb', line 96

def complete(job_id)
  ensure_connection
  @complete_stmt.execute(job_id)
end

#enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object



66
67
68
69
70
71
72
# File 'lib/async/background/queue/store.rb', line 66

def enqueue(class_name, args = [], run_at = nil, options: {})
  ensure_connection
  run_at ||= realtime_now
  options_json = options.empty? ? nil : JSON.generate(options)
  @enqueue_stmt.execute(class_name, JSON.generate(args), options_json, realtime_now, run_at)
  @db.last_insert_row_id
end

#ensure_database!Object



55
56
57
58
59
60
61
62
63
64
# File 'lib/async/background/queue/store.rb', line 55

def ensure_database!
  require_sqlite3
  db = SQLite3::Database.new(@path)
  db.execute('PRAGMA busy_timeout = 5000')
  db.execute_batch(@pragma_sql)
  db.execute_batch(SCHEMA)
  db.execute("PRAGMA wal_checkpoint(TRUNCATE)")
  db.close
  @schema_checked = true
end

#fail(job_id) ⇒ Object



101
102
103
104
# File 'lib/async/background/queue/store.rb', line 101

def fail(job_id)
  ensure_connection
  @fail_stmt.execute(job_id)
end

#fetch(worker_id) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/async/background/queue/store.rb', line 74

def fetch(worker_id)
  ensure_connection
  now = realtime_now
  @db.execute("BEGIN IMMEDIATE")

  begin
    results = @fetch_stmt.execute(worker_id, now, now)
    row = results.first
  ensure
    @fetch_stmt.reset! rescue nil
  end

  @db.execute("COMMIT")
  return unless row

  maybe_cleanup
  options = row[3] ? JSON.parse(row[3], symbolize_names: true) : {}
  { id: row[0], class_name: row[1], args: JSON.parse(row[2]), options: options }
rescue
  @db.execute("ROLLBACK") rescue nil
  raise
end

#recover(worker_id) ⇒ Object



106
107
108
109
110
# File 'lib/async/background/queue/store.rb', line 106

def recover(worker_id)
  ensure_connection
  @requeue_stmt.execute(worker_id)
  @db.changes
end