Class: Mycel::Channel::Hub
Overview
Hub keeps server-role and client-role sessions in separate hashes so the role information has a single source of truth (no duplicated tracking in higher layers).
Instance Method Summary
collapse
#__mycel_callback_handlers__, #callback, #on
Constructor Details
#initialize(server_connector_class: Mycel::Transport::AutoServer, client_connector_class: Mycel::Transport::Client, server_session_class: Session, client_session_class: Session, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Hub
Returns a new instance of Hub.
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
|
# File 'lib/mycel.rb', line 985
def initialize(server_connector_class: Mycel::Transport::AutoServer,
client_connector_class: Mycel::Transport::Client,
server_session_class: Session,
client_session_class: Session,
codec: Mycel::Codec::JSON,
max_concurrent_jobs: nil,
executor: nil)
@server_connector_class = server_connector_class
@client_connector_class = client_connector_class
@server_session_class = server_session_class
@client_session_class = client_session_class
@codec = codec
@max_concurrent_jobs = max_concurrent_jobs
@executor = executor
@server_sessions = Hash.new
@client_sessions = Hash.new
@sessions_lock = Monitor.new
@client_connector = @client_connector_class.new
@server_connector = nil
@server_connector_lock = Monitor.new
end
|
Instance Method Details
#client_sessions ⇒ Object
1073
1074
1075
|
# File 'lib/mycel.rb', line 1073
def client_sessions
@sessions_lock.synchronize { @client_sessions.dup }
end
|
#close ⇒ Object
1054
1055
1056
1057
1058
1059
|
# File 'lib/mycel.rb', line 1054
def close
@server_connector_lock.synchronize {
@server_connector&.close
@server_connector = nil
}
end
|
#connect ⇒ Object
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
|
# File 'lib/mycel.rb', line 1009
def connect(...)
socket = @client_connector.connect(...)
session = @client_session_class.new(socket, codec: @codec, max_concurrent_jobs: @max_concurrent_jobs, executor: @executor)
@sessions_lock.synchronize {
id = generate_id
@client_sessions[id] = session
callback(:client_session, id, session)
callback(:session, id, session)
install_close_handler(id, session)
session.start
}
end
|
#get_session(id) ⇒ Object
1061
1062
1063
|
# File 'lib/mycel.rb', line 1061
def get_session(id)
@sessions_lock.synchronize { @server_sessions[id] || @client_sessions[id] }
end
|
#open ⇒ Object
Hub#open is **not idempotent**. Calling open() while a server connector already exists closes the previous one (tearing down its accept loop and listening socket) before installing the new one. This is deliberate: open(host: ‘a’) followed by open(host: ‘b’) must rebind, and the only sensible interpretation of a repeated open() is “start over with these arguments”. If you want a guarded variant, check @server_connector yourself before calling, or wrap close+open behind your own intent. Existing accepted sessions on the previous connector are not torn down here — they remain in @server_sessions until they close on their own (or the caller invokes Hub#shutdown).
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
|
# File 'lib/mycel.rb', line 1033
def open(...)
@server_connector_lock.synchronize {
self.close if @server_connector
connector = @server_connector_class.new(...)
connector.on(:connect_this) { |socket|
session = @server_session_class.new(socket, codec: @codec, max_concurrent_jobs: @max_concurrent_jobs, executor: @executor)
@sessions_lock.synchronize {
id = generate_id
@server_sessions[id] = session
callback(:server_session, id, session)
callback(:session, id, session)
install_close_handler(id, session)
session.start
}
}
connector.open
@server_connector = connector
}
end
|
#server_sessions ⇒ Object
1069
1070
1071
|
# File 'lib/mycel.rb', line 1069
def server_sessions
@sessions_lock.synchronize { @server_sessions.dup }
end
|
#sessions ⇒ Object
1065
1066
1067
|
# File 'lib/mycel.rb', line 1065
def sessions
@sessions_lock.synchronize { @server_sessions.merge(@client_sessions) }
end
|
#shutdown ⇒ Object
1077
1078
1079
1080
|
# File 'lib/mycel.rb', line 1077
def shutdown
self.close
sessions.each_value(&:close)
end
|