Class: ConduitSSE::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/conduit_sse/config.rb

Overview

Mutable configuration for Stream.

‘Config` is the canonical bag of knobs the stream reads from at construction time. It loads its own defaults, exposes plain accessors so a block can mutate it freely, and validates its invariants on demand.

Two ways to fill it in (both supported by ‘Stream#initialize`):

# Keyword form
ConduitSSE.new(parser: ->(d) { JSON.parse(d) }, stats: true)

# Block form
ConduitSSE.new do |c|
  c.parser           = ->(d) { JSON.parse(d) }
  c.stats            = true
  c.frame_separator  = "\r\n\r\n"
end

The two can be mixed; kwargs seed the config and the block then mutates whatever it likes on top.

Constant Summary collapse

SETTABLE_KEYS =
%i[
  parser
  chunk_normalizer
  frame_separator
  payload_start
  ping_pattern
  sanitize_pattern
  stats
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ Config

Returns a new instance of Config.



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/conduit_sse/config.rb', line 62

def initialize(**opts)
  validate_keys!(opts)

  @parser           = opts[:parser]
  @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
  @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
  @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
  @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
  @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
  @stats            = opts.fetch(:stats, false)
end

Instance Attribute Details

#chunk_normalizer#call

Returns Transforms incoming chunks before processing (default: UTF-8 normalize + CRLF→LF).

Returns:

  • (#call)

    Transforms incoming chunks before processing (default: UTF-8 normalize + CRLF→LF).



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/conduit_sse/config.rb', line 49

class Config
  SETTABLE_KEYS = %i[
    parser
    chunk_normalizer
    frame_separator
    payload_start
    ping_pattern
    sanitize_pattern
    stats
  ].freeze

  attr_accessor(*SETTABLE_KEYS)

  def initialize(**opts)
    validate_keys!(opts)

    @parser           = opts[:parser]
    @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
    @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
    @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
    @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
    @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
    @stats            = opts.fetch(:stats, false)
  end

  # Enforce the one hard invariant: a usable parser must be present.
  # Called by the stream after the configuration block (if any) has run, so
  # callers can supply the parser via either kwarg or block.
  #
  # @raise [ArgumentError] if {#parser} doesn't respond to `:call`.
  def validate!
    return if @parser.respond_to?(:call)

    raise ArgumentError, "parser must be a Proc (respond to #call)"
  end

  # Lock the configuration in: validate, compute derived values, and freeze.
  # After this returns, the Config is immutable and every accessor (including
  # {#data_field}) is safe to read on a hot path.
  #
  # @return [self]
  def finalize!
    validate!
    @data_field = @payload_start.chomp(":")
    freeze
    self
  end

  # The SSE field name derived from {#payload_start} (the trailing ":" is
  # stripped). Computed once by {#finalize!}.
  #
  # @return [String] e.g. `"data"` for the default `payload_start` of `"data:"`.
  attr_reader :data_field

  private

  def validate_keys!(opts)
    unknown = opts.keys - SETTABLE_KEYS
    return if unknown.empty?

    raise ArgumentError, "unknown configuration keys: #{unknown.join(", ")}"
  end
end

#data_fieldString (readonly)

The SSE field name derived from #payload_start (the trailing “:” is stripped). Computed once by #finalize!.

Returns:

  • (String)

    e.g. ‘“data”` for the default `payload_start` of `“data:”`.



101
102
103
# File 'lib/conduit_sse/config.rb', line 101

def data_field
  @data_field
end

#frame_separatorString

Returns Delimiter between frames (default: ‘“nn”`).

Returns:

  • (String)

    Delimiter between frames (default: ‘“nn”`).



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/conduit_sse/config.rb', line 49

class Config
  SETTABLE_KEYS = %i[
    parser
    chunk_normalizer
    frame_separator
    payload_start
    ping_pattern
    sanitize_pattern
    stats
  ].freeze

  attr_accessor(*SETTABLE_KEYS)

  def initialize(**opts)
    validate_keys!(opts)

    @parser           = opts[:parser]
    @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
    @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
    @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
    @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
    @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
    @stats            = opts.fetch(:stats, false)
  end

  # Enforce the one hard invariant: a usable parser must be present.
  # Called by the stream after the configuration block (if any) has run, so
  # callers can supply the parser via either kwarg or block.
  #
  # @raise [ArgumentError] if {#parser} doesn't respond to `:call`.
  def validate!
    return if @parser.respond_to?(:call)

    raise ArgumentError, "parser must be a Proc (respond to #call)"
  end

  # Lock the configuration in: validate, compute derived values, and freeze.
  # After this returns, the Config is immutable and every accessor (including
  # {#data_field}) is safe to read on a hot path.
  #
  # @return [self]
  def finalize!
    validate!
    @data_field = @payload_start.chomp(":")
    freeze
    self
  end

  # The SSE field name derived from {#payload_start} (the trailing ":" is
  # stripped). Computed once by {#finalize!}.
  #
  # @return [String] e.g. `"data"` for the default `payload_start` of `"data:"`.
  attr_reader :data_field

  private

  def validate_keys!(opts)
    unknown = opts.keys - SETTABLE_KEYS
    return if unknown.empty?

    raise ArgumentError, "unknown configuration keys: #{unknown.join(", ")}"
  end
end

#parser#call

Returns Required. Callable that receives the joined data field content of one SSE event and returns whatever shape the application wants (e.g. ‘JSON.parse`, a domain object, the raw string).

Returns:

  • (#call)

    Required. Callable that receives the joined data field content of one SSE event and returns whatever shape the application wants (e.g. ‘JSON.parse`, a domain object, the raw string).



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/conduit_sse/config.rb', line 49

class Config
  SETTABLE_KEYS = %i[
    parser
    chunk_normalizer
    frame_separator
    payload_start
    ping_pattern
    sanitize_pattern
    stats
  ].freeze

  attr_accessor(*SETTABLE_KEYS)

  def initialize(**opts)
    validate_keys!(opts)

    @parser           = opts[:parser]
    @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
    @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
    @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
    @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
    @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
    @stats            = opts.fetch(:stats, false)
  end

  # Enforce the one hard invariant: a usable parser must be present.
  # Called by the stream after the configuration block (if any) has run, so
  # callers can supply the parser via either kwarg or block.
  #
  # @raise [ArgumentError] if {#parser} doesn't respond to `:call`.
  def validate!
    return if @parser.respond_to?(:call)

    raise ArgumentError, "parser must be a Proc (respond to #call)"
  end

  # Lock the configuration in: validate, compute derived values, and freeze.
  # After this returns, the Config is immutable and every accessor (including
  # {#data_field}) is safe to read on a hot path.
  #
  # @return [self]
  def finalize!
    validate!
    @data_field = @payload_start.chomp(":")
    freeze
    self
  end

  # The SSE field name derived from {#payload_start} (the trailing ":" is
  # stripped). Computed once by {#finalize!}.
  #
  # @return [String] e.g. `"data"` for the default `payload_start` of `"data:"`.
  attr_reader :data_field

  private

  def validate_keys!(opts)
    unknown = opts.keys - SETTABLE_KEYS
    return if unknown.empty?

    raise ArgumentError, "unknown configuration keys: #{unknown.join(", ")}"
  end
end

#payload_startString

Returns Prefix that identifies the data field; the trailing ‘“:”` is stripped to derive the field name (default: `“data:”`).

Returns:

  • (String)

    Prefix that identifies the data field; the trailing ‘“:”` is stripped to derive the field name (default: `“data:”`).



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/conduit_sse/config.rb', line 49

class Config
  SETTABLE_KEYS = %i[
    parser
    chunk_normalizer
    frame_separator
    payload_start
    ping_pattern
    sanitize_pattern
    stats
  ].freeze

  attr_accessor(*SETTABLE_KEYS)

  def initialize(**opts)
    validate_keys!(opts)

    @parser           = opts[:parser]
    @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
    @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
    @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
    @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
    @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
    @stats            = opts.fetch(:stats, false)
  end

  # Enforce the one hard invariant: a usable parser must be present.
  # Called by the stream after the configuration block (if any) has run, so
  # callers can supply the parser via either kwarg or block.
  #
  # @raise [ArgumentError] if {#parser} doesn't respond to `:call`.
  def validate!
    return if @parser.respond_to?(:call)

    raise ArgumentError, "parser must be a Proc (respond to #call)"
  end

  # Lock the configuration in: validate, compute derived values, and freeze.
  # After this returns, the Config is immutable and every accessor (including
  # {#data_field}) is safe to read on a hot path.
  #
  # @return [self]
  def finalize!
    validate!
    @data_field = @payload_start.chomp(":")
    freeze
    self
  end

  # The SSE field name derived from {#payload_start} (the trailing ":" is
  # stripped). Computed once by {#finalize!}.
  #
  # @return [String] e.g. `"data"` for the default `payload_start` of `"data:"`.
  attr_reader :data_field

  private

  def validate_keys!(opts)
    unknown = opts.keys - SETTABLE_KEYS
    return if unknown.empty?

    raise ArgumentError, "unknown configuration keys: #{unknown.join(", ")}"
  end
end

#ping_patternString

Returns Pattern identifying ping/comment frames (default: ‘“:”`).

Returns:

  • (String)

    Pattern identifying ping/comment frames (default: ‘“:”`).



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/conduit_sse/config.rb', line 49

class Config
  SETTABLE_KEYS = %i[
    parser
    chunk_normalizer
    frame_separator
    payload_start
    ping_pattern
    sanitize_pattern
    stats
  ].freeze

  attr_accessor(*SETTABLE_KEYS)

  def initialize(**opts)
    validate_keys!(opts)

    @parser           = opts[:parser]
    @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
    @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
    @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
    @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
    @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
    @stats            = opts.fetch(:stats, false)
  end

  # Enforce the one hard invariant: a usable parser must be present.
  # Called by the stream after the configuration block (if any) has run, so
  # callers can supply the parser via either kwarg or block.
  #
  # @raise [ArgumentError] if {#parser} doesn't respond to `:call`.
  def validate!
    return if @parser.respond_to?(:call)

    raise ArgumentError, "parser must be a Proc (respond to #call)"
  end

  # Lock the configuration in: validate, compute derived values, and freeze.
  # After this returns, the Config is immutable and every accessor (including
  # {#data_field}) is safe to read on a hot path.
  #
  # @return [self]
  def finalize!
    validate!
    @data_field = @payload_start.chomp(":")
    freeze
    self
  end

  # The SSE field name derived from {#payload_start} (the trailing ":" is
  # stripped). Computed once by {#finalize!}.
  #
  # @return [String] e.g. `"data"` for the default `payload_start` of `"data:"`.
  attr_reader :data_field

  private

  def validate_keys!(opts)
    unknown = opts.keys - SETTABLE_KEYS
    return if unknown.empty?

    raise ArgumentError, "unknown configuration keys: #{unknown.join(", ")}"
  end
end

#sanitize_pattern#call

Returns Cleans or validates frame content (default: UTF-8 normalize + strip).

Returns:

  • (#call)

    Cleans or validates frame content (default: UTF-8 normalize + strip).



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/conduit_sse/config.rb', line 49

class Config
  SETTABLE_KEYS = %i[
    parser
    chunk_normalizer
    frame_separator
    payload_start
    ping_pattern
    sanitize_pattern
    stats
  ].freeze

  attr_accessor(*SETTABLE_KEYS)

  def initialize(**opts)
    validate_keys!(opts)

    @parser           = opts[:parser]
    @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
    @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
    @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
    @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
    @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
    @stats            = opts.fetch(:stats, false)
  end

  # Enforce the one hard invariant: a usable parser must be present.
  # Called by the stream after the configuration block (if any) has run, so
  # callers can supply the parser via either kwarg or block.
  #
  # @raise [ArgumentError] if {#parser} doesn't respond to `:call`.
  def validate!
    return if @parser.respond_to?(:call)

    raise ArgumentError, "parser must be a Proc (respond to #call)"
  end

  # Lock the configuration in: validate, compute derived values, and freeze.
  # After this returns, the Config is immutable and every accessor (including
  # {#data_field}) is safe to read on a hot path.
  #
  # @return [self]
  def finalize!
    validate!
    @data_field = @payload_start.chomp(":")
    freeze
    self
  end

  # The SSE field name derived from {#payload_start} (the trailing ":" is
  # stripped). Computed once by {#finalize!}.
  #
  # @return [String] e.g. `"data"` for the default `payload_start` of `"data:"`.
  attr_reader :data_field

  private

  def validate_keys!(opts)
    unknown = opts.keys - SETTABLE_KEYS
    return if unknown.empty?

    raise ArgumentError, "unknown configuration keys: #{unknown.join(", ")}"
  end
end

#statsBoolean

Returns When true, the stream maintains a per-stage counter hash exposed via Stream#stats. Disabled by default to avoid any per-event overhead on hot paths.

Returns:

  • (Boolean)

    When true, the stream maintains a per-stage counter hash exposed via Stream#stats. Disabled by default to avoid any per-event overhead on hot paths.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/conduit_sse/config.rb', line 49

class Config
  SETTABLE_KEYS = %i[
    parser
    chunk_normalizer
    frame_separator
    payload_start
    ping_pattern
    sanitize_pattern
    stats
  ].freeze

  attr_accessor(*SETTABLE_KEYS)

  def initialize(**opts)
    validate_keys!(opts)

    @parser           = opts[:parser]
    @chunk_normalizer = opts[:chunk_normalizer] || Defaults::CHUNK_NORMALIZER
    @sanitize_pattern = opts[:sanitize_pattern] || Defaults::SANITIZE_PATTERN
    @frame_separator  = opts[:frame_separator]  || Defaults::FRAME_SEPARATOR
    @payload_start    = opts[:payload_start]    || Defaults::PAYLOAD_START
    @ping_pattern     = opts[:ping_pattern]     || Defaults::PING_PATTERN
    @stats            = opts.fetch(:stats, false)
  end

  # Enforce the one hard invariant: a usable parser must be present.
  # Called by the stream after the configuration block (if any) has run, so
  # callers can supply the parser via either kwarg or block.
  #
  # @raise [ArgumentError] if {#parser} doesn't respond to `:call`.
  def validate!
    return if @parser.respond_to?(:call)

    raise ArgumentError, "parser must be a Proc (respond to #call)"
  end

  # Lock the configuration in: validate, compute derived values, and freeze.
  # After this returns, the Config is immutable and every accessor (including
  # {#data_field}) is safe to read on a hot path.
  #
  # @return [self]
  def finalize!
    validate!
    @data_field = @payload_start.chomp(":")
    freeze
    self
  end

  # The SSE field name derived from {#payload_start} (the trailing ":" is
  # stripped). Computed once by {#finalize!}.
  #
  # @return [String] e.g. `"data"` for the default `payload_start` of `"data:"`.
  attr_reader :data_field

  private

  def validate_keys!(opts)
    unknown = opts.keys - SETTABLE_KEYS
    return if unknown.empty?

    raise ArgumentError, "unknown configuration keys: #{unknown.join(", ")}"
  end
end

Instance Method Details

#finalize!self

Lock the configuration in: validate, compute derived values, and freeze. After this returns, the Config is immutable and every accessor (including #data_field) is safe to read on a hot path.

Returns:

  • (self)


90
91
92
93
94
95
# File 'lib/conduit_sse/config.rb', line 90

def finalize!
  validate!
  @data_field = @payload_start.chomp(":")
  freeze
  self
end

#validate!Object

Enforce the one hard invariant: a usable parser must be present. Called by the stream after the configuration block (if any) has run, so callers can supply the parser via either kwarg or block.

Raises:

  • (ArgumentError)

    if #parser doesn’t respond to ‘:call`.



79
80
81
82
83
# File 'lib/conduit_sse/config.rb', line 79

def validate!
  return if @parser.respond_to?(:call)

  raise ArgumentError, "parser must be a Proc (respond to #call)"
end