Class: Prosody::AsyncTaskProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/prosody/processor.rb,
lib/prosody/native_stubs.rb

Overview

Internal processor for executing tasks asynchronously. This class is used internally by the native code.

Instance Method Summary collapse

Constructor Details

#initialize(logger = Prosody.logger) ⇒ AsyncTaskProcessor

Returns a new instance of AsyncTaskProcessor.



112
113
114
115
116
117
# File 'lib/prosody/processor.rb', line 112

def initialize(logger = Prosody.logger)
  @logger = logger
  @command_queue = Queue.new
  @processing_thread = nil
  @tracer = nil
end

Instance Method Details

#startObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/prosody/processor.rb', line 124

def start
  return if running?

  @logger.debug("Starting async task processor")
  @processing_thread = Thread.new do
    # Initialize the tracer in the processing thread to keep
    # OpenTelemetry context within the same thread
    @tracer = OpenTelemetry.tracer_provider.tracer(
      "Prosody::AsyncTaskProcessor",
      Prosody::VERSION
    )
    process_commands
  end
end

#stopObject



143
144
145
146
147
148
# File 'lib/prosody/processor.rb', line 143

def stop
  return unless running?

  @logger.debug("Stopping async task processor")
  @command_queue.push(Commands::Shutdown.new)
end

#submit(task_id, carrier, callback, &block) ⇒ Object



158
159
160
161
162
163
164
# File 'lib/prosody/processor.rb', line 158

def submit(task_id, carrier, event_context, callback, &task_block)
  token = CancellationToken.new
  @command_queue.push(
    Commands::Execute.new(task_id, carrier, event_context.transform_keys(&:to_sym), task_block, callback, token)
  )
  token
end