Module: OMQ::CLI::RactorHelpers

Defined in:
lib/omq/cli/ractor_helpers.rb

Overview

Shared Ractor infrastructure for parallel worker modes.

Constant Summary collapse

SHUTDOWN =

Sentinel value sent through ports to signal consumer threads to exit. Port#close does not unblock a waiting #receive, so we must send an explicit shutdown marker.

:__omq_shutdown__

Class Method Summary collapse

Class Method Details

.preresolve_tcp(endpoints) ⇒ Object

Resolves TCP hostnames to IP addresses so Ractors don’t touch Resolv::DefaultResolver (which is not shareable).



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/omq/cli/ractor_helpers.rb', line 16

def self.preresolve_tcp(endpoints)
  endpoints.flat_map do |ep|
    url = ep.url
    if url.start_with?("tcp://")
      host, port = OMQ::Transport::TCP.parse_endpoint(url)
      Addrinfo.getaddrinfo(host, port, nil, :STREAM).map do |addr|
        ip = addr.ip_address
        ip = "[#{ip}]" if ip.include?(":")
        Endpoint.new("tcp://#{ip}:#{addr.ip_port}", ep.bind?)
      end
    else
      ep
    end
  end
end

.start_log_consumerObject

Starts a Ractor::Port and a consumer thread that drains log messages to stderr sequentially. Returns [port, thread]. Send SHUTDOWN through the port to stop the consumer.



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/omq/cli/ractor_helpers.rb', line 37

def self.start_log_consumer
  port = Ractor::Port.new
  thread = Thread.new(port) do |p|
    loop do
      msg = p.receive
      break if msg.equal?(SHUTDOWN)
      $stderr.write("#{msg}\n")
    rescue Ractor::ClosedError
      break
    end
  end
  [port, thread]
end

.start_output_consumerObject

Starts a Ractor::Port and a consumer thread that drains formatted output to stdout sequentially. Returns [port, thread]. Send SHUTDOWN through the port to stop the consumer.



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/omq/cli/ractor_helpers.rb', line 56

def self.start_output_consumer
  port = Ractor::Port.new
  thread = Thread.new(port) do |p|
    loop do
      msg = p.receive
      break if msg.equal?(SHUTDOWN)
      $stdout.write(msg)
    rescue Ractor::ClosedError
      break
    end
  end
  [port, thread]
end

.stop_consumer(port, thread) ⇒ Object

Sends the shutdown sentinel and joins the consumer thread.



73
74
75
76
77
78
# File 'lib/omq/cli/ractor_helpers.rb', line 73

def self.stop_consumer(port, thread)
  port.send(SHUTDOWN)
  thread.join
rescue Ractor::ClosedError
  thread.join(1)
end