Class: Dynflow::PersistenceAdapters::Sequel
- Includes:
 - Algebrick::Matching, Algebrick::TypeCheck
 
- Defined in:
 - lib/dynflow/persistence_adapters/sequel.rb
 
Constant Summary collapse
- MAX_RETRIES =
 10- RETRY_DELAY =
 1- META_DATA =
 { execution_plan: %w(label state result started_at ended_at real_time execution_time root_plan_step_id class), action: %w(caller_execution_plan_id caller_action_id class plan_step_id run_step_id finalize_step_id), step: %w(state started_at ended_at real_time execution_time action_id progress_done progress_weight class action_class execution_plan_uuid queue), envelope: %w(receiver_id), coordinator_record: %w(id owner_id class), delayed: %w(execution_plan_uuid start_at start_before args_serializer frozen), output_chunk: %w(execution_plan_uuid action_id kind timestamp) }
- SERIALIZABLE_COLUMNS =
 { action: %w(input output), delayed: %w(serialized_args), execution_plan: %w(run_flow finalize_flow execution_history step_ids), step: %w(error children), output_chunk: %w(chunk) }
Instance Attribute Summary collapse
- 
  
    
      #db  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    
Returns the value of attribute db.
 
Attributes inherited from Abstract
Instance Method Summary collapse
- #abort_if_pending_migrations! ⇒ Object
 - #connector_feature! ⇒ Object
 - #coordinator_feature! ⇒ Object
 - #delete_coordinator_record(class_name, record_id) ⇒ Object
 - #delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
 - #delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object
 - #delete_output_chunks(execution_plan_id, action_id) ⇒ Object
 - #filtering_by ⇒ Object
 - #find_coordinator_records(options) ⇒ Object
 - #find_execution_plan_counts(options = {}) ⇒ Object
 - #find_execution_plan_statuses(options) ⇒ Object
 - #find_execution_plans(options = {}) ⇒ Object
 - #find_old_execution_plans(age) ⇒ Object
 - #find_past_delayed_plans(time) ⇒ Object
 - 
  
    
      #initialize(config)  ⇒ Sequel 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
A new instance of Sequel.
 - #insert_coordinator_record(value) ⇒ Object
 - #load_action(execution_plan_id, action_id) ⇒ Object
 - #load_actions(execution_plan_id, action_ids) ⇒ Object
 - #load_actions_attributes(execution_plan_id, attributes) ⇒ Object
 - #load_delayed_plan(execution_plan_id) ⇒ Object
 - #load_execution_plan(execution_plan_id) ⇒ Object
 - #load_output_chunks(execution_plan_id, action_id) ⇒ Object
 - #load_step(execution_plan_id, step_id) ⇒ Object
 - #load_steps(execution_plan_id) ⇒ Object
 - #migrate_db ⇒ Object
 - #ordering_by ⇒ Object
 - #pagination? ⇒ Boolean
 - #prune_envelopes(receiver_ids) ⇒ Object
 - #prune_undeliverable_envelopes ⇒ Object
 - #pull_envelopes(receiver_id) ⇒ Object
 - #push_envelope(envelope) ⇒ Object
 - #save_action(execution_plan_id, action_id, value) ⇒ Object
 - #save_delayed_plan(execution_plan_id, value) ⇒ Object
 - #save_envelope(data) ⇒ Object
 - #save_execution_plan(execution_plan_id, value) ⇒ Object
 - #save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
 - #save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object
 - #to_hash ⇒ Object
 - #transaction(&block) ⇒ Object
 - #update_coordinator_record(class_name, record_id, value) ⇒ Object
 
Methods inherited from Abstract
Constructor Details
#initialize(config) ⇒ Sequel
Returns a new instance of Sequel.
      50 51 52 53 54 55 56 57 58 59 60  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 50 def initialize(config) migrate = true config = config.dup @additional_responsibilities = { coordinator: true, connector: true } if config.is_a?(Hash) @additional_responsibilities.merge!(config.delete(:additional_responsibilities)) if config.key?(:additional_responsibilities) migrate = config.fetch(:migrate, true) end @db = initialize_db config migrate_db if migrate end  | 
  
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
      21 22 23  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 21 def db @db end  | 
  
Instance Method Details
#abort_if_pending_migrations! ⇒ Object
      288 289 290  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 288 def abort_if_pending_migrations! ::Sequel::Migrator.check_current(db, self.class.migrations_path, table: 'dynflow_schema_info') end  | 
  
#connector_feature! ⇒ Object
      207 208 209 210 211  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 207 def connector_feature! unless @additional_responsibilities[:connector] raise "The sequel persistence adapter connector feature used but not enabled in additional_features" end end  | 
  
#coordinator_feature! ⇒ Object
      244 245 246 247 248  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 244 def coordinator_feature! unless @additional_responsibilities[:coordinator] raise "The sequel persistence adapter coordinator feature used but not enabled in additional_features" end end  | 
  
#delete_coordinator_record(class_name, record_id) ⇒ Object
      260 261 262 263  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 260 def delete_coordinator_record(class_name, record_id) coordinator_feature! table(:coordinator_record).where(class: class_name, id: record_id).delete end  | 
  
#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
      124 125 126 127 128 129 130 131 132 133  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 124 def delete_delayed_plans(filters, batch_size = 1000) count = 0 filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:execution_plan_uuid) } @db.transaction do count += table(:delayed).where(execution_plan_uuid: uuids).delete end end count end  | 
  
#delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object
      91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 91 def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) count = 0 filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans| uuids = plans.map { |p| p.fetch(:uuid) } @db.transaction do table(:delayed).where(execution_plan_uuid: uuids).delete steps = table(:step).where(execution_plan_uuid: uuids) backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir steps.delete table(:output_chunk).where(execution_plan_uuid: uuids).delete actions = table(:action).where(execution_plan_uuid: uuids) backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir actions.delete execution_plans = table(:execution_plan).where(uuid: uuids) backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir count += execution_plans.delete end end return count end  | 
  
#delete_output_chunks(execution_plan_id, action_id) ⇒ Object
      203 204 205  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 203 def delete_output_chunks(execution_plan_id, action_id) filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete end  | 
  
#filtering_by ⇒ Object
      27 28 29  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 27 def filtering_by META_DATA.fetch :execution_plan end  | 
  
#find_coordinator_records(options) ⇒ Object
      265 266 267 268 269 270 271 272 273 274 275  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 265 def find_coordinator_records() coordinator_feature! = .dup filters = ([:filters] || {}).dup exclude_owner_id = filters.delete(:exclude_owner_id) data_set = filter(:coordinator_record, table(:coordinator_record), filters) if exclude_owner_id data_set = data_set.exclude(:owner_id => exclude_owner_id) end data_set.all.map { |record| load_data(record) } end  | 
  
#find_execution_plan_counts(options = {}) ⇒ Object
      77 78 79  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 77 def find_execution_plan_counts( = {}) filter(:execution_plan, table(:execution_plan), [:filters]).count end  | 
  
#find_execution_plan_statuses(options) ⇒ Object
      81 82 83 84 85 86 87 88 89  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 81 def find_execution_plan_statuses() plans = filter(:execution_plan, table(:execution_plan), [:filters]) .select(:uuid, :state, :result) plans.each_with_object({}) do |current, acc| uuid = current.delete(:uuid) acc[uuid] = current end end  | 
  
#find_execution_plans(options = {}) ⇒ Object
      66 67 68 69 70 71 72 73 74 75  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 66 def find_execution_plans( = {}) table_name = :execution_plan [:order_by] ||= :started_at data_set = filter(table_name, order(table_name, paginate(table(table_name), ), ), [:filters]) data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) } end  | 
  
#find_old_execution_plans(age) ⇒ Object
      135 136 137 138 139 140  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 135 def find_old_execution_plans(age) table_name = :execution_plan table(table_name) .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped')) .all.map { |plan| execution_plan_column_map(load_data plan, table_name) } end  | 
  
#find_past_delayed_plans(time) ⇒ Object
      142 143 144 145 146 147 148 149 150  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 142 def find_past_delayed_plans(time) table_name = :delayed table(table_name) .where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time)) .where(:frozen => false) .order_by(:start_at) .all .map { |plan| load_data(plan, table_name) } end  | 
  
#insert_coordinator_record(value) ⇒ Object
      250 251 252 253  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 250 def insert_coordinator_record(value) coordinator_feature! save :coordinator_record, {}, value end  | 
  
#load_action(execution_plan_id, action_id) ⇒ Object
      175 176 177  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 175 def load_action(execution_plan_id, action_id) load :action, execution_plan_uuid: execution_plan_id, id: action_id end  | 
  
#load_actions(execution_plan_id, action_ids) ⇒ Object
      179 180 181  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 179 def load_actions(execution_plan_id, action_ids) load_records :action, { execution_plan_uuid: execution_plan_id, id: action_ids } end  | 
  
#load_actions_attributes(execution_plan_id, attributes) ⇒ Object
      183 184 185  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 183 def load_actions_attributes(execution_plan_id, attributes) load_records :action, { execution_plan_uuid: execution_plan_id }, attributes end  | 
  
#load_delayed_plan(execution_plan_id) ⇒ Object
      152 153 154 155 156  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 152 def load_delayed_plan(execution_plan_id) load :delayed, execution_plan_uuid: execution_plan_id rescue KeyError return nil end  | 
  
#load_execution_plan(execution_plan_id) ⇒ Object
      116 117 118  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 116 def load_execution_plan(execution_plan_id) execution_plan_column_map(load :execution_plan, uuid: execution_plan_id) end  | 
  
#load_output_chunks(execution_plan_id, action_id) ⇒ Object
      199 200 201  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 199 def load_output_chunks(execution_plan_id, action_id) load_records :output_chunk, { execution_plan_uuid: execution_plan_id, action_id: action_id }, [:timestamp, :kind, :chunk] end  | 
  
#load_step(execution_plan_id, step_id) ⇒ Object
      162 163 164  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 162 def load_step(execution_plan_id, step_id) load :step, execution_plan_uuid: execution_plan_id, id: step_id end  | 
  
#load_steps(execution_plan_id) ⇒ Object
      166 167 168  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 166 def load_steps(execution_plan_id) load_records :step, execution_plan_uuid: execution_plan_id end  | 
  
#migrate_db ⇒ Object
      284 285 286  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 284 def migrate_db ::Sequel::Migrator.run(db, self.class.migrations_path, table: 'dynflow_schema_info') end  | 
  
#ordering_by ⇒ Object
      31 32 33  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 31 def ordering_by META_DATA.fetch :execution_plan end  | 
  
#pagination? ⇒ Boolean
      23 24 25  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 23 def pagination? true end  | 
  
#prune_envelopes(receiver_ids) ⇒ Object
      234 235 236 237  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 234 def prune_envelopes(receiver_ids) connector_feature! table(:envelope).where(receiver_id: receiver_ids).delete end  | 
  
#prune_undeliverable_envelopes ⇒ Object
      239 240 241 242  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 239 def prune_undeliverable_envelopes connector_feature! table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete end  | 
  
#pull_envelopes(receiver_id) ⇒ Object
      218 219 220 221 222 223 224 225 226 227  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 218 def pull_envelopes(receiver_id) connector_feature! db.transaction do data_set = table(:envelope).where(receiver_id: receiver_id).all envelopes = data_set.map { |record| load_data(record) } table(:envelope).where(id: data_set.map { |d| d[:id] }).delete return envelopes end end  | 
  
#push_envelope(envelope) ⇒ Object
      229 230 231 232  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 229 def push_envelope(envelope) connector_feature! table(:envelope).insert(prepare_record(:envelope, envelope)) end  | 
  
#save_action(execution_plan_id, action_id, value) ⇒ Object
      187 188 189  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 187 def save_action(execution_plan_id, action_id, value) save :action, { execution_plan_uuid: execution_plan_id, id: action_id }, value, with_data: false end  | 
  
#save_delayed_plan(execution_plan_id, value) ⇒ Object
      158 159 160  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 158 def save_delayed_plan(execution_plan_id, value) save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false end  | 
  
#save_envelope(data) ⇒ Object
      213 214 215 216  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 213 def save_envelope(data) connector_feature! save :envelope, {}, data end  | 
  
#save_execution_plan(execution_plan_id, value) ⇒ Object
      120 121 122  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 120 def save_execution_plan(execution_plan_id, value) save :execution_plan, { uuid: execution_plan_id }, value, with_data: false end  | 
  
#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
      191 192 193 194 195 196 197  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 191 def save_output_chunks(execution_plan_id, action_id, chunks) chunks.each do |chunk| chunk[:execution_plan_uuid] = execution_plan_id chunk[:action_id] = action_id save :output_chunk, {}, chunk, with_data: false end end  | 
  
#save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object
      170 171 172 173  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 170 def save_step(execution_plan_id, step_id, value, update_conditions = {}) save :step, { execution_plan_uuid: execution_plan_id, id: step_id }, value, with_data: false, update_conditions: update_conditions end  | 
  
#to_hash ⇒ Object
      277 278 279 280 281 282  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 277 def to_hash { execution_plans: table(:execution_plan).all.to_a, steps: table(:step).all.to_a, actions: table(:action).all.to_a, envelopes: table(:envelope).all.to_a } end  | 
  
#transaction(&block) ⇒ Object
      62 63 64  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 62 def transaction(&block) db.transaction(&block) end  | 
  
#update_coordinator_record(class_name, record_id, value) ⇒ Object
      255 256 257 258  | 
    
      # File 'lib/dynflow/persistence_adapters/sequel.rb', line 255 def update_coordinator_record(class_name, record_id, value) coordinator_feature! save :coordinator_record, { class: class_name, :id => record_id }, value end  |