Class: Freeswitch::ESL::Connection::CommandDispatcher

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/freeswitch/esl/connection/command_dispatcher.rb

Overview

Coordinates command execution: manages socket write, command queueing, and response routing.

Threading model:

* A *reader thread* reads messages from the socket and delivers them
  to waiting {CommandRequest} objects via FIFO queue ordering.
* Command writes and enqueues are atomic under a single mutex so that
  command order on wire matches queue order.
* Responses are matched to commands by simple FIFO order.
* Events (text/event-json) are forwarded to {EventDispatcher}.
* Server-initiated messages (auth/request) invoke callbacks.

Constant Summary collapse

MSG_TERMINATOR =
"\n\n"

Instance Method Summary collapse

Methods included from Logger

default_logger, #logger

Constructor Details

#initialize(socket, event_dispatcher, debug: false) ⇒ CommandDispatcher

Returns a new instance of CommandDispatcher.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 25

def initialize(socket, event_dispatcher, debug: false)
  @socket = socket
  @event_dispatcher = event_dispatcher
  @write_mutex = Mutex.new
  @cond = ConditionVariable.new
  @queue = Queue.new
  @on_disconnect = nil
  @reader_thread = nil
  @closed = false
  @auth_request_received = false
  @debug = debug
  @disconnect_error = nil
  @read_buffer = String.new
end

Instance Method Details

#closed?Boolean

Check if the dispatcher is closed.

Returns:

  • (Boolean)


41
42
43
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 41

def closed?
  @closed
end

#execute_command(command, timeout: nil) ⇒ CommandRequest

Send a raw command on ESL socket and return CommandRequest.

Parameters:

  • command (String)

    the ESL command to send

  • timeout (Numeric, nil) (defaults to: nil)

    seconds to wait for response

Returns:

Raises:



123
124
125
126
127
128
129
130
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 123

def execute_command(command, timeout: nil)
  raise DisconnectedError, "Connection is closed" if @closed

  cmd = CommandRequest.new(command, timeout)
  @write_mutex.synchronize { execute(cmd) }

  cmd
end

#on_disconnect(&block) ⇒ Object

Register a callback invoked after the dispatcher transitions to the disconnected state and pending commands have been notified.



47
48
49
50
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 47

def on_disconnect(&block)
  @on_disconnect = block
  self
end

#pending_commands_countObject

Return the number of commands currently waiting for a response.



133
134
135
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 133

def pending_commands_count
  @queue.size
end

#startObject

Start the socket reader thread and the periodic socket health check. Safe to call more than once: subsequent calls are ignored while the current reader thread reference is still present.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 55

def start
  return if @reader_thread

  @reader_thread = Thread.new do
    loop do
      break if closed?

      msg = read_message
      unless msg
        disconnect!(DisconnectedError.new("Connection closed by remote host"))
        break
      end

      route_message(msg)
    rescue IOError, Errno::ECONNRESET, Errno::ENOTCONN => e
      disconnect!(DisconnectedError.new(e.message))
      break
    end
  end
  @reader_thread.name = "esl-reader"
  @reader_thread.abort_on_exception = false
  @healthcheck_timer = Ztimer.every(1000) { check_socket_health }
end

#stopObject

Stop the dispatcher, cancel the healthcheck timer and broadcast to any waiters blocked on wait_for_auth_request.

Socket close failures during teardown are ignored because shutdown is already in progress and there is no recovery path.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 98

def stop
  return if closed?

  logger.debug "Stopping command dispatcher and closing socket"
  @closed = true
  @healthcheck_timer&.cancel!

  begin
    @socket.close
  rescue StandardError
    nil
  end
  @reader_thread&.join(0.1) if @reader_thread != Thread.current
  @queue.close
  @cond.broadcast # unblock wait_for_auth_request if still waiting
  logger.debug "Command dispatcher stopped"
end

#wait_for_auth_request(timeout: nil) ⇒ Object

Wait for the server-initiated auth/request message from FreeSWITCH. Raises TimeoutError if the message does not arrive within timeout seconds, or re-raises the disconnect error if the socket closes first.

Raises:

  • (@disconnect_error)


82
83
84
85
86
87
88
89
90
91
# File 'lib/freeswitch/esl/connection/command_dispatcher.rb', line 82

def wait_for_auth_request(timeout: nil)
  @write_mutex.synchronize do
    return if @auth_request_received

    @cond.wait(@write_mutex, timeout)
  end

  raise @disconnect_error if @closed && @disconnect_error
  raise TimeoutError, "Timed out waiting for auth/request" unless @auth_request_received
end