Class: Dynflow::Persistence

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck, Debug::Persistence
Defined in:
lib/dynflow/persistence.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(world, persistence_adapter, options = {}) ⇒ Persistence

Returns a new instance of Persistence.



11
12
13
14
15
16
17
# File 'lib/dynflow/persistence.rb', line 11

def initialize(world, persistence_adapter, options = {})
  @world   = world
  @adapter = persistence_adapter
  @adapter.register_world(world)
  @backup_deleted_plans = options.fetch(:backup_deleted_plans, false)
  @backup_dir = options.fetch(:backup_dir, './backup')
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



9
10
11
# File 'lib/dynflow/persistence.rb', line 9

def adapter
  @adapter
end

Instance Method Details

#current_backup_dirObject



81
82
83
# File 'lib/dynflow/persistence.rb', line 81

def current_backup_dir
  @backup_deleted_plans ? File.join(@backup_dir, Date.today.strftime('%Y%m%d')) : nil
end

#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object



106
107
108
# File 'lib/dynflow/persistence.rb', line 106

def delete_delayed_plans(filters, batch_size = 1000)
  adapter.delete_delayed_plans(filters, batch_size)
end

#delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) ⇒ Object



76
77
78
79
# File 'lib/dynflow/persistence.rb', line 76

def delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil)
  backup_dir = enforce_backup_dir || current_backup_dir
  adapter.delete_execution_plans(filters, batch_size, backup_dir)
end

#delete_output_chunks(execution_plan_id, action_id) ⇒ Object



58
59
60
# File 'lib/dynflow/persistence.rb', line 58

def delete_output_chunks(execution_plan_id, action_id)
  adapter.delete_output_chunks(execution_plan_id, action_id)
end

#find_execution_plan_counts(options) ⇒ Object



72
73
74
# File 'lib/dynflow/persistence.rb', line 72

def find_execution_plan_counts(options)
  adapter.find_execution_plan_counts(options)
end

#find_execution_plan_statuses(options) ⇒ Object



68
69
70
# File 'lib/dynflow/persistence.rb', line 68

def find_execution_plan_statuses(options)
  adapter.find_execution_plan_statuses(options)
end

#find_execution_plans(options) ⇒ Object



62
63
64
65
66
# File 'lib/dynflow/persistence.rb', line 62

def find_execution_plans(options)
  adapter.find_execution_plans(options).map do |execution_plan_hash|
    ExecutionPlan.new_from_hash(execution_plan_hash, @world)
  end
end

#find_old_execution_plans(age) ⇒ Object



94
95
96
97
98
# File 'lib/dynflow/persistence.rb', line 94

def find_old_execution_plans(age)
  adapter.find_old_execution_plans(age).map do |plan|
    ExecutionPlan.new_from_hash(plan, @world)
  end
end

#find_past_delayed_plans(time) ⇒ Object



100
101
102
103
104
# File 'lib/dynflow/persistence.rb', line 100

def find_past_delayed_plans(time)
  adapter.find_past_delayed_plans(time).map do |plan|
    DelayedPlan.new_from_hash(@world, plan)
  end
end

#load_action(step) ⇒ Object



19
20
21
22
23
24
# File 'lib/dynflow/persistence.rb', line 19

def load_action(step)
  attributes = adapter
               .load_action(step.execution_plan_id, step.action_id)
               .update(step: step, phase: step.phase)
  return Action.from_hash(attributes, step.world)
end

#load_action_for_presentation(execution_plan, action_id, step = nil) ⇒ Object



33
34
35
36
37
38
# File 'lib/dynflow/persistence.rb', line 33

def load_action_for_presentation(execution_plan, action_id, step = nil)
  attributes = adapter.load_action(execution_plan.id, action_id)
  Action.from_hash(attributes.update(phase: Action::Present, execution_plan: execution_plan, step: step), @world).tap do |present_action|
    @world.middleware.execute(:present, present_action) {}
  end
end

#load_actions(execution_plan, action_ids) ⇒ Object



26
27
28
29
30
31
# File 'lib/dynflow/persistence.rb', line 26

def load_actions(execution_plan, action_ids)
  attributes = adapter.load_actions(execution_plan.id, action_ids)
  attributes.map do |action|
    Action.from_hash(action.merge(phase: Action::Present, execution_plan: execution_plan), @world)
  end
end

#load_actions_attributes(execution_plan_id, attributes) ⇒ Object



40
41
42
# File 'lib/dynflow/persistence.rb', line 40

def load_actions_attributes(execution_plan_id, attributes)
  adapter.load_actions_attributes(execution_plan_id, attributes).reject(&:empty?)
end

#load_delayed_plan(execution_plan_id) ⇒ Object



121
122
123
124
125
# File 'lib/dynflow/persistence.rb', line 121

def load_delayed_plan(execution_plan_id)
  hash = adapter.load_delayed_plan(execution_plan_id)
  return nil unless hash
  DelayedPlan.new_from_hash(@world, hash)
end

#load_execution_plan(id) ⇒ Object



85
86
87
88
# File 'lib/dynflow/persistence.rb', line 85

def load_execution_plan(id)
  execution_plan_hash = adapter.load_execution_plan(id)
  ExecutionPlan.new_from_hash(execution_plan_hash, @world)
end

#load_output_chunks(execution_plan_id, action_id) ⇒ Object



54
55
56
# File 'lib/dynflow/persistence.rb', line 54

def load_output_chunks(execution_plan_id, action_id)
  adapter.load_output_chunks(execution_plan_id, action_id)
end

#load_step(execution_plan_id, step_id, world) ⇒ Object



127
128
129
130
# File 'lib/dynflow/persistence.rb', line 127

def load_step(execution_plan_id, step_id, world)
  step_hash = adapter.load_step(execution_plan_id, step_id)
  ExecutionPlan::Steps::Abstract.from_hash(step_hash, execution_plan_id, world)
end

#load_steps(execution_plan_id, world) ⇒ Object



132
133
134
135
136
# File 'lib/dynflow/persistence.rb', line 132

def load_steps(execution_plan_id, world)
  adapter.load_steps(execution_plan_id).map do |step_hash|
    ExecutionPlan::Steps::Abstract.from_hash(step_hash, execution_plan_id, world)
  end
end

#prune_envelopes(receiver_ids) ⇒ Object



155
156
157
# File 'lib/dynflow/persistence.rb', line 155

def prune_envelopes(receiver_ids)
  adapter.prune_envelopes(receiver_ids)
end

#prune_undeliverable_envelopesObject



159
160
161
# File 'lib/dynflow/persistence.rb', line 159

def prune_undeliverable_envelopes
  adapter.prune_undeliverable_envelopes
end

#pull_envelopes(world_id) ⇒ Object



147
148
149
150
151
152
153
# File 'lib/dynflow/persistence.rb', line 147

def pull_envelopes(world_id)
  adapter.pull_envelopes(world_id).map do |data|
    envelope = Dynflow.serializer.load(data)
    Type! envelope, Dispatcher::Envelope
    envelope
  end
end

#push_envelope(envelope) ⇒ Object



142
143
144
145
# File 'lib/dynflow/persistence.rb', line 142

def push_envelope(envelope)
  Type! envelope, Dispatcher::Envelope
  adapter.push_envelope(Dynflow.serializer.dump(envelope))
end

#save_action(execution_plan_id, action) ⇒ Object



44
45
46
# File 'lib/dynflow/persistence.rb', line 44

def save_action(execution_plan_id, action)
  adapter.save_action(execution_plan_id, action.id, action.to_hash)
end

#save_delayed_plan(delayed_plan) ⇒ Object



110
111
112
# File 'lib/dynflow/persistence.rb', line 110

def save_delayed_plan(delayed_plan)
  adapter.save_delayed_plan(delayed_plan.execution_plan_uuid, delayed_plan.to_hash)
end

#save_execution_plan(execution_plan) ⇒ Object



90
91
92
# File 'lib/dynflow/persistence.rb', line 90

def save_execution_plan(execution_plan)
  adapter.save_execution_plan(execution_plan.id, execution_plan.to_hash)
end

#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object



48
49
50
51
52
# File 'lib/dynflow/persistence.rb', line 48

def save_output_chunks(execution_plan_id, action_id, chunks)
  return if chunks.empty?

  adapter.save_output_chunks(execution_plan_id, action_id, chunks)
end

#save_step(step, conditions = {}) ⇒ Object



138
139
140
# File 'lib/dynflow/persistence.rb', line 138

def save_step(step, conditions = {})
  adapter.save_step(step.execution_plan_id, step.id, step.to_hash, conditions)
end

#set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil) ⇒ Object



114
115
116
117
118
119
# File 'lib/dynflow/persistence.rb', line 114

def set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil)
  plan = load_delayed_plan(execution_plan_id)
  plan.frozen = frozen
  plan.start_at = new_start_at if new_start_at
  save_delayed_plan(plan)
end