Class: RubyEventStore::Client
- Inherits:
-
Object
- Object
- RubyEventStore::Client
- Defined in:
- lib/ruby_event_store/client.rb
Defined Under Namespace
Classes: Within
Instance Method Summary collapse
-
#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists new event(s) without notifying any subscribed handlers.
-
#delete_stream(stream_name) ⇒ self
Deletes a stream.
-
#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event
Deserialize event which was serialized for async event handlers Read more.
-
#event_in_stream?(event_id, stream_name) ⇒ Boolean
Checks whether event is linked in given stream.
-
#global_position(event_id) ⇒ Integer
Gets position of the event in global stream.
-
#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client
constructor
A new instance of Client.
- #inspect ⇒ Object
-
#link(event_ids, stream_name:, expected_version: :any) ⇒ self
Links already persisted event(s) to a different stream.
-
#metadata ⇒ Hash
Read additional metadata which will be added for published events Read more.
-
#overwrite(events_or_event) ⇒ self
Overwrite existing event(s) with the same ID.
-
#position_in_stream(event_id, stream_name) ⇒ Integer
Gets position of the event in given stream.
-
#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists events and notifies subscribed handlers about them.
-
#read ⇒ Specification
Starts building a query specification for reading events.
- #rescue_from_double_json_serialization! ⇒ Object
-
#streams_of(event_id) ⇒ Array<Stream>
Gets list of streams where event is stored or linked.
-
#subscribe(subscriber = nil, to:, &proc) ⇒ Object
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
-
#subscribe_to_all_events(subscriber = nil, &proc) ⇒ Object
Subscribes a handler (subscriber) that will be invoked for all published events.
-
#subscribers_for(event_class) ⇒ Array<Object, Class>
Get list of handlers subscribed to an event.
-
#with_metadata(metadata_for_block, &block) ⇒ Object
Set additional metadata for all events published within the provided block Read more.
-
#within(&block) ⇒ Within
Use for starting temporary subscriptions.
Constructor Details
#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client
Returns a new instance of Client.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/ruby_event_store/client.rb', line 7 def initialize( repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new ) @repository = repository @mapper = batch_mapper?(mapper) ? mapper : Mappers::BatchMapper.new(mapper) @broker = || Broker.new(subscriptions: subscriptions || Subscriptions.new, dispatcher: dispatcher || SyncScheduler.new) @clock = clock @metadata = Concurrent::ThreadLocalVar.new @correlation_id_generator = correlation_id_generator @event_type_resolver = event_type_resolver if (subscriptions || dispatcher) warn <<~EOW Passing subscriptions and dispatcher to #{self.class} has been deprecated. Pass it using message_broker argument. For example: event_store = RubyEventStore::Client.new( message_broker: RubyEventStore::Broker.new( subscriptions: RubyEventStore::Subscriptions.new, dispatcher: RubyEventStore::SyncScheduler.new ) ) EOW warn <<~EOW if () Because message_broker has been provided, arguments passed by subscriptions or dispatcher will be ignored. EOW end end |
Instance Method Details
#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists new event(s) without notifying any subscribed handlers
84 85 86 87 88 89 90 91 |
# File 'lib/ruby_event_store/client.rb', line 84 def append(events, stream_name: GLOBAL_STREAM, expected_version: :any) append_records_to_stream( transform((events)), stream_name: stream_name, expected_version: expected_version, ) self end |
#delete_stream(stream_name) ⇒ self
Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.
116 117 118 119 |
# File 'lib/ruby_event_store/client.rb', line 116 def delete_stream(stream_name) @repository.delete_stream(Stream.new(stream_name)) self end |
#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event
Deserialize event which was serialized for async event handlers Read more
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/ruby_event_store/client.rb', line 315 def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 } @mapper.records_to_events( [ SerializedRecord.new( event_type: event_type, event_id: event_id, data: data, metadata: , timestamp: || = [serializer.load()], valid_at: valid_at || , ).deserialize(serializer), ], ).first end |
#event_in_stream?(event_id, stream_name) ⇒ Boolean
Checks whether event is linked in given stream
168 169 170 171 |
# File 'lib/ruby_event_store/client.rb', line 168 def event_in_stream?(event_id, stream_name) stream = Stream.new(stream_name) stream.global? ? @repository.has_event?(event_id) : @repository.event_in_stream?(event_id, stream) end |
#global_position(event_id) ⇒ Integer
Gets position of the event in global stream
The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.
159 160 161 |
# File 'lib/ruby_event_store/client.rb', line 159 def global_position(event_id) @repository.global_position(event_id) end |
#inspect ⇒ Object
374 375 376 |
# File 'lib/ruby_event_store/client.rb', line 374 def inspect "#<#{self.class}:0x#{__id__.to_s(16)}>" end |
#link(event_ids, stream_name:, expected_version: :any) ⇒ self
Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.
100 101 102 103 104 105 106 107 108 |
# File 'lib/ruby_event_store/client.rb', line 100 def link(event_ids, stream_name:, expected_version: :any) if event_ids.nil? || Array(event_ids).any?(&:nil?) warn <<~EOW Passing `nil` to link is deprecated and will raise ArgumentError in RubyEventStore 3.0. EOW end @repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version)) self end |
#metadata ⇒ Hash
Read additional metadata which will be added for published events Read more
336 337 338 |
# File 'lib/ruby_event_store/client.rb', line 336 def @metadata.value || EMPTY_HASH end |
#overwrite(events_or_event) ⇒ self
Overwrite existing event(s) with the same ID.
Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more
369 370 371 372 |
# File 'lib/ruby_event_store/client.rb', line 369 def overwrite(events_or_event) @repository.(transform(Array(events_or_event))) self end |
#position_in_stream(event_id, stream_name) ⇒ Integer
Gets position of the event in given stream
The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.
146 147 148 |
# File 'lib/ruby_event_store/client.rb', line 146 def position_in_stream(event_id, stream_name) @repository.position_in_stream(event_id, Stream.new(stream_name)) end |
#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists events and notifies subscribed handlers about them
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/ruby_event_store/client.rb', line 59 def publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) enriched_events = (events) records = transform(enriched_events) append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version) enriched_events.zip(records) do |event, record| (correlation_id: event..fetch(:correlation_id), causation_id: event.event_id) do if @broker.public_method(:call).arity == 3 @broker.call(topic || event.event_type, event, record) else warn <<~EOW Message broker shall support topics. Topic WILL BE IGNORED in the current broker. Modify the broker implementation to pass topic as an argument to broker.call method. EOW @broker.call(event, record) end end end self end |
#read ⇒ Specification
Starts building a query specification for reading events. More info.
125 126 127 |
# File 'lib/ruby_event_store/client.rb', line 125 def read Specification.new(SpecificationReader.new(@repository, @mapper)) end |
#rescue_from_double_json_serialization! ⇒ Object
48 49 50 51 |
# File 'lib/ruby_event_store/client.rb', line 48 def rescue_from_double_json_serialization! return unless @repository.respond_to? :rescue_from_double_json_serialization! @repository.rescue_from_double_json_serialization! end |
#streams_of(event_id) ⇒ Array<Stream>
Gets list of streams where event is stored or linked
132 133 134 |
# File 'lib/ruby_event_store/client.rb', line 132 def streams_of(event_id) @repository.streams_of(event_id) end |
#subscribe(subscriber, to:) ⇒ Proc #subscribe(to:, &subscriber) ⇒ Proc
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
185 186 187 188 189 |
# File 'lib/ruby_event_store/client.rb', line 185 def subscribe(subscriber = nil, to:, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc subscriber ||= proc @broker.add_subscription(subscriber, to.map { |event_klass| @event_type_resolver.call(event_klass) }) end |
#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc
Subscribes a handler (subscriber) that will be invoked for all published events
201 202 203 204 |
# File 'lib/ruby_event_store/client.rb', line 201 def subscribe_to_all_events(subscriber = nil, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc @broker.add_global_subscription(subscriber || proc) end |
#subscribers_for(event_class) ⇒ Array<Object, Class>
Get list of handlers subscribed to an event
210 211 212 |
# File 'lib/ruby_event_store/client.rb', line 210 def subscribers_for(event_class) @broker.all_subscriptions_for(@event_type_resolver.call(event_class)) end |
#with_metadata(metadata_for_block, &block) ⇒ Object
Set additional metadata for all events published within the provided block Read more
303 304 305 306 307 308 309 |
# File 'lib/ruby_event_store/client.rb', line 303 def (, &block) = self. = .merge() block.call if block_given? ensure self. = end |