Module: Async::Background::Queue::Schema
- Defined in:
- lib/async/background/queue/schema.rb
Constant Summary collapse
- VERSION =
1- MIGRATION_BUSY_TIMEOUT_MS =
30_000- CORE_INDEXES =
%w[idx_jobs_pending].freeze
- DASHBOARD_INDEXES =
%w[ idx_jobs_done_finished_at idx_jobs_failed_finished_at idx_jobs_executing_started_at idx_jobs_claimed_locked_at ].freeze
- REQUIRED_INDEXES =
CORE_INDEXES
Class Method Summary collapse
- .add_column_unless_exists!(db, name, sql_type) ⇒ Object
- .add_lifecycle_columns!(db) ⇒ Object
- .backfill_finished_at!(db) ⇒ Object
- .core_indexes_current?(db) ⇒ Boolean
- .create_current_schema!(db) ⇒ Object
- .create_dashboard_indexes!(db) ⇒ Object
- .current?(db) ⇒ Boolean
- .dashboard_indexes_current?(db) ⇒ Boolean
- .enable_incremental_vacuum!(db) ⇒ Object
- .ensure_pending_index!(db) ⇒ Object
- .immediate_transaction(db) ⇒ Object
- .index_names(db) ⇒ Object
- .jobs_table?(db) ⇒ Boolean
- .migrate!(db) ⇒ Object
- .prepare_dashboard!(db) ⇒ Object
- .reject_future_version!(db) ⇒ Object
- .set_version!(db, value) ⇒ Object
- .table_columns(db) ⇒ Object
- .upgrade!(db) ⇒ Object
- .upgrade_existing_database!(db) ⇒ Object
- .version(db) ⇒ Object
- .with_migration_timeout(db) ⇒ Object
Class Method Details
.add_column_unless_exists!(db, name, sql_type) ⇒ Object
116 117 118 119 120 |
# File 'lib/async/background/queue/schema.rb', line 116 def add_column_unless_exists!(db, name, sql_type) return if table_columns(db).include?(name) db.execute(SQL.add_column(name, sql_type)) end |
.add_lifecycle_columns!(db) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/async/background/queue/schema.rb', line 80 def add_lifecycle_columns!(db) { 'claim_token' => 'TEXT', 'started_at' => 'REAL', 'finished_at' => 'REAL', 'duration_ms' => 'INTEGER', 'last_error_class' => 'TEXT', 'last_error_message' => 'TEXT' }.each { |name, type| add_column_unless_exists!(db, name, type) } end |
.backfill_finished_at!(db) ⇒ Object
91 92 93 |
# File 'lib/async/background/queue/schema.rb', line 91 def backfill_finished_at!(db) db.execute(SQL::BACKFILL_FINISHED_AT) end |
.core_indexes_current?(db) ⇒ Boolean
104 105 106 |
# File 'lib/async/background/queue/schema.rb', line 104 def core_indexes_current?(db) (CORE_INDEXES - index_names(db)).empty? end |
.create_current_schema!(db) ⇒ Object
67 68 69 70 |
# File 'lib/async/background/queue/schema.rb', line 67 def create_current_schema!(db) db.execute_batch(SQL::CREATE_SCHEMA) set_version!(db, VERSION) end |
.create_dashboard_indexes!(db) ⇒ Object
100 101 102 |
# File 'lib/async/background/queue/schema.rb', line 100 def create_dashboard_indexes!(db) SQL::CREATE_DASHBOARD_INDEXES.each { |statement| db.execute(statement) } end |
.current?(db) ⇒ Boolean
51 52 53 |
# File 'lib/async/background/queue/schema.rb', line 51 def current?(db) jobs_table?(db) && version(db) == VERSION && core_indexes_current?(db) end |
.dashboard_indexes_current?(db) ⇒ Boolean
55 56 57 |
# File 'lib/async/background/queue/schema.rb', line 55 def dashboard_indexes_current?(db) (DASHBOARD_INDEXES - index_names(db)).empty? end |
.enable_incremental_vacuum!(db) ⇒ Object
137 138 139 |
# File 'lib/async/background/queue/schema.rb', line 137 def enable_incremental_vacuum!(db) db.execute(SQL::AUTO_VACUUM_INCREMENTAL) end |
.ensure_pending_index!(db) ⇒ Object
95 96 97 98 |
# File 'lib/async/background/queue/schema.rb', line 95 def ensure_pending_index!(db) db.execute(SQL::DROP_LEGACY_PENDING_INDEX) db.execute(SQL::CREATE_PENDING_INDEX) end |
.immediate_transaction(db) ⇒ Object
153 154 155 156 157 158 159 160 161 |
# File 'lib/async/background/queue/schema.rb', line 153 def immediate_transaction(db) db.execute(SQL::BEGIN_IMMEDIATE) result = yield db.execute(SQL::COMMIT) result rescue StandardError db.execute(SQL::ROLLBACK) rescue nil raise end |
.index_names(db) ⇒ Object
112 113 114 |
# File 'lib/async/background/queue/schema.rb', line 112 def index_names(db) db.execute(SQL::JOB_INDEX_NAMES).map(&:first) end |
.jobs_table?(db) ⇒ Boolean
108 109 110 |
# File 'lib/async/background/queue/schema.rb', line 108 def jobs_table?(db) !db.get_first_value(SQL::JOBS_TABLE_EXISTS).nil? end |
.migrate!(db) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/async/background/queue/schema.rb', line 26 def migrate!(db) reject_future_version!(db) return if current?(db) enable_incremental_vacuum!(db) unless jobs_table?(db) with_migration_timeout(db) do immediate_transaction(db) do reject_future_version!(db) upgrade!(db) unless current?(db) end end end |
.prepare_dashboard!(db) ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/async/background/queue/schema.rb', line 40 def prepare_dashboard!(db) migrate!(db) return if dashboard_indexes_current?(db) with_migration_timeout(db) do immediate_transaction(db) do create_dashboard_indexes!(db) unless dashboard_indexes_current?(db) end end end |
.reject_future_version!(db) ⇒ Object
126 127 128 129 130 131 |
# File 'lib/async/background/queue/schema.rb', line 126 def reject_future_version!(db) return unless version(db) > VERSION raise Store::SchemaError, "queue database schema #{version(db)} is newer than supported schema #{VERSION}" end |
.set_version!(db, value) ⇒ Object
133 134 135 |
# File 'lib/async/background/queue/schema.rb', line 133 def set_version!(db, value) db.execute(SQL.user_version(value)) end |
.table_columns(db) ⇒ Object
122 123 124 |
# File 'lib/async/background/queue/schema.rb', line 122 def table_columns(db) db.execute(SQL::TABLE_INFO).map { |row| row[1] } end |
.upgrade!(db) ⇒ Object
63 64 65 |
# File 'lib/async/background/queue/schema.rb', line 63 def upgrade!(db) jobs_table?(db) ? upgrade_existing_database!(db) : create_current_schema!(db) end |
.upgrade_existing_database!(db) ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/async/background/queue/schema.rb', line 72 def upgrade_existing_database!(db) add_column_unless_exists!(db, 'options', 'TEXT') add_lifecycle_columns!(db) backfill_finished_at!(db) ensure_pending_index!(db) set_version!(db, VERSION) end |
.version(db) ⇒ Object
59 60 61 |
# File 'lib/async/background/queue/schema.rb', line 59 def version(db) db.get_first_value(SQL::USER_VERSION).to_i end |
.with_migration_timeout(db) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/async/background/queue/schema.rb', line 141 def with_migration_timeout(db) original_timeout = db.get_first_value(SQL::BUSY_TIMEOUT).to_i db.execute(SQL.busy_timeout(MIGRATION_BUSY_TIMEOUT_MS)) yield ensure begin db.execute(SQL.busy_timeout(original_timeout)) if defined?(original_timeout) rescue StandardError # Restoring a connection option must not hide the migration exception. end end |