Module: Async::Background::Queue::Schema
- Defined in:
- lib/async/background/queue/schema.rb
Constant Summary collapse
- VERSION =
1- MIGRATION_BUSY_TIMEOUT_MS =
30_000- CORE_INDEXES =
%w[idx_jobs_pending].freeze
- DASHBOARD_INDEXES =
%w[idx_jobs_done_finished_at idx_jobs_failed_finished_at idx_jobs_running].freeze
- REQUIRED_INDEXES =
CORE_INDEXES
Class Method Summary collapse
- .add_column_unless_exists!(db, name, sql_type) ⇒ Object
- .add_lifecycle_columns!(db) ⇒ Object
- .backfill_finished_at!(db) ⇒ Object
- .core_indexes_current?(db) ⇒ Boolean
- .create_current_schema!(db) ⇒ Object
- .create_dashboard_indexes!(db) ⇒ Object
- .current?(db) ⇒ Boolean
- .dashboard_indexes_current?(db) ⇒ Boolean
- .enable_incremental_vacuum!(db) ⇒ Object
- .ensure_pending_index!(db) ⇒ Object
- .immediate_transaction(db) ⇒ Object
- .index_names(db) ⇒ Object
- .jobs_table?(db) ⇒ Boolean
- .migrate!(db) ⇒ Object
- .prepare_dashboard!(db) ⇒ Object
- .reject_future_version!(db) ⇒ Object
- .set_version!(db, value) ⇒ Object
- .table_columns(db) ⇒ Object
- .upgrade!(db) ⇒ Object
- .upgrade_existing_database!(db) ⇒ Object
- .version(db) ⇒ Object
- .with_migration_timeout(db) ⇒ Object
Class Method Details
.add_column_unless_exists!(db, name, sql_type) ⇒ Object
111 112 113 114 115 |
# File 'lib/async/background/queue/schema.rb', line 111 def add_column_unless_exists!(db, name, sql_type) return if table_columns(db).include?(name) db.execute(SQL.add_column(name, sql_type)) end |
.add_lifecycle_columns!(db) ⇒ Object
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/async/background/queue/schema.rb', line 75 def add_lifecycle_columns!(db) { 'claim_token' => 'TEXT', 'started_at' => 'REAL', 'finished_at' => 'REAL', 'duration_ms' => 'INTEGER', 'last_error_class' => 'TEXT', 'last_error_message' => 'TEXT' }.each { |name, type| add_column_unless_exists!(db, name, type) } end |
.backfill_finished_at!(db) ⇒ Object
86 87 88 |
# File 'lib/async/background/queue/schema.rb', line 86 def backfill_finished_at!(db) db.execute(SQL::BACKFILL_FINISHED_AT) end |
.core_indexes_current?(db) ⇒ Boolean
99 100 101 |
# File 'lib/async/background/queue/schema.rb', line 99 def core_indexes_current?(db) (CORE_INDEXES - index_names(db)).empty? end |
.create_current_schema!(db) ⇒ Object
62 63 64 65 |
# File 'lib/async/background/queue/schema.rb', line 62 def create_current_schema!(db) db.execute_batch(SQL::CREATE_SCHEMA) set_version!(db, VERSION) end |
.create_dashboard_indexes!(db) ⇒ Object
95 96 97 |
# File 'lib/async/background/queue/schema.rb', line 95 def create_dashboard_indexes!(db) SQL::CREATE_DASHBOARD_INDEXES.each { |statement| db.execute(statement) } end |
.current?(db) ⇒ Boolean
46 47 48 |
# File 'lib/async/background/queue/schema.rb', line 46 def current?(db) jobs_table?(db) && version(db) == VERSION && core_indexes_current?(db) end |
.dashboard_indexes_current?(db) ⇒ Boolean
50 51 52 |
# File 'lib/async/background/queue/schema.rb', line 50 def dashboard_indexes_current?(db) (DASHBOARD_INDEXES - index_names(db)).empty? end |
.enable_incremental_vacuum!(db) ⇒ Object
132 133 134 |
# File 'lib/async/background/queue/schema.rb', line 132 def enable_incremental_vacuum!(db) db.execute(SQL::AUTO_VACUUM_INCREMENTAL) end |
.ensure_pending_index!(db) ⇒ Object
90 91 92 93 |
# File 'lib/async/background/queue/schema.rb', line 90 def ensure_pending_index!(db) db.execute(SQL::DROP_LEGACY_PENDING_INDEX) db.execute(SQL::CREATE_PENDING_INDEX) end |
.immediate_transaction(db) ⇒ Object
148 149 150 151 152 153 154 155 156 |
# File 'lib/async/background/queue/schema.rb', line 148 def immediate_transaction(db) db.execute(SQL::BEGIN_IMMEDIATE) result = yield db.execute(SQL::COMMIT) result rescue StandardError db.execute(SQL::ROLLBACK) rescue nil raise end |
.index_names(db) ⇒ Object
107 108 109 |
# File 'lib/async/background/queue/schema.rb', line 107 def index_names(db) db.execute(SQL::JOB_INDEX_NAMES).map(&:first) end |
.jobs_table?(db) ⇒ Boolean
103 104 105 |
# File 'lib/async/background/queue/schema.rb', line 103 def jobs_table?(db) !db.get_first_value(SQL::JOBS_TABLE_EXISTS).nil? end |
.migrate!(db) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/async/background/queue/schema.rb', line 21 def migrate!(db) reject_future_version!(db) return if current?(db) enable_incremental_vacuum!(db) unless jobs_table?(db) with_migration_timeout(db) do immediate_transaction(db) do reject_future_version!(db) upgrade!(db) unless current?(db) end end end |
.prepare_dashboard!(db) ⇒ Object
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/async/background/queue/schema.rb', line 35 def prepare_dashboard!(db) migrate!(db) return if dashboard_indexes_current?(db) with_migration_timeout(db) do immediate_transaction(db) do create_dashboard_indexes!(db) unless dashboard_indexes_current?(db) end end end |
.reject_future_version!(db) ⇒ Object
121 122 123 124 125 126 |
# File 'lib/async/background/queue/schema.rb', line 121 def reject_future_version!(db) return unless version(db) > VERSION raise Store::SchemaError, "queue database schema #{version(db)} is newer than supported schema #{VERSION}" end |
.set_version!(db, value) ⇒ Object
128 129 130 |
# File 'lib/async/background/queue/schema.rb', line 128 def set_version!(db, value) db.execute(SQL.user_version(value)) end |
.table_columns(db) ⇒ Object
117 118 119 |
# File 'lib/async/background/queue/schema.rb', line 117 def table_columns(db) db.execute(SQL::TABLE_INFO).map { |row| row[1] } end |
.upgrade!(db) ⇒ Object
58 59 60 |
# File 'lib/async/background/queue/schema.rb', line 58 def upgrade!(db) jobs_table?(db) ? upgrade_existing_database!(db) : create_current_schema!(db) end |
.upgrade_existing_database!(db) ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/async/background/queue/schema.rb', line 67 def upgrade_existing_database!(db) add_column_unless_exists!(db, 'options', 'TEXT') add_lifecycle_columns!(db) backfill_finished_at!(db) ensure_pending_index!(db) set_version!(db, VERSION) end |
.version(db) ⇒ Object
54 55 56 |
# File 'lib/async/background/queue/schema.rb', line 54 def version(db) db.get_first_value(SQL::USER_VERSION).to_i end |
.with_migration_timeout(db) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/async/background/queue/schema.rb', line 136 def with_migration_timeout(db) original_timeout = db.get_first_value(SQL::BUSY_TIMEOUT).to_i db.execute(SQL.busy_timeout(MIGRATION_BUSY_TIMEOUT_MS)) yield ensure begin db.execute(SQL.busy_timeout(original_timeout)) if defined?(original_timeout) rescue StandardError # Restoring a connection option must not hide the migration exception. end end |