Class: Dynflow::PersistenceAdapters::Sequel

Inherits:
Abstract
  • Object
show all
Includes:
Algebrick::Matching, Algebrick::TypeCheck
Defined in:
lib/dynflow/persistence_adapters/sequel.rb

Constant Summary collapse

MAX_RETRIES =
10
RETRY_DELAY =
1
META_DATA =
{ execution_plan:      %w(label state result started_at ended_at real_time execution_time root_plan_step_id class),
action:              %w(caller_execution_plan_id caller_action_id class plan_step_id run_step_id finalize_step_id),
step:                %w(state started_at ended_at real_time execution_time action_id progress_done progress_weight
                        class action_class execution_plan_uuid queue),
envelope:            %w(receiver_id),
coordinator_record:  %w(id owner_id class),
delayed:             %w(execution_plan_uuid start_at start_before args_serializer frozen),
output_chunk:        %w(execution_plan_uuid action_id kind timestamp),
execution_plan_dependency: %w(execution_plan_uuid blocked_by_uuid) }
SERIALIZABLE_COLUMNS =
{ action:  %w(input output),
delayed: %w(serialized_args),
execution_plan: %w(run_flow finalize_flow execution_history step_ids),
step:    %w(error children),
output_chunk: %w(chunk) }

Instance Attribute Summary collapse

Attributes inherited from Abstract

#logger

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Abstract

#log, #register_world

Constructor Details

#initialize(config) ⇒ Sequel

Returns a new instance of Sequel.



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 51

def initialize(config)
  migrate = true
  config = config.dup
  @additional_responsibilities = { coordinator: true, connector: true }
  if config.is_a?(Hash)
    @additional_responsibilities.merge!(config.delete(:additional_responsibilities)) if config.key?(:additional_responsibilities)
    migrate = config.fetch(:migrate, true)
  end
  @db = initialize_db config
  migrate_db if migrate
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



21
22
23
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 21

def db
  @db
end

Class Method Details

.migrations_pathObject



358
359
360
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 358

def self.migrations_path
  File.expand_path('../sequel_migrations', __FILE__)
end

Instance Method Details

#abort_if_pending_migrations!Object



334
335
336
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 334

def abort_if_pending_migrations!
  ::Sequel::Migrator.check_current(db, self.class.migrations_path, table: 'dynflow_schema_info')
end

#chain_execution_plan(first, second) ⇒ Object



198
199
200
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 198

def chain_execution_plan(first, second)
  save :execution_plan_dependency, {}, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false
end

#connector_feature!Object



249
250
251
252
253
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 249

def connector_feature!
  unless @additional_responsibilities[:connector]
    raise "The sequel persistence adapter connector feature used but not enabled in additional_features"
  end
end

#coordinator_feature!Object



288
289
290
291
292
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 288

def coordinator_feature!
  unless @additional_responsibilities[:coordinator]
    raise "The sequel persistence adapter coordinator feature used but not enabled in additional_features"
  end
end

#delete_coordinator_record(class_name, record_id) ⇒ Object



304
305
306
307
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 304

def delete_coordinator_record(class_name, record_id)
  coordinator_feature!
  with_retry { table(:coordinator_record).where(class: class_name, id: record_id).delete }
end

#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 134

def delete_delayed_plans(filters, batch_size = 1000)
  count = 0
  with_retry do
    filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
      uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
      @db.transaction do
        count += table(:delayed).where(execution_plan_uuid: uuids).delete
      end
    end
  end
  count
end

#delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 99

def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
  count = 0
  with_retry do
    filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
      uuids = plans.map { |p| p.fetch(:uuid) }
      @db.transaction do
        table(:delayed).where(execution_plan_uuid: uuids).delete

        steps = table(:step).where(execution_plan_uuid: uuids)
        backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
        steps.delete

        output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete

        actions = table(:action).where(execution_plan_uuid: uuids)
        backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
        actions.delete

        execution_plans = table(:execution_plan).where(uuid: uuids)
        backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
        count += execution_plans.delete
      end
    end
    return count
  end
end

#delete_output_chunks(execution_plan_id, action_id) ⇒ Object



243
244
245
246
247
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 243

def delete_output_chunks(execution_plan_id, action_id)
  with_retry do
    filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete
  end
end

#filtering_byObject



27
28
29
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 27

def filtering_by
  META_DATA.fetch :execution_plan
end

#find_blocked_execution_plans(execution_plan_id) ⇒ Object



163
164
165
166
167
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 163

def find_blocked_execution_plans(execution_plan_id)
  table(:execution_plan_dependency)
    .where(blocked_by_uuid: execution_plan_id)
    .select_map(:execution_plan_uuid)
end

#find_coordinator_records(options) ⇒ Object



309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 309

def find_coordinator_records(options)
  coordinator_feature!
  options = options.dup
  filters = (options[:filters] || {}).dup
  exclude_owner_id = filters.delete(:exclude_owner_id)
  data_set = filter(:coordinator_record, table(:coordinator_record), filters)
  if exclude_owner_id
    data_set = data_set.exclude(:owner_id => exclude_owner_id)
  end
  with_retry do
    data_set.all.map { |record| load_data(record) }
  end
end

#find_execution_plan_counts(options = {}) ⇒ Object



79
80
81
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 79

def find_execution_plan_counts(options = {})
  with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).count }
end

#find_execution_plan_counts_after(timestamp, options = {}) ⇒ Object



83
84
85
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 83

def find_execution_plan_counts_after(timestamp, options = {})
  with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count }
end

#find_execution_plan_dependencies(execution_plan_id) ⇒ Object



157
158
159
160
161
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 157

def find_execution_plan_dependencies(execution_plan_id)
  table(:execution_plan_dependency)
    .where(execution_plan_uuid: execution_plan_id)
    .select_map(:blocked_by_uuid)
end

#find_execution_plan_statuses(options) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 87

def find_execution_plan_statuses(options)
  plans = with_retry do
    filter(:execution_plan, table(:execution_plan), options[:filters])
      .select(:uuid, :state, :result)
  end

  plans.each_with_object({}) do |current, acc|
    uuid = current.delete(:uuid)
    acc[uuid] = current
  end
end

#find_execution_plans(options = {}) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 67

def find_execution_plans(options = {})
  table_name = :execution_plan
  options[:order_by] ||= :started_at
  data_set = filter(table_name,
    order(table_name,
      paginate(table(table_name), options),
      options),
    options[:filters])
  records = with_retry { data_set.all }
  records.map { |record| execution_plan_column_map(load_data(record, table_name)) }
end

#find_old_execution_plans(age) ⇒ Object



147
148
149
150
151
152
153
154
155
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 147

def find_old_execution_plans(age)
  table_name = :execution_plan
  records = with_retry do
    table(table_name)
      .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
      .all
  end
  records.map { |plan| execution_plan_column_map(load_data plan, table_name) }
end

#find_ready_delayed_plans(time) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 169

def find_ready_delayed_plans(time)
  table_name = :delayed
  # Subquery to find delayed plans that have at least one non-stopped dependency
  plans_with_unfinished_deps = table(:execution_plan_dependency)
                               .join(TABLES[:execution_plan], uuid: :blocked_by_uuid)
                               .where(::Sequel.~(state: 'stopped'))
                               .select(:execution_plan_uuid)

  records = with_retry do
    table(table_name)
      .where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time))
      .where(:frozen => false)
      .exclude(execution_plan_uuid: plans_with_unfinished_deps)
      .order_by(:start_at)
      .all
  end
  records.map { |plan| load_data(plan, table_name) }
end

#insert_coordinator_record(value) ⇒ Object



294
295
296
297
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 294

def insert_coordinator_record(value)
  coordinator_feature!
  save :coordinator_record, {}, value
end

#load_action(execution_plan_id, action_id) ⇒ Object



215
216
217
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 215

def load_action(execution_plan_id, action_id)
  load :action, execution_plan_uuid: execution_plan_id, id: action_id
end

#load_actions(execution_plan_id, action_ids) ⇒ Object



219
220
221
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 219

def load_actions(execution_plan_id, action_ids)
  load_records :action, { execution_plan_uuid: execution_plan_id, id: action_ids }
end

#load_actions_attributes(execution_plan_id, attributes) ⇒ Object



223
224
225
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 223

def load_actions_attributes(execution_plan_id, attributes)
  load_records :action, { execution_plan_uuid: execution_plan_id }, attributes
end

#load_delayed_plan(execution_plan_id) ⇒ Object



188
189
190
191
192
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 188

def load_delayed_plan(execution_plan_id)
  load :delayed, execution_plan_uuid: execution_plan_id
rescue KeyError
  return nil
end

#load_execution_plan(execution_plan_id) ⇒ Object



126
127
128
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 126

def load_execution_plan(execution_plan_id)
  execution_plan_column_map(load :execution_plan, uuid: execution_plan_id)
end

#load_output_chunks(execution_plan_id, action_id) ⇒ Object



239
240
241
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 239

def load_output_chunks(execution_plan_id, action_id)
  load_records :output_chunk, { execution_plan_uuid: execution_plan_id, action_id: action_id }, [:timestamp, :kind, :chunk]
end

#load_step(execution_plan_id, step_id) ⇒ Object



202
203
204
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 202

def load_step(execution_plan_id, step_id)
  load :step, execution_plan_uuid: execution_plan_id, id: step_id
end

#load_steps(execution_plan_id) ⇒ Object



206
207
208
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 206

def load_steps(execution_plan_id)
  load_records :step, execution_plan_uuid: execution_plan_id
end

#migrate_dbObject



330
331
332
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 330

def migrate_db
  ::Sequel::Migrator.run(db, self.class.migrations_path, table: 'dynflow_schema_info')
end

#ordering_byObject



31
32
33
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 31

def ordering_by
  META_DATA.fetch :execution_plan
end

#pagination?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 23

def pagination?
  true
end

#prune_envelopes(receiver_ids) ⇒ Object



278
279
280
281
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 278

def prune_envelopes(receiver_ids)
  connector_feature!
  with_retry { table(:envelope).where(receiver_id: receiver_ids).delete }
end

#prune_undeliverable_envelopesObject



283
284
285
286
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 283

def prune_undeliverable_envelopes
  connector_feature!
  with_retry { table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete }
end

#pull_envelopes(receiver_id) ⇒ Object



260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 260

def pull_envelopes(receiver_id)
  connector_feature!
  with_retry do
    db.transaction do
      data_set = table(:envelope).where(receiver_id: receiver_id).all
      envelopes = data_set.map { |record| load_data(record) }

      table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
      return envelopes
    end
  end
end

#push_envelope(envelope) ⇒ Object



273
274
275
276
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 273

def push_envelope(envelope)
  connector_feature!
  with_retry { table(:envelope).insert(prepare_record(:envelope, envelope)) }
end

#save_action(execution_plan_id, action_id, value) ⇒ Object



227
228
229
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 227

def save_action(execution_plan_id, action_id, value)
  save :action, { execution_plan_uuid: execution_plan_id, id: action_id }, value, with_data: false
end

#save_delayed_plan(execution_plan_id, value) ⇒ Object



194
195
196
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 194

def save_delayed_plan(execution_plan_id, value)
  save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false
end

#save_envelope(data) ⇒ Object



255
256
257
258
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 255

def save_envelope(data)
  connector_feature!
  save :envelope, {}, data
end

#save_execution_plan(execution_plan_id, value) ⇒ Object



130
131
132
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 130

def save_execution_plan(execution_plan_id, value)
  save :execution_plan, { uuid: execution_plan_id }, value, with_data: false
end

#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object



231
232
233
234
235
236
237
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 231

def save_output_chunks(execution_plan_id, action_id, chunks)
  chunks.each do |chunk|
    chunk[:execution_plan_uuid] = execution_plan_id
    chunk[:action_id] = action_id
    save :output_chunk, {}, chunk, with_data: false
  end
end

#save_step(execution_plan_id, step_id, value, update_conditions = {}) ⇒ Object



210
211
212
213
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 210

def save_step(execution_plan_id, step_id, value, update_conditions = {})
  save :step, { execution_plan_uuid: execution_plan_id, id: step_id }, value,
    with_data: false, update_conditions: update_conditions
end

#to_hashObject



323
324
325
326
327
328
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 323

def to_hash
  { execution_plans:      table(:execution_plan).all.to_a,
    steps:                table(:step).all.to_a,
    actions:              table(:action).all.to_a,
    envelopes:            table(:envelope).all.to_a }
end

#transaction(&block) ⇒ Object



63
64
65
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 63

def transaction(&block)
  db.transaction(&block)
end

#update_coordinator_record(class_name, record_id, value) ⇒ Object



299
300
301
302
# File 'lib/dynflow/persistence_adapters/sequel.rb', line 299

def update_coordinator_record(class_name, record_id, value)
  coordinator_feature!
  save :coordinator_record, { class: class_name, :id => record_id }, value
end