Class: Schematic::DataStream::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/schematic/datastream/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(api_key:, base_url:, logger:, rules_engine:, cache_ttl: DEFAULT_CACHE_TTL, company_cache: nil, user_cache: nil, flag_cache: nil, redis_client: nil, redis_key_prefix: "schematic:", replicator_mode: false, replicator_health_url: REPLICATOR_HEALTH_URL, replicator_health_interval: REPLICATOR_HEALTH_INTERVAL) ⇒ Client

Returns a new instance of Client.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/schematic/datastream/client.rb', line 20

def initialize(
  api_key:,
  base_url:,
  logger:,
  rules_engine:,
  cache_ttl: DEFAULT_CACHE_TTL,
  company_cache: nil,
  user_cache: nil,
  flag_cache: nil,
  redis_client: nil,
  redis_key_prefix: "schematic:",
  replicator_mode: false,
  replicator_health_url: REPLICATOR_HEALTH_URL,
  replicator_health_interval: REPLICATOR_HEALTH_INTERVAL
)
  @api_key = api_key
  @base_url = base_url
  @logger = logger
  @rules_engine = rules_engine
  @cache_ttl = cache_ttl
  @replicator_mode = replicator_mode
  @replicator_health_url = replicator_health_url
  @replicator_health_interval = replicator_health_interval
  @replicator_ready = false
  @cache_version = ""
  @mutex = Mutex.new
  @stopped = false

  if @replicator_mode
    has_shared_cache = redis_client || [company_cache, user_cache, flag_cache].compact.any?
    unless has_shared_cache
      raise ArgumentError,
            "Replicator mode requires a shared cache provider (e.g. Redis). " \
            "Pass redis_client: or provide company_cache:, user_cache:, flag_cache: with a shared cache implementation."
    end

    explicit_caches = [company_cache, user_cache, flag_cache].compact
    if explicit_caches.grep(Schematic::LocalCache).any?
      raise ArgumentError,
            "Replicator mode requires a shared (distributed) cache provider, but received LocalCache. " \
            "Use a shared cache implementation such as RedisCacheProvider."
    end
  end

  # Track internally-created caches so we can stop their cleanup threads
  @owned_caches = []

  # Entity caches — priority: custom provider > Redis client > LocalCache
  flag_ttl = [MAX_CACHE_TTL, cache_ttl].max
  company_primary = company_cache || build_cache_provider(redis_client, redis_key_prefix, cache_ttl)
  user_primary = user_cache || build_cache_provider(redis_client, redis_key_prefix, cache_ttl)
  flag_primary = flag_cache || build_cache_provider(redis_client, redis_key_prefix, flag_ttl)

  # Lookup caches (key→ID mappings) — use Redis when available so
  # replicator instances share lookup state, otherwise LocalCache.
  company_lookup = build_cache_provider(redis_client, redis_key_prefix, cache_ttl)
  user_lookup = build_cache_provider(redis_client, redis_key_prefix, cache_ttl)

  @company_cache = ResourceCache.new(
    primary_cache: company_primary,
    lookup_cache: company_lookup,
    key_prefix: "company",
    get_id: ->(c) { c[:id] || c["id"] },
    get_keys: ->(c) { c[:keys] || c["keys"] || {} }
  )

  @user_cache = ResourceCache.new(
    primary_cache: user_primary,
    lookup_cache: user_lookup,
    key_prefix: "user",
    get_id: ->(u) { u[:id] || u["id"] },
    get_keys: ->(u) { u[:keys] || u["keys"] || {} }
  )

  @flag_cache = flag_primary

  # Pending requests for deduplication
  @pending_companies = {} # cache_key => [ConditionVariable, ...]
  @pending_users = {}
  @pending_flags = nil
  @pending_mutex = Mutex.new

  @ws_client = nil
  @health_thread = nil
end

Instance Method Details

#check_flag(eval_ctx, flag_key) ⇒ Object

Evaluate a flag using cached DataStream data and the local rules engine. Raises DataStream::EvaluationError when the flag cannot be evaluated locally (e.g. flag not in cache, rules engine unavailable), so the caller can fall back to the API path.

Raises:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/schematic/datastream/client.rb', line 146

def check_flag(eval_ctx, flag_key)
  flag = get_flag(flag_key)
  raise EvaluationError, "flag '#{flag_key}' not found in cache" unless flag

  unless @rules_engine&.initialized?
    @logger.warn("Rules engine not initialized, using default flag value")
    raise EvaluationError, "rules engine not available"
  end

  company = nil
  user = nil

  company_keys = eval_ctx[:company] || eval_ctx["company"]
  user_keys = eval_ctx[:user] || eval_ctx["user"]

  if company_keys&.any?
    company = get_company(company_keys)
    @logger.debug("Company #{company ? "found in cache" : "not found in cache"} for keys: #{company_keys}")
  end

  if user_keys&.any?
    user = get_user(user_keys)
    @logger.debug("User #{user ? "found in cache" : "not found in cache"} for keys: #{user_keys}")
  end

  @logger.debug("Evaluating flag with rules engine: flag=#{flag_key}, company=#{company&.dig(:id)}, user=#{user&.dig(:id)}")
  result = @rules_engine.check_flag(flag, company, user)
  @logger.debug("Rules engine evaluation result: value=#{result[:value]}, reason=#{result[:reason]}")
  result[:flag_key] = flag_key
  result
end

#closeObject



133
134
135
136
137
138
139
140
# File 'lib/schematic/datastream/client.rb', line 133

def close
  @logger.info("Closing DataStream client")
  @mutex.synchronize { @stopped = true }
  @ws_client&.close
  @health_thread&.join(5)
  @owned_caches.each { |c| c.stop if c.respond_to?(:stop) }
  @logger.info("DataStream client closed")
end

#connected?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/schematic/datastream/client.rb', line 129

def connected?
  @replicator_mode ? @replicator_ready : (@ws_client&.connected || false)
end

#startObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/schematic/datastream/client.rb', line 106

def start
  @logger.info("Starting DataStream client")
  @rules_engine&.initialize!

  # Set cache version from rules engine so cache keys include the WASM version.
  # In replicator mode this is overwritten by the replicator health check response.
  update_cache_version(@rules_engine.version_key) if @rules_engine&.initialized?

  if @replicator_mode
    @logger.info("Replicator mode enabled - skipping WebSocket connection")
    start_health_check
  else
    @ws_client = WebSocketClient.new(
      base_url: @base_url,
      api_key: @api_key,
      logger: @logger,
      message_handler: method(:handle_message),
      ready_handler: method(:handle_ready)
    )
    @ws_client.start
  end
end

#update_company_metrics(company_keys, event_name, quantity) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/schematic/datastream/client.rb', line 178

def update_company_metrics(company_keys, event_name, quantity)
  company = @company_cache.get_by_keys(company_keys)
  return unless company

  company = Merge.deep_copy(company)
  metrics = company[:metrics] || company["metrics"] || []

  metrics.each do |metric|
    subtype = metric[:event_subtype] || metric["event_subtype"]
    next unless subtype == event_name

    current = metric[:value] || metric["value"] || 0
    metric[:value] = current + (quantity || 1)
  end

  company[:metrics] = metrics
  @company_cache.cache_entity(company, ttl: @cache_ttl)
end