Class: Mycel::Channel::Hub

Inherits:
Object
  • Object
show all
Includes:
Mycel::Callbacks
Defined in:
lib/mycel.rb

Overview

Hub keeps server-role and client-role sessions in separate hashes so the role information has a single source of truth (no duplicated tracking in higher layers).

Instance Method Summary collapse

Methods included from Mycel::Callbacks

#__mycel_callback_handlers__, #callback, #on

Constructor Details

#initialize(server_connector_class: Mycel::Transport::AutoServer, client_connector_class: Mycel::Transport::Client, server_session_class: Session, client_session_class: Session, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Hub

Returns a new instance of Hub.



1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
# File 'lib/mycel.rb', line 1032

def initialize(server_connector_class: Mycel::Transport::AutoServer,
               client_connector_class: Mycel::Transport::Client,
               server_session_class:   Session,
               client_session_class:   Session,
               codec:                  Mycel::Codec::JSON,
               max_concurrent_jobs:    nil,
               executor:               nil)
  @server_connector_class = server_connector_class
  @client_connector_class = client_connector_class
  @server_session_class   = server_session_class
  @client_session_class   = client_session_class
  @codec                  = codec
  @max_concurrent_jobs    = max_concurrent_jobs
  @executor               = executor

  @server_sessions = Hash.new
  @client_sessions = Hash.new
  @sessions_lock = Monitor.new

  @client_connector = @client_connector_class.new
  @server_connector = nil
  @server_connector_lock = Monitor.new
end

Instance Method Details

#client_sessionsObject



1122
1123
1124
# File 'lib/mycel.rb', line 1122

def client_sessions
  @sessions_lock.synchronize { @client_sessions.dup }
end

#closeObject



1103
1104
1105
1106
1107
1108
# File 'lib/mycel.rb', line 1103

def close
  @server_connector_lock.synchronize {
    @server_connector&.close
    @server_connector = nil
  }
end

#connectObject



1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
# File 'lib/mycel.rb', line 1056

def connect(...)
  socket = @client_connector.connect(...)
  session = @client_session_class.new(socket, codec: @codec, max_concurrent_jobs: @max_concurrent_jobs, executor: @executor)
  @sessions_lock.synchronize {
    id = generate_id
    session.session_id = id
    @client_sessions[id] = session
    callback(:client_session, id, session)
    callback(:session, id, session)
    install_close_handler(id, session)
    session.start
  }
end

#get_session(id) ⇒ Object



1110
1111
1112
# File 'lib/mycel.rb', line 1110

def get_session(id)
  @sessions_lock.synchronize { @server_sessions[id] || @client_sessions[id] }
end

#openObject

Hub#open is **not idempotent**. Calling open() while a server connector already exists closes the previous one (tearing down its accept loop and listening socket) before installing the new one. This is deliberate: open(host: ‘a’) followed by open(host: ‘b’) must rebind, and the only sensible interpretation of a repeated open() is “start over with these arguments”. If you want a guarded variant, check @server_connector yourself before calling, or wrap close+open behind your own intent. Existing accepted sessions on the previous connector are not torn down here — they remain in @server_sessions until they close on their own (or the caller invokes Hub#shutdown).



1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
# File 'lib/mycel.rb', line 1081

def open(...)
  @server_connector_lock.synchronize {
    self.close if @server_connector

    connector = @server_connector_class.new(...)
    connector.on(:connect_this) { |socket|
      session = @server_session_class.new(socket, codec: @codec, max_concurrent_jobs: @max_concurrent_jobs, executor: @executor)
      @sessions_lock.synchronize {
        id = generate_id
        session.session_id = id
        @server_sessions[id] = session
        callback(:server_session, id, session)
        callback(:session, id, session)
        install_close_handler(id, session)
        session.start
      }
    }
    connector.open
    @server_connector = connector
  }
end

#server_sessionsObject



1118
1119
1120
# File 'lib/mycel.rb', line 1118

def server_sessions
  @sessions_lock.synchronize { @server_sessions.dup }
end

#sessionsObject



1114
1115
1116
# File 'lib/mycel.rb', line 1114

def sessions
  @sessions_lock.synchronize { @server_sessions.merge(@client_sessions) }
end

#shutdownObject



1126
1127
1128
1129
# File 'lib/mycel.rb', line 1126

def shutdown
  self.close
  sessions.each_value(&:close)
end