Class: Dynflow::PersistenceAdapters::Sequel

Inherits:
Abstract
  • Object
show all
Includes:
Algebrick::Matching, Algebrick::TypeCheck
Defined in:
lib/dynflow/persistence_adapters/sequel.rb

Constant Summary collapse

MAX_RETRIES =
10
RETRY_DELAY =
1
META_DATA =
{ execution_plan:      %w(label state result started_at ended_at real_time execution_time root_plan_step_id class),
action:              %w(caller_execution_plan_id caller_action_id class plan_step_id run_step_id finalize_step_id),
step:                %w(state started_at ended_at real_time execution_time action_id progress_done progress_weight
                        class action_class execution_plan_uuid queue),
envelope:            %w(receiver_id),
coordinator_record:  %w(id owner_id class),
delayed:             %w(execution_plan_uuid start_at start_before args_serializer frozen),
output_chunk:        %w(execution_plan_uuid action_id kind timestamp) }
SERIALIZABLE_COLUMNS =
{ action:  %w(input output),
delayed: %w(serialized_args),
execution_plan: %w(run_flow finalize_flow execution_history step_ids),
step:    %w(error children),
output_chunk: %w(chunk) }

Instance Attribute Summary collapse

Attributes inherited from Abstract

#logger

Instance Method Summary collapse

Methods inherited from Abstract

#log, #register_world

Constructor Details

#initialize(config) ⇒ Sequel

Returns a new instance of Sequel.



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 50

def initialize(config)
  migrate = true
  config = config.dup
  @additional_responsibilities = { coordinator: true, connector: true }
  if config.is_a?(Hash)
    @additional_responsibilities.merge!(config.delete(:additional_responsibilities)) if config.key?(:additional_responsibilities)
    migrate = config.fetch(:migrate, true)
  end
  @db = initialize_db config
  migrate_db if migrate
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



21
22
23
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 21

def db
  @db
end

Instance Method Details

#abort_if_pending_migrations!Object



288
289
290
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 288

def abort_if_pending_migrations!
  ::Sequel::Migrator.check_current(db, self.class.migrations_path, table: 'dynflow_schema_info')
end

#connector_feature!Object



207
208
209
210
211
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 207

def connector_feature!
  unless @additional_responsibilities[:connector]
    raise "The sequel persistence adapter connector feature used but not enabled in additional_features"
  end
end

#coordinator_feature!Object



244
245
246
247
248
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 244

def coordinator_feature!
  unless @additional_responsibilities[:coordinator]
    raise "The sequel persistence adapter coordinator feature used but not enabled in additional_features"
  end
end

#delete_coordinator_record(class_name, record_id) ⇒ Object



260
261
262
263
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 260

def delete_coordinator_record(class_name, record_id)
  coordinator_feature!
  table(:coordinator_record).where(class: class_name, id: record_id).delete
end

#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object



124
125
126
127
128
129
130
131
132
133
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 124

def delete_delayed_plans(filters, batch_size = 1000)
  count = 0
  filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
    uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
    @db.transaction do
      count += table(:delayed).where(execution_plan_uuid: uuids).delete
    end
  end
  count
end

#delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 91

def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
  count = 0
  filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
    uuids = plans.map { |p| p.fetch(:uuid) }
    @db.transaction do
      table(:delayed).where(execution_plan_uuid: uuids).delete

      steps = table(:step).where(execution_plan_uuid: uuids)
      backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
      steps.delete

      table(:output_chunk).where(execution_plan_uuid: uuids).delete

      actions = table(:action).where(execution_plan_uuid: uuids)
      backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
      actions.delete

      execution_plans = table(:execution_plan).where(uuid: uuids)
      backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
      count += execution_plans.delete
    end
  end
  return count
end

#delete_output_chunks(execution_plan_id, action_id) ⇒ Object



203
204
205
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 203

def delete_output_chunks(execution_plan_id, action_id)
  filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete
end

#filtering_byObject



27
28
29
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 27

def filtering_by
  META_DATA.fetch :execution_plan
end

#find_coordinator_records(options) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 265

def find_coordinator_records(options)
  coordinator_feature!
  options = options.dup
  filters = (options[:filters] || {}).dup
  exclude_owner_id = filters.delete(:exclude_owner_id)
  data_set = filter(:coordinator_record, table(:coordinator_record), filters)
  if exclude_owner_id
    data_set = data_set.exclude(:owner_id => exclude_owner_id)
  end
  data_set.all.map { |record| load_data(record) }
end

#find_execution_plan_counts(options = {}) ⇒ Object



77
78
79
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 77

def find_execution_plan_counts(options = {})
  filter(:execution_plan, table(:execution_plan), options[:filters]).count
end

#find_execution_plan_statuses(options) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 81

def find_execution_plan_statuses(options)
  plans = filter(:execution_plan, table(:execution_plan), options[:filters])
          .select(:uuid, :state, :result)

  plans.each_with_object({}) do |current, acc|
    uuid = current.delete(:uuid)
    acc[uuid] = current
  end
end

#find_execution_plans(options = {}) ⇒ Object



66
67
68
69
70
71
72
73
74
75
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 66

def find_execution_plans(options = {})
  table_name = :execution_plan
  options[:order_by] ||= :started_at
  data_set = filter(table_name,
    order(table_name,
      paginate(table(table_name), options),
      options),
    options[:filters])
  data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) }
end

#find_old_execution_plans(age) ⇒ Object



135
136
137
138
139
140
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 135

def find_old_execution_plans(age)
  table_name = :execution_plan
  table(table_name)
    .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
    .all.map { |plan| execution_plan_column_map(load_data plan, table_name) }
end

#find_past_delayed_plans(time) ⇒ Object



142
143
144
145
146
147
148
149
150
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 142

def find_past_delayed_plans(time)
  table_name = :delayed
  table(table_name)
    .where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
    .where(:frozen => false)
    .order_by(:start_at)
    .all
    .map { |plan| load_data(plan, table_name) }
end

#insert_coordinator_record(value) ⇒ Object



250
251
252
253
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 250

def insert_coordinator_record(value)
  coordinator_feature!
  save :coordinator_record, {}, value
end

#load_action(execution_plan_id, action_id) ⇒ Object



175
176
177
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 175

def load_action(execution_plan_id, action_id)
  load :action, execution_plan_uuid: execution_plan_id, id: action_id
end

#load_actions(execution_plan_id, action_ids) ⇒ Object



179
180
181
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 179

def load_actions(execution_plan_id, action_ids)
  load_records :action, { execution_plan_uuid: execution_plan_id, id: action_ids }
end

#load_actions_attributes(execution_plan_id, attributes) ⇒ Object



183
184
185
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 183

def load_actions_attributes(execution_plan_id, attributes)
  load_records :action, { execution_plan_uuid: execution_plan_id }, attributes
end

#load_delayed_plan(execution_plan_id) ⇒ Object



152
153
154
155
156
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 152

def load_delayed_plan(execution_plan_id)
  load :delayed, execution_plan_uuid: execution_plan_id
rescue KeyError
  return nil
end

#load_execution_plan(execution_plan_id) ⇒ Object



116
117
118
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 116

def load_execution_plan(execution_plan_id)
  execution_plan_column_map(load :execution_plan, uuid: execution_plan_id)
end

#load_output_chunks(execution_plan_id, action_id) ⇒ Object



199
200
201
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 199

def load_output_chunks(execution_plan_id, action_id)
  load_records :output_chunk, { execution_plan_uuid: execution_plan_id, action_id: action_id }, [:timestamp, :kind, :chunk]
end

#load_step(execution_plan_id, step_id) ⇒ Object



162
163
164
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 162

def load_step(execution_plan_id, step_id)
  load :step, execution_plan_uuid: execution_plan_id, id: step_id
end

#load_steps(execution_plan_id) ⇒ Object



166
167
168
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 166

def load_steps(execution_plan_id)
  load_records :step, execution_plan_uuid: execution_plan_id
end

#migrate_dbObject



284
285
286
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 284

def migrate_db
  ::Sequel::Migrator.run(db, self.class.migrations_path, table: 'dynflow_schema_info')
end

#ordering_byObject



31
32
33
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 31

def ordering_by
  META_DATA.fetch :execution_plan
end

#pagination?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 23

def pagination?
  true
end

#prune_envelopes(receiver_ids) ⇒ Object



234
235
236
237
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 234

def prune_envelopes(receiver_ids)
  connector_feature!
  table(:envelope).where(receiver_id: receiver_ids).delete
end

#prune_undeliverable_envelopesObject



239
240
241
242
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 239

def prune_undeliverable_envelopes
  connector_feature!
  table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete
end

#pull_envelopes(receiver_id) ⇒ Object



218
219
220
221
222
223
224
225
226
227
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 218

def pull_envelopes(receiver_id)
  connector_feature!
  db.transaction do
    data_set = table(:envelope).where(receiver_id: receiver_id).all
    envelopes = data_set.map { |record| load_data(record) }

    table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
    return envelopes
  end
end

#push_envelope(envelope) ⇒ Object



229
230
231
232
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 229

def push_envelope(envelope)
  connector_feature!
  table(:envelope).insert(prepare_record(:envelope, envelope))
end

#save_action(execution_plan_id, action_id, value) ⇒ Object



187
188
189
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 187

def save_action(execution_plan_id, action_id, value)
  save :action, { execution_plan_uuid: execution_plan_id, id: action_id }, value, with_data: false
end

#save_delayed_plan(execution_plan_id, value) ⇒ Object



158
159
160
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 158

def save_delayed_plan(execution_plan_id, value)
  save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false
end

#save_envelope(data) ⇒ Object



213
214
215
216
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 213

def save_envelope(data)
  connector_feature!
  save :envelope, {}, data
end

#save_execution_plan(execution_plan_id, value) ⇒ Object



120
121
122
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 120

def save_execution_plan(execution_plan_id, value)
  save :execution_plan, { uuid: execution_plan_id }, value, with_data: false
end

#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object



191
192
193
194
195
196
197
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 191

def save_output_chunks(execution_plan_id, action_id, chunks)
  chunks.each do |chunk|
    chunk[:execution_plan_uuid] = execution_plan_id
    chunk[:action_id] = action_id
    save :output_chunk, {}, chunk, with_data: false
  end
end

#save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object



170
171
172
173
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 170

def save_step(execution_plan_id, step_id, value, update_conditions = {})
  save :step, { execution_plan_uuid: execution_plan_id, id: step_id }, value,
    with_data: false, update_conditions: update_conditions
end

#to_hashObject



277
278
279
280
281
282
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 277

def to_hash
  { execution_plans:      table(:execution_plan).all.to_a,
    steps:                table(:step).all.to_a,
    actions:              table(:action).all.to_a,
    envelopes:            table(:envelope).all.to_a }
end

#transaction(&block) ⇒ Object



62
63
64
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 62

def transaction(&block)
  db.transaction(&block)
end

#update_coordinator_record(class_name, record_id, value) ⇒ Object



255
256
257
258
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 255

def update_coordinator_record(class_name, record_id, value)
  coordinator_feature!
  save :coordinator_record, { class: class_name, :id => record_id }, value
end