Class: JRPC::SharedClient
- Inherits:
-
Object
- Object
- JRPC::SharedClient
- Defined in:
- lib/jrpc/shared_client.rb,
lib/jrpc/shared_client/ticket.rb,
lib/jrpc/shared_client/registry.rb,
lib/jrpc/shared_client/outbound_queue.rb,
lib/jrpc/shared_client/transport_loop.rb
Defined Under Namespace
Classes: OutboundQueue, Registry, Ticket, TransportLoop
Constant Summary collapse
- WAIT_GRACE =
caller-side backstop beyond the loop-enforced ttl
1.0
Instance Attribute Summary collapse
-
#server ⇒ Object
readonly
Returns the value of attribute server.
Instance Method Summary collapse
- #close(timeout: 5) ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(server, **options) ⇒ SharedClient
constructor
A new instance of SharedClient.
- #notification(method, params = nil, ttl: @default_ttl, fire_and_forget: false) ⇒ Object
- #request(method, params = nil, ttl: @default_ttl) ⇒ Object
Constructor Details
#initialize(server, **options) ⇒ SharedClient
Returns a new instance of SharedClient.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/jrpc/shared_client.rb', line 7 def initialize(server, **) @server = server @write_timeout = .fetch(:write_timeout, 5) @default_ttl = .fetch(:default_ttl, 30) @reap_timeout = .fetch(:reap_timeout, nil) @max_queue_size = .fetch(:max_queue_size, 10_000) @logger = .fetch(:logger, nil) # Single monotonic clock source shared with the transport loop. The # :clock option is an internal test seam (deterministic TTL specs); # callers should not need it. @clock = .fetch(:clock, nil) || -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) } if @write_timeout && @default_ttl && @write_timeout >= @default_ttl raise ArgumentError, "write_timeout (#{@write_timeout}) must be < default_ttl (#{@default_ttl})" end @transport = .fetch(:transport) do Transport.build(server, **) end @id_gen = .fetch(:id_gen) do IdGenerator.new(prefix: .fetch(:id_prefix, nil), thread_safe: true) end @registry = SharedClient::Registry.new @outbound_queue = SharedClient::OutboundQueue.new(capacity: @max_queue_size) @wake_pipe_reader, @wake_pipe_writer = IO.pipe @lifecycle_mutex = Mutex.new # :running -> :closing -> :closed (user-initiated close) # :running -> :dead (transport thread crashed) @status = :running shutdown_check = -> { @lifecycle_mutex.synchronize { @status == :closing } } @transport_loop = SharedClient::TransportLoop.new( transport: @transport, registry: @registry, outbound_queue: @outbound_queue, wake_pipe_reader: @wake_pipe_reader, write_timeout: @write_timeout, reap_timeout: @reap_timeout, logger: @logger, shutdown_check: shutdown_check, clock: @clock ) @transport_thread = Thread.new do @transport_loop.run do |err| @lifecycle_mutex.synchronize { @status = :dead } drain_all(err) end end @transport_thread.abort_on_exception = false end |
Instance Attribute Details
#server ⇒ Object (readonly)
Returns the value of attribute server.
5 6 7 |
# File 'lib/jrpc/shared_client.rb', line 5 def server @server end |
Instance Method Details
#close(timeout: 5) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/jrpc/shared_client.rb', line 91 def close(timeout: 5) @lifecycle_mutex.synchronize do return true if @status == :closed @status = :closing end wake_transport joined = @transport_thread.join(timeout) unless joined begin @transport.close rescue StandardError nil end @transport_thread.join(1.0) @transport_thread.kill if @transport_thread.alive? @transport_thread.join # Forced kill: the loop never reached its graceful drain, so fail # whatever was still in flight. (A graceful exit drained itself; a # crash drained via on_crash. reject is idempotent in every case.) drain_all(Errors::ConnectionError.new('client force-closed')) end # Safe now that the transport thread is guaranteed dead. begin @wake_pipe_writer.close rescue StandardError nil end begin @wake_pipe_reader.close rescue StandardError nil end @lifecycle_mutex.synchronize { @status = :closed } !joined.nil? end |
#closed? ⇒ Boolean
133 134 135 |
# File 'lib/jrpc/shared_client.rb', line 133 def closed? @lifecycle_mutex.synchronize { @status == :closed } end |
#notification(method, params = nil, ttl: @default_ttl, fire_and_forget: false) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/jrpc/shared_client.rb', line 76 def notification(method, params = nil, ttl: @default_ttl, fire_and_forget: false) ticket = Ticket.new( id: nil, payload: Message.dump(Message.build_notification(method, params)), thread: fire_and_forget ? nil : Thread.current, expires_at: ttl ? clock_now + ttl : nil ) enqueue!(ticket) return nil if fire_and_forget await(ticket, ttl) nil end |
#request(method, params = nil, ttl: @default_ttl) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/jrpc/shared_client.rb', line 63 def request(method, params = nil, ttl: @default_ttl) id = @id_gen.next ticket = Ticket.new( id: id, payload: Message.dump(Message.build_request(method, params, id)), thread: Thread.current, expires_at: ttl ? clock_now + ttl : nil ) enqueue!(ticket) await(ticket, ttl) end |