Class: Mycel::RPC::Peer

Inherits:
Object
  • Object
show all
Defined in:
lib/mycel.rb

Instance Method Summary collapse

Constructor Details

#initialize(hub, config = {}) ⇒ Peer

Returns a new instance of Peer.



1115
1116
1117
1118
1119
1120
1121
1122
# File 'lib/mycel.rb', line 1115

def initialize(hub, config = {})
  @hub = hub
  @method_registry  = {}
  @service_registry = {}
  @config = config
  @response_timeout = config[:timeout] || 30
  setup_session_handlers
end

Instance Method Details

#broadcast_method(method_name, *params, session_ids: nil, timeout: 10) ⇒ Object



1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
# File 'lib/mycel.rb', line 1223

def broadcast_method(method_name, *params, session_ids: nil, timeout: 10)
  ids = session_ids || @hub.sessions.keys
  ids.map { |id|
    begin
      { session_id: id, status: 'success', result: call(id, method_name, *params, timeout: timeout) }
    rescue => e
      { session_id: id, status: 'error', error: e.message }
    end
  }
end

#call(session_id, method_name, *params, timeout: nil) ⇒ Object



1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
# File 'lib/mycel.rb', line 1137

def call(session_id, method_name, *params, timeout: nil)
  timeout ||= @response_timeout

  payload = {
    "method"    => method_name.to_s,
    "params"    => params,
    "timestamp" => Time.now.to_f
  }

  session = get_session(session_id)
  cmd = Mycel::Channel::Command.new(session)

  response_queue = Queue.new
  cmd.on(:response) { |p| response_queue.enq(p) }
  cmd.on(:abort) {
    response_queue.enq({
      "status" => "error",
      "error"  => { "class" => "ConnectionError", "message" => "Method call was aborted" }
    })
  }

  cmd.request(payload)

  begin
    Timeout.timeout(timeout) do
      response = response_queue.deq
      handle_response(response)
    end
  rescue ::Timeout::Error
    raise TimeoutError, "Method call timed out after #{timeout} seconds"
  end
end

#call_async(session_id, method_name, *params, timeout: nil, &callback) ⇒ Object



1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
# File 'lib/mycel.rb', line 1170

def call_async(session_id, method_name, *params, timeout: nil, &callback)
  timeout ||= @response_timeout

  payload = {
    "method"    => method_name.to_s,
    "params"    => params,
    "timestamp" => Time.now.to_f
  }

  session = get_session(session_id)
  cmd = Mycel::Channel::Command.new(session)

  # Single-shot finaliser shared by response / abort / timeout. The
  # ConditionVariable lets the timeout thread wake up the moment the
  # call resolves, so a fast call doesn't strand a thread sleeping for
  # the full timeout window.
  finalised = false
  finalise_lock = Monitor.new
  finalise_cond = finalise_lock.new_cond
  finalise = ->(err, res) {
    fired = false
    finalise_lock.synchronize {
      unless finalised
        finalised = true
        fired = true
        finalise_cond.signal
      end
    }
    callback&.call(err, res) if fired
  }

  cmd.on(:response) { |p|
    begin
      finalise.call(nil, handle_response(p))
    rescue => e
      finalise.call(e, nil)
    end
  }
  cmd.on(:abort) {
    finalise.call(ConnectionError.new("Method call was aborted"), nil)
  }

  Thread.new {
    finalise_lock.synchronize {
      finalise_cond.wait(timeout) unless finalised
    }
    finalise.call(TimeoutError.new("Method call timed out after #{timeout} seconds"), nil)
  }

  cmd.request(payload)
  cmd
end

#register_method(name, &block) ⇒ Object

Method / service names are stored and transmitted as Strings so the wire form (always String after JSON.parse) lines up with the local registry. Symbol arguments are normalised here so callers can write whichever feels natural — ‘register_method(:add)` and `call(sid, :add)` work the same as their String forms.



1129
1130
1131
# File 'lib/mycel.rb', line 1129

def register_method(name, &block)
  @method_registry[name.to_s] = block
end

#register_service(name, service_instance) ⇒ Object



1133
1134
1135
# File 'lib/mycel.rb', line 1133

def register_service(name, service_instance)
  @service_registry[name.to_s] = service_instance
end