Class: GetFluxly::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/getfluxly/client.rb

Overview

Synchronous GetFluxly client.

Batches events with retry, jitter, and X-Idempotency-Key. The background flusher thread drains the queue every ‘flush_interval` seconds. at_exit runs a final flush.

Constant Summary collapse

DEFAULT_API_HOST =
"https://api.getfluxly.com"
EVENTS_BATCH_PATH =
"/v1/events/batch"
ALIAS_PATH =
"/v1/identify/alias"

Instance Method Summary collapse

Constructor Details

#initialize(token:, api_host: DEFAULT_API_HOST, flush_at: 20, flush_interval: 5.0, max_retries: 2, timeout: 5.0, max_queue_size: 1000, register_atexit: true) ⇒ Client

Returns a new instance of Client.

Raises:



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/getfluxly/client.rb', line 14

def initialize(
  token:,
  api_host: DEFAULT_API_HOST,
  flush_at: 20,
  flush_interval: 5.0,
  max_retries: 2,
  timeout: 5.0,
  max_queue_size: 1000,
  register_atexit: true
)
  raise GetFluxly::Error.new("token is required", code: "validation_error") if token.nil? || token.empty?

  @token = token
  @api_host = api_host
  @flush_at = flush_at
  @flush_interval = flush_interval

  @batch = Batch.new(max_size: max_queue_size)
  @http = Http.new(token: token, api_host: api_host, timeout: timeout, max_retries: max_retries)
  @shutdown = false
  @shutdown_mutex = Mutex.new

  return unless register_atexit

  at_exit do
    shutdown
  rescue StandardError
    # at_exit: swallow so interpreter shutdown is clean
  end
end

Instance Method Details

#alias(user_id:, anonymous_id: nil, previous_id: nil, request_id: nil) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/getfluxly/client.rb', line 80

def alias(user_id:, anonymous_id: nil, previous_id: nil, request_id: nil)
  Identity.require_alias!(user_id: user_id, anonymous_id: anonymous_id, previous_id: previous_id)

  body = { "user_id" => user_id }
  body["anonymous_id"] = anonymous_id if anonymous_id
  body["previous_id"] = previous_id if previous_id

  response = @http.post(
    ALIAS_PATH,
    body,
    idempotency_key: Http.generate_idempotency_key,
    request_id: request_id
  )

  alias_obj = response["alias"]
  if alias_obj.nil? || alias_obj == {}
    raise GetFluxly::Error.new(
      "alias response did not include alias data",
      code: "invalid_response",
      retryable: true,
      details: response.is_a?(Hash) ? response : {}
    )
  end

  alias_obj
end

#flushObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/getfluxly/client.rb', line 107

def flush
  result = Batch.empty_flush_result
  loop do
    batch = @batch.drain(@flush_at)
    break if batch.empty?

    begin
      response = @http.post(
        EVENTS_BATCH_PATH,
        { "events" => batch },
        idempotency_key: Http.generate_idempotency_key
      )
    rescue GetFluxly::Error => e
      @batch.requeue_front(batch) if e.retryable
      raise
    end

    result += Batch::FlushResult.new(
      (response["accepted"] || batch.size).to_i,
      (response["rejected"] || 0).to_i,
      1,
      Array(response["errors"])
    )
  end
  result
end

#identify(anonymous_id: nil, external_id: nil, user_id: nil, traits: nil) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/getfluxly/client.rb', line 64

def identify(anonymous_id: nil, external_id: nil, user_id: nil, traits: nil)
  Identity.require_one_id!(
    anonymous_id: anonymous_id,
    external_id: external_id,
    user_id: user_id
  )

  payload = { "event" => "$identify" }
  payload["anonymous_id"] = anonymous_id if anonymous_id
  payload["external_id"] = external_id if external_id
  payload["user_id"] = user_id if user_id
  payload["traits"] = traits if traits

  enqueue(payload)
end

#shutdownObject Also known as: close



134
135
136
137
138
139
140
141
# File 'lib/getfluxly/client.rb', line 134

def shutdown
  @shutdown_mutex.synchronize do
    return Batch.empty_flush_result if @shutdown

    @shutdown = true
  end
  flush
end

#track(event, anonymous_id: nil, external_id: nil, user_id: nil, properties: nil, timestamp: nil, context: nil) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/getfluxly/client.rb', line 45

def track(event, anonymous_id: nil, external_id: nil, user_id: nil,
          properties: nil, timestamp: nil, context: nil)
  Identity.require_one_id!(
    anonymous_id: anonymous_id,
    external_id: external_id,
    user_id: user_id
  )

  payload = { "event" => event }
  payload["anonymous_id"] = anonymous_id if anonymous_id
  payload["external_id"] = external_id if external_id
  payload["user_id"] = user_id if user_id
  payload["properties"] = properties if properties
  payload["timestamp"] = timestamp if timestamp
  payload["context"] = context if context

  enqueue(payload)
end