Module: Conductor::Worker::WorkerMixin

Defined in:
lib/conductor/worker/worker.rb

Overview

Mixin module for class-based workers Include this in your worker class to use the worker_task DSL

Examples:

class MyWorker
  include Conductor::Worker::WorkerMixin

  worker_task 'my_task', poll_interval: 200, thread_count: 5

  def execute(task)
    { result: 'success' }
  end
end

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



229
230
231
232
233
234
235
236
# File 'lib/conductor/worker/worker.rb', line 229

def self.included(base)
  base.extend(ClassMethods)
  base.class_eval do
    attr_accessor :poll_interval, :thread_count, :domain, :worker_id,
                  :poll_timeout, :register_task_def, :overwrite_task_def,
                  :strict_schema, :paused, :isolation, :executor
  end
end

Instance Method Details

#execute(task) ⇒ TaskResult, ...

Execute the task - must be overridden by worker class

Parameters:

  • task (Task)

    The task to execute

Returns:

  • (TaskResult, Hash, Boolean, nil)

    Execution result

Raises:

  • (NotImplementedError)


278
279
280
# File 'lib/conductor/worker/worker.rb', line 278

def execute(task)
  raise NotImplementedError, 'Worker must implement #execute method'
end

#get_input(task, key, default = nil) ⇒ Object

Convenience method to get a specific input value

Parameters:

  • task (Task)

    The task

  • key (String, Symbol)

    The input key

  • default (Object) (defaults to: nil)

    Default value if key not found

Returns:

  • (Object)

    The input value



300
301
302
# File 'lib/conductor/worker/worker.rb', line 300

def get_input(task, key, default = nil)
  get_input_data(task)[key.to_s] || default
end

#get_input_data(task) ⇒ Hash

Convenience method to get task input data

Parameters:

  • task (Task)

    The task

Returns:

  • (Hash)

    Input data



291
292
293
# File 'lib/conductor/worker/worker.rb', line 291

def get_input_data(task)
  task.input_data || {}
end

#polling_interval_secondsFloat

Get polling interval in seconds

Returns:

  • (Float)


284
285
286
# File 'lib/conductor/worker/worker.rb', line 284

def polling_interval_seconds
  (self.class.poll_interval || Worker::DEFAULTS[:poll_interval]) / 1000.0
end