Class: Dynflow::PersistenceAdapters::Sequel
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck
- Defined in:
- lib/dynflow/persistence_adapters/sequel.rb
Constant Summary collapse
- MAX_RETRIES =
10
- RETRY_DELAY =
1
- META_DATA =
{ execution_plan: %w(label state result started_at ended_at real_time execution_time root_plan_step_id class), action: %w(caller_execution_plan_id caller_action_id class plan_step_id run_step_id finalize_step_id), step: %w(state started_at ended_at real_time execution_time action_id progress_done progress_weight class action_class execution_plan_uuid queue), envelope: %w(receiver_id), coordinator_record: %w(id owner_id class), delayed: %w(execution_plan_uuid start_at start_before args_serializer frozen), output_chunk: %w(execution_plan_uuid action_id kind timestamp) }
- SERIALIZABLE_COLUMNS =
{ action: %w(input output), delayed: %w(serialized_args), execution_plan: %w(run_flow finalize_flow execution_history step_ids), step: %w(error children), output_chunk: %w(chunk) }
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Attributes inherited from Abstract
Instance Method Summary collapse
- #abort_if_pending_migrations! ⇒ Object
- #connector_feature! ⇒ Object
- #coordinator_feature! ⇒ Object
- #delete_coordinator_record(class_name, record_id) ⇒ Object
- #delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
- #delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object
- #delete_output_chunks(execution_plan_id, action_id) ⇒ Object
- #filtering_by ⇒ Object
- #find_coordinator_records(options) ⇒ Object
- #find_execution_plan_counts(options = {}) ⇒ Object
- #find_execution_plan_statuses(options) ⇒ Object
- #find_execution_plans(options = {}) ⇒ Object
- #find_old_execution_plans(age) ⇒ Object
- #find_past_delayed_plans(time) ⇒ Object
-
#initialize(config) ⇒ Sequel
constructor
A new instance of Sequel.
- #insert_coordinator_record(value) ⇒ Object
- #load_action(execution_plan_id, action_id) ⇒ Object
- #load_actions(execution_plan_id, action_ids) ⇒ Object
- #load_actions_attributes(execution_plan_id, attributes) ⇒ Object
- #load_delayed_plan(execution_plan_id) ⇒ Object
- #load_execution_plan(execution_plan_id) ⇒ Object
- #load_output_chunks(execution_plan_id, action_id) ⇒ Object
- #load_step(execution_plan_id, step_id) ⇒ Object
- #load_steps(execution_plan_id) ⇒ Object
- #migrate_db ⇒ Object
- #ordering_by ⇒ Object
- #pagination? ⇒ Boolean
- #prune_envelopes(receiver_ids) ⇒ Object
- #prune_undeliverable_envelopes ⇒ Object
- #pull_envelopes(receiver_id) ⇒ Object
- #push_envelope(envelope) ⇒ Object
- #save_action(execution_plan_id, action_id, value) ⇒ Object
- #save_delayed_plan(execution_plan_id, value) ⇒ Object
- #save_envelope(data) ⇒ Object
- #save_execution_plan(execution_plan_id, value) ⇒ Object
- #save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
- #save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object
- #to_hash ⇒ Object
- #transaction(&block) ⇒ Object
- #update_coordinator_record(class_name, record_id, value) ⇒ Object
Methods inherited from Abstract
Constructor Details
#initialize(config) ⇒ Sequel
Returns a new instance of Sequel.
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 50 def initialize(config) migrate = true config = config.dup @additional_responsibilities = { coordinator: true, connector: true } if config.is_a?(Hash) @additional_responsibilities.merge!(config.delete(:additional_responsibilities)) if config.key?(:additional_responsibilities) migrate = config.fetch(:migrate, true) end @db = initialize_db config migrate_db if migrate end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
21 22 23 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 21 def db @db end |
Instance Method Details
#abort_if_pending_migrations! ⇒ Object
288 289 290 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 288 def abort_if_pending_migrations! ::Sequel::Migrator.check_current(db, self.class.migrations_path, table: 'dynflow_schema_info') end |
#connector_feature! ⇒ Object
207 208 209 210 211 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 207 def connector_feature! unless @additional_responsibilities[:connector] raise "The sequel persistence adapter connector feature used but not enabled in additional_features" end end |
#coordinator_feature! ⇒ Object
244 245 246 247 248 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 244 def coordinator_feature! unless @additional_responsibilities[:coordinator] raise "The sequel persistence adapter coordinator feature used but not enabled in additional_features" end end |
#delete_coordinator_record(class_name, record_id) ⇒ Object
260 261 262 263 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 260 def delete_coordinator_record(class_name, record_id) coordinator_feature! table(:coordinator_record).where(class: class_name, id: record_id).delete end |
#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 124 def delete_delayed_plans(filters, batch_size = 1000) count = 0 filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:execution_plan_uuid) } @db.transaction do count += table(:delayed).where(execution_plan_uuid: uuids).delete end end count end |
#delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 91 def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) count = 0 filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:uuid) } @db.transaction do table(:delayed).where(execution_plan_uuid: uuids).delete steps = table(:step).where(execution_plan_uuid: uuids) backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir steps.delete table(:output_chunk).where(execution_plan_uuid: uuids).delete actions = table(:action).where(execution_plan_uuid: uuids) backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir actions.delete execution_plans = table(:execution_plan).where(uuid: uuids) backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir count += execution_plans.delete end end return count end |
#delete_output_chunks(execution_plan_id, action_id) ⇒ Object
203 204 205 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 203 def delete_output_chunks(execution_plan_id, action_id) filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete end |
#filtering_by ⇒ Object
27 28 29 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 27 def filtering_by META_DATA.fetch :execution_plan end |
#find_coordinator_records(options) ⇒ Object
265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 265 def find_coordinator_records() coordinator_feature! = .dup filters = ([:filters] || {}).dup exclude_owner_id = filters.delete(:exclude_owner_id) data_set = filter(:coordinator_record, table(:coordinator_record), filters) if exclude_owner_id data_set = data_set.exclude(:owner_id => exclude_owner_id) end data_set.all.map { |record| load_data(record) } end |
#find_execution_plan_counts(options = {}) ⇒ Object
77 78 79 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 77 def find_execution_plan_counts( = {}) filter(:execution_plan, table(:execution_plan), [:filters]).count end |
#find_execution_plan_statuses(options) ⇒ Object
81 82 83 84 85 86 87 88 89 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 81 def find_execution_plan_statuses() plans = filter(:execution_plan, table(:execution_plan), [:filters]) .select(:uuid, :state, :result) plans.each_with_object({}) do |current, acc| uuid = current.delete(:uuid) acc[uuid] = current end end |
#find_execution_plans(options = {}) ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 66 def find_execution_plans( = {}) table_name = :execution_plan [:order_by] ||= :started_at data_set = filter(table_name, order(table_name, paginate(table(table_name), ), ), [:filters]) data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) } end |
#find_old_execution_plans(age) ⇒ Object
135 136 137 138 139 140 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 135 def find_old_execution_plans(age) table_name = :execution_plan table(table_name) .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped')) .all.map { |plan| execution_plan_column_map(load_data plan, table_name) } end |
#find_past_delayed_plans(time) ⇒ Object
142 143 144 145 146 147 148 149 150 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 142 def find_past_delayed_plans(time) table_name = :delayed table(table_name) .where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time)) .where(:frozen => false) .order_by(:start_at) .all .map { |plan| load_data(plan, table_name) } end |
#insert_coordinator_record(value) ⇒ Object
250 251 252 253 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 250 def insert_coordinator_record(value) coordinator_feature! save :coordinator_record, {}, value end |
#load_action(execution_plan_id, action_id) ⇒ Object
175 176 177 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 175 def load_action(execution_plan_id, action_id) load :action, execution_plan_uuid: execution_plan_id, id: action_id end |
#load_actions(execution_plan_id, action_ids) ⇒ Object
179 180 181 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 179 def load_actions(execution_plan_id, action_ids) load_records :action, { execution_plan_uuid: execution_plan_id, id: action_ids } end |
#load_actions_attributes(execution_plan_id, attributes) ⇒ Object
183 184 185 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 183 def load_actions_attributes(execution_plan_id, attributes) load_records :action, { execution_plan_uuid: execution_plan_id }, attributes end |
#load_delayed_plan(execution_plan_id) ⇒ Object
152 153 154 155 156 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 152 def load_delayed_plan(execution_plan_id) load :delayed, execution_plan_uuid: execution_plan_id rescue KeyError return nil end |
#load_execution_plan(execution_plan_id) ⇒ Object
116 117 118 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 116 def load_execution_plan(execution_plan_id) execution_plan_column_map(load :execution_plan, uuid: execution_plan_id) end |
#load_output_chunks(execution_plan_id, action_id) ⇒ Object
199 200 201 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 199 def load_output_chunks(execution_plan_id, action_id) load_records :output_chunk, { execution_plan_uuid: execution_plan_id, action_id: action_id }, [:timestamp, :kind, :chunk] end |
#load_step(execution_plan_id, step_id) ⇒ Object
162 163 164 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 162 def load_step(execution_plan_id, step_id) load :step, execution_plan_uuid: execution_plan_id, id: step_id end |
#load_steps(execution_plan_id) ⇒ Object
166 167 168 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 166 def load_steps(execution_plan_id) load_records :step, execution_plan_uuid: execution_plan_id end |
#migrate_db ⇒ Object
284 285 286 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 284 def migrate_db ::Sequel::Migrator.run(db, self.class.migrations_path, table: 'dynflow_schema_info') end |
#ordering_by ⇒ Object
31 32 33 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 31 def ordering_by META_DATA.fetch :execution_plan end |
#pagination? ⇒ Boolean
23 24 25 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 23 def pagination? true end |
#prune_envelopes(receiver_ids) ⇒ Object
234 235 236 237 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 234 def prune_envelopes(receiver_ids) connector_feature! table(:envelope).where(receiver_id: receiver_ids).delete end |
#prune_undeliverable_envelopes ⇒ Object
239 240 241 242 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 239 def prune_undeliverable_envelopes connector_feature! table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete end |
#pull_envelopes(receiver_id) ⇒ Object
218 219 220 221 222 223 224 225 226 227 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 218 def pull_envelopes(receiver_id) connector_feature! db.transaction do data_set = table(:envelope).where(receiver_id: receiver_id).all envelopes = data_set.map { |record| load_data(record) } table(:envelope).where(id: data_set.map { |d| d[:id] }).delete return envelopes end end |
#push_envelope(envelope) ⇒ Object
229 230 231 232 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 229 def push_envelope(envelope) connector_feature! table(:envelope).insert(prepare_record(:envelope, envelope)) end |
#save_action(execution_plan_id, action_id, value) ⇒ Object
187 188 189 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 187 def save_action(execution_plan_id, action_id, value) save :action, { execution_plan_uuid: execution_plan_id, id: action_id }, value, with_data: false end |
#save_delayed_plan(execution_plan_id, value) ⇒ Object
158 159 160 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 158 def save_delayed_plan(execution_plan_id, value) save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false end |
#save_envelope(data) ⇒ Object
213 214 215 216 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 213 def save_envelope(data) connector_feature! save :envelope, {}, data end |
#save_execution_plan(execution_plan_id, value) ⇒ Object
120 121 122 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 120 def save_execution_plan(execution_plan_id, value) save :execution_plan, { uuid: execution_plan_id }, value, with_data: false end |
#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
191 192 193 194 195 196 197 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 191 def save_output_chunks(execution_plan_id, action_id, chunks) chunks.each do |chunk| chunk[:execution_plan_uuid] = execution_plan_id chunk[:action_id] = action_id save :output_chunk, {}, chunk, with_data: false end end |
#save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object
170 171 172 173 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 170 def save_step(execution_plan_id, step_id, value, update_conditions = {}) save :step, { execution_plan_uuid: execution_plan_id, id: step_id }, value, with_data: false, update_conditions: update_conditions end |
#to_hash ⇒ Object
277 278 279 280 281 282 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 277 def to_hash { execution_plans: table(:execution_plan).all.to_a, steps: table(:step).all.to_a, actions: table(:action).all.to_a, envelopes: table(:envelope).all.to_a } end |
#transaction(&block) ⇒ Object
62 63 64 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 62 def transaction(&block) db.transaction(&block) end |
#update_coordinator_record(class_name, record_id, value) ⇒ Object
255 256 257 258 |
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 255 def update_coordinator_record(class_name, record_id, value) coordinator_feature! save :coordinator_record, { class: class_name, :id => record_id }, value end |