Class: Mycel::Channel::Session
- Inherits:
-
Object
- Object
- Mycel::Channel::Session
- Includes:
- Mycel::Callbacks
- Defined in:
- lib/mycel.rb
Overview
A multiplexed bidirectional session over a STREAM IO (TCP socket, UNIX socket, etc.). Stream-orientation is intentional: Framing supplies record boundaries on top of an unbounded byte stream.
Datagram transports (UDP) need a different abstraction because each datagram is itself the message boundary — a future Mycel::Channel::DatagramSession would mirror this class’s public API but bypass Framing and read whole datagrams via recvfrom. That is deliberately not implemented here; it is a future extension point, not a missing feature.
Constant Summary collapse
- DIRECTIONS =
[DIR_COMMAND, DIR_JOB].freeze
Instance Method Summary collapse
-
#active_job_count ⇒ Object
Number of inbound Jobs currently in flight (registered in the JOB pool but not yet responded/aborted).
-
#attempt_close ⇒ Object
Returns true if this caller did the close work itself, false if someone else got there first.
-
#close ⇒ Object
Idempotent shutdown using a winner/loser model.
-
#close_body ⇒ Object
The actual cleanup.
-
#closed? ⇒ Boolean
Returns true once close has begun on this session, false otherwise.
- #generate_and_register_out_id(command_obj) ⇒ Object
-
#initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Session
constructor
A new instance of Session.
- #register_id(direction, id, obj) ⇒ Object
- #send(msg) ⇒ Object
-
#spawn_handler(&block) ⇒ Object
Hand a block off to the configured executor (default: a fresh Thread).
- #start ⇒ Object
- #unregister_id(direction, id) ⇒ Object
Methods included from Mycel::Callbacks
#__mycel_callback_handlers__, #callback, #on
Constructor Details
#initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Session
Returns a new instance of Session.
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 |
# File 'lib/mycel.rb', line 597 def initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) unless codec.respond_to?(:encode) && codec.respond_to?(:decode) raise ArgumentError, "codec must respond to :encode and :decode" end if executor && !executor.respond_to?(:call) raise ArgumentError, "executor must respond to :call" end @io = io @codec = codec @executor = executor @io_write_lock = Monitor.new @io_read_lock = Monitor.new @id_pool = DIRECTIONS.to_h { |d| [d, Hash.new] } @id_lock = DIRECTIONS.to_h { |d| [d, Monitor.new] } @max_concurrent_jobs = max_concurrent_jobs @job_slot_lock = Monitor.new @job_slot_cond = @job_slot_lock.new_cond @active_job_count = 0 @thread = nil @thread_lock = Monitor.new @closed = false @close_finished = false @close_lock = Monitor.new @close_cond = @close_lock.new_cond end |
Instance Method Details
#active_job_count ⇒ Object
Number of inbound Jobs currently in flight (registered in the JOB pool but not yet responded/aborted). Useful for instrumentation.
630 631 632 |
# File 'lib/mycel.rb', line 630 def active_job_count @job_slot_lock.synchronize { @active_job_count } end |
#attempt_close ⇒ Object
Returns true if this caller did the close work itself, false if someone else got there first. Called from both the public close entry point and from main_loop’s rescue (where the false return means “external close has it; just exit and let it finish”).
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 |
# File 'lib/mycel.rb', line 708 def attempt_close am_winner = @close_lock.synchronize { if @closed false else @closed = true true end } return false unless am_winner close_body @close_lock.synchronize { @close_finished = true @close_cond.broadcast } true end |
#close ⇒ Object
Idempotent shutdown using a winner/loser model.
close is called from two distinct paths that may interleave:
* EXTERNAL: an outside thread invokes session.close. To unblock
main_loop's pending Framing.read it closes @io, which raises
IOError / SystemCallError inside main_loop and triggers the
INTERNAL path below.
* INTERNAL: main_loop's outer rescue catches the IO error
(whether triggered by the EXTERNAL path or by an unsolicited
peer hangup) and invokes attempt_close on the same thread
that runs main_loop.
The two paths can race. attempt_close uses an atomic gate so exactly one caller becomes the WINNER and runs the cleanup body (close_body); everyone else is a LOSER.
External callers that lose the race must still observe a fully- closed session before returning, so they wait on @close_cond until the winner signals completion. main_loop’s rescue, however, MUST NOT wait — if it did and the winner is external, the winner would block in @thread.join (waiting for main_loop to exit) while main_loop blocks on @close_cond (waiting for the winner to signal): a deadlock. The rescue therefore just returns and lets main_loop exit, which lets the winner’s @thread.join unblock and finish.
callback(:close) fires from the winner, exactly once, after main_loop has fully exited and @thread is cleared. Hub’s registered :close handler can safely unregister the session at that point.
697 698 699 700 701 702 |
# File 'lib/mycel.rb', line 697 def close return if attempt_close @close_lock.synchronize { @close_cond.wait_until { @close_finished } } end |
#close_body ⇒ Object
The actual cleanup. Order matters:
1. Close @io so any pending Framing.read errors out (this is
what makes main_loop notice an EXTERNAL close).
2. Zero the backpressure counter and broadcast — unsticks
main_loop if it was parked inside acquire_job_slot.
3. Join main_loop (if we are not it) so we know the receive
thread has fully unwound before we declare the session
closed.
4. Clear @thread and fire :close so observers (Hub) can
unregister.
Re. ‘@io.close rescue IOError`:
In Ruby, IO#close on an already-closed stream raises IOError
("closed stream"). That can happen here because:
a) main_loop's outer rescue path closes @io defensively, and
the EXTERNAL caller may then find @io already gone.
b) An end-user's :request handler (running on its own thread)
races with close. If it tries job.respond after we close
@io, Session#send will see the closed stream and raise.
The rescue swallows that benign duplicate-close. A consequence
that callers should be aware of:
┃ Job#respond, Job#abort, and Job#progress called AFTER the
┃ session has closed will raise IOError out of Session#send.
┃ Peer wraps user-method invocations and routes the failure
┃ to its own respond path; for direct Channel users, expect
┃ to rescue IOError or check session liveness yourself.
We don't go further (e.g. queueing late respond payloads) on
purpose: once the session is closed, the wire is gone, and
pretending otherwise just defers failure.
761 762 763 764 765 766 767 768 769 770 771 772 |
# File 'lib/mycel.rb', line 761 def close_body @io.close rescue IOError @job_slot_lock.synchronize { @active_job_count = 0 @job_slot_cond.broadcast } @thread.join if @thread && @thread != Thread.current @thread_lock.synchronize { @thread = nil } callback(:close) end |
#closed? ⇒ Boolean
Returns true once close has begun on this session, false otherwise. Use this from inside a Job handler to bail out before calling respond / abort / progress, since those operations raise IOError out of Session#send when the underlying IO is gone.
Note this returns true as soon as the close winner flips the gate, i.e. potentially BEFORE callback(:close) has fired. That’s the correct semantics for “should I attempt further sends?” — once close has started, the wire is on its way out and any send is a losing race.
644 645 646 |
# File 'lib/mycel.rb', line 644 def closed? @close_lock.synchronize { @closed } end |
#generate_and_register_out_id(command_obj) ⇒ Object
774 775 776 777 778 779 780 781 782 783 784 785 |
# File 'lib/mycel.rb', line 774 def generate_and_register_out_id(command_obj) raise(ArgumentError) unless command_obj.is_a?(Command) id = nil @id_lock[DIR_COMMAND].synchronize { 1.times { id = SecureRandom.hex redo if @id_pool[DIR_COMMAND].has_key?(id) self.register_id(DIR_COMMAND, id, command_obj) } } id end |
#register_id(direction, id, obj) ⇒ Object
787 788 789 790 791 792 |
# File 'lib/mycel.rb', line 787 def register_id(direction, id, obj) raise(ArgumentError) unless obj.is_a?(Context) @id_lock[direction].synchronize { @id_pool[direction][id] = obj } end |
#send(msg) ⇒ Object
803 804 805 806 807 |
# File 'lib/mycel.rb', line 803 def send(msg) @io_write_lock.synchronize { Mycel::Framing.write(@io, @codec.encode(msg)) } end |
#spawn_handler(&block) ⇒ Object
Hand a block off to the configured executor (default: a fresh Thread). Used by Job#start to dispatch the user :request handler off main_loop.
651 652 653 654 655 656 657 |
# File 'lib/mycel.rb', line 651 def spawn_handler(&block) if @executor @executor.call(&block) else Thread.new(&block) end end |
#start ⇒ Object
659 660 661 662 663 |
# File 'lib/mycel.rb', line 659 def start(...) @thread_lock.synchronize { @thread ||= Thread.new { main_loop(...) } } end |
#unregister_id(direction, id) ⇒ Object
794 795 796 797 798 799 800 801 |
# File 'lib/mycel.rb', line 794 def unregister_id(direction, id) removed = @id_lock[direction].synchronize { @id_pool[direction].delete(id) } # When an inbound Job leaves the pool (via respond/abort), free up # one backpressure slot. release_job_slot if removed && direction == DIR_JOB end |