Class: Funes::EventStream
- Inherits:
-
Object
- Object
- Funes::EventStream
- Defined in:
- app/event_streams/funes/event_stream.rb
Overview
EventStream manages the append-only sequence of events for a specific entity. Each stream is identified by an ‘idx` (entity identifier) and provides methods for appending events and configuring how projections are triggered.
EventStreams implement a three-tier consistency model:
-
**Consistency Projection:** Validates business rules before persisting the event. If invalid, the event is rejected.
-
**Transactional Projections:** Execute synchronously in the same database transaction as the event.
-
**Async Projections:** Execute asynchronously via ActiveJob after the event is committed.
## Bitemporal Queries
EventStreams support two independent temporal dimensions:
-
**Record history** (‘as_of`): Filters by `created_at` — “what did the system know at time T?” Set via `projected_with(projection, as_of: time)`.
-
**Actual history** (‘at`): Filters by `occurred_at` — “what had actually happened by time T?” Set via `projected_with(projection, at: time)`.
When both are used together, ‘as_of` determines which events are visible (filtered in Ruby by `created_at`), and `at` further narrows which of those events are projected (Ruby-level filter on `occurred_at`).
## Actual Time Attribute
Streams can declare an ‘actual_time_attribute` to automatically extract the actual time from an event attribute. When configured, every event must have the attribute with a non-nil value. The explicit `at:` on `append` takes precedence; if both are present and differ, a ConflictingActualTimeError is raised.
## Concurrency Control
EventStreams use optimistic concurrency control with version numbers. Each event gets an incrementing version number with a unique constraint on ‘(idx, version)`, preventing race conditions when multiple processes append to the same stream simultaneously.
Instance Attribute Summary collapse
-
#idx ⇒ Object
readonly
Returns the value of attribute idx.
Class Method Summary collapse
-
.actual_time_attribute(attribute_name = nil) ⇒ void
Configures the event attribute used as a source for the actual time (‘occurred_at`).
-
.add_async_projection(projection, temporal_context: :last_event_time, **options) ⇒ void
Register an async projection that executes in a background job after the event is committed.
-
.add_transactional_projection(projection) ⇒ void
Register a transactional projection that executes synchronously in the same database transaction.
-
.consistency_projection(projection) ⇒ void
Register a consistency projection that validates business rules before persisting events.
-
.for(idx) ⇒ Funes::EventStream
Create a new EventStream instance for the given entity identifier.
Instance Method Summary collapse
-
#append(new_event, at: nil) ⇒ Funes::Event
Append a new event to the stream.
-
#events ⇒ Array<Funes::Event>
Get all events in the stream as event instances.
-
#projected_with(projection_class, as_of: nil, at: nil) ⇒ Object
Projects the stream’s events using the given projection class.
-
#to_param ⇒ String
Returns the parameter representation of the event stream for use in URLs.
Instance Attribute Details
#idx ⇒ Object (readonly)
Returns the value of attribute idx.
188 189 190 |
# File 'app/event_streams/funes/event_stream.rb', line 188 def idx @idx end |
Class Method Details
.actual_time_attribute(attribute_name = nil) ⇒ void
This method returns an undefined value.
Configures the event attribute used as a source for the actual time (‘occurred_at`).
When set, every event appended to this stream must have the specified attribute. Its value is used as the fallback for ‘occurred_at` when `at:` is not explicitly passed to `append`.
166 167 168 169 170 171 172 |
# File 'app/event_streams/funes/event_stream.rb', line 166 def actual_time_attribute(attribute_name = nil) if attribute_name @actual_time_attribute = attribute_name else @actual_time_attribute end end |
.add_async_projection(projection, temporal_context: :last_event_time, **options) ⇒ void
This method returns an undefined value.
Register an async projection that executes in a background job after the event is committed.
Async projections are scheduled via ActiveJob after the event transaction commits. You can pass any ActiveJob options (queue, wait, wait_until, priority, etc.) to control job scheduling.
The ‘temporal_context` parameter controls the temporal reference passed to the projection job. Its resolved value becomes the `at:` argument received by interpretation blocks. Note that this is independent from the `at:` argument of `EventStream#append` — that value sets the event’s ‘occurred_at` (business time) and does not flow through to async projections.
-
‘:last_event_time` (default) - Uses the transaction time (`created_at`) of the last event, i.e. when it was recorded in the database, not when the business event occurred (`occurred_at`)
-
‘:job_time` - Uses Time.current when the job executes
-
Proc/Lambda - Custom logic that receives the last event and returns a Time object
148 149 150 151 |
# File 'app/event_streams/funes/event_stream.rb', line 148 def add_async_projection(projection, temporal_context: :last_event_time, **) @async_projections ||= [] @async_projections << { class: projection, temporal_context: temporal_context, options: } end |
.add_transactional_projection(projection) ⇒ void
This method returns an undefined value.
Register a transactional projection that executes synchronously in the same database transaction.
Transactional projections run after the event is persisted but within the same database transaction. If a transactional projection fails with a database error, the transaction is rolled back, the event is marked as not persisted (‘persisted?` returns false), and the exception propagates to the caller. This fail-loud behavior ensures that bugs in projections (such as constraint violations) are immediately visible rather than silently hidden.
105 106 107 108 |
# File 'app/event_streams/funes/event_stream.rb', line 105 def add_transactional_projection(projection) @transactional_projections ||= [] @transactional_projections << projection end |
.consistency_projection(projection) ⇒ void
This method returns an undefined value.
Register a consistency projection that validates business rules before persisting events.
The consistency projection runs before the event is saved. If the resulting state is invalid, the event is rejected and not persisted to the database.
84 85 86 |
# File 'app/event_streams/funes/event_stream.rb', line 84 def consistency_projection(projection) @consistency_projection = projection end |
.for(idx) ⇒ Funes::EventStream
Create a new EventStream instance for the given entity identifier.
181 182 183 |
# File 'app/event_streams/funes/event_stream.rb', line 181 def for(idx) new(idx) end |
Instance Method Details
#append(new_event, at: nil) ⇒ Funes::Event
Append a new event to the stream.
This method validates the event, resolves the actual time (‘occurred_at`), runs the consistency projection (if configured), persists the event with an incremented version number, and triggers transactional and async projections.
The ‘occurred_at` value is resolved via a fallback chain:
-
Explicit ‘at:` parameter on this method
-
The event’s ‘actual_time_attribute` value (if configured on the stream)
-
Same ‘Time.current` used for `created_at`
‘Date` values are coerced to `Time` via `beginning_of_day`.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'app/event_streams/funes/event_stream.rb', line 232 def append(new_event, at: nil) return new_event unless new_event.valid? occurred_at = resolve_proper_occurred_at(new_event, at) if consistency_projection.present? materialization = compute_projection_with_new_event(consistency_projection, new_event, occurred_at) transfer_interpretation_errors(new_event) return new_event if materialization.invalid? || new_event.invalid? end ActiveRecord::Base.transaction do begin @instance_new_events << new_event.persist!(@idx, incremented_version, at: occurred_at) run_transactional_projections rescue ActiveRecord::RecordNotUnique new_event._event_entry = nil new_event.errors.add(:base, I18n.t("funes.events.racing_condition_on_insert")) raise ActiveRecord::Rollback rescue ActiveRecord::StatementInvalid, ActiveRecord::RecordInvalid => e new_event._event_entry = nil raise e end end schedule_async_projections unless new_event.errors.any? new_event end |
#events ⇒ Array<Funes::Event>
Get all events in the stream as event instances.
Returns both previously persisted events (up to ‘as_of` timestamp) and any new events appended in this session, sorted by `occurred_at` ascending.
281 282 283 284 285 |
# File 'app/event_streams/funes/event_stream.rb', line 281 def events entries = previous_events.to_a + @instance_new_events entries.sort_by!(&:occurred_at) if @instance_new_events.any? entries.map(&:to_klass_instance) end |
#projected_with(projection_class, as_of: nil, at: nil) ⇒ Object
Projects the stream’s events using the given projection class.
Delegates to the projection’s ‘process_events` class method, passing the stream’s events and ‘as_of` timestamp. When `at:` is provided, events are filtered to only include those where `occurred_at <= at` before projection. When `as_of:` is provided, it overrides the stream’s record-time boundary, filtering events in Ruby by ‘created_at`.
316 317 318 319 320 |
# File 'app/event_streams/funes/event_stream.rb', line 316 def projected_with(projection_class, as_of: nil, at: nil) source_events = as_of ? filter_by_record_time(events, as_of) : events target_events = at ? filter_by_actual_time(source_events, at) : source_events projection_class.process_events(target_events, at: at) end |
#to_param ⇒ String
Returns the parameter representation of the event stream for use in URLs.
This method follows the ActiveRecord convention for URL generation, allowing EventStream instances to be used directly with Rails URL helpers like ‘url_for` or named route helpers.
336 337 338 |
# File 'app/event_streams/funes/event_stream.rb', line 336 def to_param idx end |