Class: Sourced::Message
- Inherits:
-
Plumb::Types::Data
- Object
- Plumb::Types::Data
- Sourced::Message
- Defined in:
- lib/sourced/message.rb
Overview
Canonical message class, shared by Sourced and Sidereal.
A message has no stream_id or seq — it goes into a flat, globally-ordered log. Supports causation_id / correlation_id for tracing causal chains.
Define message types via Message.define:
CourseCreated = Sourced::Message.define('course.created') do
attribute :course_name, String
end
Subclasses (e.g. Command, Event, Sidereal::Message) all share one **top-level registry rooted at Message**: Registry#[] recurses downward into subclass registries, so Sourced::Message.registry[type] resolves a type registered under any subclass. Resolve from this root to see the whole tree.
Defined Under Namespace
Modules: Types Classes: Payload, Registry
Constant Summary collapse
- VERSION =
'0.1.0'- EMPTY_ARRAY =
[].freeze
- UnknownMessageError =
Raised by from when a type string isn’t registered.
Class.new(ArgumentError)
- PastMessageDateError =
Raised by #at when a message would be scheduled in the past.
Class.new(ArgumentError)
Class Method Summary collapse
-
.===(other) ⇒ Object
Make
case/whentransparent to a wrapper implementing#to_message. -
.define(type_str) { ... } ⇒ Class
Define a new message type.
-
.from(attrs) ⇒ Message
Instantiate the correct message subclass from a hash with a
:typekey. -
.payload_attribute_names ⇒ Array<Symbol>
Returns the declared payload attribute names for this message class.
-
.registry ⇒ Registry
The message type registry for this class.
Instance Method Summary collapse
-
#at(value) ⇒ Object
(also: #in)
Return a copy with
created_atset to a future instant. -
#correlate(message) ⇒ Message
Set causation and correlation IDs on another message, establishing a causal link from this message to
message. -
#initialize(attrs = {}) ⇒ Message
constructor
A new instance of Message.
-
#to_message ⇒ Object
Identity implementation of the
to_messagecontract — see Message.=== and any wrapper (e.g. Sourced::PositionedMessage#to_message). - #with_metadata(meta = {}) ⇒ Object
- #with_payload(attrs = {}) ⇒ Object
Constructor Details
#initialize(attrs = {}) ⇒ Message
Returns a new instance of Message.
167 168 169 170 |
# File 'lib/sourced/message.rb', line 167 def initialize(attrs = {}) attrs = attrs.merge(payload: {}) unless attrs[:payload] super(attrs) end |
Class Method Details
.===(other) ⇒ Object
Make case/when transparent to a wrapper implementing #to_message. Ruby’s default Module#=== is implemented in C and ignores is_a? overrides, so wrapped messages would otherwise fall through.
179 180 181 182 183 184 185 |
# File 'lib/sourced/message.rb', line 179 def self.===(other) return true if super return false unless other.respond_to?(:to_message) unwrapped = other. !unwrapped.equal?(other) && super(unwrapped) end |
.define(type_str) { ... } ⇒ Class
Define a new message type. Registers it in the registry and optionally defines a typed payload.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/sourced/message.rb', line 133 def self.define(type_str, &payload_block) type_str.freeze unless type_str.frozen? registry[type_str] = Class.new(self) do def self.node_name = :data define_singleton_method(:type) { type_str } attribute :type, Types::Static[type_str] if block_given? payload_class = Class.new(Payload, &payload_block) const_set(:Payload, payload_class) attribute :payload, payload_class names = payload_class._schema.to_h.keys.map(&:to_sym).freeze define_singleton_method(:payload_attribute_names) { names } end end end |
.from(attrs) ⇒ Message
Instantiate the correct message subclass from a hash with a :type key.
Resolve from the root (Sourced::Message) to see types registered under any subclass — that’s how a cross-process transport reconstructs both Sourced and Sidereal message types from one registry.
160 161 162 163 164 165 |
# File 'lib/sourced/message.rb', line 160 def self.from(attrs) klass = registry[attrs[:type]] raise UnknownMessageError, "Unknown message type: #{attrs[:type]}" unless klass klass.new(attrs) end |
.payload_attribute_names ⇒ Array<Symbol>
Returns the declared payload attribute names for this message class. Subclasses created via define override this with a cached frozen array.
256 |
# File 'lib/sourced/message.rb', line 256 def self.payload_attribute_names = EMPTY_ARRAY |
Instance Method Details
#at(value) ⇒ Object Also known as: in
Return a copy with created_at set to a future instant. Three accepted forms:
-
Time/DateTime/ anything with < — used as the absolute target. -
Integer— interpreted as seconds; added toTime.now. -
String— parsed viaFugit.parse_durationas a duration (e.g. ‘5m’, ‘1h30m’, ‘PT5M’) and added toTime.now.
Raises PastMessageDateError when the resolved target is before created_at.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/sourced/message.rb', line 210 def at(value) target = case value when Integer Time.now + value when String parsed = Fugit.parse_duration(value) or raise ArgumentError, "Message#at: String argument must be an ISO8601 / Fugit duration " \ "(e.g. '5m', 'PT1H30M'); got #{value.inspect}" parsed.add_to_time(Time.now).to_local_time else value end if target < created_at raise PastMessageDateError, "Message #{type} can't be delayed to a date in the past" end with(created_at: target) end |
#correlate(message) ⇒ Message
Set causation and correlation IDs on another message, establishing a causal link from this message to message. Merges metadata.
243 244 245 246 247 248 249 250 |
# File 'lib/sourced/message.rb', line 243 def correlate() attrs = { causation_id: id, correlation_id: correlation_id, metadata: .merge(. || Plumb::BLANK_HASH) } .with(attrs) end |
#to_message ⇒ Object
Identity implementation of the to_message contract — see === and any wrapper (e.g. Sourced::PositionedMessage#to_message).
174 |
# File 'lib/sourced/message.rb', line 174 def = self |
#with_metadata(meta = {}) ⇒ Object
187 188 189 190 191 |
# File 'lib/sourced/message.rb', line 187 def ( = {}) return self if .empty? with(metadata: .merge()) end |
#with_payload(attrs = {}) ⇒ Object
193 194 195 196 197 |
# File 'lib/sourced/message.rb', line 193 def with_payload(attrs = {}) hash = to_h (hash[:payload] ||= {}).merge!(attrs) self.class.new(hash) end |