Class: Dynflow::Persistence
- Inherits:
-
Object
- Object
- Dynflow::Persistence
- Includes:
- Algebrick::TypeCheck, Debug::Persistence
- Defined in:
- lib/dynflow/persistence.rb
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
Instance Method Summary collapse
- #current_backup_dir ⇒ Object
- #delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
- #delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) ⇒ Object
- #delete_output_chunks(execution_plan_id, action_id) ⇒ Object
- #find_execution_plan_counts(options) ⇒ Object
- #find_execution_plan_statuses(options) ⇒ Object
- #find_execution_plans(options) ⇒ Object
- #find_old_execution_plans(age) ⇒ Object
- #find_past_delayed_plans(time) ⇒ Object
-
#initialize(world, persistence_adapter, options = {}) ⇒ Persistence
constructor
A new instance of Persistence.
- #load_action(step) ⇒ Object
- #load_action_for_presentation(execution_plan, action_id, step = nil) ⇒ Object
- #load_actions(execution_plan, action_ids) ⇒ Object
- #load_actions_attributes(execution_plan_id, attributes) ⇒ Object
- #load_delayed_plan(execution_plan_id) ⇒ Object
- #load_execution_plan(id) ⇒ Object
- #load_output_chunks(execution_plan_id, action_id) ⇒ Object
- #load_step(execution_plan_id, step_id, world) ⇒ Object
- #load_steps(execution_plan_id, world) ⇒ Object
- #prune_envelopes(receiver_ids) ⇒ Object
- #prune_undeliverable_envelopes ⇒ Object
- #pull_envelopes(world_id) ⇒ Object
- #push_envelope(envelope) ⇒ Object
- #save_action(execution_plan_id, action) ⇒ Object
- #save_delayed_plan(delayed_plan) ⇒ Object
- #save_execution_plan(execution_plan) ⇒ Object
- #save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
- #save_step(step, conditions = {}) ⇒ Object
- #set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil) ⇒ Object
Constructor Details
#initialize(world, persistence_adapter, options = {}) ⇒ Persistence
Returns a new instance of Persistence.
11 12 13 14 15 16 17 |
# File 'lib/dynflow/persistence.rb', line 11 def initialize(world, persistence_adapter, = {}) @world = world @adapter = persistence_adapter @adapter.register_world(world) @backup_deleted_plans = .fetch(:backup_deleted_plans, false) @backup_dir = .fetch(:backup_dir, './backup') end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
9 10 11 |
# File 'lib/dynflow/persistence.rb', line 9 def adapter @adapter end |
Instance Method Details
#current_backup_dir ⇒ Object
81 82 83 |
# File 'lib/dynflow/persistence.rb', line 81 def current_backup_dir @backup_deleted_plans ? File.join(@backup_dir, Date.today.strftime('%Y%m%d')) : nil end |
#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
106 107 108 |
# File 'lib/dynflow/persistence.rb', line 106 def delete_delayed_plans(filters, batch_size = 1000) adapter.delete_delayed_plans(filters, batch_size) end |
#delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) ⇒ Object
76 77 78 79 |
# File 'lib/dynflow/persistence.rb', line 76 def delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) backup_dir = enforce_backup_dir || current_backup_dir adapter.delete_execution_plans(filters, batch_size, backup_dir) end |
#delete_output_chunks(execution_plan_id, action_id) ⇒ Object
58 59 60 |
# File 'lib/dynflow/persistence.rb', line 58 def delete_output_chunks(execution_plan_id, action_id) adapter.delete_output_chunks(execution_plan_id, action_id) end |
#find_execution_plan_counts(options) ⇒ Object
72 73 74 |
# File 'lib/dynflow/persistence.rb', line 72 def find_execution_plan_counts() adapter.find_execution_plan_counts() end |
#find_execution_plan_statuses(options) ⇒ Object
68 69 70 |
# File 'lib/dynflow/persistence.rb', line 68 def find_execution_plan_statuses() adapter.find_execution_plan_statuses() end |
#find_execution_plans(options) ⇒ Object
62 63 64 65 66 |
# File 'lib/dynflow/persistence.rb', line 62 def find_execution_plans() adapter.find_execution_plans().map do |execution_plan_hash| ExecutionPlan.new_from_hash(execution_plan_hash, @world) end end |
#find_old_execution_plans(age) ⇒ Object
94 95 96 97 98 |
# File 'lib/dynflow/persistence.rb', line 94 def find_old_execution_plans(age) adapter.find_old_execution_plans(age).map do |plan| ExecutionPlan.new_from_hash(plan, @world) end end |
#find_past_delayed_plans(time) ⇒ Object
100 101 102 103 104 |
# File 'lib/dynflow/persistence.rb', line 100 def find_past_delayed_plans(time) adapter.find_past_delayed_plans(time).map do |plan| DelayedPlan.new_from_hash(@world, plan) end end |
#load_action(step) ⇒ Object
19 20 21 22 23 24 |
# File 'lib/dynflow/persistence.rb', line 19 def load_action(step) attributes = adapter .load_action(step.execution_plan_id, step.action_id) .update(step: step, phase: step.phase) return Action.from_hash(attributes, step.world) end |
#load_action_for_presentation(execution_plan, action_id, step = nil) ⇒ Object
33 34 35 36 37 38 |
# File 'lib/dynflow/persistence.rb', line 33 def load_action_for_presentation(execution_plan, action_id, step = nil) attributes = adapter.load_action(execution_plan.id, action_id) Action.from_hash(attributes.update(phase: Action::Present, execution_plan: execution_plan, step: step), @world).tap do |present_action| @world.middleware.execute(:present, present_action) {} end end |
#load_actions(execution_plan, action_ids) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/dynflow/persistence.rb', line 26 def load_actions(execution_plan, action_ids) attributes = adapter.load_actions(execution_plan.id, action_ids) attributes.map do |action| Action.from_hash(action.merge(phase: Action::Present, execution_plan: execution_plan), @world) end end |
#load_actions_attributes(execution_plan_id, attributes) ⇒ Object
40 41 42 |
# File 'lib/dynflow/persistence.rb', line 40 def load_actions_attributes(execution_plan_id, attributes) adapter.load_actions_attributes(execution_plan_id, attributes).reject(&:empty?) end |
#load_delayed_plan(execution_plan_id) ⇒ Object
121 122 123 124 125 |
# File 'lib/dynflow/persistence.rb', line 121 def load_delayed_plan(execution_plan_id) hash = adapter.load_delayed_plan(execution_plan_id) return nil unless hash DelayedPlan.new_from_hash(@world, hash) end |
#load_execution_plan(id) ⇒ Object
85 86 87 88 |
# File 'lib/dynflow/persistence.rb', line 85 def load_execution_plan(id) execution_plan_hash = adapter.load_execution_plan(id) ExecutionPlan.new_from_hash(execution_plan_hash, @world) end |
#load_output_chunks(execution_plan_id, action_id) ⇒ Object
54 55 56 |
# File 'lib/dynflow/persistence.rb', line 54 def load_output_chunks(execution_plan_id, action_id) adapter.load_output_chunks(execution_plan_id, action_id) end |
#load_step(execution_plan_id, step_id, world) ⇒ Object
127 128 129 130 |
# File 'lib/dynflow/persistence.rb', line 127 def load_step(execution_plan_id, step_id, world) step_hash = adapter.load_step(execution_plan_id, step_id) ExecutionPlan::Steps::Abstract.from_hash(step_hash, execution_plan_id, world) end |
#load_steps(execution_plan_id, world) ⇒ Object
132 133 134 135 136 |
# File 'lib/dynflow/persistence.rb', line 132 def load_steps(execution_plan_id, world) adapter.load_steps(execution_plan_id).map do |step_hash| ExecutionPlan::Steps::Abstract.from_hash(step_hash, execution_plan_id, world) end end |
#prune_envelopes(receiver_ids) ⇒ Object
155 156 157 |
# File 'lib/dynflow/persistence.rb', line 155 def prune_envelopes(receiver_ids) adapter.prune_envelopes(receiver_ids) end |
#prune_undeliverable_envelopes ⇒ Object
159 160 161 |
# File 'lib/dynflow/persistence.rb', line 159 def prune_undeliverable_envelopes adapter.prune_undeliverable_envelopes end |
#pull_envelopes(world_id) ⇒ Object
147 148 149 150 151 152 153 |
# File 'lib/dynflow/persistence.rb', line 147 def pull_envelopes(world_id) adapter.pull_envelopes(world_id).map do |data| envelope = Dynflow.serializer.load(data) Type! envelope, Dispatcher::Envelope envelope end end |
#push_envelope(envelope) ⇒ Object
142 143 144 145 |
# File 'lib/dynflow/persistence.rb', line 142 def push_envelope(envelope) Type! envelope, Dispatcher::Envelope adapter.push_envelope(Dynflow.serializer.dump(envelope)) end |
#save_action(execution_plan_id, action) ⇒ Object
44 45 46 |
# File 'lib/dynflow/persistence.rb', line 44 def save_action(execution_plan_id, action) adapter.save_action(execution_plan_id, action.id, action.to_hash) end |
#save_delayed_plan(delayed_plan) ⇒ Object
110 111 112 |
# File 'lib/dynflow/persistence.rb', line 110 def save_delayed_plan(delayed_plan) adapter.save_delayed_plan(delayed_plan.execution_plan_uuid, delayed_plan.to_hash) end |
#save_execution_plan(execution_plan) ⇒ Object
90 91 92 |
# File 'lib/dynflow/persistence.rb', line 90 def save_execution_plan(execution_plan) adapter.save_execution_plan(execution_plan.id, execution_plan.to_hash) end |
#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
48 49 50 51 52 |
# File 'lib/dynflow/persistence.rb', line 48 def save_output_chunks(execution_plan_id, action_id, chunks) return if chunks.empty? adapter.save_output_chunks(execution_plan_id, action_id, chunks) end |
#save_step(step, conditions = {}) ⇒ Object
138 139 140 |
# File 'lib/dynflow/persistence.rb', line 138 def save_step(step, conditions = {}) adapter.save_step(step.execution_plan_id, step.id, step.to_hash, conditions) end |
#set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil) ⇒ Object
114 115 116 117 118 119 |
# File 'lib/dynflow/persistence.rb', line 114 def set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil) plan = load_delayed_plan(execution_plan_id) plan.frozen = frozen plan.start_at = new_start_at if new_start_at save_delayed_plan(plan) end |