Class: Schematic::DataStream::Client
- Inherits:
-
Object
- Object
- Schematic::DataStream::Client
- Defined in:
- lib/schematic/datastream/client.rb
Instance Method Summary collapse
-
#check_flag(eval_ctx, flag_key) ⇒ Object
Evaluate a flag using cached DataStream data and the local rules engine.
- #close ⇒ Object
- #connected? ⇒ Boolean
-
#initialize(api_key:, base_url:, logger:, rules_engine:, cache_ttl: DEFAULT_CACHE_TTL, company_cache: nil, user_cache: nil, flag_cache: nil, redis_client: nil, redis_key_prefix: "schematic:", replicator_mode: false, replicator_health_url: REPLICATOR_HEALTH_URL, replicator_health_interval: REPLICATOR_HEALTH_INTERVAL) ⇒ Client
constructor
A new instance of Client.
- #start ⇒ Object
- #update_company_metrics(company_keys, event_name, quantity) ⇒ Object
Constructor Details
#initialize(api_key:, base_url:, logger:, rules_engine:, cache_ttl: DEFAULT_CACHE_TTL, company_cache: nil, user_cache: nil, flag_cache: nil, redis_client: nil, redis_key_prefix: "schematic:", replicator_mode: false, replicator_health_url: REPLICATOR_HEALTH_URL, replicator_health_interval: REPLICATOR_HEALTH_INTERVAL) ⇒ Client
Returns a new instance of Client.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/schematic/datastream/client.rb', line 20 def initialize( api_key:, base_url:, logger:, rules_engine:, cache_ttl: DEFAULT_CACHE_TTL, company_cache: nil, user_cache: nil, flag_cache: nil, redis_client: nil, redis_key_prefix: "schematic:", replicator_mode: false, replicator_health_url: REPLICATOR_HEALTH_URL, replicator_health_interval: REPLICATOR_HEALTH_INTERVAL ) @api_key = api_key @base_url = base_url @logger = logger @rules_engine = rules_engine @cache_ttl = cache_ttl @replicator_mode = replicator_mode @replicator_health_url = replicator_health_url @replicator_health_interval = replicator_health_interval @replicator_ready = false @cache_version = "" @mutex = Mutex.new @stopped = false if @replicator_mode has_shared_cache = redis_client || [company_cache, user_cache, flag_cache].compact.any? unless has_shared_cache raise ArgumentError, "Replicator mode requires a shared cache provider (e.g. Redis). " \ "Pass redis_client: or provide company_cache:, user_cache:, flag_cache: with a shared cache implementation." end explicit_caches = [company_cache, user_cache, flag_cache].compact if explicit_caches.grep(Schematic::LocalCache).any? raise ArgumentError, "Replicator mode requires a shared (distributed) cache provider, but received LocalCache. " \ "Use a shared cache implementation such as RedisCacheProvider." end end # Track internally-created caches so we can stop their cleanup threads @owned_caches = [] # Entity caches — priority: custom provider > Redis client > LocalCache flag_ttl = [MAX_CACHE_TTL, cache_ttl].max company_primary = company_cache || build_cache_provider(redis_client, redis_key_prefix, cache_ttl) user_primary = user_cache || build_cache_provider(redis_client, redis_key_prefix, cache_ttl) flag_primary = flag_cache || build_cache_provider(redis_client, redis_key_prefix, flag_ttl) # Lookup caches (key→ID mappings) — use Redis when available so # replicator instances share lookup state, otherwise LocalCache. company_lookup = build_cache_provider(redis_client, redis_key_prefix, cache_ttl) user_lookup = build_cache_provider(redis_client, redis_key_prefix, cache_ttl) @company_cache = ResourceCache.new( primary_cache: company_primary, lookup_cache: company_lookup, key_prefix: "company", get_id: ->(c) { c[:id] || c["id"] }, get_keys: ->(c) { c[:keys] || c["keys"] || {} } ) @user_cache = ResourceCache.new( primary_cache: user_primary, lookup_cache: user_lookup, key_prefix: "user", get_id: ->(u) { u[:id] || u["id"] }, get_keys: ->(u) { u[:keys] || u["keys"] || {} } ) @flag_cache = flag_primary # Pending requests for deduplication @pending_companies = {} # cache_key => [ConditionVariable, ...] @pending_users = {} @pending_flags = nil @pending_mutex = Mutex.new @ws_client = nil @health_thread = nil end |
Instance Method Details
#check_flag(eval_ctx, flag_key) ⇒ Object
Evaluate a flag using cached DataStream data and the local rules engine. Raises DataStream::EvaluationError when the flag cannot be evaluated locally (e.g. flag not in cache, rules engine unavailable), so the caller can fall back to the API path.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/schematic/datastream/client.rb', line 146 def check_flag(eval_ctx, flag_key) flag = get_flag(flag_key) raise EvaluationError, "flag '#{flag_key}' not found in cache" unless flag unless @rules_engine&.initialized? @logger.warn("Rules engine not initialized, using default flag value") raise EvaluationError, "rules engine not available" end company = nil user = nil company_keys = eval_ctx[:company] || eval_ctx["company"] user_keys = eval_ctx[:user] || eval_ctx["user"] if company_keys&.any? company = get_company(company_keys) @logger.debug("Company #{company ? "found in cache" : "not found in cache"} for keys: #{company_keys}") end if user_keys&.any? user = get_user(user_keys) @logger.debug("User #{user ? "found in cache" : "not found in cache"} for keys: #{user_keys}") end @logger.debug("Evaluating flag with rules engine: flag=#{flag_key}, company=#{company&.dig(:id)}, user=#{user&.dig(:id)}") result = @rules_engine.check_flag(flag, company, user) @logger.debug("Rules engine evaluation result: value=#{result[:value]}, reason=#{result[:reason]}") result[:flag_key] = flag_key result end |
#close ⇒ Object
133 134 135 136 137 138 139 140 |
# File 'lib/schematic/datastream/client.rb', line 133 def close @logger.info("Closing DataStream client") @mutex.synchronize { @stopped = true } @ws_client&.close @health_thread&.join(5) @owned_caches.each { |c| c.stop if c.respond_to?(:stop) } @logger.info("DataStream client closed") end |
#connected? ⇒ Boolean
129 130 131 |
# File 'lib/schematic/datastream/client.rb', line 129 def connected? @replicator_mode ? @replicator_ready : (@ws_client&.connected || false) end |
#start ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/schematic/datastream/client.rb', line 106 def start @logger.info("Starting DataStream client") @rules_engine&.initialize! # Set cache version from rules engine so cache keys include the WASM version. # In replicator mode this is overwritten by the replicator health check response. update_cache_version(@rules_engine.version_key) if @rules_engine&.initialized? if @replicator_mode @logger.info("Replicator mode enabled - skipping WebSocket connection") start_health_check else @ws_client = WebSocketClient.new( base_url: @base_url, api_key: @api_key, logger: @logger, message_handler: method(:handle_message), ready_handler: method(:handle_ready) ) @ws_client.start end end |
#update_company_metrics(company_keys, event_name, quantity) ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/schematic/datastream/client.rb', line 178 def update_company_metrics(company_keys, event_name, quantity) company = @company_cache.get_by_keys(company_keys) return unless company company = Merge.deep_copy(company) metrics = company[:metrics] || company["metrics"] || [] metrics.each do |metric| subtype = metric[:event_subtype] || metric["event_subtype"] next unless subtype == event_name current = metric[:value] || metric["value"] || 0 metric[:value] = current + (quantity || 1) end company[:metrics] = metrics @company_cache.cache_entity(company, ttl: @cache_ttl) end |