Class: Parse::Agent::MCPSubscriptions::LocalNotifier

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_subscriptions.rb

Overview

In-process Notifier. Routes a published notification straight to the listening-stream callback registered under the same session id. This is the single-process implementation; the contract it satisfies is the seam a clustered (e.g. Redis pub/sub) implementation slots into.

Contract (any Notifier must honor):

  • register(session_id) { |notification_hash| ... } — install the delivery callback for a listening stream. Replacing an existing registration is allowed (last writer wins).
  • unregister(session_id) — remove it. Idempotent.
  • publish(session_id, notification_hash) — deliver to the registered callback if one exists; a no-op (dropped) when no listener is attached. Returns whether a listener received it.

Thread-safety: publish may run on a LiveQuery dispatcher thread while register/unregister run on Rack I/O threads, so all three guard the listener table with a mutex. The delivery callback itself is invoked outside the lock so a slow consumer can't block registry mutation.

Instance Method Summary collapse

Constructor Details

#initializeLocalNotifier

Returns a new instance of LocalNotifier.



185
186
187
188
# File 'lib/parse/agent/mcp_subscriptions.rb', line 185

def initialize
  @listeners = {}
  @mutex     = Mutex.new
end

Instance Method Details

#listener?(session_id) ⇒ Boolean

Returns whether a listening stream is attached.

Returns:

  • (Boolean)

    whether a listening stream is attached.



210
211
212
# File 'lib/parse/agent/mcp_subscriptions.rb', line 210

def listener?(session_id)
  @mutex.synchronize { @listeners.key?(session_id) }
end

#publish(session_id, notification_hash) ⇒ Boolean

Returns true if a listener received the notification.

Returns:

  • (Boolean)

    true if a listener received the notification.



202
203
204
205
206
207
# File 'lib/parse/agent/mcp_subscriptions.rb', line 202

def publish(session_id, notification_hash)
  callback = @mutex.synchronize { @listeners[session_id] }
  return false unless callback
  callback.call(notification_hash)
  true
end

#register(session_id) {|notification_hash| ... } ⇒ Object

Yield Parameters:

  • notification_hash (Hash)

    the JSON-RPC notification.



191
192
193
194
# File 'lib/parse/agent/mcp_subscriptions.rb', line 191

def register(session_id, &callback)
  return if session_id.nil? || callback.nil?
  @mutex.synchronize { @listeners[session_id] = callback }
end

#unregister(session_id) ⇒ Object



196
197
198
199
# File 'lib/parse/agent/mcp_subscriptions.rb', line 196

def unregister(session_id)
  return if session_id.nil?
  @mutex.synchronize { @listeners.delete(session_id) }
end