Class: Dynflow::Coordinator
  
  
  
  
  
    - Inherits:
 
    - 
      Object
      
        
          - Object
 
          
            - Dynflow::Coordinator
 
          
        
        show all
      
     
  
  
  
  
  
  
  
      - Includes:
 
      - Algebrick::TypeCheck
 
  
  
  
  
  
  
    - Defined in:
 
    - lib/dynflow/coordinator.rb
 
  
  
 
Defined Under Namespace
  
    
  
    
      Classes: AutoExecuteLock, ClientWorld, DelayedExecutorLock, DuplicateRecordError, ExecutionInhibitionLock, ExecutionLock, ExecutionPlanCleanerLock, ExecutorWorld, Lock, LockByWorld, LockError, PlanningLock, Record, SingletonActionLock, WorldInvalidationLock, WorldRecord
    
  
  Instance Attribute Summary collapse
  
  
    
      Instance Method Summary
      collapse
    
    
  
  
  Constructor Details
  
    
  
  
    #initialize(coordinator_adapter)  ⇒ Coordinator 
  
  
  
  
    
Returns a new instance of Coordinator.
   
 
  
  
    
      
337
338
339 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 337
def initialize(coordinator_adapter)
  @adapter = coordinator_adapter
end 
     | 
  
 
  
 
  
    Instance Attribute Details
    
      
      
      
  
  
    #adapter  ⇒ Object  
  
  
  
  
    
Returns the value of attribute adapter.
   
 
  
  
    
      
335
336
337 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 335
def adapter
  @adapter
end 
     | 
  
 
    
   
  
    Instance Method Details
    
      
  
  
    #acquire(lock, &block)  ⇒ Object 
  
  
  
  
    
      
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 341
def acquire(lock, &block)
  Type! lock, Lock
  lock.validate!
  adapter.create_record(lock)
  if block
    begin
      block.call
            rescue Interrupt => e
      raise e
    ensure
      release(lock) if !(defined?(::Sidekiq) && e.is_a?(::Sidekiq::Shutdown)) || lock.unlock_on_shutdown?
    end
  end
rescue DuplicateRecordError => e
  raise LockError.new(e.record)
end
     | 
  
 
    
      
  
  
    #clean_orphaned_locks  ⇒ Object 
  
  
  
  
    
      
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 423
def clean_orphaned_locks
  cleanup_classes = [LockByWorld, SingletonActionLock]
  ret = []
  cleanup_classes.each do |cleanup_class|
    valid_owner_ids = cleanup_class.valid_owner_ids(self)
    valid_classes = cleanup_class.valid_classes.map(&:name)
    orphaned_locks = find_locks(class: valid_classes, exclude_owner_id: valid_owner_ids)
        valid_owner_ids = cleanup_class.valid_owner_ids(self)
    orphaned_locks.each do |lock|
      unless valid_owner_ids.include?(lock.owner_id)
        release(lock)
        ret << lock
      end
    end
  end
  return ret
end
     | 
  
 
    
      
  
  
    #create_record(record)  ⇒ Object 
  
  
  
  
    
      
375
376
377
378 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 375
def create_record(record)
  Type! record, Record
  adapter.create_record(record)
end 
     | 
  
 
    
      
  
  
    #deactivate_world(world)  ⇒ Object 
  
  
  
  
    
      
417
418
419
420
421 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 417
def deactivate_world(world)
  Type! world, Coordinator::ExecutorWorld
  world.active = false
  update_record(world)
end 
     | 
  
 
    
      
  
  
    #delete_record(record)  ⇒ Object 
  
  
  
  
    
      
385
386
387
388 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 385
def delete_record(record)
  Type! record, Record
  adapter.delete_record(record)
end 
     | 
  
 
    
      
  
  
    #delete_world(world, on_termination = false)  ⇒ Object 
  
  
  
  
    
      
411
412
413
414
415 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 411
def delete_world(world, on_termination = false)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld
  release_by_owner("world:#{world.id}", on_termination)
  delete_record(world)
end
     | 
  
 
    
      
  
  
    #find_locks(filter_options)  ⇒ Object 
  
  
  
  
    
      
369
370
371
372
373 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 369
def find_locks(filter_options)
  adapter.find_records(filter_options).map do |lock_data|
    Lock.from_hash(lock_data)
  end
end
     | 
  
 
    
      
  
  
    #find_records(filter)  ⇒ Object 
  
  
  
  
    
      
390
391
392
393
394 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 390
def find_records(filter)
  adapter.find_records(filter).map do |record_data|
    Record.from_hash(record_data)
  end
end
     | 
  
 
    
      
  
  
    #find_worlds(active_executor_only = false, filters = {})  ⇒ Object 
  
  
  
  
    
      
396
397
398
399
400
401
402
403
404 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 396
def find_worlds(active_executor_only = false, filters = {})
  ret = find_records(filters.merge(class: Coordinator::ExecutorWorld.name))
  if active_executor_only
    ret = ret.select(&:active?)
  else
    ret.concat(find_records(filters.merge(class: Coordinator::ClientWorld.name)))
  end
  ret
end
     | 
  
 
    
      
  
  
    #register_world(world)  ⇒ Object 
  
  
  
 
    
      
  
  
    #release(lock)  ⇒ Object 
  
  
  
  
    
      
360
361
362
363 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 360
def release(lock)
  Type! lock, Lock
  adapter.delete_record(lock)
end 
     | 
  
 
    
      
  
  
    #release_by_owner(owner_id, on_termination = false)  ⇒ Object 
  
  
  
  
    
      
365
366
367 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 365
def release_by_owner(owner_id, on_termination = false)
  find_locks(owner_id: owner_id).map { |lock| release(lock) if !on_termination || lock.unlock_on_shutdown? }
end
     | 
  
 
    
      
  
  
    #update_record(record)  ⇒ Object 
  
  
  
  
    
      
380
381
382
383 
     | 
    
      # File 'lib/dynflow/coordinator.rb', line 380
def update_record(record)
  Type! record, Record
  adapter.update_record(record)
end 
     |