Class: Async::Background::Queue::Store

Inherits:
Object
  • Object
show all
Includes:
Clock
Defined in:
lib/async/background/queue/store.rb,
lib/async/background/queue/schema.rb

Defined Under Namespace

Classes: SchemaError

Constant Summary collapse

SCHEMA_VERSION =
Schema::VERSION
MIGRATION_BUSY_TIMEOUT_MS =
Schema::MIGRATION_BUSY_TIMEOUT_MS
REQUIRED_INDEXES =
Schema::REQUIRED_INDEXES
SCHEMA =
SQL::CREATE_SCHEMA
CLEANUP_INTERVAL =
300
CLEANUP_AGE =
3600
FAILED_RETENTION_AGE =
7 * 24 * 3600
ERROR_MESSAGE_MAX_LEN =
2_000
EMPTY_ARGS_JSON =
'[]'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path: self.class.default_path, options: {}) ⇒ Store

Returns a new instance of Store.



48
49
50
51
52
53
54
55
# File 'lib/async/background/queue/store.rb', line 48

def initialize(path: self.class.default_path, options: {})
  @path = path
  @options = StoreOptions.build(options)
  @pragma_sql = @options.pragma_sql.freeze
  @db = nil
  @schema_checked = false
  @last_cleanup_at = nil
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



28
29
30
# File 'lib/async/background/queue/store.rb', line 28

def options
  @options
end

#pathObject (readonly)

Returns the value of attribute path.



28
29
30
# File 'lib/async/background/queue/store.rb', line 28

def path
  @path
end

Class Method Details

.default_pathObject



46
# File 'lib/async/background/queue/store.rb', line 46

def self.default_path = 'async_background_queue.db'

.migrate!(path: default_path, options: {}) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/async/background/queue/store.rb', line 30

def self.migrate!(path: default_path, options: {})
  store = new(path: path, options: options)
  store.migrate!
  SCHEMA_VERSION
ensure
  store&.close
end

.prepare_dashboard!(path: default_path, options: {}) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/async/background/queue/store.rb', line 38

def self.prepare_dashboard!(path: default_path, options: {})
  store = new(path: path, options: options)
  store.prepare_dashboard!
  SCHEMA_VERSION
ensure
  store&.close
end

Instance Method Details

#closeObject



163
164
165
166
167
168
169
170
171
# File 'lib/async/background/queue/store.rb', line 163

def close
  return unless connected?

  finalize_statements
  @db.execute(SQL::OPTIMIZE) rescue nil
  @db.close
  @db = nil
  @schema_checked = false
end

#complete(job_id, claim_token:, finished_at: realtime_now, duration_ms: nil) ⇒ Object



107
108
109
110
111
# File 'lib/async/background/queue/store.rb', line 107

def complete(job_id, claim_token:, finished_at: realtime_now, duration_ms: nil)
  ensure_connection
  @complete_stmt.execute(finished_at, duration_ms, job_id, claim_token)
  @db.changes.positive?
end

#data_versionObject



158
159
160
161
# File 'lib/async/background/queue/store.rb', line 158

def data_version
  ensure_connection
  @db.get_first_value(SQL::DATA_VERSION).to_i
end

#enqueue(class_name, args = EMPTY_ARGS, run_at = nil, options: EMPTY_OPTIONS) ⇒ Object



80
81
82
83
84
85
# File 'lib/async/background/queue/store.rb', line 80

def enqueue(class_name, args = EMPTY_ARGS, run_at = nil, options: EMPTY_OPTIONS)
  ensure_connection
  now = realtime_now
  @enqueue_stmt.execute(class_name, dump_args(args), dump_options(options), now, run_at || now)
  @db.last_insert_row_id
end

#fail(job_id, claim_token:, error_class: nil, error_message: nil, finished_at: realtime_now, duration_ms: nil) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/async/background/queue/store.rb', line 113

def fail(job_id, claim_token:, error_class: nil, error_message: nil, finished_at: realtime_now, duration_ms: nil)
  ensure_connection
  @fail_stmt.execute(
    finished_at,
    duration_ms,
    error_class&.to_s,
    truncate_message(error_message),
    job_id,
    claim_token
  )
  @db.changes.positive?
end

#fetch(worker_id) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/async/background/queue/store.rb', line 87

def fetch(worker_id)
  ensure_connection
  token = generate_claim_token
  now = realtime_now

  row = transaction do
    with_statement(@fetch_stmt) { |statement| statement.execute(worker_id, now, token, now).first }
  end
  return unless row

  maybe_cleanup
  job_from_row(row, token)
end

#mark_started!(job_id, claim_token:, started_at: realtime_now) ⇒ Object



101
102
103
104
105
# File 'lib/async/background/queue/store.rb', line 101

def mark_started!(job_id, claim_token:, started_at: realtime_now)
  ensure_connection
  @mark_started_stmt.execute(started_at, job_id, claim_token)
  @db.changes.positive?
end

#migrate!Object Also known as: ensure_database!

Raises:



57
58
59
60
61
62
63
# File 'lib/async/background/queue/store.rb', line 57

def migrate!
  raise SchemaError, 'close the Store before calling migrate!' if connected?

  with_database { |db| migrate_database!(db) }
  @schema_checked = true
  self
end

#next_pending_run_atObject



153
154
155
156
# File 'lib/async/background/queue/store.rb', line 153

def next_pending_run_at
  ensure_connection
  with_statement(@next_pending_stmt) { |statement| statement.execute.first&.first }
end

#prepare_dashboard!Object

Raises:



67
68
69
70
71
72
73
# File 'lib/async/background/queue/store.rb', line 67

def prepare_dashboard!
  raise SchemaError, 'close the Store before calling prepare_dashboard!' if connected?

  with_database { |db| Schema.prepare_dashboard!(db) }
  @schema_checked = true
  self
end

#recover(worker_id) ⇒ Object



147
148
149
150
151
# File 'lib/async/background/queue/store.rb', line 147

def recover(worker_id)
  ensure_connection
  @requeue_stmt.execute(worker_id)
  @db.changes
end

#retry_or_fail(job_id, claim_token:, error_class: nil, error_message: nil, fallback_options: nil, finished_at: realtime_now, duration_ms: nil) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/async/background/queue/store.rb', line 126

def retry_or_fail(
  job_id,
  claim_token:,
  error_class: nil,
  error_message: nil,
  fallback_options: nil,
  finished_at: realtime_now,
  duration_ms: nil
)
  ensure_connection

  transaction do
    stored_options = stored_options_for(job_id, claim_token)
    next unless lease_alive?(job_id, claim_token)

    policy = retry_policy(stored_options, fallback_options)
    policy_retries?(policy) ? retry_job!(job_id, claim_token, policy, error_class, error_message) :
      fail_job!(job_id, claim_token, error_class, error_message, finished_at, duration_ms)
  end
end

#schema_versionObject



75
76
77
78
# File 'lib/async/background/queue/store.rb', line 75

def schema_version
  ensure_connection
  @db.get_first_value(SQL::USER_VERSION).to_i
end