Class: RubyEventStore::Client
- Inherits:
-
Object
- Object
- RubyEventStore::Client
- Defined in:
- lib/ruby_event_store/client.rb
Defined Under Namespace
Classes: Within
Instance Method Summary collapse
-
#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists new event(s) without notifying any subscribed handlers.
-
#delete_stream(stream_name) ⇒ self
Deletes a stream.
-
#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event
Deserialize event which was serialized for async event handlers Read more.
-
#event_in_stream?(event_id, stream_name) ⇒ Boolean
Checks whether event is linked in given stream.
-
#global_position(event_id) ⇒ Integer
Gets position of the event in global stream.
-
#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client
constructor
A new instance of Client.
- #inspect ⇒ Object
-
#link(event_ids, stream_name:, expected_version: :any) ⇒ self
Links already persisted event(s) to a different stream.
-
#metadata ⇒ Hash
Read additional metadata which will be added for published events Read more.
-
#overwrite(events_or_event) ⇒ self
Overwrite existing event(s) with the same ID.
-
#position_in_stream(event_id, stream_name) ⇒ Integer
Gets position of the event in given stream.
-
#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists events and notifies subscribed handlers about them.
-
#read ⇒ Specification
Starts building a query specification for reading events.
- #rescue_from_double_json_serialization! ⇒ Object
-
#streams_of(event_id) ⇒ Array<Stream>
Gets list of streams where event is stored or linked.
-
#subscribe(subscriber = nil, to:, &proc) ⇒ Object
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
-
#subscribe_to_all_events(subscriber = nil, &proc) ⇒ Object
Subscribes a handler (subscriber) that will be invoked for all published events.
-
#subscribers_for(event_class) ⇒ Array<Object, Class>
Get list of handlers subscribed to an event.
-
#with_metadata(metadata_for_block, &block) ⇒ Object
Set additional metadata for all events published within the provided block Read more.
-
#within(&block) ⇒ Within
Use for starting temporary subscriptions.
Constructor Details
#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client
Returns a new instance of Client.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/ruby_event_store/client.rb', line 7 def initialize( repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new ) @repository = repository @mapper = batch_mapper?(mapper) ? mapper : Mappers::BatchMapper.new(mapper) @broker = || Broker.new(subscriptions: subscriptions || Subscriptions.new, dispatcher: dispatcher || SyncScheduler.new) @clock = clock @metadata = Concurrent::ThreadLocalVar.new @correlation_id_generator = correlation_id_generator @event_type_resolver = event_type_resolver if (subscriptions || dispatcher) msg = <<~EOW Passing subscriptions and dispatcher to #{self.class} has been deprecated. Pass it using message_broker argument. For example: event_store = RubyEventStore::Client.new( message_broker: RubyEventStore::Broker.new( subscriptions: RubyEventStore::Subscriptions.new, dispatcher: RubyEventStore::SyncScheduler.new ) ) EOW msg += <<~EOW if Because message_broker has been provided, arguments passed by subscriptions or dispatcher will be ignored. EOW Deprecations.warn(:ruby_client_subscriptions_dispatcher, message: msg) end end |
Instance Method Details
#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists new event(s) without notifying any subscribed handlers
85 86 87 88 89 90 91 92 |
# File 'lib/ruby_event_store/client.rb', line 85 def append(events, stream_name: GLOBAL_STREAM, expected_version: :any) append_records_to_stream( transform((events)), stream_name: stream_name, expected_version: expected_version, ) self end |
#delete_stream(stream_name) ⇒ self
Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.
113 114 115 116 |
# File 'lib/ruby_event_store/client.rb', line 113 def delete_stream(stream_name) @repository.delete_stream(Stream.new(stream_name)) self end |
#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event
Deserialize event which was serialized for async event handlers Read more
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/ruby_event_store/client.rb', line 312 def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 } @mapper.records_to_events( [ SerializedRecord.new( event_type: event_type, event_id: event_id, data: data, metadata: , timestamp: || = [serializer.load()], valid_at: valid_at || , ).deserialize(serializer), ], ).first end |
#event_in_stream?(event_id, stream_name) ⇒ Boolean
Checks whether event is linked in given stream
165 166 167 168 |
# File 'lib/ruby_event_store/client.rb', line 165 def event_in_stream?(event_id, stream_name) stream = Stream.new(stream_name) stream.global? ? @repository.has_event?(event_id) : @repository.event_in_stream?(event_id, stream) end |
#global_position(event_id) ⇒ Integer
Gets position of the event in global stream
The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.
156 157 158 |
# File 'lib/ruby_event_store/client.rb', line 156 def global_position(event_id) @repository.global_position(event_id) end |
#inspect ⇒ Object
371 372 373 |
# File 'lib/ruby_event_store/client.rb', line 371 def inspect "#<#{self.class}:0x#{__id__.to_s(16)}>" end |
#link(event_ids, stream_name:, expected_version: :any) ⇒ self
Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.
101 102 103 104 105 |
# File 'lib/ruby_event_store/client.rb', line 101 def link(event_ids, stream_name:, expected_version: :any) raise ArgumentError, "event_ids must not be nil" if event_ids.nil? || Array(event_ids).any?(&:nil?) @repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version)) self end |
#metadata ⇒ Hash
Read additional metadata which will be added for published events Read more
333 334 335 |
# File 'lib/ruby_event_store/client.rb', line 333 def @metadata.value || EMPTY_HASH end |
#overwrite(events_or_event) ⇒ self
Overwrite existing event(s) with the same ID.
Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more
366 367 368 369 |
# File 'lib/ruby_event_store/client.rb', line 366 def overwrite(events_or_event) @repository.(transform(Array(events_or_event))) self end |
#position_in_stream(event_id, stream_name) ⇒ Integer
Gets position of the event in given stream
The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.
143 144 145 |
# File 'lib/ruby_event_store/client.rb', line 143 def position_in_stream(event_id, stream_name) @repository.position_in_stream(event_id, Stream.new(stream_name)) end |
#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists events and notifies subscribed handlers about them
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ruby_event_store/client.rb', line 60 def publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) enriched_events = (events) records = transform(enriched_events) append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version) enriched_events.zip(records) do |event, record| (correlation_id: event..fetch(:correlation_id), causation_id: event.event_id) do if @broker.public_method(:call).arity == 3 @broker.call(topic || event.event_type, event, record) else warn <<~EOW Message broker shall support topics. Topic WILL BE IGNORED in the current broker. Modify the broker implementation to pass topic as an argument to broker.call method. EOW @broker.call(event, record) end end end self end |
#read ⇒ Specification
Starts building a query specification for reading events. More info.
122 123 124 |
# File 'lib/ruby_event_store/client.rb', line 122 def read Specification.new(SpecificationReader.new(@repository, @mapper)) end |
#rescue_from_double_json_serialization! ⇒ Object
49 50 51 52 |
# File 'lib/ruby_event_store/client.rb', line 49 def rescue_from_double_json_serialization! return unless @repository.respond_to? :rescue_from_double_json_serialization! @repository.rescue_from_double_json_serialization! end |
#streams_of(event_id) ⇒ Array<Stream>
Gets list of streams where event is stored or linked
129 130 131 |
# File 'lib/ruby_event_store/client.rb', line 129 def streams_of(event_id) @repository.streams_of(event_id) end |
#subscribe(subscriber, to:) ⇒ Proc #subscribe(to:, &subscriber) ⇒ Proc
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
182 183 184 185 186 |
# File 'lib/ruby_event_store/client.rb', line 182 def subscribe(subscriber = nil, to:, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc subscriber ||= proc @broker.add_subscription(subscriber, to.map { |event_klass| @event_type_resolver.call(event_klass) }) end |
#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc
Subscribes a handler (subscriber) that will be invoked for all published events
198 199 200 201 |
# File 'lib/ruby_event_store/client.rb', line 198 def subscribe_to_all_events(subscriber = nil, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc @broker.add_global_subscription(subscriber || proc) end |
#subscribers_for(event_class) ⇒ Array<Object, Class>
Get list of handlers subscribed to an event
207 208 209 |
# File 'lib/ruby_event_store/client.rb', line 207 def subscribers_for(event_class) @broker.all_subscriptions_for(@event_type_resolver.call(event_class)) end |
#with_metadata(metadata_for_block, &block) ⇒ Object
Set additional metadata for all events published within the provided block Read more
300 301 302 303 304 305 306 |
# File 'lib/ruby_event_store/client.rb', line 300 def (, &block) = self. = .merge() block.call if block_given? ensure self. = end |