Class: Igniter::Store::ContractableReceiptSink
- Inherits:
-
Object
- Object
- Igniter::Store::ContractableReceiptSink
- Defined in:
- lib/igniter/store/contractable_receipt_sink.rb
Overview
Durable sink for contractable observation/event receipts emitted by igniter-embed’s contractable runner.
Implements the record_observation / record_event store adapter protocol so it can be passed directly as the ‘store:` option to any contractable.
Idempotency policy:
record_observation — keyed by observation_id; same id overwrites the
current fact and creates a causation chain entry. Safe to retry.
record_event — append-only history; retries produce duplicate entries.
Callers should deduplicate at the source if needed.
Constant Summary collapse
- REQUIRED_OBSERVATION_FIELDS =
%i[observation_id receipt_kind].freeze
- REQUIRED_EVENT_FIELDS =
%i[event_id receipt_kind observation_id].freeze
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#events_store ⇒ Object
readonly
Returns the value of attribute events_store.
-
#observations_store ⇒ Object
readonly
Returns the value of attribute observations_store.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
Instance Method Summary collapse
- #error_events(limit: nil) ⇒ Object
- #events_for(observation_id) ⇒ Object
-
#initialize(store: nil, client: nil, observations_store: :contractable_observations, events_store: :contractable_events, producer: { type: :embed, name: :contractable_receipt_sink }) ⇒ ContractableReceiptSink
constructor
A new instance of ContractableReceiptSink.
- #observation(observation_id) ⇒ Object
- #observations(status: nil, limit: nil) ⇒ Object
- #record_event(receipt) ⇒ Object
- #record_observation(receipt) ⇒ Object
Constructor Details
#initialize(store: nil, client: nil, observations_store: :contractable_observations, events_store: :contractable_events, producer: { type: :embed, name: :contractable_receipt_sink }) ⇒ ContractableReceiptSink
Returns a new instance of ContractableReceiptSink.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 22 def initialize( store: nil, client: nil, observations_store: :contractable_observations, events_store: :contractable_events, producer: { type: :embed, name: :contractable_receipt_sink } ) raise ArgumentError, "ContractableReceiptSink requires store: or client:" unless store || client @store = store @client = client @observations_store = observations_store.to_sym @events_store = events_store.to_sym @producer = producer register_descriptors end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
20 21 22 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20 def client @client end |
#events_store ⇒ Object (readonly)
Returns the value of attribute events_store.
20 21 22 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20 def events_store @events_store end |
#observations_store ⇒ Object (readonly)
Returns the value of attribute observations_store.
20 21 22 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20 def observations_store @observations_store end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
20 21 22 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20 def producer @producer end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
20 21 22 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20 def store @store end |
Instance Method Details
#error_events(limit: nil) ⇒ Object
80 81 82 83 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 80 def error_events(limit: nil) results = history_facts(store: events_store).map { |f| fact_value(f) }.select { |r| r[:severity] == :error } limit ? results.take(limit) : results end |
#events_for(observation_id) ⇒ Object
63 64 65 66 67 68 69 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 63 def events_for(observation_id) history_partition_values( store: events_store, partition_key: :observation_id, partition_value: observation_id.to_s ) end |
#observation(observation_id) ⇒ Object
59 60 61 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 59 def observation(observation_id) normalize_read_result(target.read(store: observations_store, key: observation_id.to_s)) end |
#observations(status: nil, limit: nil) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 71 def observations(status: nil, limit: nil) all_facts = history_facts(store: observations_store) by_key = {} all_facts.each { |f| by_key[fact_key(f)] = f } results = by_key.values.sort_by { |f| fact_transaction_time(f) }.map { |f| fact_value(f) } results = results.select { |r| r[:status] == status } if status limit ? results.take(limit) : results end |
#record_event(receipt) ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 49 def record_event(receipt) validate_receipt!(receipt, REQUIRED_EVENT_FIELDS, :contractable_event) target.append( history: events_store, event: receipt, partition_key: :observation_id, producer: producer ) end |
#record_observation(receipt) ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 39 def record_observation(receipt) validate_receipt!(receipt, REQUIRED_OBSERVATION_FIELDS, :contractable_observation) target.write( store: observations_store, key: receipt[:observation_id].to_s, value: receipt, producer: producer ) end |