Class: Mycel::Channel::Hub
Overview
Hub keeps server-role and client-role sessions in separate hashes so the role information has a single source of truth (no duplicated tracking in higher layers).
Instance Method Summary
collapse
#__mycel_callback_handlers__, #callback, #on
Constructor Details
#initialize(server_connector_class: Mycel::Transport::AutoServer, client_connector_class: Mycel::Transport::Client, server_session_class: Session, client_session_class: Session, codec: Mycel::Codec::JSON, max_concurrent_jobs: nil, executor: nil) ⇒ Hub
Returns a new instance of Hub.
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
|
# File 'lib/mycel.rb', line 1032
def initialize(server_connector_class: Mycel::Transport::AutoServer,
client_connector_class: Mycel::Transport::Client,
server_session_class: Session,
client_session_class: Session,
codec: Mycel::Codec::JSON,
max_concurrent_jobs: nil,
executor: nil)
@server_connector_class = server_connector_class
@client_connector_class = client_connector_class
@server_session_class = server_session_class
@client_session_class = client_session_class
@codec = codec
@max_concurrent_jobs = max_concurrent_jobs
@executor = executor
@server_sessions = Hash.new
@client_sessions = Hash.new
@sessions_lock = Monitor.new
@client_connector = @client_connector_class.new
@server_connector = nil
@server_connector_lock = Monitor.new
end
|
Instance Method Details
#client_sessions ⇒ Object
1122
1123
1124
|
# File 'lib/mycel.rb', line 1122
def client_sessions
@sessions_lock.synchronize { @client_sessions.dup }
end
|
#close ⇒ Object
1103
1104
1105
1106
1107
1108
|
# File 'lib/mycel.rb', line 1103
def close
@server_connector_lock.synchronize {
@server_connector&.close
@server_connector = nil
}
end
|
#connect ⇒ Object
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
|
# File 'lib/mycel.rb', line 1056
def connect(...)
socket = @client_connector.connect(...)
session = @client_session_class.new(socket, codec: @codec, max_concurrent_jobs: @max_concurrent_jobs, executor: @executor)
@sessions_lock.synchronize {
id = generate_id
session.session_id = id
@client_sessions[id] = session
callback(:client_session, id, session)
callback(:session, id, session)
install_close_handler(id, session)
session.start
}
end
|
#get_session(id) ⇒ Object
1110
1111
1112
|
# File 'lib/mycel.rb', line 1110
def get_session(id)
@sessions_lock.synchronize { @server_sessions[id] || @client_sessions[id] }
end
|
#open ⇒ Object
Hub#open is **not idempotent**. Calling open() while a server connector already exists closes the previous one (tearing down its accept loop and listening socket) before installing the new one. This is deliberate: open(host: ‘a’) followed by open(host: ‘b’) must rebind, and the only sensible interpretation of a repeated open() is “start over with these arguments”. If you want a guarded variant, check @server_connector yourself before calling, or wrap close+open behind your own intent. Existing accepted sessions on the previous connector are not torn down here — they remain in @server_sessions until they close on their own (or the caller invokes Hub#shutdown).
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
|
# File 'lib/mycel.rb', line 1081
def open(...)
@server_connector_lock.synchronize {
self.close if @server_connector
connector = @server_connector_class.new(...)
connector.on(:connect_this) { |socket|
session = @server_session_class.new(socket, codec: @codec, max_concurrent_jobs: @max_concurrent_jobs, executor: @executor)
@sessions_lock.synchronize {
id = generate_id
session.session_id = id
@server_sessions[id] = session
callback(:server_session, id, session)
callback(:session, id, session)
install_close_handler(id, session)
session.start
}
}
connector.open
@server_connector = connector
}
end
|
#server_sessions ⇒ Object
1118
1119
1120
|
# File 'lib/mycel.rb', line 1118
def server_sessions
@sessions_lock.synchronize { @server_sessions.dup }
end
|
#sessions ⇒ Object
1114
1115
1116
|
# File 'lib/mycel.rb', line 1114
def sessions
@sessions_lock.synchronize { @server_sessions.merge(@client_sessions) }
end
|
#shutdown ⇒ Object
1126
1127
1128
1129
|
# File 'lib/mycel.rb', line 1126
def shutdown
self.close
sessions.each_value(&:close)
end
|