Module: Async::Background::Queue::Schema

Defined in:
lib/async/background/queue/schema.rb

Constant Summary collapse

VERSION =
1
MIGRATION_BUSY_TIMEOUT_MS =
30_000
CORE_INDEXES =
%w[idx_jobs_pending].freeze
DASHBOARD_INDEXES =
%w[
  idx_jobs_done_finished_at
  idx_jobs_failed_finished_at
  idx_jobs_executing_started_at
  idx_jobs_claimed_locked_at
].freeze
REQUIRED_INDEXES =
CORE_INDEXES

Class Method Summary collapse

Class Method Details

.add_column_unless_exists!(db, name, sql_type) ⇒ Object



116
117
118
119
120
# File 'lib/async/background/queue/schema.rb', line 116

def add_column_unless_exists!(db, name, sql_type)
  return if table_columns(db).include?(name)

  db.execute(SQL.add_column(name, sql_type))
end

.add_lifecycle_columns!(db) ⇒ Object



80
81
82
83
84
85
86
87
88
89
# File 'lib/async/background/queue/schema.rb', line 80

def add_lifecycle_columns!(db)
  {
    'claim_token' => 'TEXT',
    'started_at' => 'REAL',
    'finished_at' => 'REAL',
    'duration_ms' => 'INTEGER',
    'last_error_class' => 'TEXT',
    'last_error_message' => 'TEXT'
  }.each { |name, type| add_column_unless_exists!(db, name, type) }
end

.backfill_finished_at!(db) ⇒ Object



91
92
93
# File 'lib/async/background/queue/schema.rb', line 91

def backfill_finished_at!(db)
  db.execute(SQL::BACKFILL_FINISHED_AT)
end

.core_indexes_current?(db) ⇒ Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/async/background/queue/schema.rb', line 104

def core_indexes_current?(db)
  (CORE_INDEXES - index_names(db)).empty?
end

.create_current_schema!(db) ⇒ Object



67
68
69
70
# File 'lib/async/background/queue/schema.rb', line 67

def create_current_schema!(db)
  db.execute_batch(SQL::CREATE_SCHEMA)
  set_version!(db, VERSION)
end

.create_dashboard_indexes!(db) ⇒ Object



100
101
102
# File 'lib/async/background/queue/schema.rb', line 100

def create_dashboard_indexes!(db)
  SQL::CREATE_DASHBOARD_INDEXES.each { |statement| db.execute(statement) }
end

.current?(db) ⇒ Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/async/background/queue/schema.rb', line 51

def current?(db)
  jobs_table?(db) && version(db) == VERSION && core_indexes_current?(db)
end

.dashboard_indexes_current?(db) ⇒ Boolean

Returns:

  • (Boolean)


55
56
57
# File 'lib/async/background/queue/schema.rb', line 55

def dashboard_indexes_current?(db)
  (DASHBOARD_INDEXES - index_names(db)).empty?
end

.enable_incremental_vacuum!(db) ⇒ Object



137
138
139
# File 'lib/async/background/queue/schema.rb', line 137

def enable_incremental_vacuum!(db)
  db.execute(SQL::AUTO_VACUUM_INCREMENTAL)
end

.ensure_pending_index!(db) ⇒ Object



95
96
97
98
# File 'lib/async/background/queue/schema.rb', line 95

def ensure_pending_index!(db)
  db.execute(SQL::DROP_LEGACY_PENDING_INDEX)
  db.execute(SQL::CREATE_PENDING_INDEX)
end

.immediate_transaction(db) ⇒ Object



153
154
155
156
157
158
159
160
161
# File 'lib/async/background/queue/schema.rb', line 153

def immediate_transaction(db)
  db.execute(SQL::BEGIN_IMMEDIATE)
  result = yield
  db.execute(SQL::COMMIT)
  result
rescue StandardError
  db.execute(SQL::ROLLBACK) rescue nil
  raise
end

.index_names(db) ⇒ Object



112
113
114
# File 'lib/async/background/queue/schema.rb', line 112

def index_names(db)
  db.execute(SQL::JOB_INDEX_NAMES).map(&:first)
end

.jobs_table?(db) ⇒ Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/async/background/queue/schema.rb', line 108

def jobs_table?(db)
  !db.get_first_value(SQL::JOBS_TABLE_EXISTS).nil?
end

.migrate!(db) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/async/background/queue/schema.rb', line 26

def migrate!(db)
  reject_future_version!(db)
  return if current?(db)

  enable_incremental_vacuum!(db) unless jobs_table?(db)

  with_migration_timeout(db) do
    immediate_transaction(db) do
      reject_future_version!(db)
      upgrade!(db) unless current?(db)
    end
  end
end

.prepare_dashboard!(db) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'lib/async/background/queue/schema.rb', line 40

def prepare_dashboard!(db)
  migrate!(db)
  return if dashboard_indexes_current?(db)

  with_migration_timeout(db) do
    immediate_transaction(db) do
      create_dashboard_indexes!(db) unless dashboard_indexes_current?(db)
    end
  end
end

.reject_future_version!(db) ⇒ Object

Raises:



126
127
128
129
130
131
# File 'lib/async/background/queue/schema.rb', line 126

def reject_future_version!(db)
  return unless version(db) > VERSION

  raise Store::SchemaError,
        "queue database schema #{version(db)} is newer than supported schema #{VERSION}"
end

.set_version!(db, value) ⇒ Object



133
134
135
# File 'lib/async/background/queue/schema.rb', line 133

def set_version!(db, value)
  db.execute(SQL.user_version(value))
end

.table_columns(db) ⇒ Object



122
123
124
# File 'lib/async/background/queue/schema.rb', line 122

def table_columns(db)
  db.execute(SQL::TABLE_INFO).map { |row| row[1] }
end

.upgrade!(db) ⇒ Object



63
64
65
# File 'lib/async/background/queue/schema.rb', line 63

def upgrade!(db)
  jobs_table?(db) ? upgrade_existing_database!(db) : create_current_schema!(db)
end

.upgrade_existing_database!(db) ⇒ Object



72
73
74
75
76
77
78
# File 'lib/async/background/queue/schema.rb', line 72

def upgrade_existing_database!(db)
  add_column_unless_exists!(db, 'options', 'TEXT')
  add_lifecycle_columns!(db)
  backfill_finished_at!(db)
  ensure_pending_index!(db)
  set_version!(db, VERSION)
end

.version(db) ⇒ Object



59
60
61
# File 'lib/async/background/queue/schema.rb', line 59

def version(db)
  db.get_first_value(SQL::USER_VERSION).to_i
end

.with_migration_timeout(db) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/async/background/queue/schema.rb', line 141

def with_migration_timeout(db)
  original_timeout = db.get_first_value(SQL::BUSY_TIMEOUT).to_i
  db.execute(SQL.busy_timeout(MIGRATION_BUSY_TIMEOUT_MS))
  yield
ensure
  begin
    db.execute(SQL.busy_timeout(original_timeout)) if defined?(original_timeout)
  rescue StandardError
    # Restoring a connection option must not hide the migration exception.
  end
end