Class: Funes::EventStream

Inherits:
Object
  • Object
show all
Defined in:
app/event_streams/funes/event_stream.rb

Overview

EventStream manages the append-only sequence of events for a specific entity. Each stream is identified by an ‘idx` (entity identifier) and provides methods for appending events and configuring how projections are triggered.

EventStreams implement a three-tier consistency model:

  • **Consistency Projection:** Validates business rules before persisting the event. If invalid, the event is rejected.

  • **Transactional Projections:** Execute synchronously in the same database transaction as the event.

  • **Async Projections:** Execute asynchronously via ActiveJob after the event is committed.

## Bitemporal Queries

EventStreams support two independent temporal dimensions:

  • **Record history** (‘as_of`): Filters by `created_at` — “what did the system know at time T?” Set via `projected_with(projection, as_of: time)`.

  • **Actual history** (‘at`): Filters by `occurred_at` — “what had actually happened by time T?” Set via `projected_with(projection, at: time)`.

When both are used together, ‘as_of` determines which events are visible (filtered in Ruby by `created_at`), and `at` further narrows which of those events are projected (Ruby-level filter on `occurred_at`).

## Actual Time Attribute

Streams can declare an ‘actual_time_attribute` to automatically extract the actual time from an event attribute. When configured, every event must have the attribute with a non-nil value. The explicit `at:` on `append` takes precedence; if both are present and differ, a ConflictingActualTimeError is raised.

## Concurrency Control

EventStreams use optimistic concurrency control with version numbers. Each event gets an incrementing version number with a unique constraint on ‘(idx, version)`, preventing race conditions when multiple processes append to the same stream simultaneously.

Examples:

Define an event stream with projections

class OrderEventStream < Funes::EventStream
  consistency_projection OrderValidationProjection
  add_transactional_projection OrderSnapshotProjection
  add_async_projection OrderReportProjection, queue: :reports
end

Define a stream with actual time extraction

class SalaryEventStream < Funes::EventStream
  actual_time_attribute :effective_date
end

Append events to a stream

stream = OrderEventStream.for("order-123")
event = stream.append(Order::Placed.new(total: 99.99))

if event.valid?
  puts "Event persisted with version #{event.version}"
else
  puts "Event rejected: #{event.errors.full_messages}"
end

Append a retroactive event with explicit actual time

stream.append(Salary::Raised.new(amount: 6500), at: Time.new(2025, 2, 15))

Actual history query - what had actually happened by a point in time

stream = SalaryEventStream.for("sally-123")
stream.projected_with(SalaryProjection, at: Time.new(2025, 2, 20))

Full bitemporal query - combining both dimensions

stream = SalaryEventStream.for("sally-123")
stream.projected_with(SalaryProjection, as_of: Time.new(2025, 3, 1), at: Time.new(2025, 2, 20))

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#idxObject (readonly)

Returns the value of attribute idx.



188
189
190
# File 'app/event_streams/funes/event_stream.rb', line 188

def idx
  @idx
end

Class Method Details

.actual_time_attribute(attribute_name = nil) ⇒ void

This method returns an undefined value.

Configures the event attribute used as a source for the actual time (‘occurred_at`).

When set, every event appended to this stream must have the specified attribute. Its value is used as the fallback for ‘occurred_at` when `at:` is not explicitly passed to `append`.

Examples:

class SalaryEventStream < Funes::EventStream
  actual_time_attribute :effective_date
end

Parameters:

  • attribute_name (Symbol) (defaults to: nil)

    The event attribute name to read actual time from.



166
167
168
169
170
171
172
# File 'app/event_streams/funes/event_stream.rb', line 166

def actual_time_attribute(attribute_name = nil)
  if attribute_name
    @actual_time_attribute = attribute_name
  else
    @actual_time_attribute
  end
end

.add_async_projection(projection, temporal_context: :last_event_time, **options) ⇒ void

This method returns an undefined value.

Register an async projection that executes in a background job after the event is committed.

Async projections are scheduled via ActiveJob after the event transaction commits. You can pass any ActiveJob options (queue, wait, wait_until, priority, etc.) to control job scheduling.

The ‘temporal_context` parameter controls the temporal reference passed to the projection job. Its resolved value becomes the `at:` argument received by interpretation blocks. Note that this is independent from the `at:` argument of `EventStream#append` — that value sets the event’s ‘occurred_at` (business time) and does not flow through to async projections.

  • ‘:last_event_time` (default) - Uses the transaction time (`created_at`) of the last event, i.e. when it was recorded in the database, not when the business event occurred (`occurred_at`)

  • ‘:job_time` - Uses Time.current when the job executes

  • Proc/Lambda - Custom logic that receives the last event and returns a Time object

Examples:

Schedule with custom queue

class OrderEventStream < Funes::EventStream
  add_async_projection OrderReportProjection, queue: :reports
end

Schedule with delay

class OrderEventStream < Funes::EventStream
  add_async_projection AnalyticsProjection, wait: 5.minutes
end

Use job execution time instead of event time

class OrderEventStream < Funes::EventStream
  add_async_projection RealtimeProjection, temporal_context: :job_time
end

Custom temporal_context logic with proc

class OrderEventStream < Funes::EventStream
  add_async_projection EndOfDayProjection, temporal_context: ->(last_event) { last_event.created_at.beginning_of_day }
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class to execute asynchronously.

  • temporal_context (Symbol, Proc) (defaults to: :last_event_time)

    Strategy for determining the temporal reference (:last_event_time, :job_time, or Proc).

  • options (Hash)

    ActiveJob options for scheduling (queue, wait, wait_until, priority, etc.).



148
149
150
151
# File 'app/event_streams/funes/event_stream.rb', line 148

def add_async_projection(projection, temporal_context: :last_event_time, **options)
  @async_projections ||= []
  @async_projections << { class: projection, temporal_context: temporal_context, options: options }
end

.add_transactional_projection(projection) ⇒ void

This method returns an undefined value.

Register a transactional projection that executes synchronously in the same database transaction.

Transactional projections run after the event is persisted but within the same database transaction. If a transactional projection fails with a database error, the transaction is rolled back, the event is marked as not persisted (‘persisted?` returns false), and the exception propagates to the caller. This fail-loud behavior ensures that bugs in projections (such as constraint violations) are immediately visible rather than silently hidden.

Examples:

class OrderEventStream < Funes::EventStream
  add_transactional_projection OrderSnapshotProjection
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class to execute transactionally.

Raises:

  • (ActiveRecord::StatementInvalid)

    if the projection fails with a database error. The event will have ‘persisted?` returning false, allowing safe rescue in the host application.



105
106
107
108
# File 'app/event_streams/funes/event_stream.rb', line 105

def add_transactional_projection(projection)
  @transactional_projections ||= []
  @transactional_projections << projection
end

.consistency_projection(projection) ⇒ void

This method returns an undefined value.

Register a consistency projection that validates business rules before persisting events.

The consistency projection runs before the event is saved. If the resulting state is invalid, the event is rejected and not persisted to the database.

Examples:

class InventoryEventStream < Funes::EventStream
  consistency_projection InventoryValidationProjection
end

Parameters:

  • projection (Class<Funes::Projection>)

    The projection class that will validate the state.



84
85
86
# File 'app/event_streams/funes/event_stream.rb', line 84

def consistency_projection(projection)
  @consistency_projection = projection
end

.for(idx) ⇒ Funes::EventStream

Create a new EventStream instance for the given entity identifier.

Examples:

stream = OrderEventStream.for("order-123")

Parameters:

  • idx (String)

    The entity identifier.

Returns:



181
182
183
# File 'app/event_streams/funes/event_stream.rb', line 181

def for(idx)
  new(idx)
end

Instance Method Details

#append(new_event, at: nil) ⇒ Funes::Event

Append a new event to the stream.

This method validates the event, resolves the actual time (‘occurred_at`), runs the consistency projection (if configured), persists the event with an incremented version number, and triggers transactional and async projections.

The ‘occurred_at` value is resolved via a fallback chain:

  1. Explicit ‘at:` parameter on this method

  2. The event’s ‘actual_time_attribute` value (if configured on the stream)

  3. Same ‘Time.current` used for `created_at`

‘Date` values are coerced to `Time` via `beginning_of_day`.

Examples:

Successful append

event = stream.append(Order::Placed.new(total: 99.99))
if event.valid?
  puts "Event persisted with version #{event.version}"
end

Append with explicit actual time

event = stream.append(Salary::Raised.new(amount: 6500), at: Time.new(2025, 2, 15))

Handling validation failure

event = stream.append(InvalidEvent.new)
unless event.valid?
  puts "Event rejected: #{event.errors.full_messages}"
end

Handling concurrency conflict

event = stream.append(SomeEvent.new)
if event.errors[:base].present?
  # Race condition detected, retry logic here
end

Parameters:

  • new_event (Funes::Event)

    The event to append to the stream.

  • at (Time, Date, nil) (defaults to: nil)

    The actual time when the event occurred. When provided, this overrides the event’s ‘actual_time_attribute` value. When nil, falls back to the attribute or `Time.current`.

Returns:

  • (Funes::Event)

    The event object (check ‘valid?` to see if it was persisted).

Raises:



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'app/event_streams/funes/event_stream.rb', line 232

def append(new_event, at: nil)
  return new_event unless new_event.valid?

  occurred_at = resolve_proper_occurred_at(new_event, at)

  if consistency_projection.present?
    materialization = compute_projection_with_new_event(consistency_projection, new_event, occurred_at)
    transfer_interpretation_errors(new_event)
    return new_event if materialization.invalid? || new_event.invalid?
  end

  ActiveRecord::Base.transaction do
    begin
      @instance_new_events << new_event.persist!(@idx, incremented_version, at: occurred_at)
      run_transactional_projections
    rescue ActiveRecord::RecordNotUnique
      new_event._event_entry = nil
      new_event.errors.add(:base, I18n.t("funes.events.racing_condition_on_insert"))
      raise ActiveRecord::Rollback
    rescue ActiveRecord::StatementInvalid, ActiveRecord::RecordInvalid => e
      new_event._event_entry = nil
      raise e
    end
  end

  schedule_async_projections unless new_event.errors.any?

  new_event
end

#eventsArray<Funes::Event>

Get all events in the stream as event instances.

Returns both previously persisted events (up to ‘as_of` timestamp) and any new events appended in this session, sorted by `occurred_at` ascending.

Examples:

stream = OrderEventStream.for("order-123")
stream.events.each do |event|
  puts "#{event.class.name} at #{event.created_at}"
end

Returns:

  • (Array<Funes::Event>)

    Array of event instances ordered by ‘occurred_at`.



281
282
283
284
285
# File 'app/event_streams/funes/event_stream.rb', line 281

def events
  entries = previous_events.to_a + @instance_new_events
  entries.sort_by!(&:occurred_at) if @instance_new_events.any?
  entries.map(&:to_klass_instance)
end

#projected_with(projection_class, as_of: nil, at: nil) ⇒ Object

Projects the stream’s events using the given projection class.

Delegates to the projection’s ‘process_events` class method, passing the stream’s events and ‘as_of` timestamp. When `at:` is provided, events are filtered to only include those where `occurred_at <= at` before projection. When `as_of:` is provided, it overrides the stream’s record-time boundary, filtering events in Ruby by ‘created_at`.

Examples:

Project current state

stream = OrderEventStream.for("order-123")
snapshot = stream.projected_with(OrderSummaryProjection)
snapshot.total # => 150.0

Project with actual-time filter

stream = SalaryEventStream.for("sally-123")
snapshot = stream.projected_with(SalaryProjection, at: Time.new(2025, 2, 20))
snapshot.salary # => only reflects events that actually occurred by Feb 20

Full bitemporal query combining both dimensions in a single call

stream = SalaryEventStream.for("sally-123")
snapshot = stream.projected_with(SalaryProjection,
                                 as_of: Time.new(2025, 3, 1),
                                 at: Time.new(2025, 2, 20))

Parameters:

  • projection_class (Class<Funes::Projection>)

    The projection class to use.

  • as_of (Time, nil) (defaults to: nil)

    Optional record-time override. When provided, only events with ‘created_at <= as_of` are considered, overriding the stream’s own ‘@as_of`.

  • at (Time, nil) (defaults to: nil)

    Optional actual-time reference. When provided, only events with ‘occurred_at <= at` are included in the projection.

Returns:

  • (Object)

    The materialized state as defined by the projection’s materialization model.



316
317
318
319
320
# File 'app/event_streams/funes/event_stream.rb', line 316

def projected_with(projection_class, as_of: nil, at: nil)
  source_events = as_of ? filter_by_record_time(events, as_of) : events
  target_events = at ? filter_by_actual_time(source_events, at) : source_events
  projection_class.process_events(target_events, at: at)
end

#to_paramString

Returns the parameter representation of the event stream for use in URLs.

This method follows the ActiveRecord convention for URL generation, allowing EventStream instances to be used directly with Rails URL helpers like ‘url_for` or named route helpers.

Examples:

Using with Rails URL helpers

stream = OrderEventStream.for("order-123")
url_for(stream) # => uses "order-123" as the :id parameter

In path helpers

stream = OrderEventStream.for("order-123")
order_path(stream) # => "/orders/order-123"

Returns:

  • (String)

    The entity identifier (‘idx`) used as the URL parameter.



336
337
338
# File 'app/event_streams/funes/event_stream.rb', line 336

def to_param
  idx
end