Class: Plushie::Test::SessionPool
- Inherits:
-
Object
- Object
- Plushie::Test::SessionPool
- Defined in:
- lib/plushie/test/session_pool.rb
Overview
Shared renderer process for concurrent test sessions.
Owns a single +plushie --mock --max-sessions N+ process and multiplexes messages from multiple test sessions over it. Each session gets a unique session ID (e.g. "test_1", "test_2"); responses are demuxed by the +session+ field and forwarded to the owning queue.
See the Elixir SDK's SessionPool for reference.
Instance Attribute Summary collapse
- #format ⇒ :msgpack, :json readonly
Instance Method Summary collapse
-
#initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) ⇒ SessionPool
constructor
A new instance of SessionPool.
-
#read_message(session_id, timeout: 10) ⇒ Object
Read the next message for a session (blocking).
-
#register ⇒ String
Register a new session.
-
#send_and_wait(msg, session_id, response_type, timeout: 10) ⇒ Hash
Send a message and wait for a specific response type.
-
#send_message(msg, session_id) ⇒ Object
Send a message for a session (fire-and-forget).
-
#start ⇒ Object
Start the renderer process.
-
#started? ⇒ Boolean
True if the pool is started.
-
#stop ⇒ Object
Stop the pool and close the renderer.
-
#unregister(session_id) ⇒ Object
Unregister a session.
Constructor Details
#initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) ⇒ SessionPool
Returns a new instance of SessionPool.
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/plushie/test/session_pool.rb', line 25 def initialize(mode: :mock, format: :msgpack, max_sessions: 8, binary: nil) @mode = mode @format = format @max_sessions = max_sessions @binary = binary @connection = nil @sessions = {} # session_id -> Thread::Queue @stashes = {} # session_id -> Array @counter = 0 @mutex = Mutex.new @started = false end |
Instance Attribute Details
#format ⇒ :msgpack, :json (readonly)
19 20 21 |
# File 'lib/plushie/test/session_pool.rb', line 19 def format @format end |
Instance Method Details
#read_message(session_id, timeout: 10) ⇒ Object
Read the next message for a session (blocking).
113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/plushie/test/session_pool.rb', line 113 def (session_id, timeout: 10) stashed, queue = @mutex.synchronize do queue = @sessions[session_id] raise "Unknown session: #{session_id}" unless queue stash = @stashes.fetch(session_id) [stash.empty? ? nil : stash.shift, queue] end return stashed unless stashed.nil? msg = nil Timeout.timeout(timeout) { msg = queue.pop } msg end |
#register ⇒ String
Register a new session. Returns a unique session ID.
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/plushie/test/session_pool.rb', line 55 def register @mutex.synchronize do if @sessions.size >= @max_sessions raise "Session pool full (#{@max_sessions} sessions). " \ "Increase max_sessions or check for leaked sessions." end @counter += 1 session_id = "test_#{@counter}" @sessions[session_id] = BoundedQueue.new(BoundedQueue::SESSION_CAPACITY) @stashes[session_id] = [] session_id end end |
#send_and_wait(msg, session_id, response_type, timeout: 10) ⇒ Hash
Send a message and wait for a specific response type.
103 104 105 106 |
# File 'lib/plushie/test/session_pool.rb', line 103 def send_and_wait(msg, session_id, response_type, timeout: 10) (msg, session_id) wait_for_response(session_id, response_type, timeout: timeout) end |
#send_message(msg, session_id) ⇒ Object
Send a message for a session (fire-and-forget). Injects the session field automatically.
91 92 93 94 |
# File 'lib/plushie/test/session_pool.rb', line 91 def (msg, session_id) encoded = Protocol::Encode.encode(msg.merge(session: session_id), @format) @connection.send_encoded(encoded) end |
#start ⇒ Object
Start the renderer process.
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/plushie/test/session_pool.rb', line 39 def start @connection = Connection.spawn( format: @format, binary: @binary, mode: @mode, max_sessions: @max_sessions, settings: {}, on_message: method(:dispatch_message) ) @started = true end |
#started? ⇒ Boolean
Returns true if the pool is started.
139 140 141 |
# File 'lib/plushie/test/session_pool.rb', line 139 def started? @started end |
#stop ⇒ Object
Stop the pool and close the renderer.
129 130 131 132 133 134 135 136 |
# File 'lib/plushie/test/session_pool.rb', line 129 def stop @connection&.close @mutex.synchronize do @sessions.clear @stashes.clear end @started = false end |
#unregister(session_id) ⇒ Object
Unregister a session. Sends Reset to the renderer.
72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/plushie/test/session_pool.rb', line 72 def unregister(session_id) ({type: "reset", id: "reset_#{session_id}"}, session_id) begin wait_for_response(session_id, :reset_response, timeout: 5) wait_for_response(session_id, :session_closed, timeout: 5) rescue Timeout::Error # Timeout on reset or session_closed is not fatal end @mutex.synchronize do @sessions.delete(session_id) @stashes.delete(session_id) end end |