Class: Mycel::RPC::Peer

Inherits:
Object
  • Object
show all
Defined in:
lib/mycel.rb

Instance Method Summary collapse

Constructor Details

#initialize(hub, config = {}) ⇒ Peer

Returns a new instance of Peer.



1164
1165
1166
1167
1168
1169
1170
1171
# File 'lib/mycel.rb', line 1164

def initialize(hub, config = {})
  @hub = hub
  @method_registry  = {}
  @service_registry = {}
  @config = config
  @response_timeout = config[:timeout] || 30
  setup_session_handlers
end

Instance Method Details

#broadcast_method(method_name, *params, session_ids: nil, timeout: 10) ⇒ Object



1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
# File 'lib/mycel.rb', line 1272

def broadcast_method(method_name, *params, session_ids: nil, timeout: 10)
  ids = session_ids || @hub.sessions.keys
  ids.map { |id|
    begin
      { session_id: id, status: 'success', result: call(id, method_name, *params, timeout: timeout) }
    rescue => e
      { session_id: id, status: 'error', error: e.message }
    end
  }
end

#call(session_id, method_name, *params, timeout: nil) ⇒ Object



1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
# File 'lib/mycel.rb', line 1186

def call(session_id, method_name, *params, timeout: nil)
  timeout ||= @response_timeout

  payload = {
    "method"    => method_name.to_s,
    "params"    => params,
    "timestamp" => Time.now.to_f
  }

  session = get_session(session_id)
  cmd = Mycel::Channel::Command.new(session)

  response_queue = Queue.new
  cmd.on(:response) { |p| response_queue.enq(p) }
  cmd.on(:abort) {
    response_queue.enq({
      "status" => "error",
      "error"  => { "class" => "ConnectionError", "message" => "Method call was aborted" }
    })
  }

  cmd.request(payload)

  begin
    Timeout.timeout(timeout) do
      response = response_queue.deq
      handle_response(response)
    end
  rescue ::Timeout::Error
    raise TimeoutError, "Method call timed out after #{timeout} seconds"
  end
end

#call_async(session_id, method_name, *params, timeout: nil, &callback) ⇒ Object



1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
# File 'lib/mycel.rb', line 1219

def call_async(session_id, method_name, *params, timeout: nil, &callback)
  timeout ||= @response_timeout

  payload = {
    "method"    => method_name.to_s,
    "params"    => params,
    "timestamp" => Time.now.to_f
  }

  session = get_session(session_id)
  cmd = Mycel::Channel::Command.new(session)

  # Single-shot finaliser shared by response / abort / timeout. The
  # ConditionVariable lets the timeout thread wake up the moment the
  # call resolves, so a fast call doesn't strand a thread sleeping for
  # the full timeout window.
  finalised = false
  finalise_lock = Monitor.new
  finalise_cond = finalise_lock.new_cond
  finalise = ->(err, res) {
    fired = false
    finalise_lock.synchronize {
      unless finalised
        finalised = true
        fired = true
        finalise_cond.signal
      end
    }
    callback&.call(err, res) if fired
  }

  cmd.on(:response) { |p|
    begin
      finalise.call(nil, handle_response(p))
    rescue => e
      finalise.call(e, nil)
    end
  }
  cmd.on(:abort) {
    finalise.call(ConnectionError.new("Method call was aborted"), nil)
  }

  Thread.new {
    finalise_lock.synchronize {
      finalise_cond.wait(timeout) unless finalised
    }
    finalise.call(TimeoutError.new("Method call timed out after #{timeout} seconds"), nil)
  }

  cmd.request(payload)
  cmd
end

#register_method(name, &block) ⇒ Object

Method / service names are stored and transmitted as Strings so the wire form (always String after JSON.parse) lines up with the local registry. Symbol arguments are normalised here so callers can write whichever feels natural — ‘register_method(:add)` and `call(sid, :add)` work the same as their String forms.



1178
1179
1180
# File 'lib/mycel.rb', line 1178

def register_method(name, &block)
  @method_registry[name.to_s] = block
end

#register_service(name, service_instance) ⇒ Object



1182
1183
1184
# File 'lib/mycel.rb', line 1182

def register_service(name, service_instance)
  @service_registry[name.to_s] = service_instance
end