Class: Funes::EventStream

Inherits:
Object
  • Object
show all
Defined in:
app/event_streams/funes/event_stream.rb

Overview

EventStream manages the append-only sequence of events for a specific entity. Each stream is identified by an ‘idx` (entity identifier) and provides methods for appending events and configuring how projections are triggered.

EventStreams implement a three-tier consistency model:

  • **Consistency Projection:** Validates business rules before persisting the event. If invalid, the event is rejected.

  • **Transactional Projections:** Execute synchronously in the same database transaction as the event.

  • **Async Projections:** Execute asynchronously via ActiveJob after the event is committed.

## Bitemporal Queries

EventStreams support two independent temporal dimensions:

  • **Record history** (‘as_of`): Filters by `created_at` — “what did the system know at time T?” Set via `projected_with(projection, as_of: time)`.

  • **Actual history** (‘at`): Filters by `occurred_at` — “what had actually happened by time T?” Set via `projected_with(projection, at: time)`.

When both are used together, ‘as_of` determines which events are visible (filtered in Ruby by `created_at`), and `at` further narrows which of those events are projected (Ruby-level filter on `occurred_at`).

## Actual Time Attribute

Streams can declare an ‘actual_time_attribute` to automatically extract the actual time from an event attribute. When configured, every event must have the attribute with a non-nil value. The explicit `at:` on `append` takes precedence; if both are present and differ, a ConflictingActualTimeError is raised.

## Concurrency Control

EventStreams use optimistic concurrency control with version numbers. Each event gets an incrementing version number with a unique constraint on ‘(idx, version)`, preventing race conditions when multiple processes append to the same stream simultaneously.

Examples:

Define an event stream with projections

class OrderEventStream < Funes::EventStream
  consistency_projection OrderValidationProjection
  add_transactional_projection OrderSnapshotProjection
  add_async_projection OrderReportProjection, queue: :reports
end

Define a stream with actual time extraction

class SalaryEventStream < Funes::EventStream
  actual_time_attribute :effective_date
end

Append events to a stream

stream = OrderEventStream.for("order-123")
event = stream.append(Order::Placed.new(total: 99.99))

if event.valid?
  puts "Event persisted with version #{event.version}"
else
  puts "Event rejected: #{event.errors.full_messages}"
end

Append a retroactive event with explicit actual time

stream.append(Salary::Raised.new(amount: 6500), at: Time.new(2025, 2, 15))

Actual history query - what had actually happened by a point in time

stream = SalaryEventStream.for("sally-123")
stream.projected_with(SalaryProjection, at: Time.new(2025, 2, 20))

Full bitemporal query - combining both dimensions

stream = SalaryEventStream.for("sally-123")
stream.projected_with(SalaryProjection, as_of: Time.new(2025, 3, 1), at: Time.new(2025, 2, 20))

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#idxObject (readonly)

Returns the value of attribute idx.



188
189
190
# File 'app/event_streams/funes/event_stream.rb', line 188

def idx
  @idx
end

Class Method Details

.actual_time_attribute(attribute_name = nil) ⇒ void

This method returns an undefined value.

Configures the event attribute used as a source for the actual time (‘occurred_at`).

When set, every event appended to this stream must have the specified attribute. Its value is used as the fallback for ‘occurred_at` when `at:` is not explicitly passed to `append`.

Examples:

class SalaryEventStream < Funes::EventStream
  actual_time_attribute :effective_date
end

Parameters:

  • attribute_name (Symbol) (defaults to: nil)

    The event attribute name to read actual time from.



166
167
168
169
170
171
172
# File 'app/event_streams/funes/event_stream.rb', line 166

def actual_time_attribute(attribute_name = nil)
  if attribute_name
    @actual_time_attribute = attribute_name
  else
    @actual_time_attribute
  end
end

.add_async_projection(projection, temporal_context: :last_event_time, **options) ⇒ void

This method returns an undefined value.

Register an async projection that executes in a background job after the event is committed.

Async projections are scheduled via ActiveJob after the event transaction commits. You can pass any ActiveJob options (queue, wait, wait_until, priority, etc.) to control job scheduling.

The ‘temporal_context` parameter controls the temporal reference passed to the projection job. Its resolved value becomes the `at:` argument received by interpretation blocks. Note that this is independent from the `at:` argument of `EventStream#append` — that value sets the event’s ‘occurred_at` (business time) and does not flow through to async projections.

  • ‘:last_event_time` (default) - Uses the transaction time (`created_at`) of the last event, i.e. when it was recorded in the database, not when the business event occurred (`occurred_at`)

  • ‘:job_time` - Uses Time.current when the job executes

  • Proc/Lambda - Custom logic that receives the last event and returns a Time object

Examples:

Schedule with custom queue

class OrderEventStream < Funes::EventStream
  add_async_projection OrderReportProjection, queue: :reports
end

Schedule with delay

class OrderEventStream < Funes::EventStream
  add_async_projection AnalyticsProjection, wait: 5.minutes
end

Use job execution time instead of event time

class OrderEventStream < Funes::EventStream
  add_async_projection RealtimeProjection, temporal_context: :job_time
end

Custom temporal_context logic with proc

class OrderEventStream < Funes::EventStream
  add_async_projection EndOfDayProjection, temporal_context: ->(last_event) { last_event.created_at.beginning_of_day }
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class to execute asynchronously.

  • temporal_context (Symbol, Proc) (defaults to: :last_event_time)

    Strategy for determining the temporal reference (:last_event_time, :job_time, or Proc).

  • options (Hash)

    ActiveJob options for scheduling (queue, wait, wait_until, priority, etc.).



148
149
150
151
# File 'app/event_streams/funes/event_stream.rb', line 148

def add_async_projection(projection, temporal_context: :last_event_time, **options)
  @async_projections ||= []
  @async_projections << { class: projection, temporal_context: temporal_context, options: options }
end

.add_transactional_projection(projection) ⇒ void

This method returns an undefined value.

Register a transactional projection that executes synchronously in the same database transaction.

Transactional projections run after the event is persisted but within the same database transaction. If a transactional projection fails with a database error, the transaction is rolled back, the event is marked as not persisted (‘persisted?` returns false), and the exception propagates to the caller. This fail-loud behavior ensures that bugs in projections (such as constraint violations) are immediately visible rather than silently hidden.

Examples:

class OrderEventStream < Funes::EventStream
  add_transactional_projection OrderSnapshotProjection
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class to execute transactionally.

Raises:

  • (ActiveRecord::StatementInvalid)

    if the projection fails with a database error. The event will have ‘persisted?` returning false, allowing safe rescue in the host application.



105
106
107
108
# File 'app/event_streams/funes/event_stream.rb', line 105

def add_transactional_projection(projection)
  @transactional_projections ||= []
  @transactional_projections << projection
end

.consistency_projection(projection) ⇒ void

This method returns an undefined value.

Register a consistency projection that validates business rules before persisting events.

The consistency projection runs before the event is saved. If the resulting state is invalid, the event is rejected and not persisted to the database.

Examples:

class InventoryEventStream < Funes::EventStream
  consistency_projection InventoryValidationProjection
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class that will validate the state.



84
85
86
# File 'app/event_streams/funes/event_stream.rb', line 84

def consistency_projection(projection)
  @consistency_projection = projection
end

.for(idx) ⇒ Funes::EventStream

Create a new EventStream instance for the given entity identifier.

Examples:

stream = OrderEventStream.for("order-123")

Parameters:

  • idx (String)

    The entity identifier.

Returns:



181
182
183
# File 'app/event_streams/funes/event_stream.rb', line 181

def for(idx)
  new(idx)
end

Instance Method Details

#append(new_event, at: nil) ⇒ Funes::Event

Append a new event to the stream.

This method validates the event, resolves the actual time (‘occurred_at`), runs the consistency projection (if configured), persists the event with an incremented version number, and triggers transactional and async projections.

The ‘occurred_at` value is resolved via a fallback chain:

  1. Explicit ‘at:` parameter on this method

  2. The event’s ‘actual_time_attribute` value (if configured on the stream)

  3. Same ‘Time.current` used for `created_at`

‘Date` values are coerced to `Time` via `beginning_of_day`.

Examples:

Successful append

event = stream.append(Order::Placed.new(total: 99.99))
if event.valid?
  puts "Event persisted with version #{event.version}"
end

Append with explicit actual time

event = stream.append(Salary::Raised.new(amount: 6500), at: Time.new(2025, 2, 15))

Handling validation failure

event = stream.append(InvalidEvent.new)
unless event.valid?
  puts "Event rejected: #{event.errors.full_messages}"
end

Handling concurrency conflict

event = stream.append(SomeEvent.new)
if event.errors[:base].present?
  # Race condition detected, retry logic here
end

Parameters:

  • new_event (Funes::Event)

    The event to append to the stream.

  • at (Time, Date, nil) (defaults to: nil)

    The actual time when the event occurred. When provided, this overrides the event’s ‘actual_time_attribute` value. When nil, falls back to the attribute or `Time.current`.

Returns:

  • (Funes::Event)

    The event object (check ‘valid?` to see if it was persisted).

Raises:



232
233
234
# File 'app/event_streams/funes/event_stream.rb', line 232

def append(new_event, at: nil)
  do_append(new_event, at: at, raise_on_failure: false)
end

#append!(new_event, at: nil) ⇒ Funes::Event

Append a new event to the stream, raising on any failure.

Behaves like #append, but raises ‘ActiveRecord::RecordInvalid` whenever the event cannot be persisted — mirroring the relationship between `ActiveRecord::Base#save` and `#save!`. Because a real exception is raised, any enclosing `ActiveRecord::Base.transaction` block rolls back, which is what you want when you need host-managed transactional control around `append`.

The failed event is always queryable after the rescue: ‘event.persisted?` returns false and `event.errors` is populated (for event-level failures such as validation, consistency rejection, or version conflict).

Failure modes:

  • Event’s own validation fails → raises ‘ActiveRecord::RecordInvalid` with the event as `record`.

  • Consistency projection rejects the event → raises ‘ActiveRecord::RecordInvalid` with the event as `record` (state / interpretation errors transferred to the event).

  • Version conflict (race condition on insert) → raises ‘ActiveRecord::RecordInvalid` with the event as `record` and a racing-condition message on `event.errors`.

  • A transactional projection’s persistence fails → the original ‘ActiveRecord::StatementInvalid` / `ActiveRecord::RecordInvalid` is re-raised untouched (its `record` is the projection’s materialization model, not the event). ‘event.persisted?` is still false after the rescue.

Async projections are only enqueued when ‘append!` returns successfully. When `append!` is nested inside a user-opened `ActiveRecord::Base.transaction` that later rolls back, Rails’ ‘enqueue_after_transaction_commit` discards the deferred enqueue so no job runs.

Examples:

Host-managed transaction with a sibling AR update

event = SomeEvent.new(some: "value")
begin
  ActiveRecord::Base.transaction do
    some_model.update!(some: "value")
    SomeEventStream.for(stream_id).append!(event)
  end
rescue ActiveRecord::RecordInvalid
  event.persisted?  # => false
  event.errors.any? # => true
end

Two appends in a single transaction — either both commit or neither does

event_1 = SomeEvent.new(some: "value")
event_2 = OtherEvent.new(some: "value")
begin
  ActiveRecord::Base.transaction do
    SomeEventStream.for(stream_id).append!(event_1)
    OtherEventStream.for(other_stream_id).append!(event_2)
  end
rescue ActiveRecord::RecordInvalid
  event_1.persisted? # => false
  event_2.persisted? # => false
end

Parameters:

  • new_event (Funes::Event)

    The event to append to the stream.

  • at (Time, Date, nil) (defaults to: nil)

    See #append.

Returns:

Raises:

  • (ActiveRecord::RecordInvalid)

    When the event cannot be persisted. ‘e.record` is the failed event (for event-level failures) or a projection materialization model (for projection validation failures).

  • (ActiveRecord::StatementInvalid)

    When a transactional projection fails with a database constraint violation.

  • (Funes::ConflictingActualTimeError)

    See #append.

  • (Funes::MissingActualTimeAttributeError)

    See #append.



297
298
299
# File 'app/event_streams/funes/event_stream.rb', line 297

def append!(new_event, at: nil)
  do_append(new_event, at: at, raise_on_failure: true)
end

#eventsArray<Funes::Event>

Get all events in the stream as event instances.

Returns both previously persisted events (up to ‘as_of` timestamp) and any new events appended in this session, sorted by `occurred_at` ascending.

Examples:

stream = OrderEventStream.for("order-123")
stream.events.each do |event|
  puts "#{event.class.name} at #{event.created_at}"
end

Returns:

  • (Array<Funes::Event>)

    Array of event instances ordered by ‘occurred_at`.



320
321
322
323
324
# File 'app/event_streams/funes/event_stream.rb', line 320

def events
  entries = previous_events.to_a + @instance_new_events
  entries.sort_by!(&:occurred_at) if @instance_new_events.any?
  entries.map(&:to_klass_instance)
end

#projected_with(projection_class, as_of: nil, at: nil) ⇒ Object

Projects the stream’s events using the given projection class.

Delegates to the projection’s ‘process_events` class method, passing the stream’s events and ‘as_of` timestamp. When `at:` is provided, events are filtered to only include those where `occurred_at <= at` before projection. When `as_of:` is provided, it overrides the stream’s record-time boundary, filtering events in Ruby by ‘created_at`.

Mirrors ‘ActiveRecord::Base.find`: if there are no events to replay — either because the stream has no events at all or because `as_of:` / `at:` filtering leaves none in scope —raises ActiveRecord::RecordNotFound. In a Rails controller, this lets the host app’s middleware render a 404 automatically, without any rescue wiring.

Examples:

Project current state

stream = OrderEventStream.for("order-123")
snapshot = stream.projected_with(OrderSummaryProjection)
snapshot.total # => 150.0

Project with actual-time filter

stream = SalaryEventStream.for("sally-123")
snapshot = stream.projected_with(SalaryProjection, at: Time.new(2025, 2, 20))
snapshot.salary # => only reflects events that actually occurred by Feb 20

Full bitemporal query combining both dimensions in a single call

stream = SalaryEventStream.for("sally-123")
snapshot = stream.projected_with(SalaryProjection,
                                 as_of: Time.new(2025, 3, 1),
                                 at: Time.new(2025, 2, 20))

Automatic 404 in a Rails controller

class OrdersController < ApplicationController
  def show
    stream = OrderEventStream.for(params[:id])
    @order = stream.projected_with(OrderSummaryProjection)
  end
end
# If no events exist for params[:id], Rails renders its 404 page automatically.

Parameters:

  • projection_class (Class<Funes::Projection>)

    The projection class to use.

  • as_of (Time, nil) (defaults to: nil)

    Optional record-time override. When provided, only events with ‘created_at <= as_of` are considered, overriding the stream’s own ‘@as_of`.

  • at (Time, nil) (defaults to: nil)

    Optional actual-time reference. When provided, only events with ‘occurred_at <= at` are included in the projection.

Returns:

  • (Object)

    The materialized state as defined by the projection’s materialization model.

Raises:

  • (ActiveRecord::RecordNotFound)

    when the filtered event list is empty — either because the stream has no events or because ‘as_of:` / `at:` excludes them all.



371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'app/event_streams/funes/event_stream.rb', line 371

def projected_with(projection_class, as_of: nil, at: nil)
  source_events = as_of ? filter_by_record_time(events, as_of) : events
  target_events = at ? filter_by_actual_time(source_events, at) : source_events

  if target_events.empty?
    materialization_model = projection_class.instance_variable_get(:@materialization_model)
    model_name = materialization_model&.name
    Rails.logger.info(
      "[Funes] projected_with found no events for " \
      "#{self.class.name} idx=#{idx.inspect} " \
      "projection=#{projection_class.name} " \
      "as_of=#{as_of.inspect} at=#{at.inspect}"
    )
    raise ActiveRecord::RecordNotFound.new(
      "Couldn't find #{model_name} for #{self.class.name} #{idx.inspect}",
      model_name,
      "idx",
      idx
    )
  end

  projection_class.process_events(target_events, at: at)
end

#to_paramString

Returns the parameter representation of the event stream for use in URLs.

This method follows the ActiveRecord convention for URL generation, allowing EventStream instances to be used directly with Rails URL helpers like ‘url_for` or named route helpers.

Examples:

Using with Rails URL helpers

stream = OrderEventStream.for("order-123")
url_for(stream) # => uses "order-123" as the :id parameter

In path helpers

stream = OrderEventStream.for("order-123")
order_path(stream) # => "/orders/order-123"

Returns:

  • (String)

    The entity identifier (‘idx`) used as the URL parameter.



409
410
411
# File 'app/event_streams/funes/event_stream.rb', line 409

def to_param
  idx
end