Module: Async::Background::Queue::Schema

Defined in:
lib/async/background/queue/schema.rb

Constant Summary collapse

VERSION =
1
MIGRATION_BUSY_TIMEOUT_MS =
30_000
CORE_INDEXES =
%w[idx_jobs_pending].freeze
DASHBOARD_INDEXES =
%w[idx_jobs_done_finished_at idx_jobs_failed_finished_at idx_jobs_running].freeze
REQUIRED_INDEXES =
CORE_INDEXES

Class Method Summary collapse

Class Method Details

.add_column_unless_exists!(db, name, sql_type) ⇒ Object



111
112
113
114
115
# File 'lib/async/background/queue/schema.rb', line 111

def add_column_unless_exists!(db, name, sql_type)
  return if table_columns(db).include?(name)

  db.execute(SQL.add_column(name, sql_type))
end

.add_lifecycle_columns!(db) ⇒ Object



75
76
77
78
79
80
81
82
83
84
# File 'lib/async/background/queue/schema.rb', line 75

def add_lifecycle_columns!(db)
  {
    'claim_token' => 'TEXT',
    'started_at' => 'REAL',
    'finished_at' => 'REAL',
    'duration_ms' => 'INTEGER',
    'last_error_class' => 'TEXT',
    'last_error_message' => 'TEXT'
  }.each { |name, type| add_column_unless_exists!(db, name, type) }
end

.backfill_finished_at!(db) ⇒ Object



86
87
88
# File 'lib/async/background/queue/schema.rb', line 86

def backfill_finished_at!(db)
  db.execute(SQL::BACKFILL_FINISHED_AT)
end

.core_indexes_current?(db) ⇒ Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/async/background/queue/schema.rb', line 99

def core_indexes_current?(db)
  (CORE_INDEXES - index_names(db)).empty?
end

.create_current_schema!(db) ⇒ Object



62
63
64
65
# File 'lib/async/background/queue/schema.rb', line 62

def create_current_schema!(db)
  db.execute_batch(SQL::CREATE_SCHEMA)
  set_version!(db, VERSION)
end

.create_dashboard_indexes!(db) ⇒ Object



95
96
97
# File 'lib/async/background/queue/schema.rb', line 95

def create_dashboard_indexes!(db)
  SQL::CREATE_DASHBOARD_INDEXES.each { |statement| db.execute(statement) }
end

.current?(db) ⇒ Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/async/background/queue/schema.rb', line 46

def current?(db)
  jobs_table?(db) && version(db) == VERSION && core_indexes_current?(db)
end

.dashboard_indexes_current?(db) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/async/background/queue/schema.rb', line 50

def dashboard_indexes_current?(db)
  (DASHBOARD_INDEXES - index_names(db)).empty?
end

.enable_incremental_vacuum!(db) ⇒ Object



132
133
134
# File 'lib/async/background/queue/schema.rb', line 132

def enable_incremental_vacuum!(db)
  db.execute(SQL::AUTO_VACUUM_INCREMENTAL)
end

.ensure_pending_index!(db) ⇒ Object



90
91
92
93
# File 'lib/async/background/queue/schema.rb', line 90

def ensure_pending_index!(db)
  db.execute(SQL::DROP_LEGACY_PENDING_INDEX)
  db.execute(SQL::CREATE_PENDING_INDEX)
end

.immediate_transaction(db) ⇒ Object



148
149
150
151
152
153
154
155
156
# File 'lib/async/background/queue/schema.rb', line 148

def immediate_transaction(db)
  db.execute(SQL::BEGIN_IMMEDIATE)
  result = yield
  db.execute(SQL::COMMIT)
  result
rescue StandardError
  db.execute(SQL::ROLLBACK) rescue nil
  raise
end

.index_names(db) ⇒ Object



107
108
109
# File 'lib/async/background/queue/schema.rb', line 107

def index_names(db)
  db.execute(SQL::JOB_INDEX_NAMES).map(&:first)
end

.jobs_table?(db) ⇒ Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/async/background/queue/schema.rb', line 103

def jobs_table?(db)
  !db.get_first_value(SQL::JOBS_TABLE_EXISTS).nil?
end

.migrate!(db) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/async/background/queue/schema.rb', line 21

def migrate!(db)
  reject_future_version!(db)
  return if current?(db)

  enable_incremental_vacuum!(db) unless jobs_table?(db)

  with_migration_timeout(db) do
    immediate_transaction(db) do
      reject_future_version!(db)
      upgrade!(db) unless current?(db)
    end
  end
end

.prepare_dashboard!(db) ⇒ Object



35
36
37
38
39
40
41
42
43
44
# File 'lib/async/background/queue/schema.rb', line 35

def prepare_dashboard!(db)
  migrate!(db)
  return if dashboard_indexes_current?(db)

  with_migration_timeout(db) do
    immediate_transaction(db) do
      create_dashboard_indexes!(db) unless dashboard_indexes_current?(db)
    end
  end
end

.reject_future_version!(db) ⇒ Object

Raises:



121
122
123
124
125
126
# File 'lib/async/background/queue/schema.rb', line 121

def reject_future_version!(db)
  return unless version(db) > VERSION

  raise Store::SchemaError,
        "queue database schema #{version(db)} is newer than supported schema #{VERSION}"
end

.set_version!(db, value) ⇒ Object



128
129
130
# File 'lib/async/background/queue/schema.rb', line 128

def set_version!(db, value)
  db.execute(SQL.user_version(value))
end

.table_columns(db) ⇒ Object



117
118
119
# File 'lib/async/background/queue/schema.rb', line 117

def table_columns(db)
  db.execute(SQL::TABLE_INFO).map { |row| row[1] }
end

.upgrade!(db) ⇒ Object



58
59
60
# File 'lib/async/background/queue/schema.rb', line 58

def upgrade!(db)
  jobs_table?(db) ? upgrade_existing_database!(db) : create_current_schema!(db)
end

.upgrade_existing_database!(db) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/async/background/queue/schema.rb', line 67

def upgrade_existing_database!(db)
  add_column_unless_exists!(db, 'options', 'TEXT')
  add_lifecycle_columns!(db)
  backfill_finished_at!(db)
  ensure_pending_index!(db)
  set_version!(db, VERSION)
end

.version(db) ⇒ Object



54
55
56
# File 'lib/async/background/queue/schema.rb', line 54

def version(db)
  db.get_first_value(SQL::USER_VERSION).to_i
end

.with_migration_timeout(db) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/async/background/queue/schema.rb', line 136

def with_migration_timeout(db)
  original_timeout = db.get_first_value(SQL::BUSY_TIMEOUT).to_i
  db.execute(SQL.busy_timeout(MIGRATION_BUSY_TIMEOUT_MS))
  yield
ensure
  begin
    db.execute(SQL.busy_timeout(original_timeout)) if defined?(original_timeout)
  rescue StandardError
    # Restoring a connection option must not hide the migration exception.
  end
end