Class: Dynflow::PersistenceAdapters::Sequel
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck
- Defined in:
- lib/dynflow/persistence_adapters/sequel.rb
Constant Summary collapse
- MAX_RETRIES =
10- RETRY_DELAY =
1- META_DATA =
{ execution_plan: %w(label state result started_at ended_at real_time execution_time root_plan_step_id class), action: %w(caller_execution_plan_id caller_action_id class plan_step_id run_step_id finalize_step_id), step: %w(state started_at ended_at real_time execution_time action_id progress_done progress_weight class action_class execution_plan_uuid queue), envelope: %w(receiver_id), coordinator_record: %w(id owner_id class), delayed: %w(execution_plan_uuid start_at start_before args_serializer frozen), output_chunk: %w(execution_plan_uuid action_id kind timestamp), execution_plan_dependency: %w(execution_plan_uuid blocked_by_uuid) }
- SERIALIZABLE_COLUMNS =
{ action: %w(input output), delayed: %w(serialized_args), execution_plan: %w(run_flow finalize_flow execution_history step_ids), step: %w(error children), output_chunk: %w(chunk) }
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Attributes inherited from Abstract
Class Method Summary collapse
Instance Method Summary collapse
- #abort_if_pending_migrations! ⇒ Object
- #chain_execution_plan(first, second) ⇒ Object
- #connector_feature! ⇒ Object
- #coordinator_feature! ⇒ Object
- #delete_coordinator_record(class_name, record_id) ⇒ Object
- #delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
- #delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object
- #delete_output_chunks(execution_plan_id, action_id) ⇒ Object
- #filtering_by ⇒ Object
- #find_blocked_execution_plans(execution_plan_id) ⇒ Object
- #find_coordinator_records(options) ⇒ Object
- #find_execution_plan_counts(options = {}) ⇒ Object
- #find_execution_plan_counts_after(timestamp, options = {}) ⇒ Object
- #find_execution_plan_dependencies(execution_plan_id) ⇒ Object
- #find_execution_plan_statuses(options) ⇒ Object
- #find_execution_plans(options = {}) ⇒ Object
- #find_old_execution_plans(age) ⇒ Object
- #find_ready_delayed_plans(time) ⇒ Object
-
#initialize(config) ⇒ Sequel
constructor
A new instance of Sequel.
- #insert_coordinator_record(value) ⇒ Object
- #load_action(execution_plan_id, action_id) ⇒ Object
- #load_actions(execution_plan_id, action_ids) ⇒ Object
- #load_actions_attributes(execution_plan_id, attributes) ⇒ Object
- #load_delayed_plan(execution_plan_id) ⇒ Object
- #load_execution_plan(execution_plan_id) ⇒ Object
- #load_output_chunks(execution_plan_id, action_id) ⇒ Object
- #load_step(execution_plan_id, step_id) ⇒ Object
- #load_steps(execution_plan_id) ⇒ Object
- #migrate_db ⇒ Object
- #ordering_by ⇒ Object
- #pagination? ⇒ Boolean
- #prune_envelopes(receiver_ids) ⇒ Object
- #prune_undeliverable_envelopes ⇒ Object
- #pull_envelopes(receiver_id) ⇒ Object
- #push_envelope(envelope) ⇒ Object
- #save_action(execution_plan_id, action_id, value) ⇒ Object
- #save_delayed_plan(execution_plan_id, value) ⇒ Object
- #save_envelope(data) ⇒ Object
- #save_execution_plan(execution_plan_id, value) ⇒ Object
- #save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
- #save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object
- #to_hash ⇒ Object
- #transaction(&block) ⇒ Object
- #update_coordinator_record(class_name, record_id, value) ⇒ Object
Methods inherited from Abstract
Constructor Details
#initialize(config) ⇒ Sequel
Returns a new instance of Sequel.
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 51 def initialize(config) migrate = true config = config.dup @additional_responsibilities = { coordinator: true, connector: true } if config.is_a?(Hash) @additional_responsibilities.merge!(config.delete(:additional_responsibilities)) if config.key?(:additional_responsibilities) migrate = config.fetch(:migrate, true) end @db = initialize_db config migrate_db if migrate end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
21 22 23 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 21 def db @db end |
Class Method Details
.migrations_path ⇒ Object
358 359 360 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 358 def self.migrations_path File.('../sequel_migrations', __FILE__) end |
Instance Method Details
#abort_if_pending_migrations! ⇒ Object
334 335 336 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 334 def abort_if_pending_migrations! ::Sequel::Migrator.check_current(db, self.class.migrations_path, table: 'dynflow_schema_info') end |
#chain_execution_plan(first, second) ⇒ Object
198 199 200 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 198 def chain_execution_plan(first, second) save :execution_plan_dependency, {}, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false end |
#connector_feature! ⇒ Object
249 250 251 252 253 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 249 def connector_feature! unless @additional_responsibilities[:connector] raise "The sequel persistence adapter connector feature used but not enabled in additional_features" end end |
#coordinator_feature! ⇒ Object
288 289 290 291 292 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 288 def coordinator_feature! unless @additional_responsibilities[:coordinator] raise "The sequel persistence adapter coordinator feature used but not enabled in additional_features" end end |
#delete_coordinator_record(class_name, record_id) ⇒ Object
304 305 306 307 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 304 def delete_coordinator_record(class_name, record_id) coordinator_feature! with_retry { table(:coordinator_record).where(class: class_name, id: record_id).delete } end |
#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 134 def delete_delayed_plans(filters, batch_size = 1000) count = 0 with_retry do filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:execution_plan_uuid) } @db.transaction do count += table(:delayed).where(execution_plan_uuid: uuids).delete end end end count end |
#delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 99 def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) count = 0 with_retry do filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:uuid) } @db.transaction do table(:delayed).where(execution_plan_uuid: uuids).delete steps = table(:step).where(execution_plan_uuid: uuids) backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir steps.delete output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete actions = table(:action).where(execution_plan_uuid: uuids) backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir actions.delete execution_plans = table(:execution_plan).where(uuid: uuids) backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir count += execution_plans.delete end end return count end end |
#delete_output_chunks(execution_plan_id, action_id) ⇒ Object
243 244 245 246 247 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 243 def delete_output_chunks(execution_plan_id, action_id) with_retry do filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete end end |
#filtering_by ⇒ Object
27 28 29 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 27 def filtering_by META_DATA.fetch :execution_plan end |
#find_blocked_execution_plans(execution_plan_id) ⇒ Object
163 164 165 166 167 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 163 def find_blocked_execution_plans(execution_plan_id) table(:execution_plan_dependency) .where(blocked_by_uuid: execution_plan_id) .select_map(:execution_plan_uuid) end |
#find_coordinator_records(options) ⇒ Object
309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 309 def find_coordinator_records() coordinator_feature! = .dup filters = ([:filters] || {}).dup exclude_owner_id = filters.delete(:exclude_owner_id) data_set = filter(:coordinator_record, table(:coordinator_record), filters) if exclude_owner_id data_set = data_set.exclude(:owner_id => exclude_owner_id) end with_retry do data_set.all.map { |record| load_data(record) } end end |
#find_execution_plan_counts(options = {}) ⇒ Object
79 80 81 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 79 def find_execution_plan_counts( = {}) with_retry { filter(:execution_plan, table(:execution_plan), [:filters]).count } end |
#find_execution_plan_counts_after(timestamp, options = {}) ⇒ Object
83 84 85 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 83 def find_execution_plan_counts_after(, = {}) with_retry { filter(:execution_plan, table(:execution_plan), [:filters]).filter(::Sequel.lit('ended_at >= ?', )).count } end |
#find_execution_plan_dependencies(execution_plan_id) ⇒ Object
157 158 159 160 161 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 157 def find_execution_plan_dependencies(execution_plan_id) table(:execution_plan_dependency) .where(execution_plan_uuid: execution_plan_id) .select_map(:blocked_by_uuid) end |
#find_execution_plan_statuses(options) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 87 def find_execution_plan_statuses() plans = with_retry do filter(:execution_plan, table(:execution_plan), [:filters]) .select(:uuid, :state, :result) end plans.each_with_object({}) do |current, acc| uuid = current.delete(:uuid) acc[uuid] = current end end |
#find_execution_plans(options = {}) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 67 def find_execution_plans( = {}) table_name = :execution_plan [:order_by] ||= :started_at data_set = filter(table_name, order(table_name, paginate(table(table_name), ), ), [:filters]) records = with_retry { data_set.all } records.map { |record| execution_plan_column_map(load_data(record, table_name)) } end |
#find_old_execution_plans(age) ⇒ Object
147 148 149 150 151 152 153 154 155 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 147 def find_old_execution_plans(age) table_name = :execution_plan records = with_retry do table(table_name) .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped')) .all end records.map { |plan| execution_plan_column_map(load_data plan, table_name) } end |
#find_ready_delayed_plans(time) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 169 def find_ready_delayed_plans(time) table_name = :delayed # Subquery to find delayed plans that have at least one non-stopped dependency plans_with_unfinished_deps = table(:execution_plan_dependency) .join(TABLES[:execution_plan], uuid: :blocked_by_uuid) .where(::Sequel.~(state: 'stopped')) .select(:execution_plan_uuid) records = with_retry do table(table_name) .where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time)) .where(:frozen => false) .exclude(execution_plan_uuid: plans_with_unfinished_deps) .order_by(:start_at) .all end records.map { |plan| load_data(plan, table_name) } end |
#insert_coordinator_record(value) ⇒ Object
294 295 296 297 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 294 def insert_coordinator_record(value) coordinator_feature! save :coordinator_record, {}, value end |
#load_action(execution_plan_id, action_id) ⇒ Object
215 216 217 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 215 def load_action(execution_plan_id, action_id) load :action, execution_plan_uuid: execution_plan_id, id: action_id end |
#load_actions(execution_plan_id, action_ids) ⇒ Object
219 220 221 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 219 def load_actions(execution_plan_id, action_ids) load_records :action, { execution_plan_uuid: execution_plan_id, id: action_ids } end |
#load_actions_attributes(execution_plan_id, attributes) ⇒ Object
223 224 225 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 223 def load_actions_attributes(execution_plan_id, attributes) load_records :action, { execution_plan_uuid: execution_plan_id }, attributes end |
#load_delayed_plan(execution_plan_id) ⇒ Object
188 189 190 191 192 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 188 def load_delayed_plan(execution_plan_id) load :delayed, execution_plan_uuid: execution_plan_id rescue KeyError return nil end |
#load_execution_plan(execution_plan_id) ⇒ Object
126 127 128 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 126 def load_execution_plan(execution_plan_id) execution_plan_column_map(load :execution_plan, uuid: execution_plan_id) end |
#load_output_chunks(execution_plan_id, action_id) ⇒ Object
239 240 241 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 239 def load_output_chunks(execution_plan_id, action_id) load_records :output_chunk, { execution_plan_uuid: execution_plan_id, action_id: action_id }, [:timestamp, :kind, :chunk] end |
#load_step(execution_plan_id, step_id) ⇒ Object
202 203 204 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 202 def load_step(execution_plan_id, step_id) load :step, execution_plan_uuid: execution_plan_id, id: step_id end |
#load_steps(execution_plan_id) ⇒ Object
206 207 208 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 206 def load_steps(execution_plan_id) load_records :step, execution_plan_uuid: execution_plan_id end |
#migrate_db ⇒ Object
330 331 332 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 330 def migrate_db ::Sequel::Migrator.run(db, self.class.migrations_path, table: 'dynflow_schema_info') end |
#ordering_by ⇒ Object
31 32 33 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 31 def ordering_by META_DATA.fetch :execution_plan end |
#pagination? ⇒ Boolean
23 24 25 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 23 def pagination? true end |
#prune_envelopes(receiver_ids) ⇒ Object
278 279 280 281 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 278 def prune_envelopes(receiver_ids) connector_feature! with_retry { table(:envelope).where(receiver_id: receiver_ids).delete } end |
#prune_undeliverable_envelopes ⇒ Object
283 284 285 286 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 283 def prune_undeliverable_envelopes connector_feature! with_retry { table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete } end |
#pull_envelopes(receiver_id) ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 260 def pull_envelopes(receiver_id) connector_feature! with_retry do db.transaction do data_set = table(:envelope).where(receiver_id: receiver_id).all envelopes = data_set.map { |record| load_data(record) } table(:envelope).where(id: data_set.map { |d| d[:id] }).delete return envelopes end end end |
#push_envelope(envelope) ⇒ Object
273 274 275 276 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 273 def push_envelope(envelope) connector_feature! with_retry { table(:envelope).insert(prepare_record(:envelope, envelope)) } end |
#save_action(execution_plan_id, action_id, value) ⇒ Object
227 228 229 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 227 def save_action(execution_plan_id, action_id, value) save :action, { execution_plan_uuid: execution_plan_id, id: action_id }, value, with_data: false end |
#save_delayed_plan(execution_plan_id, value) ⇒ Object
194 195 196 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 194 def save_delayed_plan(execution_plan_id, value) save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false end |
#save_envelope(data) ⇒ Object
255 256 257 258 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 255 def save_envelope(data) connector_feature! save :envelope, {}, data end |
#save_execution_plan(execution_plan_id, value) ⇒ Object
130 131 132 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 130 def save_execution_plan(execution_plan_id, value) save :execution_plan, { uuid: execution_plan_id }, value, with_data: false end |
#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
231 232 233 234 235 236 237 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 231 def save_output_chunks(execution_plan_id, action_id, chunks) chunks.each do |chunk| chunk[:execution_plan_uuid] = execution_plan_id chunk[:action_id] = action_id save :output_chunk, {}, chunk, with_data: false end end |
#save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object
210 211 212 213 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 210 def save_step(execution_plan_id, step_id, value, update_conditions = {}) save :step, { execution_plan_uuid: execution_plan_id, id: step_id }, value, with_data: false, update_conditions: update_conditions end |
#to_hash ⇒ Object
323 324 325 326 327 328 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 323 def to_hash { execution_plans: table(:execution_plan).all.to_a, steps: table(:step).all.to_a, actions: table(:action).all.to_a, envelopes: table(:envelope).all.to_a } end |
#transaction(&block) ⇒ Object
63 64 65 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 63 def transaction(&block) db.transaction(&block) end |
#update_coordinator_record(class_name, record_id, value) ⇒ Object
299 300 301 302 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 299 def update_coordinator_record(class_name, record_id, value) coordinator_feature! save :coordinator_record, { class: class_name, :id => record_id }, value end |