Class: Parse::Agent::MCPSubscriptions::LocalNotifier
- Inherits:
-
Object
- Object
- Parse::Agent::MCPSubscriptions::LocalNotifier
- Defined in:
- lib/parse/agent/mcp_subscriptions.rb
Overview
In-process Notifier. Routes a published notification straight to the listening-stream callback registered under the same session id. This is the single-process implementation; the contract it satisfies is the seam a clustered (e.g. Redis pub/sub) implementation slots into.
Contract (any Notifier must honor):
register(session_id) { |notification_hash| ... }— install the delivery callback for a listening stream. Replacing an existing registration is allowed (last writer wins).unregister(session_id)— remove it. Idempotent.publish(session_id, notification_hash)— deliver to the registered callback if one exists; a no-op (dropped) when no listener is attached. Returns whether a listener received it.
Thread-safety: publish may run on a LiveQuery dispatcher thread while
register/unregister run on Rack I/O threads, so all three guard the
listener table with a mutex. The delivery callback itself is invoked
outside the lock so a slow consumer can't block registry mutation.
Instance Method Summary collapse
-
#initialize ⇒ LocalNotifier
constructor
A new instance of LocalNotifier.
-
#listener?(session_id) ⇒ Boolean
Whether a listening stream is attached.
-
#publish(session_id, notification_hash) ⇒ Boolean
True if a listener received the notification.
- #register(session_id) {|notification_hash| ... } ⇒ Object
- #unregister(session_id) ⇒ Object
Constructor Details
#initialize ⇒ LocalNotifier
Returns a new instance of LocalNotifier.
185 186 187 188 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 185 def initialize @listeners = {} @mutex = Mutex.new end |
Instance Method Details
#listener?(session_id) ⇒ Boolean
Returns whether a listening stream is attached.
210 211 212 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 210 def listener?(session_id) @mutex.synchronize { @listeners.key?(session_id) } end |
#publish(session_id, notification_hash) ⇒ Boolean
Returns true if a listener received the notification.
202 203 204 205 206 207 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 202 def publish(session_id, notification_hash) callback = @mutex.synchronize { @listeners[session_id] } return false unless callback callback.call(notification_hash) true end |
#register(session_id) {|notification_hash| ... } ⇒ Object
191 192 193 194 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 191 def register(session_id, &callback) return if session_id.nil? || callback.nil? @mutex.synchronize { @listeners[session_id] = callback } end |
#unregister(session_id) ⇒ Object
196 197 198 199 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 196 def unregister(session_id) return if session_id.nil? @mutex.synchronize { @listeners.delete(session_id) } end |