Class: Daytona::EventSubscriber
- Inherits:
-
Object
- Object
- Daytona::EventSubscriber
- Defined in:
- lib/daytona/common/event_subscriber.rb
Overview
Manages a Socket.IO connection and dispatches events to per-resource handlers. Generic — works for sandboxes, volumes, snapshots, runners, etc.
Constant Summary collapse
- DISCONNECT_DELAY =
Subscribe to specific events for a resource.
30
Instance Attribute Summary collapse
- #fail_error ⇒ String? readonly
Instance Method Summary collapse
-
#connect ⇒ void
Establish the Socket.IO connection.
- #connected? ⇒ Boolean
-
#disconnect ⇒ Object
Disconnect and clean up.
-
#ensure_connected ⇒ void
Idempotent: ensure a connection attempt is in progress or already established.
- #failed? ⇒ Boolean
-
#initialize(api_url:, token:, organization_id: nil) ⇒ EventSubscriber
constructor
A new instance of EventSubscriber.
- #subscribe(resource_id, events:, &handler) ⇒ Object
Constructor Details
#initialize(api_url:, token:, organization_id: nil) ⇒ EventSubscriber
Returns a new instance of EventSubscriber.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/daytona/common/event_subscriber.rb', line 15 def initialize(api_url:, token:, organization_id: nil) @api_url = api_url @token = token @organization_id = organization_id @client = nil @connected = false @failed = false @fail_error = nil @listeners = {} @registered_events = Set.new @mutex = Mutex.new @disconnect_timer = nil @last_event_at = Time.now @reconnecting = false @close_requested = false @max_reconnects = 10 end |
Instance Attribute Details
#fail_error ⇒ String? (readonly)
126 127 128 |
# File 'lib/daytona/common/event_subscriber.rb', line 126 def fail_error @fail_error end |
Instance Method Details
#connect ⇒ void
This method returns an undefined value.
Establish the Socket.IO connection.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/daytona/common/event_subscriber.rb', line 51 def connect return if @connected # Close any existing stale connection before creating a fresh one @client&.close rescue nil # rubocop:disable Style/RescueModifier @client = SocketIOClient.new( api_url: @api_url, token: @token, organization_id: @organization_id, on_event: method(:handle_event), on_disconnect: method(:handle_disconnect) ) @close_requested = false @client.connect @connected = true @failed = false @fail_error = nil rescue StandardError => e @failed = true @fail_error = "WebSocket connection failed: #{e.}" raise end |
#connected? ⇒ Boolean
116 117 118 |
# File 'lib/daytona/common/event_subscriber.rb', line 116 def connected? @connected end |
#disconnect ⇒ Object
Disconnect and clean up.
129 130 131 132 133 134 135 |
# File 'lib/daytona/common/event_subscriber.rb', line 129 def disconnect @close_requested = true @client&.close @connected = false @mutex.synchronize { @listeners.clear } @registered_events.clear end |
#ensure_connected ⇒ void
This method returns an undefined value.
Idempotent: ensure a connection attempt is in progress or already established. Non-blocking. Starts a background Thread to connect if not already connected and no attempt is currently running.
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/daytona/common/event_subscriber.rb', line 37 def ensure_connected return if @connected return if @connect_thread&.alive? @connect_thread = Thread.new do connect rescue StandardError # Callers check connected? when they need it end end |
#failed? ⇒ Boolean
121 122 123 |
# File 'lib/daytona/common/event_subscriber.rb', line 121 def failed? @failed end |
#subscribe(resource_id, events:, &handler) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/daytona/common/event_subscriber.rb', line 83 def subscribe(resource_id, events:, &handler) # Cancel any pending delayed disconnect @disconnect_timer&.kill @disconnect_timer = nil # Register any new events with the Socket.IO client (idempotent) register_events(events) @mutex.synchronize do @listeners[resource_id] ||= [] @listeners[resource_id] << handler end lambda { should_schedule = false @mutex.synchronize do @listeners[resource_id]&.delete(handler) @listeners.delete(resource_id) if @listeners[resource_id] && @listeners[resource_id].empty? should_schedule = @listeners.empty? end # Schedule delayed disconnect when no resources are listening anymore if should_schedule @disconnect_timer = Thread.new do sleep(DISCONNECT_DELAY) empty = @mutex.synchronize { @listeners.empty? } disconnect if empty end end } end |