Class: RubyEventStore::Client
- Inherits:
-
Object
- Object
- RubyEventStore::Client
- Defined in:
- lib/ruby_event_store/client.rb
Defined Under Namespace
Classes: Within
Instance Method Summary collapse
-
#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists new event(s) without notifying any subscribed handlers.
-
#delete_stream(stream_name) ⇒ self
Deletes a stream.
-
#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event
Deserialize event which was serialized for async event handlers Read more.
-
#event_in_stream?(event_id, stream_name) ⇒ Boolean
Checks whether event is linked in given stream.
-
#global_position(event_id) ⇒ Integer
Gets position of the event in global stream.
-
#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client
constructor
A new instance of Client.
- #inspect ⇒ Object
-
#link(event_ids, stream_name:, expected_version: :any) ⇒ self
Links already persisted event(s) to a different stream.
-
#metadata ⇒ Hash
Read additional metadata which will be added for published events Read more.
-
#overwrite(events_or_event) ⇒ self
Overwrite existing event(s) with the same ID.
-
#position_in_stream(event_id, stream_name) ⇒ Integer
Gets position of the event in given stream.
-
#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists events and notifies subscribed handlers about them.
-
#read ⇒ Specification
Starts building a query specification for reading events.
- #rescue_from_double_json_serialization! ⇒ Object
-
#streams_of(event_id) ⇒ Array<Stream>
Gets list of streams where event is stored or linked.
-
#subscribe(subscriber = nil, to:, &proc) ⇒ Object
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
-
#subscribe_to_all_events(subscriber = nil, &proc) ⇒ Object
Subscribes a handler (subscriber) that will be invoked for all published events.
-
#subscribers_for(event_class) ⇒ Array<Object, Class>
Get list of handlers subscribed to an event.
-
#with_metadata(metadata_for_block, &block) ⇒ Object
Set additional metadata for all events published within the provided block Read more.
-
#within(&block) ⇒ Within
Use for starting temporary subscriptions.
Constructor Details
#initialize(repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new) ⇒ Client
Returns a new instance of Client.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/ruby_event_store/client.rb', line 7 def initialize( repository: InMemoryRepository.new, mapper: Mappers::BatchMapper.new, subscriptions: nil, dispatcher: nil, message_broker: nil, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new ) @repository = repository @mapper = batch_mapper?(mapper) ? mapper : Mappers::BatchMapper.new(mapper) @broker = || Broker.new(subscriptions: subscriptions || Subscriptions.new, dispatcher: dispatcher || SyncScheduler.new) @clock = clock @metadata = Concurrent::ThreadLocalVar.new @correlation_id_generator = correlation_id_generator @event_type_resolver = event_type_resolver if (subscriptions || dispatcher) msg = <<~EOW Passing subscriptions and dispatcher to #{self.class} has been deprecated. Pass it using message_broker argument. For example: event_store = RubyEventStore::Client.new( message_broker: RubyEventStore::Broker.new( subscriptions: RubyEventStore::Subscriptions.new, dispatcher: RubyEventStore::SyncScheduler.new ) ) EOW msg += <<~EOW if Because message_broker has been provided, arguments passed by subscriptions or dispatcher will be ignored. EOW Deprecations.warn(:ruby_client_subscriptions_dispatcher, message: msg) end end |
Instance Method Details
#append(events, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists new event(s) without notifying any subscribed handlers
85 86 87 88 89 90 91 92 |
# File 'lib/ruby_event_store/client.rb', line 85 def append(events, stream_name: GLOBAL_STREAM, expected_version: :any) append_records_to_stream( transform((events)), stream_name: stream_name, expected_version: expected_version, ) self end |
#delete_stream(stream_name) ⇒ self
Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.
115 116 117 118 |
# File 'lib/ruby_event_store/client.rb', line 115 def delete_stream(stream_name) @repository.delete_stream(Stream.new(stream_name)) self end |
#deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) ⇒ Event
Deserialize event which was serialized for async event handlers Read more
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/ruby_event_store/client.rb', line 314 def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 } @mapper.records_to_events( [ SerializedRecord.new( event_type: event_type, event_id: event_id, data: data, metadata: , timestamp: || = [serializer.load()], valid_at: valid_at || , ).deserialize(serializer), ], ).first end |
#event_in_stream?(event_id, stream_name) ⇒ Boolean
Checks whether event is linked in given stream
167 168 169 170 |
# File 'lib/ruby_event_store/client.rb', line 167 def event_in_stream?(event_id, stream_name) stream = Stream.new(stream_name) stream.global? ? @repository.has_event?(event_id) : @repository.event_in_stream?(event_id, stream) end |
#global_position(event_id) ⇒ Integer
Gets position of the event in global stream
The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.
158 159 160 |
# File 'lib/ruby_event_store/client.rb', line 158 def global_position(event_id) @repository.global_position(event_id) end |
#inspect ⇒ Object
373 374 375 |
# File 'lib/ruby_event_store/client.rb', line 373 def inspect "#<#{self.class}:0x#{__id__.to_s(16)}>" end |
#link(event_ids, stream_name:, expected_version: :any) ⇒ self
Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.
101 102 103 104 105 106 107 |
# File 'lib/ruby_event_store/client.rb', line 101 def link(event_ids, stream_name:, expected_version: :any) warn <<~EOW if event_ids.nil? || Array(event_ids).any?(&:nil?) Passing `nil` to link is deprecated and will raise ArgumentError in RubyEventStore 3.0. EOW @repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version)) self end |
#metadata ⇒ Hash
Read additional metadata which will be added for published events Read more
335 336 337 |
# File 'lib/ruby_event_store/client.rb', line 335 def @metadata.value || EMPTY_HASH end |
#overwrite(events_or_event) ⇒ self
Overwrite existing event(s) with the same ID.
Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. Read more
368 369 370 371 |
# File 'lib/ruby_event_store/client.rb', line 368 def overwrite(events_or_event) @repository.(transform(Array(events_or_event))) self end |
#position_in_stream(event_id, stream_name) ⇒ Integer
Gets position of the event in given stream
The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.
145 146 147 |
# File 'lib/ruby_event_store/client.rb', line 145 def position_in_stream(event_id, stream_name) @repository.position_in_stream(event_id, Stream.new(stream_name)) end |
#publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) ⇒ self
Persists events and notifies subscribed handlers about them
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ruby_event_store/client.rb', line 60 def publish(events, topic: nil, stream_name: GLOBAL_STREAM, expected_version: :any) enriched_events = (events) records = transform(enriched_events) append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version) enriched_events.zip(records) do |event, record| (correlation_id: event..fetch(:correlation_id), causation_id: event.event_id) do if @broker.public_method(:call).arity == 3 @broker.call(topic || event.event_type, event, record) else warn <<~EOW Message broker shall support topics. Topic WILL BE IGNORED in the current broker. Modify the broker implementation to pass topic as an argument to broker.call method. EOW @broker.call(event, record) end end end self end |
#read ⇒ Specification
Starts building a query specification for reading events. More info.
124 125 126 |
# File 'lib/ruby_event_store/client.rb', line 124 def read Specification.new(SpecificationReader.new(@repository, @mapper)) end |
#rescue_from_double_json_serialization! ⇒ Object
49 50 51 52 |
# File 'lib/ruby_event_store/client.rb', line 49 def rescue_from_double_json_serialization! return unless @repository.respond_to? :rescue_from_double_json_serialization! @repository.rescue_from_double_json_serialization! end |
#streams_of(event_id) ⇒ Array<Stream>
Gets list of streams where event is stored or linked
131 132 133 |
# File 'lib/ruby_event_store/client.rb', line 131 def streams_of(event_id) @repository.streams_of(event_id) end |
#subscribe(subscriber, to:) ⇒ Proc #subscribe(to:, &subscriber) ⇒ Proc
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
184 185 186 187 188 |
# File 'lib/ruby_event_store/client.rb', line 184 def subscribe(subscriber = nil, to:, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc subscriber ||= proc @broker.add_subscription(subscriber, to.map { |event_klass| @event_type_resolver.call(event_klass) }) end |
#subscribe_to_all_events(subscriber) ⇒ Proc #subscribe_to_all_events(&subscriber) ⇒ Proc
Subscribes a handler (subscriber) that will be invoked for all published events
200 201 202 203 |
# File 'lib/ruby_event_store/client.rb', line 200 def subscribe_to_all_events(subscriber = nil, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc @broker.add_global_subscription(subscriber || proc) end |
#subscribers_for(event_class) ⇒ Array<Object, Class>
Get list of handlers subscribed to an event
209 210 211 |
# File 'lib/ruby_event_store/client.rb', line 209 def subscribers_for(event_class) @broker.all_subscriptions_for(@event_type_resolver.call(event_class)) end |
#with_metadata(metadata_for_block, &block) ⇒ Object
Set additional metadata for all events published within the provided block Read more
302 303 304 305 306 307 308 |
# File 'lib/ruby_event_store/client.rb', line 302 def (, &block) = self. = .merge() block.call if block_given? ensure self. = end |