Class: Igniter::Store::ContractableReceiptSink

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/contractable_receipt_sink.rb

Overview

Durable sink for contractable observation/event receipts emitted by igniter-embed’s contractable runner.

Implements the record_observation / record_event store adapter protocol so it can be passed directly as the ‘store:` option to any contractable.

Idempotency policy:

record_observation — keyed by observation_id; same id overwrites the
  current fact and creates a causation chain entry. Safe to retry.
record_event — append-only history; retries produce duplicate entries.
  Callers should deduplicate at the source if needed.

Constant Summary collapse

REQUIRED_OBSERVATION_FIELDS =
%i[observation_id receipt_kind].freeze
REQUIRED_EVENT_FIELDS =
%i[event_id receipt_kind observation_id].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store: nil, client: nil, observations_store: :contractable_observations, events_store: :contractable_events, producer: { type: :embed, name: :contractable_receipt_sink }) ⇒ ContractableReceiptSink

Returns a new instance of ContractableReceiptSink.

Raises:

  • (ArgumentError)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 22

def initialize(
  store: nil,
  client: nil,
  observations_store: :contractable_observations,
  events_store: :contractable_events,
  producer: { type: :embed, name: :contractable_receipt_sink }
)
  raise ArgumentError, "ContractableReceiptSink requires store: or client:" unless store || client

  @store = store
  @client = client
  @observations_store = observations_store.to_sym
  @events_store = events_store.to_sym
  @producer = producer
  register_descriptors
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



20
21
22
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20

def client
  @client
end

#events_storeObject (readonly)

Returns the value of attribute events_store.



20
21
22
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20

def events_store
  @events_store
end

#observations_storeObject (readonly)

Returns the value of attribute observations_store.



20
21
22
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20

def observations_store
  @observations_store
end

#producerObject (readonly)

Returns the value of attribute producer.



20
21
22
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20

def producer
  @producer
end

#storeObject (readonly)

Returns the value of attribute store.



20
21
22
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 20

def store
  @store
end

Instance Method Details

#error_events(limit: nil) ⇒ Object



80
81
82
83
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 80

def error_events(limit: nil)
  results = history_facts(store: events_store).map { |f| fact_value(f) }.select { |r| r[:severity] == :error }
  limit ? results.take(limit) : results
end

#events_for(observation_id) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 63

def events_for(observation_id)
  history_partition_values(
    store: events_store,
    partition_key: :observation_id,
    partition_value: observation_id.to_s
  )
end

#observation(observation_id) ⇒ Object



59
60
61
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 59

def observation(observation_id)
  normalize_read_result(target.read(store: observations_store, key: observation_id.to_s))
end

#observations(status: nil, limit: nil) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 71

def observations(status: nil, limit: nil)
  all_facts = history_facts(store: observations_store)
  by_key = {}
  all_facts.each { |f| by_key[fact_key(f)] = f }
  results = by_key.values.sort_by { |f| fact_transaction_time(f) }.map { |f| fact_value(f) }
  results = results.select { |r| r[:status] == status } if status
  limit ? results.take(limit) : results
end

#record_event(receipt) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 49

def record_event(receipt)
  validate_receipt!(receipt, REQUIRED_EVENT_FIELDS, :contractable_event)
  target.append(
    history: events_store,
    event: receipt,
    partition_key: :observation_id,
    producer: producer
  )
end

#record_observation(receipt) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/igniter/store/contractable_receipt_sink.rb', line 39

def record_observation(receipt)
  validate_receipt!(receipt, REQUIRED_OBSERVATION_FIELDS, :contractable_observation)
  target.write(
    store: observations_store,
    key: receipt[:observation_id].to_s,
    value: receipt,
    producer: producer
  )
end