Class: TgVizor::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/tgvizor/client.rb

Overview

TgVizor analytics client.

Buffers events in memory, flushes them in the background to the ingestion API, and falls back to a JSONL disk queue if the API is unreachable.

Examples:

vizor = TgVizor::Client.new(api_key: ENV["TGVIZOR_API_KEY"])
vizor.track("purchase", user_id: 123, properties: { amount: 9.99 })
vizor.identify(123, username: "john")

begin
  do_work
rescue => e
  vizor.capture_error(e, user_id: 123, command: "/buy")
  raise
end

vizor.shutdown! # also auto-called via at_exit

Constant Summary collapse

DEFAULT_ENDPOINT =
"https://ingest.tgvizor.com"
DEFAULT_FLUSH_INTERVAL =

seconds

5
DEFAULT_MAX_QUEUE_SIZE =
10_000
DEFAULT_BATCH_SIZE =
500
DEFAULT_MAX_RETRIES =
5

Instance Method Summary collapse

Constructor Details

#initialize(api_key:, endpoint: DEFAULT_ENDPOINT, flush_interval: DEFAULT_FLUSH_INTERVAL, max_queue_size: DEFAULT_MAX_QUEUE_SIZE, batch_size: DEFAULT_BATCH_SIZE, max_retries: DEFAULT_MAX_RETRIES, persist_queue: true, auto_shutdown: true, debug: false, logger: nil) ⇒ Client

Returns a new instance of Client.

Raises:

  • (ArgumentError)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/tgvizor/client.rb', line 33

def initialize(
  api_key:,
  endpoint:       DEFAULT_ENDPOINT,
  flush_interval: DEFAULT_FLUSH_INTERVAL,
  max_queue_size: DEFAULT_MAX_QUEUE_SIZE,
  batch_size:     DEFAULT_BATCH_SIZE,
  max_retries:    DEFAULT_MAX_RETRIES,
  persist_queue:  true,
  auto_shutdown:  true,
  debug:          false,
  logger:         nil
)
  raise ArgumentError, "TgVizor: api_key is required" if api_key.nil? || api_key.to_s.empty?

  @api_key        = api_key
  @endpoint       = endpoint
  @flush_interval = flush_interval
  @batch_size     = batch_size
  @debug          = debug
  @logger         = logger

  @queue      = EventQueue.new(max_queue_size)
  @transport  = Transport.new(
    endpoint:    endpoint,
    api_key:     api_key,
    max_retries: max_retries,
    debug:       debug,
    logger:      logger,
  )
  @disk_queue = persist_queue ? DiskQueue.new : nil

  @shutdown    = false
  @state_mutex = Mutex.new
  @flush_mutex = Mutex.new
  @flush_thread = nil

  start_flush_loop
  install_at_exit_hook if auto_shutdown
end

Instance Method Details

#capture_error(error, user_id: nil, command: nil, extra: nil) ⇒ Object

Capture an exception. Includes class name, message, fingerprint and the first stack frame so the dashboard can group it.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/tgvizor/client.rb', line 94

def capture_error(error, user_id: nil, command: nil, extra: nil)
  stack         = error.backtrace || []
  first_frame   = stack.first.to_s
  fingerprint   = build_fingerprint(error.class.name.to_s, error.message.to_s, first_frame)

  properties = {
    type:        error.class.name,
    message:     error.message,
    stack:       stack.first(50).join("\n"),
    fingerprint: fingerprint,
  }
  properties[:context] = { user_id: user_id, command: command, extra: extra }.compact unless user_id.nil? && command.nil? && extra.nil?

  enqueue(event: "$error", userId: user_id, properties: properties)
end

#flush!Object

Drain the in-memory + disk queues to the ingestion API. Safe to call at any time. Skips silently if there’s nothing to send.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/tgvizor/client.rb', line 112

def flush!
  @flush_mutex.synchronize do
    # Pull anything previously persisted to disk back into memory first.
    # `any?` is an O(1) stat — `size` would re-read the whole JSONL file.
    if @disk_queue && @disk_queue.any?
      @disk_queue.drain_all.each { |e| @queue.push(e) }
    end

    while @queue.any?
      batch = @queue.drain(@batch_size)
      break if batch.empty?

      result = @transport.send_batch(batch)

      unless result.ok?
        if result.retryable && @disk_queue
          batch.each { |e| @disk_queue.append(e) }
        end
        debug_log("flush failed: #{result.message}")
        break
      end
    end
  end
end

#identify(user_id, traits = {}) ⇒ Object

Identify a user with optional traits (profile enrichment).



84
85
86
87
88
89
90
# File 'lib/tgvizor/client.rb', line 84

def identify(user_id, traits = {})
  enqueue(
    event:      "$identify",
    userId:     user_id,
    properties: traits,
  )
end

#queue_sizeInteger

Returns events currently buffered in memory.

Returns:

  • (Integer)

    events currently buffered in memory



154
155
156
# File 'lib/tgvizor/client.rb', line 154

def queue_size
  @queue.size
end

#shutdown!Object

Stop the background flush thread and drain remaining events. Idempotent.



138
139
140
141
142
143
144
145
146
147
# File 'lib/tgvizor/client.rb', line 138

def shutdown!
  @state_mutex.synchronize do
    return if @shutdown

    @shutdown = true
  end

  stop_flush_loop
  flush!
end

#shutdown?Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/tgvizor/client.rb', line 149

def shutdown?
  @state_mutex.synchronize { @shutdown }
end

#track(event_name, user_id: nil, properties: nil, timestamp: nil) ⇒ Object

Track a custom event.



74
75
76
77
78
79
80
81
# File 'lib/tgvizor/client.rb', line 74

def track(event_name, user_id: nil, properties: nil, timestamp: nil)
  enqueue(
    event:      event_name,
    userId:     user_id,
    properties: properties,
    timestamp:  timestamp,
  )
end