Class: Mycel::Channel::Session
- Inherits:
-
Object
- Object
- Mycel::Channel::Session
- Includes:
- Mycel::Callbacks
- Defined in:
- lib/mycel.rb
Overview
A multiplexed bidirectional session over a STREAM IO (TCP socket, UNIX socket, etc.). Stream-orientation is intentional: Framing supplies record boundaries on top of an unbounded byte stream.
Datagram transports (UDP) need a different abstraction because each datagram is itself the message boundary — a future Mycel::Channel::DatagramSession would mirror this class’s public API but bypass Framing and read whole datagrams via recvfrom. That is deliberately not implemented here; it is a future extension point, not a missing feature.
Constant Summary collapse
- DIRECTIONS =
[DIR_COMMAND, DIR_JOB].freeze
Instance Attribute Summary collapse
-
#session_id ⇒ Object
‘session_id` is the Hub-assigned identifier and is set by the Hub after construction (the Session itself does not generate it, so that the Hub remains the sole authority for namespacing IDs across server and client pools).
Instance Method Summary collapse
-
#active_job_count ⇒ Object
Number of inbound Jobs currently in flight (registered in the JOB pool but not yet responded/aborted).
-
#attempt_close ⇒ Object
Returns true if this caller did the close work itself, false if someone else got there first.
-
#close ⇒ Object
Idempotent shutdown using a winner/loser model.
-
#close_body ⇒ Object
The actual cleanup.
-
#closed? ⇒ Boolean
Returns true once close has begun on this session, false otherwise.
- #generate_and_register_out_id(command_obj) ⇒ Object
-
#initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Session
constructor
A new instance of Session.
- #register_id(direction, id, obj) ⇒ Object
- #send(msg) ⇒ Object
-
#spawn_handler(&block) ⇒ Object
Hand a block off to the configured executor (default: a fresh Thread).
- #start ⇒ Object
- #unregister_id(direction, id) ⇒ Object
Methods included from Mycel::Callbacks
#__mycel_callback_handlers__, #callback, #on
Constructor Details
#initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Session
Returns a new instance of Session.
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 |
# File 'lib/mycel.rb', line 643 def initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) unless codec.respond_to?(:encode) && codec.respond_to?(:decode) raise ArgumentError, "codec must respond to :encode and :decode" end if executor && !executor.respond_to?(:call) raise ArgumentError, "executor must respond to :call" end @session_id = nil @io = io @codec = codec @executor = executor @io_write_lock = Monitor.new @io_read_lock = Monitor.new @id_pool = DIRECTIONS.to_h { |d| [d, Hash.new] } @id_lock = DIRECTIONS.to_h { |d| [d, Monitor.new] } @max_concurrent_jobs = max_concurrent_jobs @job_slot_lock = Monitor.new @job_slot_cond = @job_slot_lock.new_cond @active_job_count = 0 @thread = nil @thread_lock = Monitor.new @closed = false @close_finished = false @close_lock = Monitor.new @close_cond = @close_lock.new_cond end |
Instance Attribute Details
#session_id ⇒ Object
‘session_id` is the Hub-assigned identifier and is set by the Hub after construction (the Session itself does not generate it, so that the Hub remains the sole authority for namespacing IDs across server and client pools). It is `nil` until the Hub assigns it. Code reading `Mycel.current_session_id` from inside an RPC handler will see the assigned value because handler dispatch happens after registration.
641 642 643 |
# File 'lib/mycel.rb', line 641 def session_id @session_id end |
Instance Method Details
#active_job_count ⇒ Object
Number of inbound Jobs currently in flight (registered in the JOB pool but not yet responded/aborted). Useful for instrumentation.
677 678 679 |
# File 'lib/mycel.rb', line 677 def active_job_count @job_slot_lock.synchronize { @active_job_count } end |
#attempt_close ⇒ Object
Returns true if this caller did the close work itself, false if someone else got there first. Called from both the public close entry point and from main_loop’s rescue (where the false return means “external close has it; just exit and let it finish”).
755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 |
# File 'lib/mycel.rb', line 755 def attempt_close am_winner = @close_lock.synchronize { if @closed false else @closed = true true end } return false unless am_winner close_body @close_lock.synchronize { @close_finished = true @close_cond.broadcast } true end |
#close ⇒ Object
Idempotent shutdown using a winner/loser model.
close is called from two distinct paths that may interleave:
* EXTERNAL: an outside thread invokes session.close. To unblock
main_loop's pending Framing.read it closes @io, which raises
IOError / SystemCallError inside main_loop and triggers the
INTERNAL path below.
* INTERNAL: main_loop's outer rescue catches the IO error
(whether triggered by the EXTERNAL path or by an unsolicited
peer hangup) and invokes attempt_close on the same thread
that runs main_loop.
The two paths can race. attempt_close uses an atomic gate so exactly one caller becomes the WINNER and runs the cleanup body (close_body); everyone else is a LOSER.
External callers that lose the race must still observe a fully- closed session before returning, so they wait on @close_cond until the winner signals completion. main_loop’s rescue, however, MUST NOT wait — if it did and the winner is external, the winner would block in @thread.join (waiting for main_loop to exit) while main_loop blocks on @close_cond (waiting for the winner to signal): a deadlock. The rescue therefore just returns and lets main_loop exit, which lets the winner’s @thread.join unblock and finish.
callback(:close) fires from the winner, exactly once, after main_loop has fully exited and @thread is cleared. Hub’s registered :close handler can safely unregister the session at that point.
744 745 746 747 748 749 |
# File 'lib/mycel.rb', line 744 def close return if attempt_close @close_lock.synchronize { @close_cond.wait_until { @close_finished } } end |
#close_body ⇒ Object
The actual cleanup. Order matters:
1. Close @io so any pending Framing.read errors out (this is
what makes main_loop notice an EXTERNAL close).
2. Zero the backpressure counter and broadcast — unsticks
main_loop if it was parked inside acquire_job_slot.
3. Join main_loop (if we are not it) so we know the receive
thread has fully unwound before we declare the session
closed.
4. Clear @thread and fire :close so observers (Hub) can
unregister.
Re. ‘@io.close rescue IOError`:
In Ruby, IO#close on an already-closed stream raises IOError
("closed stream"). That can happen here because:
a) main_loop's outer rescue path closes @io defensively, and
the EXTERNAL caller may then find @io already gone.
b) An end-user's :request handler (running on its own thread)
races with close. If it tries job.respond after we close
@io, Session#send will see the closed stream and raise.
The rescue swallows that benign duplicate-close. A consequence
that callers should be aware of:
┃ Job#respond, Job#abort, and Job#progress called AFTER the
┃ session has closed will raise IOError out of Session#send.
┃ Peer wraps user-method invocations and routes the failure
┃ to its own respond path; for direct Channel users, expect
┃ to rescue IOError or check session liveness yourself.
We don't go further (e.g. queueing late respond payloads) on
purpose: once the session is closed, the wire is gone, and
pretending otherwise just defers failure.
808 809 810 811 812 813 814 815 816 817 818 819 |
# File 'lib/mycel.rb', line 808 def close_body @io.close rescue IOError @job_slot_lock.synchronize { @active_job_count = 0 @job_slot_cond.broadcast } @thread.join if @thread && @thread != Thread.current @thread_lock.synchronize { @thread = nil } callback(:close) end |
#closed? ⇒ Boolean
Returns true once close has begun on this session, false otherwise. Use this from inside a Job handler to bail out before calling respond / abort / progress, since those operations raise IOError out of Session#send when the underlying IO is gone.
Note this returns true as soon as the close winner flips the gate, i.e. potentially BEFORE callback(:close) has fired. That’s the correct semantics for “should I attempt further sends?” — once close has started, the wire is on its way out and any send is a losing race.
691 692 693 |
# File 'lib/mycel.rb', line 691 def closed? @close_lock.synchronize { @closed } end |
#generate_and_register_out_id(command_obj) ⇒ Object
821 822 823 824 825 826 827 828 829 830 831 832 |
# File 'lib/mycel.rb', line 821 def generate_and_register_out_id(command_obj) raise(ArgumentError) unless command_obj.is_a?(Command) id = nil @id_lock[DIR_COMMAND].synchronize { 1.times { id = SecureRandom.hex redo if @id_pool[DIR_COMMAND].has_key?(id) self.register_id(DIR_COMMAND, id, command_obj) } } id end |
#register_id(direction, id, obj) ⇒ Object
834 835 836 837 838 839 |
# File 'lib/mycel.rb', line 834 def register_id(direction, id, obj) raise(ArgumentError) unless obj.is_a?(Context) @id_lock[direction].synchronize { @id_pool[direction][id] = obj } end |
#send(msg) ⇒ Object
850 851 852 853 854 |
# File 'lib/mycel.rb', line 850 def send(msg) @io_write_lock.synchronize { Mycel::Framing.write(@io, @codec.encode(msg)) } end |
#spawn_handler(&block) ⇒ Object
Hand a block off to the configured executor (default: a fresh Thread). Used by Job#start to dispatch the user :request handler off main_loop.
698 699 700 701 702 703 704 |
# File 'lib/mycel.rb', line 698 def spawn_handler(&block) if @executor @executor.call(&block) else Thread.new(&block) end end |
#start ⇒ Object
706 707 708 709 710 |
# File 'lib/mycel.rb', line 706 def start(...) @thread_lock.synchronize { @thread ||= Thread.new { main_loop(...) } } end |
#unregister_id(direction, id) ⇒ Object
841 842 843 844 845 846 847 848 |
# File 'lib/mycel.rb', line 841 def unregister_id(direction, id) removed = @id_lock[direction].synchronize { @id_pool[direction].delete(id) } # When an inbound Job leaves the pool (via respond/abort), free up # one backpressure slot. release_job_slot if removed && direction == DIR_JOB end |