Module: OMQ::CLI::RactorHelpers
- Defined in:
- lib/omq/cli/ractor_helpers.rb
Overview
Shared Ractor infrastructure for parallel worker modes.
Constant Summary collapse
- SHUTDOWN =
Sentinel value sent through ports to signal consumer threads to exit. Port#close does not unblock a waiting #receive, so we must send an explicit shutdown marker.
:__omq_shutdown__
Class Method Summary collapse
-
.preresolve_tcp(endpoints) ⇒ Object
Resolves TCP hostnames to IP addresses so Ractors don’t touch Resolv::DefaultResolver (which is not shareable).
-
.start_log_consumer ⇒ Object
Starts a Ractor::Port and a consumer thread that drains log messages to stderr sequentially.
-
.start_output_consumer ⇒ Object
Starts a Ractor::Port and a consumer thread that drains formatted output to stdout sequentially.
-
.stop_consumer(port, thread) ⇒ Object
Sends the shutdown sentinel and joins the consumer thread.
Class Method Details
.preresolve_tcp(endpoints) ⇒ Object
Resolves TCP hostnames to IP addresses so Ractors don’t touch Resolv::DefaultResolver (which is not shareable).
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/omq/cli/ractor_helpers.rb', line 16 def self.preresolve_tcp(endpoints) endpoints.flat_map do |ep| url = ep.url if url.start_with?("tcp://") host, port = OMQ::Transport::TCP.parse_endpoint(url) Addrinfo.getaddrinfo(host, port, nil, :STREAM).map do |addr| ip = addr.ip_address ip = "[#{ip}]" if ip.include?(":") Endpoint.new("tcp://#{ip}:#{addr.ip_port}", ep.bind?) end else ep end end end |
.start_log_consumer ⇒ Object
Starts a Ractor::Port and a consumer thread that drains log messages to stderr sequentially. Returns [port, thread]. Send SHUTDOWN through the port to stop the consumer.
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/omq/cli/ractor_helpers.rb', line 37 def self.start_log_consumer port = Ractor::Port.new thread = Thread.new(port) do |p| loop do msg = p.receive break if msg.equal?(SHUTDOWN) $stderr.write("#{msg}\n") rescue Ractor::ClosedError break end end [port, thread] end |
.start_output_consumer ⇒ Object
Starts a Ractor::Port and a consumer thread that drains formatted output to stdout sequentially. Returns [port, thread]. Send SHUTDOWN through the port to stop the consumer.
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/omq/cli/ractor_helpers.rb', line 56 def self.start_output_consumer port = Ractor::Port.new thread = Thread.new(port) do |p| loop do msg = p.receive break if msg.equal?(SHUTDOWN) $stdout.write(msg) rescue Ractor::ClosedError break end end [port, thread] end |
.stop_consumer(port, thread) ⇒ Object
Sends the shutdown sentinel and joins the consumer thread.
73 74 75 76 77 78 |
# File 'lib/omq/cli/ractor_helpers.rb', line 73 def self.stop_consumer(port, thread) port.send(SHUTDOWN) thread.join rescue Ractor::ClosedError thread.join(1) end |