Class: ConduitSSE::State

Inherits:
Object
  • Object
show all
Defined in:
lib/conduit_sse/state.rb

Overview

Runtime, per-stream mutable state.

Where Config answers *what to do* (parser, separators, patterns), ‘State` answers *where we are*: how much data is buffered, the last event id/retry seen, accumulated counters when stats are enabled, etc.

Mutated continuously while the stream is processing data; lives for the lifetime of one Stream instance.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stats_enabled:) ⇒ State

Returns a new instance of State.



27
28
29
30
31
32
33
34
35
# File 'lib/conduit_sse/state.rb', line 27

def initialize(stats_enabled:)
  @buffer          = +""
  @callbacks       = Callbacks.new
  @last_event_id   = nil
  @retry_ms        = nil
  @last_event_type = nil
  @stats           = stats_enabled ? Hash.new(0) : nil
  @total_fields    = stats_enabled ? 0 : nil
end

Instance Attribute Details

#bufferString (readonly)

The raw input buffer. Mutated via ‘<<`, `index`, `slice!`.

Returns:

  • (String)


21
22
23
# File 'lib/conduit_sse/state.rb', line 21

def buffer
  @buffer
end

#callbacksCallbacks (readonly)

Registered user callbacks for every pipeline stage.

Returns:



25
26
27
# File 'lib/conduit_sse/state.rb', line 25

def callbacks
  @callbacks
end

#last_event_idObject

SSE spec state — last id and retry values observed on the wire. Mutated as ‘id:` / `retry:` fields are parsed.



17
18
19
# File 'lib/conduit_sse/state.rb', line 17

def last_event_id
  @last_event_id
end

#last_event_typeObject

SSE spec state — last id and retry values observed on the wire. Mutated as ‘id:` / `retry:` fields are parsed.



17
18
19
# File 'lib/conduit_sse/state.rb', line 17

def last_event_type
  @last_event_type
end

#retry_msObject

SSE spec state — last id and retry values observed on the wire. Mutated as ‘id:` / `retry:` fields are parsed.



17
18
19
# File 'lib/conduit_sse/state.rb', line 17

def retry_ms
  @retry_ms
end

Instance Method Details

#add_fields(count) ⇒ Object

Accumulate parsed fields into the rolling total used to compute ‘:avg_fields_per_frame`. No-op when stats are disabled.



56
57
58
# File 'lib/conduit_sse/state.rb', line 56

def add_fields(count)
  @total_fields += count if @stats
end

#buffer_sizeInteger

Current buffer size in bytes.

Returns:

  • (Integer)


39
40
41
# File 'lib/conduit_sse/state.rb', line 39

def buffer_size
  @buffer.bytesize
end

#increment_stat(key) ⇒ Object

Increment one stats counter by 1. No-op when stats are disabled.



50
51
52
# File 'lib/conduit_sse/state.rb', line 50

def increment_stat(key)
  @stats[key] += 1 if @stats
end

#stats_enabled?Boolean

Whether per-stage counter tracking is active for this stream.

Returns:

  • (Boolean)


45
46
47
# File 'lib/conduit_sse/state.rb', line 45

def stats_enabled?
  !@stats.nil?
end

#stats_snapshotHash{Symbol => Integer, Float}?

Snapshot of the counters plus the derived ‘:avg_fields_per_frame`. Returns `nil` when stats tracking is disabled, so callers can branch on the return value without a separate `#stats_enabled?` check.

Returns:

  • (Hash{Symbol => Integer, Float}, nil)


65
66
67
68
69
70
71
# File 'lib/conduit_sse/state.rb', line 65

def stats_snapshot
  return nil unless @stats

  snapshot = @stats.dup
  snapshot[:avg_fields_per_frame] = avg_fields_per_frame
  snapshot
end