Class: Pulsar::Internal::Connection
- Inherits:
-
Object
- Object
- Pulsar::Internal::Connection
- Defined in:
- lib/pulsar/internal/connection.rb
Overview
Owns a broker socket, request correlation, and reader thread.
Constant Summary collapse
- PROTOCOL_VERSION =
21
Instance Attribute Summary collapse
-
#max_message_size ⇒ Object
readonly
Returns the value of attribute max_message_size.
-
#protocol_version ⇒ Object
readonly
Returns the value of attribute protocol_version.
-
#server_version ⇒ Object
readonly
Returns the value of attribute server_version.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #connect ⇒ Object
- #connected? ⇒ Boolean
-
#initialize(transport:, operation_timeout:, client_version:) ⇒ Connection
constructor
A new instance of Connection.
- #next_request_id ⇒ Object
- #read_frame(timeout: @operation_timeout) ⇒ Object
- #register_consumer(consumer_id, consumer) ⇒ Object
- #request(command, timeout: @operation_timeout) ⇒ Object
- #send_message(command, metadata, payload, timeout: @operation_timeout) ⇒ Object
- #unregister_consumer(consumer_id) ⇒ Object
- #write_command(command) ⇒ Object
Constructor Details
#initialize(transport:, operation_timeout:, client_version:) ⇒ Connection
Returns a new instance of Connection.
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/pulsar/internal/connection.rb', line 20 def initialize(transport:, operation_timeout:, client_version:) @transport = transport @operation_timeout = operation_timeout @client_version = client_version @connected = false @closed = false @state_mutex = Mutex.new @write_mutex = Mutex.new @request_id = 0 @pending_requests = {} @pending_sends = {} @consumers = {} end |
Instance Attribute Details
#max_message_size ⇒ Object (readonly)
Returns the value of attribute max_message_size.
9 10 11 |
# File 'lib/pulsar/internal/connection.rb', line 9 def @max_message_size end |
#protocol_version ⇒ Object (readonly)
Returns the value of attribute protocol_version.
9 10 11 |
# File 'lib/pulsar/internal/connection.rb', line 9 def protocol_version @protocol_version end |
#server_version ⇒ Object (readonly)
Returns the value of attribute server_version.
9 10 11 |
# File 'lib/pulsar/internal/connection.rb', line 9 def server_version @server_version end |
Class Method Details
.connect(host:, port:, connection_timeout:, operation_timeout:, client_version:) ⇒ Object
11 12 13 14 15 16 17 18 |
# File 'lib/pulsar/internal/connection.rb', line 11 def self.connect(host:, port:, connection_timeout:, operation_timeout:, client_version:) transport = TcpTransport.connect(host: host, port: port, connection_timeout: connection_timeout) new( transport: transport, operation_timeout: operation_timeout, client_version: client_version ).tap(&:connect) end |
Instance Method Details
#close ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/pulsar/internal/connection.rb', line 54 def close return nil if closed? @closed = true @connected = false @transport.close reject_pending(ClosedError.new('connection is closed')) @reader_thread&.join unless Thread.current == @reader_thread nil end |
#closed? ⇒ Boolean
65 66 67 |
# File 'lib/pulsar/internal/connection.rb', line 65 def closed? @closed end |
#connect ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/pulsar/internal/connection.rb', line 34 def connect write_connect_command command = read_command(timeout: @operation_timeout) raise ProtocolError, "expected CONNECTED response, got #{command.type}" unless command.type == :CONNECTED @server_version = command.connected.server_version @protocol_version = command.connected.protocol_version @max_message_size = command.connected. @connected = true start_reader_thread self rescue Error close raise end |
#connected? ⇒ Boolean
50 51 52 |
# File 'lib/pulsar/internal/connection.rb', line 50 def connected? @connected && !closed? end |
#next_request_id ⇒ Object
69 70 71 72 73 |
# File 'lib/pulsar/internal/connection.rb', line 69 def next_request_id @state_mutex.synchronize do @request_id += 1 end end |
#read_frame(timeout: @operation_timeout) ⇒ Object
119 120 121 122 123 |
# File 'lib/pulsar/internal/connection.rb', line 119 def read_frame(timeout: @operation_timeout) ensure_connected! read_decoded_frame(timeout: timeout) end |
#register_consumer(consumer_id, consumer) ⇒ Object
109 110 111 112 |
# File 'lib/pulsar/internal/connection.rb', line 109 def register_consumer(consumer_id, consumer) @state_mutex.synchronize { @consumers[consumer_id] = consumer } nil end |
#request(command, timeout: @operation_timeout) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/pulsar/internal/connection.rb', line 75 def request(command, timeout: @operation_timeout) ensure_connected! promise = Promise.new request_id = request_id_for(command) add_pending_request(request_id, promise) begin write_frame(FrameCodec.encode_command(command)) promise.wait(timeout: timeout) ensure remove_pending_request(request_id) end end |
#send_message(command, metadata, payload, timeout: @operation_timeout) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/pulsar/internal/connection.rb', line 89 def (command, , payload, timeout: @operation_timeout) ensure_connected! promise = Promise.new send_key = [command['send'].producer_id, command['send'].sequence_id] add_pending_send(send_key, promise) begin write_frame(FrameCodec.(command, , payload)) promise.wait(timeout: timeout) ensure remove_pending_send(send_key) end end |
#unregister_consumer(consumer_id) ⇒ Object
114 115 116 117 |
# File 'lib/pulsar/internal/connection.rb', line 114 def unregister_consumer(consumer_id) @state_mutex.synchronize { @consumers.delete(consumer_id) } nil end |
#write_command(command) ⇒ Object
103 104 105 106 107 |
# File 'lib/pulsar/internal/connection.rb', line 103 def write_command(command) ensure_connected! write_frame(FrameCodec.encode_command(command)) end |