Class: Daytona::EventSubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/daytona/common/event_subscriber.rb

Overview

Manages a Socket.IO connection and dispatches events to per-resource handlers. Generic — works for sandboxes, volumes, snapshots, runners, etc.

Constant Summary collapse

DISCONNECT_DELAY =

Subscribe to specific events for a resource.

Yields:

  • (event_name, data)

    Called with raw event name and data hash.

Returns:

  • (Proc)

    Unsubscribe function.

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_url:, token:, organization_id: nil) ⇒ EventSubscriber

Returns a new instance of EventSubscriber.

Parameters:

  • api_url (String)
  • token (String)
  • organization_id (String, nil) (defaults to: nil)


15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/daytona/common/event_subscriber.rb', line 15

def initialize(api_url:, token:, organization_id: nil)
  @api_url = api_url
  @token = token
  @organization_id = organization_id
  @client = nil
  @connected = false
  @failed = false
  @fail_error = nil
  @listeners = {}
  @registered_events = Set.new
  @mutex = Mutex.new
  @disconnect_timer = nil
  @last_event_at = Time.now
  @reconnecting = false
  @close_requested = false
  @max_reconnects = 10
end

Instance Attribute Details

#fail_errorString? (readonly)

Returns:

  • (String, nil)


126
127
128
# File 'lib/daytona/common/event_subscriber.rb', line 126

def fail_error
  @fail_error
end

Instance Method Details

#connectvoid

This method returns an undefined value.

Establish the Socket.IO connection.

Raises:

  • (StandardError)

    on connection failure



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/daytona/common/event_subscriber.rb', line 51

def connect
  return if @connected

  # Close any existing stale connection before creating a fresh one
  @client&.close rescue nil # rubocop:disable Style/RescueModifier

  @client = SocketIOClient.new(
    api_url: @api_url,
    token: @token,
    organization_id: @organization_id,
    on_event: method(:handle_event),
    on_disconnect: method(:handle_disconnect)
  )

  @close_requested = false
  @client.connect
  @connected = true
  @failed = false
  @fail_error = nil
rescue StandardError => e
  @failed = true
  @fail_error = "WebSocket connection failed: #{e.message}"
  raise
end

#connected?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/daytona/common/event_subscriber.rb', line 116

def connected?
  @connected
end

#disconnectObject

Disconnect and clean up.



129
130
131
132
133
134
135
# File 'lib/daytona/common/event_subscriber.rb', line 129

def disconnect
  @close_requested = true
  @client&.close
  @connected = false
  @mutex.synchronize { @listeners.clear }
  @registered_events.clear
end

#ensure_connectedvoid

This method returns an undefined value.

Idempotent: ensure a connection attempt is in progress or already established. Non-blocking. Starts a background Thread to connect if not already connected and no attempt is currently running.



37
38
39
40
41
42
43
44
45
46
# File 'lib/daytona/common/event_subscriber.rb', line 37

def ensure_connected
  return if @connected
  return if @connect_thread&.alive?

  @connect_thread = Thread.new do
    connect
  rescue StandardError
    # Callers check connected? when they need it
  end
end

#failed?Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/daytona/common/event_subscriber.rb', line 121

def failed?
  @failed
end

#subscribe(resource_id, events:, &handler) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/daytona/common/event_subscriber.rb', line 83

def subscribe(resource_id, events:, &handler)
  # Cancel any pending delayed disconnect
  @disconnect_timer&.kill
  @disconnect_timer = nil

  # Register any new events with the Socket.IO client (idempotent)
  register_events(events)

  @mutex.synchronize do
    @listeners[resource_id] ||= []
    @listeners[resource_id] << handler
  end

  lambda {
    should_schedule = false
    @mutex.synchronize do
      @listeners[resource_id]&.delete(handler)
      @listeners.delete(resource_id) if @listeners[resource_id] && @listeners[resource_id].empty?
      should_schedule = @listeners.empty?
    end

    # Schedule delayed disconnect when no resources are listening anymore
    if should_schedule
      @disconnect_timer = Thread.new do
        sleep(DISCONNECT_DELAY)
        empty = @mutex.synchronize { @listeners.empty? }
        disconnect if empty
      end
    end
  }
end