Class: Sourced::Message

Inherits:
Plumb::Types::Data
  • Object
show all
Defined in:
lib/sourced/message.rb

Overview

Canonical message class, shared by Sourced and Sidereal.

A message has no stream_id or seq — it goes into a flat, globally-ordered log. Supports causation_id / correlation_id for tracing causal chains.

Define message types via Message.define:

CourseCreated = Sourced::Message.define('course.created') do
  attribute :course_name, String
end

Subclasses (e.g. Command, Event, Sidereal::Message) all share one **top-level registry rooted at Message**: Registry#[] recurses downward into subclass registries, so Sourced::Message.registry[type] resolves a type registered under any subclass. Resolve from this root to see the whole tree.

Direct Known Subclasses

Command, Event

Defined Under Namespace

Modules: Types Classes: Payload, Registry

Constant Summary collapse

VERSION =
'0.1.0'
EMPTY_ARRAY =
[].freeze
UnknownMessageError =

Raised by from when a type string isn’t registered.

Class.new(ArgumentError)
PastMessageDateError =

Raised by #at when a message would be scheduled in the past.

Class.new(ArgumentError)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs = {}) ⇒ Message

Returns a new instance of Message.



167
168
169
170
# File 'lib/sourced/message.rb', line 167

def initialize(attrs = {})
  attrs = attrs.merge(payload: {}) unless attrs[:payload]
  super(attrs)
end

Class Method Details

.===(other) ⇒ Object

Make case/when transparent to a wrapper implementing #to_message. Ruby’s default Module#=== is implemented in C and ignores is_a? overrides, so wrapped messages would otherwise fall through.



179
180
181
182
183
184
185
# File 'lib/sourced/message.rb', line 179

def self.===(other)
  return true if super
  return false unless other.respond_to?(:to_message)

  unwrapped = other.to_message
  !unwrapped.equal?(other) && super(unwrapped)
end

.define(type_str) { ... } ⇒ Class

Define a new message type. Registers it in the registry and optionally defines a typed payload.

Examples:

UserJoined = Sourced::Message.define('user.joined') do
  attribute :course_name, String
  attribute :user_id, String
end

Parameters:

  • type_str (String)

    unique message type identifier (e.g. ‘course.created’)

Yields:

  • optional block to define payload attributes via attribute DSL

Returns:

  • (Class)

    the new message subclass



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/sourced/message.rb', line 133

def self.define(type_str, &payload_block)
  type_str.freeze unless type_str.frozen?

  registry[type_str] = Class.new(self) do
    def self.node_name = :data
    define_singleton_method(:type) { type_str }

    attribute :type, Types::Static[type_str]
    if block_given?
      payload_class = Class.new(Payload, &payload_block)
      const_set(:Payload, payload_class)
      attribute :payload, payload_class
      names = payload_class._schema.to_h.keys.map(&:to_sym).freeze
      define_singleton_method(:payload_attribute_names) { names }
    end
  end
end

.from(attrs) ⇒ Message

Instantiate the correct message subclass from a hash with a :type key.

Resolve from the root (Sourced::Message) to see types registered under any subclass — that’s how a cross-process transport reconstructs both Sourced and Sidereal message types from one registry.

Parameters:

  • attrs (Hash)

    must include :type matching a registered type string

Returns:

  • (Message)

    instance of the appropriate subclass

Raises:



160
161
162
163
164
165
# File 'lib/sourced/message.rb', line 160

def self.from(attrs)
  klass = registry[attrs[:type]]
  raise UnknownMessageError, "Unknown message type: #{attrs[:type]}" unless klass

  klass.new(attrs)
end

.payload_attribute_namesArray<Symbol>

Returns the declared payload attribute names for this message class. Subclasses created via define override this with a cached frozen array.

Returns:

  • (Array<Symbol>)

    attribute names (e.g. [:course_name, :user_id])



256
# File 'lib/sourced/message.rb', line 256

def self.payload_attribute_names = EMPTY_ARRAY

.registryRegistry

Returns the message type registry for this class.

Returns:

  • (Registry)

    the message type registry for this class



107
108
109
# File 'lib/sourced/message.rb', line 107

def self.registry
  @registry ||= Registry.new(self)
end

Instance Method Details

#at(value) ⇒ Object Also known as: in

Return a copy with created_at set to a future instant. Three accepted forms:

  • Time / DateTime / anything with < — used as the absolute target.

  • Integer — interpreted as seconds; added to Time.now.

  • String — parsed via Fugit.parse_duration as a duration (e.g. ‘5m’, ‘1h30m’, ‘PT5M’) and added to Time.now.

Raises PastMessageDateError when the resolved target is before created_at.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/sourced/message.rb', line 210

def at(value)
  target = case value
           when Integer
             Time.now + value
           when String
             parsed = Fugit.parse_duration(value) or
               raise ArgumentError,
                     "Message#at: String argument must be an ISO8601 / Fugit duration " \
                     "(e.g. '5m', 'PT1H30M'); got #{value.inspect}"
             parsed.add_to_time(Time.now).to_local_time
           else
             value
           end

  if target < created_at
    raise PastMessageDateError, "Message #{type} can't be delayed to a date in the past"
  end

  with(created_at: target)
end

#correlate(message) ⇒ Message

Set causation and correlation IDs on another message, establishing a causal link from this message to message. Merges metadata.

Examples:

caused = source_event.correlate(SomeCommand.new(payload: { ... }))
caused.causation_id  # => source_event.id
caused.correlation_id # => source_event.correlation_id

Parameters:

  • message (Message)

    the message to correlate

Returns:

  • (Message)

    a copy of message with causation/correlation set



243
244
245
246
247
248
249
250
# File 'lib/sourced/message.rb', line 243

def correlate(message)
  attrs = {
    causation_id: id,
    correlation_id: correlation_id,
    metadata: .merge(message. || Plumb::BLANK_HASH)
  }
  message.with(attrs)
end

#to_messageObject

Identity implementation of the to_message contract — see === and any wrapper (e.g. Sourced::PositionedMessage#to_message).



174
# File 'lib/sourced/message.rb', line 174

def to_message = self

#with_metadata(meta = {}) ⇒ Object



187
188
189
190
191
# File 'lib/sourced/message.rb', line 187

def (meta = {})
  return self if meta.empty?

  with(metadata: .merge(meta))
end

#with_payload(attrs = {}) ⇒ Object



193
194
195
196
197
# File 'lib/sourced/message.rb', line 193

def with_payload(attrs = {})
  hash = to_h
  (hash[:payload] ||= {}).merge!(attrs)
  self.class.new(hash)
end