Class: Dynflow::Coordinator
- Inherits:
-
Object
- Object
- Dynflow::Coordinator
show all
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/coordinator.rb
Defined Under Namespace
Classes: AutoExecuteLock, ClientWorld, DelayedExecutorLock, DuplicateRecordError, ExecutionInhibitionLock, ExecutionLock, ExecutionPlanCleanerLock, ExecutorWorld, Lock, LockByWorld, LockError, PlanningLock, Record, SingletonActionLock, WorldInvalidationLock, WorldRecord
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(coordinator_adapter) ⇒ Coordinator
Returns a new instance of Coordinator.
337
338
339
|
# File 'lib/dynflow/coordinator.rb', line 337
def initialize(coordinator_adapter)
@adapter = coordinator_adapter
end
|
Instance Attribute Details
#adapter ⇒ Object
Returns the value of attribute adapter.
335
336
337
|
# File 'lib/dynflow/coordinator.rb', line 335
def adapter
@adapter
end
|
Instance Method Details
#acquire(lock, &block) ⇒ Object
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
# File 'lib/dynflow/coordinator.rb', line 341
def acquire(lock, &block)
Type! lock, Lock
lock.validate!
adapter.create_record(lock)
if block
begin
block.call
rescue Interrupt => e
raise e
ensure
release(lock) if !(defined?(::Sidekiq) && e.is_a?(::Sidekiq::Shutdown)) || lock.unlock_on_shutdown?
end
end
rescue DuplicateRecordError => e
raise LockError.new(e.record)
end
|
#clean_orphaned_locks ⇒ Object
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
|
# File 'lib/dynflow/coordinator.rb', line 423
def clean_orphaned_locks
cleanup_classes = [LockByWorld, SingletonActionLock]
ret = []
cleanup_classes.each do |cleanup_class|
valid_owner_ids = cleanup_class.valid_owner_ids(self)
valid_classes = cleanup_class.valid_classes.map(&:name)
orphaned_locks = find_locks(class: valid_classes, exclude_owner_id: valid_owner_ids)
valid_owner_ids = cleanup_class.valid_owner_ids(self)
orphaned_locks.each do |lock|
unless valid_owner_ids.include?(lock.owner_id)
release(lock)
ret << lock
end
end
end
return ret
end
|
#create_record(record) ⇒ Object
375
376
377
378
|
# File 'lib/dynflow/coordinator.rb', line 375
def create_record(record)
Type! record, Record
adapter.create_record(record)
end
|
#deactivate_world(world) ⇒ Object
417
418
419
420
421
|
# File 'lib/dynflow/coordinator.rb', line 417
def deactivate_world(world)
Type! world, Coordinator::ExecutorWorld
world.active = false
update_record(world)
end
|
#delete_record(record) ⇒ Object
385
386
387
388
|
# File 'lib/dynflow/coordinator.rb', line 385
def delete_record(record)
Type! record, Record
adapter.delete_record(record)
end
|
#delete_world(world, on_termination = false) ⇒ Object
411
412
413
414
415
|
# File 'lib/dynflow/coordinator.rb', line 411
def delete_world(world, on_termination = false)
Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
release_by_owner("world:#{world.id}", on_termination)
delete_record(world)
end
|
#find_locks(filter_options) ⇒ Object
369
370
371
372
373
|
# File 'lib/dynflow/coordinator.rb', line 369
def find_locks(filter_options)
adapter.find_records(filter_options).map do |lock_data|
Lock.from_hash(lock_data)
end
end
|
#find_records(filter) ⇒ Object
390
391
392
393
394
|
# File 'lib/dynflow/coordinator.rb', line 390
def find_records(filter)
adapter.find_records(filter).map do |record_data|
Record.from_hash(record_data)
end
end
|
#find_worlds(active_executor_only = false, filters = {}) ⇒ Object
396
397
398
399
400
401
402
403
404
|
# File 'lib/dynflow/coordinator.rb', line 396
def find_worlds(active_executor_only = false, filters = {})
ret = find_records(filters.merge(class: Coordinator::ExecutorWorld.name))
if active_executor_only
ret = ret.select(&:active?)
else
ret.concat(find_records(filters.merge(class: Coordinator::ClientWorld.name)))
end
ret
end
|
#register_world(world) ⇒ Object
#release(lock) ⇒ Object
360
361
362
363
|
# File 'lib/dynflow/coordinator.rb', line 360
def release(lock)
Type! lock, Lock
adapter.delete_record(lock)
end
|
#release_by_owner(owner_id, on_termination = false) ⇒ Object
365
366
367
|
# File 'lib/dynflow/coordinator.rb', line 365
def release_by_owner(owner_id, on_termination = false)
find_locks(owner_id: owner_id).map { |lock| release(lock) if !on_termination || lock.unlock_on_shutdown? }
end
|
#update_record(record) ⇒ Object
380
381
382
383
|
# File 'lib/dynflow/coordinator.rb', line 380
def update_record(record)
Type! record, Record
adapter.update_record(record)
end
|