Class: Dynflow::Coordinator

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/coordinator.rb

Defined Under Namespace

Classes: AutoExecuteLock, ClientWorld, DelayedExecutorLock, DuplicateRecordError, ExecutionInhibitionLock, ExecutionLock, ExecutionPlanCleanerLock, ExecutorWorld, Lock, LockByWorld, LockError, PlanningLock, Record, SingletonActionLock, WorldInvalidationLock, WorldRecord

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(coordinator_adapter) ⇒ Coordinator

Returns a new instance of Coordinator.



337
338
339
# File 'lib/dynflow/coordinator.rb', line 337

def initialize(coordinator_adapter)
  @adapter = coordinator_adapter
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



335
336
337
# File 'lib/dynflow/coordinator.rb', line 335

def adapter
  @adapter
end

Instance Method Details

#acquire(lock, &block) ⇒ Object



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/dynflow/coordinator.rb', line 341

def acquire(lock, &block)
  Type! lock, Lock
  lock.validate!
  adapter.create_record(lock)
  if block
    begin
      block.call
    # We are looking for ::Sidekiq::Shutdown, but that may not be defined. We rely on it being a subclass of Interrupt
    # We don't really want to rescue it, but we need to bind it somehow so that we can check it in ensure
    rescue Interrupt => e
      raise e
    ensure
      release(lock) if !(defined?(::Sidekiq) && e.is_a?(::Sidekiq::Shutdown)) || lock.unlock_on_shutdown?
    end
  end
rescue DuplicateRecordError => e
  raise LockError.new(e.record)
end

#clean_orphaned_locksObject



423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
# File 'lib/dynflow/coordinator.rb', line 423

def clean_orphaned_locks
  cleanup_classes = [LockByWorld, SingletonActionLock]
  ret = []
  cleanup_classes.each do |cleanup_class|
    valid_owner_ids = cleanup_class.valid_owner_ids(self)
    valid_classes = cleanup_class.valid_classes.map(&:name)
    orphaned_locks = find_locks(class: valid_classes, exclude_owner_id: valid_owner_ids)
    # reloading the valid owner ids to avoid race conditions
    valid_owner_ids = cleanup_class.valid_owner_ids(self)
    orphaned_locks.each do |lock|
      unless valid_owner_ids.include?(lock.owner_id)
        release(lock)
        ret << lock
      end
    end
  end
  return ret
end

#create_record(record) ⇒ Object



375
376
377
378
# File 'lib/dynflow/coordinator.rb', line 375

def create_record(record)
  Type! record, Record
  adapter.create_record(record)
end

#deactivate_world(world) ⇒ Object



417
418
419
420
421
# File 'lib/dynflow/coordinator.rb', line 417

def deactivate_world(world)
  Type! world, Coordinator::ExecutorWorld
  world.active = false
  update_record(world)
end

#delete_record(record) ⇒ Object



385
386
387
388
# File 'lib/dynflow/coordinator.rb', line 385

def delete_record(record)
  Type! record, Record
  adapter.delete_record(record)
end

#delete_world(world, on_termination = false) ⇒ Object



411
412
413
414
415
# File 'lib/dynflow/coordinator.rb', line 411

def delete_world(world, on_termination = false)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  release_by_owner("world:#{world.id}", on_termination)
  delete_record(world)
end

#find_locks(filter_options) ⇒ Object



369
370
371
372
373
# File 'lib/dynflow/coordinator.rb', line 369

def find_locks(filter_options)
  adapter.find_records(filter_options).map do |lock_data|
    Lock.from_hash(lock_data)
  end
end

#find_records(filter) ⇒ Object



390
391
392
393
394
# File 'lib/dynflow/coordinator.rb', line 390

def find_records(filter)
  adapter.find_records(filter).map do |record_data|
    Record.from_hash(record_data)
  end
end

#find_worlds(active_executor_only = false, filters = {}) ⇒ Object



396
397
398
399
400
401
402
403
404
# File 'lib/dynflow/coordinator.rb', line 396

def find_worlds(active_executor_only = false, filters = {})
  ret = find_records(filters.merge(class: Coordinator::ExecutorWorld.name))
  if active_executor_only
    ret = ret.select(&:active?)
  else
    ret.concat(find_records(filters.merge(class: Coordinator::ClientWorld.name)))
  end
  ret
end

#register_world(world) ⇒ Object



406
407
408
409
# File 'lib/dynflow/coordinator.rb', line 406

def register_world(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  create_record(world)
end

#release(lock) ⇒ Object



360
361
362
363
# File 'lib/dynflow/coordinator.rb', line 360

def release(lock)
  Type! lock, Lock
  adapter.delete_record(lock)
end

#release_by_owner(owner_id, on_termination = false) ⇒ Object



365
366
367
# File 'lib/dynflow/coordinator.rb', line 365

def release_by_owner(owner_id, on_termination = false)
  find_locks(owner_id: owner_id).map { |lock| release(lock) if !on_termination || lock.unlock_on_shutdown? }
end

#update_record(record) ⇒ Object



380
381
382
383
# File 'lib/dynflow/coordinator.rb', line 380

def update_record(record)
  Type! record, Record
  adapter.update_record(record)
end