Class: RubyEventStore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/client.rb

Defined Under Namespace

Classes: Within

Instance Method Summary collapse

Constructor Details

#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/ruby_event_store/client.rb', line 7

def initialize(
  repository: InMemoryRepository.new,
  mapper: Mappers::BatchMapper.new,
  subscriptions: nil,
  dispatcher: nil,
  message_broker: nil,
  clock: default_clock,
  correlation_id_generator: default_correlation_id_generator,
  event_type_resolver: EventTypeResolver.new
)
  @repository = repository
  @mapper = batch_mapper?(mapper) ? mapper : Mappers::BatchMapper.new(mapper)
  @broker =
    message_broker ||
      Broker.new(subscriptions: subscriptions || Subscriptions.new, dispatcher: dispatcher || SyncScheduler.new)
  @clock = clock
  @metadata = Concurrent::ThreadLocalVar.new
  @correlation_id_generator = correlation_id_generator
  @event_type_resolver = event_type_resolver

  if (subscriptions || dispatcher)
    msg = <<~EOW
      Passing subscriptions and dispatcher to #{self.class} has been deprecated.

      Pass it using message_broker argument. For example:

      event_store = RubyEventStore::Client.new(
        message_broker: RubyEventStore::Broker.new(
          subscriptions: RubyEventStore::Subscriptions.new,
          dispatcher: RubyEventStore::SyncScheduler.new
        )
      )
    EOW
    msg += <<~EOW if message_broker

      Because message_broker has been provided,
      arguments passed by subscriptions or dispatcher will be ignored.
    EOW
    Deprecations.warn(:ruby_client_subscriptions_dispatcher, message: msg)
  end
end

Instance Method Details

#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists new event(s) without notifying any subscribed handlers

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. Read more

Returns:

  • (self)


85
86
87
88
89
90
91
92
# File 'lib/ruby_event_store/client.rb', line 85

def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
  append_records_to_stream(
    transform((events)),
    stream_name: stream_name,
    expected_version: expected_version,
  )
  self
end

#delete_stream(stream_name) ⇒ self

Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.

Parameters:

  • stream_name (String)

    name of the stream to be cleared.

Returns:

  • (self)


115
116
117
118
# File 'lib/ruby_event_store/client.rb', line 115

def delete_stream(stream_name)
  @repository.delete_stream(Stream.new(stream_name))
  self
end

#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event

Deserialize event which was serialized for async event handlers Read more

Returns:

  • (Event)

    deserialized event



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/ruby_event_store/client.rb', line 314

def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
  extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 }

  @mapper.records_to_events(
    [
      SerializedRecord.new(
        event_type: event_type,
        event_id: event_id,
        data: data,
        metadata: ,
        timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load()],
        valid_at: valid_at || timestamp_,
      ).deserialize(serializer),
    ],
  ).first
end

#event_in_stream?(event_id, stream_name) ⇒ Boolean

Checks whether event is linked in given stream

Parameters:

  • event_id (String)
  • stream_name (String)

Returns:

  • (Boolean)

    true if event is linked to given stream, false otherwise



167
168
169
170
# File 'lib/ruby_event_store/client.rb', line 167

def event_in_stream?(event_id, stream_name)
  stream = Stream.new(stream_name)
  stream.global? ? @repository.has_event?(event_id) : @repository.event_in_stream?(event_id, stream)
end

#global_position(event_id) ⇒ Integer

Gets position of the event in global stream

The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.

Parameters:

  • event_id (String)

Returns:

  • (Integer)

    nonnegno ative integer position of event in global stream

Raises:



158
159
160
# File 'lib/ruby_event_store/client.rb', line 158

def global_position(event_id)
  @repository.global_position(event_id)
end

#inspectObject



373
374
375
# File 'lib/ruby_event_store/client.rb', line 373

def inspect
  "#<#{self.class}:0x#{__id__.to_s(16)}>"
end

Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.

Parameters:

  • event_ids (String, Array<String>)

    ids of events

  • stream_name (String)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. Read more

Returns:

  • (self)


101
102
103
104
105
106
107
# File 'lib/ruby_event_store/client.rb', line 101

def link(event_ids, stream_name:, expected_version: :any)
  warn <<~EOW if event_ids.nil? || Array(event_ids).any?(&:nil?)
      Passing `nil` to link is deprecated and will raise ArgumentError in RubyEventStore 3.0.
    EOW
  @repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version))
  self
end

#metadataHash

Read additional metadata which will be added for published events Read more

Returns:

  • (Hash)


335
336
337
# File 'lib/ruby_event_store/client.rb', line 335

def 
  @metadata.value || EMPTY_HASH
end

#overwrite(events_or_event) ⇒ self

Overwrite existing event(s) with the same ID.

Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more

Examples:

Add data and metadata to existing events


events = event_store.read.limit(10).to_a
events.each do |ev|
  ev.data[:tenant_id] = 1
  ev.[:server_id] = "eu-west-2"
end
event_store.overwrite(events)

Change event type


events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
  NewType.new(
    event_id: ev.event_id,
    data: ev.data,
    metadata: ev.,
  )
end
event_store.overwrite(events)

Parameters:

  • events (Array<Event>, Event)

    event(s) to serialize and overwrite again

Returns:

  • (self)


368
369
370
371
# File 'lib/ruby_event_store/client.rb', line 368

def overwrite(events_or_event)
  @repository.update_messages(transform(Array(events_or_event)))
  self
end

#position_in_stream(event_id, stream_name) ⇒ Integer

Gets position of the event in given stream

The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.

Parameters:

  • event_id (String)
  • stream_name (String)

Returns:

  • (Integer)

    nonnegative integer position of event in stream

Raises:



145
146
147
# File 'lib/ruby_event_store/client.rb', line 145

def position_in_stream(event_id, stream_name)
  @repository.position_in_stream(event_id, Stream.new(stream_name))
end

#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self

Persists events and notifies subscribed handlers about them

Parameters:

  • events (Array<Event>, Event)

    event(s)

  • stream_name (String) (defaults to: GLOBAL_STREAM)

    name of the stream for persisting events.

  • expected_version (:any, :auto, :none, Integer) (defaults to: :any)

    controls optimistic locking strategy. Read more

Returns:

  • (self)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ruby_event_store/client.rb', line 60

def publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any)
  enriched_events = (events)
  records = transform(enriched_events)
  append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
  enriched_events.zip(records) do |event, record|
    (correlation_id: event..fetch(:correlation_id), causation_id: event.event_id) do
      if @broker.public_method(:call).arity == 3
        @broker.call(topic || event.event_type, event, record)
      else
        warn <<~EOW
          Message broker shall support topics.
          Topic WILL BE IGNORED in the current broker.
          Modify the broker implementation to pass topic as an argument to broker.call method.
        EOW
        @broker.call(event, record)
      end
    end
  end
  self
end

#readSpecification

Starts building a query specification for reading events. More info.

Returns:



124
125
126
# File 'lib/ruby_event_store/client.rb', line 124

def read
  Specification.new(SpecificationReader.new(@repository, @mapper))
end

#rescue_from_double_json_serialization!Object



49
50
51
52
# File 'lib/ruby_event_store/client.rb', line 49

def rescue_from_double_json_serialization!
  return unless @repository.respond_to? :rescue_from_double_json_serialization!
  @repository.rescue_from_double_json_serialization!
end

#streams_of(event_id) ⇒ Array<Stream>

Gets list of streams where event is stored or linked

Returns:

  • (Array<Stream>)

    where event is stored or linked



131
132
133
# File 'lib/ruby_event_store/client.rb', line 131

def streams_of(event_id)
  @repository.streams_of(event_id)
end

#subscribe(subscriber, to:) ⇒ Proc #subscribe(to:, &subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for published events of provided type.

Overloads:

  • #subscribe(subscriber, to:) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>)

      types of events to subscribe

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe(to:, &subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • to (Array<Class>)

      types of events to subscribe

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


184
185
186
187
188
# File 'lib/ruby_event_store/client.rb', line 184

def subscribe(subscriber = nil, to:, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  subscriber ||= proc
  @broker.add_subscription(subscriber, to.map { |event_klass| @event_type_resolver.call(event_klass) })
end

#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc

Subscribes a handler (subscriber) that will be invoked for all published events

Overloads:

  • #subscribe_to_all_events(subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Object, Class)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

  • #subscribe_to_all_events(&subscriber) ⇒ Proc

    Returns - unsubscribe proc. Call to unsubscribe.

    Parameters:

    • subscriber (Proc)

      handler

    Returns:

    • (Proc)
      • unsubscribe proc. Call to unsubscribe.

    Raises:

Raises:

  • (ArgumentError)


200
201
202
203
# File 'lib/ruby_event_store/client.rb', line 200

def subscribe_to_all_events(subscriber = nil, &proc)
  raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
  @broker.add_global_subscription(subscriber || proc)
end

#subscribers_for(event_class) ⇒ Array<Object, Class>

Get list of handlers subscribed to an event

Parameters:

  • to (Class, String)

    type of events to get list of sybscribed handlers

Returns:

  • (Array<Object, Class>)


209
210
211
# File 'lib/ruby_event_store/client.rb', line 209

def subscribers_for(event_class)
  @broker.all_subscriptions_for(@event_type_resolver.call(event_class))
end

#with_metadata(metadata_for_block, &block) ⇒ Object

Set additional metadata for all events published within the provided block Read more

Parameters:

  • metadata (Hash)

    metadata to set for events

  • block (Proc)

    block of code during which the metadata will be added

Returns:

  • (Object)

    last value returned by the provided block



302
303
304
305
306
307
308
# File 'lib/ruby_event_store/client.rb', line 302

def (, &block)
   = 
  self. = .merge()
  block.call if block_given?
ensure
  self. = 
end

#within(&block) ⇒ Within

Use for starting temporary subscriptions. Read more

Parameters:

  • block (Proc)

    block of code during which the temporary subscriptions will be active

Returns:

  • (Within)

    builder object which collects temporary subscriptions

Raises:

  • (ArgumentError)


291
292
293
294
# File 'lib/ruby_event_store/client.rb', line 291

def within(&block)
  raise ArgumentError if block.nil?
  Within.new(block, @broker, @event_type_resolver)
end