Class: Pulsar::Internal::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/internal/connection.rb

Overview

Owns a broker socket, request correlation, and reader thread.

Constant Summary collapse

PROTOCOL_VERSION =
21

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport:, operation_timeout:, client_version:) ⇒ Connection

Returns a new instance of Connection.



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/pulsar/internal/connection.rb', line 20

def initialize(transport:, operation_timeout:, client_version:)
  @transport = transport
  @operation_timeout = operation_timeout
  @client_version = client_version
  @connected = false
  @closed = false
  @state_mutex = Mutex.new
  @write_mutex = Mutex.new
  @request_id = 0
  @pending_requests = {}
  @pending_sends = {}
  @consumers = {}
end

Instance Attribute Details

#max_message_sizeObject (readonly)

Returns the value of attribute max_message_size.



9
10
11
# File 'lib/pulsar/internal/connection.rb', line 9

def max_message_size
  @max_message_size
end

#protocol_versionObject (readonly)

Returns the value of attribute protocol_version.



9
10
11
# File 'lib/pulsar/internal/connection.rb', line 9

def protocol_version
  @protocol_version
end

#server_versionObject (readonly)

Returns the value of attribute server_version.



9
10
11
# File 'lib/pulsar/internal/connection.rb', line 9

def server_version
  @server_version
end

Class Method Details

.connect(host:, port:, connection_timeout:, operation_timeout:, client_version:) ⇒ Object



11
12
13
14
15
16
17
18
# File 'lib/pulsar/internal/connection.rb', line 11

def self.connect(host:, port:, connection_timeout:, operation_timeout:, client_version:)
  transport = TcpTransport.connect(host: host, port: port, connection_timeout: connection_timeout)
  new(
    transport: transport,
    operation_timeout: operation_timeout,
    client_version: client_version
  ).tap(&:connect)
end

Instance Method Details

#closeObject



54
55
56
57
58
59
60
61
62
63
# File 'lib/pulsar/internal/connection.rb', line 54

def close
  return nil if closed?

  @closed = true
  @connected = false
  @transport.close
  reject_pending(ClosedError.new('connection is closed'))
  @reader_thread&.join unless Thread.current == @reader_thread
  nil
end

#closed?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/pulsar/internal/connection.rb', line 65

def closed?
  @closed
end

#connectObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/pulsar/internal/connection.rb', line 34

def connect
  write_connect_command
  command = read_command(timeout: @operation_timeout)
  raise ProtocolError, "expected CONNECTED response, got #{command.type}" unless command.type == :CONNECTED

  @server_version = command.connected.server_version
  @protocol_version = command.connected.protocol_version
  @max_message_size = command.connected.max_message_size
  @connected = true
  start_reader_thread
  self
rescue Error
  close
  raise
end

#connected?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/pulsar/internal/connection.rb', line 50

def connected?
  @connected && !closed?
end

#next_request_idObject



69
70
71
72
73
# File 'lib/pulsar/internal/connection.rb', line 69

def next_request_id
  @state_mutex.synchronize do
    @request_id += 1
  end
end

#read_frame(timeout: @operation_timeout) ⇒ Object



119
120
121
122
123
# File 'lib/pulsar/internal/connection.rb', line 119

def read_frame(timeout: @operation_timeout)
  ensure_connected!

  read_decoded_frame(timeout: timeout)
end

#register_consumer(consumer_id, consumer) ⇒ Object



109
110
111
112
# File 'lib/pulsar/internal/connection.rb', line 109

def register_consumer(consumer_id, consumer)
  @state_mutex.synchronize { @consumers[consumer_id] = consumer }
  nil
end

#request(command, timeout: @operation_timeout) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pulsar/internal/connection.rb', line 75

def request(command, timeout: @operation_timeout)
  ensure_connected!
  promise = Promise.new
  request_id = request_id_for(command)
  add_pending_request(request_id, promise)

  begin
    write_frame(FrameCodec.encode_command(command))
    promise.wait(timeout: timeout)
  ensure
    remove_pending_request(request_id)
  end
end

#send_message(command, metadata, payload, timeout: @operation_timeout) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/pulsar/internal/connection.rb', line 89

def send_message(command, , payload, timeout: @operation_timeout)
  ensure_connected!
  promise = Promise.new
  send_key = [command['send'].producer_id, command['send'].sequence_id]
  add_pending_send(send_key, promise)

  begin
    write_frame(FrameCodec.encode_message(command, , payload))
    promise.wait(timeout: timeout)
  ensure
    remove_pending_send(send_key)
  end
end

#unregister_consumer(consumer_id) ⇒ Object



114
115
116
117
# File 'lib/pulsar/internal/connection.rb', line 114

def unregister_consumer(consumer_id)
  @state_mutex.synchronize { @consumers.delete(consumer_id) }
  nil
end

#write_command(command) ⇒ Object



103
104
105
106
107
# File 'lib/pulsar/internal/connection.rb', line 103

def write_command(command)
  ensure_connected!

  write_frame(FrameCodec.encode_command(command))
end