Class: Arcp::Runtime::SubscriptionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/arcp/runtime/subscription_manager.rb

Overview

Tracks per-job subscribers across sessions. Submitter session is registered first; additional subscribers attach via ‘job.subscribe` and receive a fan-out of every `job.event` and the terminating `job.result` / `job.error`.

Instance Method Summary collapse

Constructor Details

#initializeSubscriptionManager

Returns a new instance of SubscriptionManager.



10
11
12
13
14
# File 'lib/arcp/runtime/subscription_manager.rb', line 10

def initialize
  @subs = Hash.new { |h, k| h[k] = [] } # job_id => [[session_id, principal_id, queue], …]
  @owners = {}                          # job_id => principal_id (submitter)
  @mutex = Mutex.new
end

Instance Method Details

#attach(job_id, principal_id, session_id, queue) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/arcp/runtime/subscription_manager.rb', line 23

def attach(job_id, principal_id, session_id, queue)
  @mutex.synchronize do
    unless @owners[job_id] == principal_id
      raise Arcp::Errors::PermissionDenied.new(
        "principal not authorized to observe #{job_id}",
        details: { 'job_id' => job_id }
      )
    end

    @subs[job_id] << [session_id, principal_id, queue]
  end
end

#clear(job_id) ⇒ Object



49
50
51
52
53
54
# File 'lib/arcp/runtime/subscription_manager.rb', line 49

def clear(job_id)
  @mutex.synchronize do
    @subs.delete(job_id)
    @owners.delete(job_id)
  end
end

#detach(job_id, session_id) ⇒ Object



36
37
38
39
40
# File 'lib/arcp/runtime/subscription_manager.rb', line 36

def detach(job_id, session_id)
  @mutex.synchronize do
    @subs[job_id].reject! { |s, _, _| s == session_id }
  end
end

#fanout(job_id, envelope) ⇒ Object



42
43
44
45
# File 'lib/arcp/runtime/subscription_manager.rb', line 42

def fanout(job_id, envelope)
  targets = @mutex.synchronize { @subs[job_id].dup }
  targets.each { |_s, _p, q| q.enqueue(envelope) }
end

#owner_of(job_id) ⇒ Object



47
# File 'lib/arcp/runtime/subscription_manager.rb', line 47

def owner_of(job_id) = @mutex.synchronize { @owners[job_id] }

#register_owner(job_id, principal_id, session_id, queue) ⇒ Object



16
17
18
19
20
21
# File 'lib/arcp/runtime/subscription_manager.rb', line 16

def register_owner(job_id, principal_id, session_id, queue)
  @mutex.synchronize do
    @owners[job_id] = principal_id
    @subs[job_id] << [session_id, principal_id, queue]
  end
end