Class: Prosody::AsyncTaskProcessor
- Inherits:
-
Object
- Object
- Prosody::AsyncTaskProcessor
- Defined in:
- lib/prosody/processor.rb,
lib/prosody/native_stubs.rb
Overview
Internal processor for executing tasks asynchronously. This class is used internally by the native code.
Instance Method Summary collapse
-
#initialize(logger = Prosody.logger) ⇒ AsyncTaskProcessor
constructor
A new instance of AsyncTaskProcessor.
- #start ⇒ Object
- #stop ⇒ Object
- #submit(task_id, carrier, callback, &block) ⇒ Object
Constructor Details
#initialize(logger = Prosody.logger) ⇒ AsyncTaskProcessor
Returns a new instance of AsyncTaskProcessor.
112 113 114 115 116 117 |
# File 'lib/prosody/processor.rb', line 112 def initialize(logger = Prosody.logger) @logger = logger @command_queue = Queue.new @processing_thread = nil @tracer = nil end |
Instance Method Details
#start ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/prosody/processor.rb', line 124 def start return if running? @logger.debug("Starting async task processor") @processing_thread = Thread.new do # Initialize the tracer in the processing thread to keep # OpenTelemetry context within the same thread @tracer = OpenTelemetry.tracer_provider.tracer( "Prosody::AsyncTaskProcessor", Prosody::VERSION ) process_commands end end |
#stop ⇒ Object
143 144 145 146 147 148 |
# File 'lib/prosody/processor.rb', line 143 def stop return unless running? @logger.debug("Stopping async task processor") @command_queue.push(Commands::Shutdown.new) end |
#submit(task_id, carrier, callback, &block) ⇒ Object
158 159 160 161 162 163 164 |
# File 'lib/prosody/processor.rb', line 158 def submit(task_id, carrier, event_context, callback, &task_block) token = CancellationToken.new @command_queue.push( Commands::Execute.new(task_id, carrier, event_context.transform_keys(&:to_sym), task_block, callback, token) ) token end |