Class: Plushie::Test::SessionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/plushie/test/session_pool.rb

Overview

Shared renderer process for concurrent test sessions.

Owns a single +plushie --mock --max-sessions N+ process and multiplexes messages from multiple test sessions over it. Each session gets a unique session ID (e.g. "test_1", "test_2"); responses are demuxed by the +session+ field and forwarded to the owning queue.

See the Elixir SDK's SessionPool for reference.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) ⇒ SessionPool

Returns a new instance of SessionPool.

Parameters:

  • mode (:mock, :headless, :windowed) (defaults to: :mock)

    renderer mode

  • format (:msgpack, :json) (defaults to: :msgpack)

    wire format

  • max_sessions (Integer) (defaults to: 8)

    max concurrent sessions

  • binary (String, nil) (defaults to: nil)

    path to renderer binary



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/plushie/test/session_pool.rb', line 25

def initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil)
  @mode = mode
  @format = format
  @max_sessions = max_sessions
  @binary = binary
  @connection = nil
  @sessions = {}  # session_id -> Thread::Queue
  @stashes = {}   # session_id -> Array
  @counter = 0
  @mutex = Mutex.new
  @started = false
end

Instance Attribute Details

#format:msgpack, :json (readonly)

Returns:

  • (:msgpack, :json)


19
20
21
# File 'lib/plushie/test/session_pool.rb', line 19

def format
  @format
end

Instance Method Details

#read_message(session_id, timeout: 10) ⇒ Object

Read the next message for a session (blocking).

Parameters:

  • session_id (String)
  • timeout (Numeric) (defaults to: 10)

    max wait time in seconds

Returns:

  • (Object)

    the decoded message



113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/plushie/test/session_pool.rb', line 113

def read_message(session_id, timeout: 10)
  stashed, queue = @mutex.synchronize do
    queue = @sessions[session_id]
    raise "Unknown session: #{session_id}" unless queue

    stash = @stashes.fetch(session_id)
    [stash.empty? ? nil : stash.shift, queue]
  end
  return stashed unless stashed.nil?

  msg = nil
  Timeout.timeout(timeout) { msg = queue.pop }
  msg
end

#registerString

Register a new session. Returns a unique session ID.

Returns:

  • (String)

    session ID

Raises:

  • (RuntimeError)

    if pool is full



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/plushie/test/session_pool.rb', line 55

def register
  @mutex.synchronize do
    if @sessions.size >= @max_sessions
      raise "Session pool full (#{@max_sessions} sessions). " \
        "Increase max_sessions or check for leaked sessions."
    end
    @counter += 1
    session_id = "test_#{@counter}"
    @sessions[session_id] = BoundedQueue.new(BoundedQueue::SESSION_CAPACITY)
    @stashes[session_id] = []
    session_id
  end
end

#send_and_wait(msg, session_id, response_type, timeout: 10) ⇒ Hash

Send a message and wait for a specific response type.

Parameters:

  • msg (Hash)
  • session_id (String)
  • response_type (Symbol)

    expected response type

  • timeout (Numeric) (defaults to: 10)

    max wait time in seconds

Returns:

  • (Hash)

    the response



103
104
105
106
# File 'lib/plushie/test/session_pool.rb', line 103

def send_and_wait(msg, session_id, response_type, timeout: 10)
  send_message(msg, session_id)
  wait_for_response(session_id, response_type, timeout: timeout)
end

#send_message(msg, session_id) ⇒ Object

Send a message for a session (fire-and-forget). Injects the session field automatically.

Parameters:

  • msg (Hash)

    message to send

  • session_id (String)


91
92
93
94
# File 'lib/plushie/test/session_pool.rb', line 91

def send_message(msg, session_id)
  encoded = Protocol::Encode.encode(msg.merge(session: session_id), @format)
  @connection.send_encoded(encoded)
end

#startObject

Start the renderer process.



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/plushie/test/session_pool.rb', line 39

def start
  @connection = Connection.spawn(
    format: @format,
    binary: @binary,
    mode: @mode,
    max_sessions: @max_sessions,
    settings: {},
    on_message: method(:dispatch_message)
  )
  @started = true
end

#started?Boolean

Returns true if the pool is started.

Returns:

  • (Boolean)

    true if the pool is started



139
140
141
# File 'lib/plushie/test/session_pool.rb', line 139

def started?
  @started
end

#stopObject

Stop the pool and close the renderer.



129
130
131
132
133
134
135
136
# File 'lib/plushie/test/session_pool.rb', line 129

def stop
  @connection&.close
  @mutex.synchronize do
    @sessions.clear
    @stashes.clear
  end
  @started = false
end

#unregister(session_id) ⇒ Object

Unregister a session. Sends Reset to the renderer.

Parameters:

  • session_id (String)


72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/plushie/test/session_pool.rb', line 72

def unregister(session_id)
  send_message({type: "reset", id: "reset_#{session_id}"}, session_id)
  begin
    wait_for_response(session_id, :reset_response, timeout: 5)
    wait_for_response(session_id, :session_closed, timeout: 5)
  rescue Timeout::Error
    # Timeout on reset or session_closed is not fatal
  end
  @mutex.synchronize do
    @sessions.delete(session_id)
    @stashes.delete(session_id)
  end
end