Class: Arcp::Runtime::SubscriptionManager
- Inherits:
-
Object
- Object
- Arcp::Runtime::SubscriptionManager
- Defined in:
- lib/arcp/runtime/subscription_manager.rb
Overview
Tracks per-job subscribers across sessions. Submitter session is registered first; additional subscribers attach via ‘job.subscribe` and receive a fan-out of every `job.event` and the terminating `job.result` / `job.error`.
Instance Method Summary collapse
- #attach(job_id, principal_id, session_id, queue) ⇒ Object
- #clear(job_id) ⇒ Object
- #detach(job_id, session_id) ⇒ Object
- #fanout(job_id, envelope) ⇒ Object
-
#initialize ⇒ SubscriptionManager
constructor
A new instance of SubscriptionManager.
- #owner_of(job_id) ⇒ Object
- #register_owner(job_id, principal_id, session_id, queue) ⇒ Object
Constructor Details
#initialize ⇒ SubscriptionManager
Returns a new instance of SubscriptionManager.
10 11 12 13 14 |
# File 'lib/arcp/runtime/subscription_manager.rb', line 10 def initialize @subs = Hash.new { |h, k| h[k] = [] } # job_id => [[session_id, principal_id, queue], …] @owners = {} # job_id => principal_id (submitter) @mutex = Mutex.new end |
Instance Method Details
#attach(job_id, principal_id, session_id, queue) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/arcp/runtime/subscription_manager.rb', line 23 def attach(job_id, principal_id, session_id, queue) @mutex.synchronize do unless @owners[job_id] == principal_id raise Arcp::Errors::PermissionDenied.new( "principal not authorized to observe #{job_id}", details: { 'job_id' => job_id } ) end @subs[job_id] << [session_id, principal_id, queue] end end |
#clear(job_id) ⇒ Object
49 50 51 52 53 54 |
# File 'lib/arcp/runtime/subscription_manager.rb', line 49 def clear(job_id) @mutex.synchronize do @subs.delete(job_id) @owners.delete(job_id) end end |
#detach(job_id, session_id) ⇒ Object
36 37 38 39 40 |
# File 'lib/arcp/runtime/subscription_manager.rb', line 36 def detach(job_id, session_id) @mutex.synchronize do @subs[job_id].reject! { |s, _, _| s == session_id } end end |
#fanout(job_id, envelope) ⇒ Object
42 43 44 45 |
# File 'lib/arcp/runtime/subscription_manager.rb', line 42 def fanout(job_id, envelope) targets = @mutex.synchronize { @subs[job_id].dup } targets.each { |_s, _p, q| q.enqueue(envelope) } end |
#owner_of(job_id) ⇒ Object
47 |
# File 'lib/arcp/runtime/subscription_manager.rb', line 47 def owner_of(job_id) = @mutex.synchronize { @owners[job_id] } |
#register_owner(job_id, principal_id, session_id, queue) ⇒ Object
16 17 18 19 20 21 |
# File 'lib/arcp/runtime/subscription_manager.rb', line 16 def register_owner(job_id, principal_id, session_id, queue) @mutex.synchronize do @owners[job_id] = principal_id @subs[job_id] << [session_id, principal_id, queue] end end |