Class: Mycel::Channel::Session

Inherits:
Object
  • Object
show all
Includes:
Mycel::Callbacks
Defined in:
lib/mycel.rb

Overview

A multiplexed bidirectional session over a STREAM IO (TCP socket, UNIX socket, etc.). Stream-orientation is intentional: Framing supplies record boundaries on top of an unbounded byte stream.

Datagram transports (UDP) need a different abstraction because each datagram is itself the message boundary — a future Mycel::Channel::DatagramSession would mirror this class’s public API but bypass Framing and read whole datagrams via recvfrom. That is deliberately not implemented here; it is a future extension point, not a missing feature.

Constant Summary collapse

DIRECTIONS =
[DIR_COMMAND, DIR_JOB].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Mycel::Callbacks

#__mycel_callback_handlers__, #callback, #on

Constructor Details

#initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Session

Returns a new instance of Session.



643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
# File 'lib/mycel.rb', line 643

def initialize(io, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil)
  unless codec.respond_to?(:encode) && codec.respond_to?(:decode)
    raise ArgumentError, "codec must respond to :encode and :decode"
  end
  if executor && !executor.respond_to?(:call)
    raise ArgumentError, "executor must respond to :call"
  end

  @session_id = nil
  @io = io
  @codec = codec
  @executor = executor
  @io_write_lock = Monitor.new
  @io_read_lock  = Monitor.new

  @id_pool = DIRECTIONS.to_h { |d| [d, Hash.new] }
  @id_lock = DIRECTIONS.to_h { |d| [d, Monitor.new] }

  @max_concurrent_jobs = max_concurrent_jobs
  @job_slot_lock = Monitor.new
  @job_slot_cond = @job_slot_lock.new_cond
  @active_job_count = 0

  @thread = nil
  @thread_lock = Monitor.new

  @closed = false
  @close_finished = false
  @close_lock = Monitor.new
  @close_cond = @close_lock.new_cond
end

Instance Attribute Details

#session_idObject

‘session_id` is the Hub-assigned identifier and is set by the Hub after construction (the Session itself does not generate it, so that the Hub remains the sole authority for namespacing IDs across server and client pools). It is `nil` until the Hub assigns it. Code reading `Mycel.current_session_id` from inside an RPC handler will see the assigned value because handler dispatch happens after registration.

Parameters:

  • io (IO-like)

    framed bidirectional stream

  • codec (#encode, #decode)

    wire serializer

  • max_concurrent_jobs (Integer, nil)

    Backpressure cap: at most N inbound Jobs may be in flight on this session at once. main_loop blocks reading the next frame when the limit is reached, which propagates pressure into the TCP receive buffer and ultimately stalls the producer. nil = unlimited (legacy behaviour).

  • executor (#call, nil)

    Strategy for spawning user :request handlers. The callable receives a block and is responsible for arranging that block to execute. The default (nil) starts a fresh Thread per request, which is fine for moderate load and bounded by max_concurrent_jobs. Pass an Mycel::ThreadPool (or any callable that implements the same protocol) to switch to a fixed-size worker pool.



641
642
643
# File 'lib/mycel.rb', line 641

def session_id
  @session_id
end

Instance Method Details

#active_job_countObject

Number of inbound Jobs currently in flight (registered in the JOB pool but not yet responded/aborted). Useful for instrumentation.



677
678
679
# File 'lib/mycel.rb', line 677

def active_job_count
  @job_slot_lock.synchronize { @active_job_count }
end

#attempt_closeObject

Returns true if this caller did the close work itself, false if someone else got there first. Called from both the public close entry point and from main_loop’s rescue (where the false return means “external close has it; just exit and let it finish”).



755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
# File 'lib/mycel.rb', line 755

def attempt_close
  am_winner = @close_lock.synchronize {
    if @closed
      false
    else
      @closed = true
      true
    end
  }
  return false unless am_winner

  close_body
  @close_lock.synchronize {
    @close_finished = true
    @close_cond.broadcast
  }
  true
end

#closeObject

Idempotent shutdown using a winner/loser model.

close is called from two distinct paths that may interleave:

* EXTERNAL: an outside thread invokes session.close. To unblock
  main_loop's pending Framing.read it closes @io, which raises
  IOError / SystemCallError inside main_loop and triggers the
  INTERNAL path below.

* INTERNAL: main_loop's outer rescue catches the IO error
  (whether triggered by the EXTERNAL path or by an unsolicited
  peer hangup) and invokes attempt_close on the same thread
  that runs main_loop.

The two paths can race. attempt_close uses an atomic gate so exactly one caller becomes the WINNER and runs the cleanup body (close_body); everyone else is a LOSER.

External callers that lose the race must still observe a fully- closed session before returning, so they wait on @close_cond until the winner signals completion. main_loop’s rescue, however, MUST NOT wait — if it did and the winner is external, the winner would block in @thread.join (waiting for main_loop to exit) while main_loop blocks on @close_cond (waiting for the winner to signal): a deadlock. The rescue therefore just returns and lets main_loop exit, which lets the winner’s @thread.join unblock and finish.

callback(:close) fires from the winner, exactly once, after main_loop has fully exited and @thread is cleared. Hub’s registered :close handler can safely unregister the session at that point.



744
745
746
747
748
749
# File 'lib/mycel.rb', line 744

def close
  return if attempt_close
  @close_lock.synchronize {
    @close_cond.wait_until { @close_finished }
  }
end

#close_bodyObject

The actual cleanup. Order matters:

1. Close @io so any pending Framing.read errors out (this is
   what makes main_loop notice an EXTERNAL close).
2. Zero the backpressure counter and broadcast — unsticks
   main_loop if it was parked inside acquire_job_slot.
3. Join main_loop (if we are not it) so we know the receive
   thread has fully unwound before we declare the session
   closed.
4. Clear @thread and fire :close so observers (Hub) can
   unregister.

Re. ‘@io.close rescue IOError`:

In Ruby, IO#close on an already-closed stream raises IOError
("closed stream"). That can happen here because:

  a) main_loop's outer rescue path closes @io defensively, and
     the EXTERNAL caller may then find @io already gone.
  b) An end-user's :request handler (running on its own thread)
     races with close. If it tries job.respond after we close
     @io, Session#send will see the closed stream and raise.

The rescue swallows that benign duplicate-close. A consequence
that callers should be aware of:

  ┃ Job#respond, Job#abort, and Job#progress called AFTER the
  ┃ session has closed will raise IOError out of Session#send.
  ┃ Peer wraps user-method invocations and routes the failure
  ┃ to its own respond path; for direct Channel users, expect
  ┃ to rescue IOError or check session liveness yourself.

We don't go further (e.g. queueing late respond payloads) on
purpose: once the session is closed, the wire is gone, and
pretending otherwise just defers failure.


808
809
810
811
812
813
814
815
816
817
818
819
# File 'lib/mycel.rb', line 808

def close_body
  @io.close rescue IOError
  @job_slot_lock.synchronize {
    @active_job_count = 0
    @job_slot_cond.broadcast
  }
  @thread.join if @thread && @thread != Thread.current
  @thread_lock.synchronize {
    @thread = nil
  }
  callback(:close)
end

#closed?Boolean

Returns true once close has begun on this session, false otherwise. Use this from inside a Job handler to bail out before calling respond / abort / progress, since those operations raise IOError out of Session#send when the underlying IO is gone.

Note this returns true as soon as the close winner flips the gate, i.e. potentially BEFORE callback(:close) has fired. That’s the correct semantics for “should I attempt further sends?” — once close has started, the wire is on its way out and any send is a losing race.

Returns:

  • (Boolean)


691
692
693
# File 'lib/mycel.rb', line 691

def closed?
  @close_lock.synchronize { @closed }
end

#generate_and_register_out_id(command_obj) ⇒ Object

Raises:

  • (ArgumentError)


821
822
823
824
825
826
827
828
829
830
831
832
# File 'lib/mycel.rb', line 821

def generate_and_register_out_id(command_obj)
  raise(ArgumentError) unless command_obj.is_a?(Command)
  id = nil
  @id_lock[DIR_COMMAND].synchronize {
    1.times {
      id = SecureRandom.hex
      redo if @id_pool[DIR_COMMAND].has_key?(id)
      self.register_id(DIR_COMMAND, id, command_obj)
    }
  }
  id
end

#register_id(direction, id, obj) ⇒ Object

Raises:

  • (ArgumentError)


834
835
836
837
838
839
# File 'lib/mycel.rb', line 834

def register_id(direction, id, obj)
  raise(ArgumentError) unless obj.is_a?(Context)
  @id_lock[direction].synchronize {
    @id_pool[direction][id] = obj
  }
end

#send(msg) ⇒ Object



850
851
852
853
854
# File 'lib/mycel.rb', line 850

def send(msg)
  @io_write_lock.synchronize {
    Mycel::Framing.write(@io, @codec.encode(msg))
  }
end

#spawn_handler(&block) ⇒ Object

Hand a block off to the configured executor (default: a fresh Thread). Used by Job#start to dispatch the user :request handler off main_loop.



698
699
700
701
702
703
704
# File 'lib/mycel.rb', line 698

def spawn_handler(&block)
  if @executor
    @executor.call(&block)
  else
    Thread.new(&block)
  end
end

#startObject



706
707
708
709
710
# File 'lib/mycel.rb', line 706

def start(...)
  @thread_lock.synchronize {
    @thread ||= Thread.new { main_loop(...) }
  }
end

#unregister_id(direction, id) ⇒ Object



841
842
843
844
845
846
847
848
# File 'lib/mycel.rb', line 841

def unregister_id(direction, id)
  removed = @id_lock[direction].synchronize {
    @id_pool[direction].delete(id)
  }
  # When an inbound Job leaves the pool (via respond/abort), free up
  # one backpressure slot.
  release_job_slot if removed && direction == DIR_JOB
end