Class: PostHog::SendWorker Private

Inherits:
Object
  • Object
show all
Includes:
Defaults, Logging, Utils
Defined in:
lib/posthog/send_worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Background worker that batches and sends queued events.

Constant Summary

Constants included from Defaults

Defaults::MAX_HASH_SIZE

Constants included from Utils

Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON

Instance Method Summary collapse

Methods included from Logging

included, #logger

Methods included from Utils

convert_to_datetime, date_in_iso8601, datetime_in_iso8601, deep_symbolize_keys, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601

Constructor Details

#initialize(queue, api_key, options = {}) ⇒ SendWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Creates a new worker

The worker continuously takes messages off the queue and makes requests to the posthog.com api

Parameters:

  • queue (Queue)

    Queue synchronized between client and worker.

  • api_key (String)

    Project API key.

  • options (Hash) (defaults to: {})

    Worker options.

Options Hash (options):

  • :batch_size (Integer)

    How many items to send in a batch.

  • :flush_interval_seconds (Numeric)

    Maximum seconds to wait for a batch to fill before sending.

  • :on_error (Proc)

    Callback invoked as on_error.call(status, error).

  • :host (String)

    PostHog API host URL.

  • :skip_ssl_verification (Boolean)

    Disable SSL certificate verification.

  • :compress_request (Boolean)

    Set to false to disable gzip batch request bodies.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/posthog/send_worker.rb', line 31

def initialize(queue, api_key, options = {})
  symbolize_keys! options
  @queue = queue
  @api_key = api_key
  @on_error = options[:on_error] || proc { |status, error| }
  batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
  flush_interval_seconds = options[:flush_interval_seconds] || Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS
  @flush_interval_seconds = flush_interval_seconds.to_f
  @batch = MessageBatch.new(batch_size)
  @lock = Mutex.new
  @state_lock = Mutex.new
  @condition = ConditionVariable.new
  @flush_requested = false
  @shutdown = false
  @pid = Process.pid
  @transport_options = {
    api_host: options[:host],
    skip_ssl_verification: options[:skip_ssl_verification],
    compress_request: options[:compress_request]
  }
  @transport_options[:retries] = options[:max_retries].to_i + 1 if options.key?(:max_retries)
  @transport = Transport.new(@transport_options)
end

Instance Method Details

#is_requesting?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Check whether we have outstanding requests.

TODO: Rename to requesting? in future version

Returns:

  • (Boolean)

    Whether the worker has outstanding requests.



116
117
118
119
120
# File 'lib/posthog/send_worker.rb', line 116

def is_requesting? # rubocop:disable Naming/PredicateName
  ensure_current_process!

  @lock.synchronize { !@batch.empty? }
end

#notifyvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Wake the worker when producers enqueue new messages.



95
96
97
98
99
# File 'lib/posthog/send_worker.rb', line 95

def notify
  ensure_current_process!

  @state_lock.synchronize { @condition.signal }
end

#request_flushvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Request the worker to send any pending events without waiting for the configured flush interval. Used by Client#flush and shutdown paths.



83
84
85
86
87
88
89
90
# File 'lib/posthog/send_worker.rb', line 83

def request_flush
  ensure_current_process!

  @state_lock.synchronize do
    @flush_requested = true
    @condition.broadcast
  end
end

#runvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Continuously runs the loop to check for new events.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/posthog/send_worker.rb', line 58

def run
  ensure_current_process!

  until shutdown?
    wait_for_work
    break if shutdown?
    next if @queue.empty?

    build_batch

    begin
      send_batch unless @batch.empty?
    ensure
      @lock.synchronize { @batch.clear }
      clear_flush_request_if_idle
    end
  end
ensure
  shutdown_transport
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.



102
103
104
105
106
107
108
109
110
# File 'lib/posthog/send_worker.rb', line 102

def shutdown
  ensure_current_process!

  @state_lock.synchronize do
    @shutdown = true
    @flush_requested = true
    @condition.broadcast
  end
end