Class: PostHog::SendWorker Private

Inherits:
Object
  • Object
show all
Includes:
Defaults, Logging, Utils
Defined in:
lib/posthog/send_worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Background worker that batches and sends queued events.

Constant Summary

Constants included from Defaults

Defaults::MAX_HASH_SIZE

Constants included from Utils

Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON

Instance Method Summary collapse

Methods included from Logging

included, #logger

Methods included from Utils

convert_to_datetime, date_in_iso8601, datetime_in_iso8601, deep_symbolize_keys, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601

Constructor Details

#initialize(queue, api_key, options = {}) ⇒ SendWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Creates a new worker

The worker continuously takes messages off the queue and makes requests to the posthog.com api

Parameters:

  • queue (Queue)

    Queue synchronized between client and worker.

  • api_key (String)

    Project API key.

  • options (Hash) (defaults to: {})

    Worker options.

Options Hash (options):

  • :batch_size (Integer)

    How many items to send in a batch.

  • :flush_interval_seconds (Numeric)

    Maximum seconds to wait for a batch to fill before sending.

  • :on_error (Proc)

    Callback invoked as ‘on_error.call(status, error)`.

  • :host (String)

    PostHog API host URL.

  • :skip_ssl_verification (Boolean)

    Disable SSL certificate verification.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/posthog/send_worker.rb', line 30

def initialize(queue, api_key, options = {})
  symbolize_keys! options
  @queue = queue
  @api_key = api_key
  @on_error = options[:on_error] || proc { |status, error| }
  batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
  flush_interval_seconds = options[:flush_interval_seconds] || Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS
  @flush_interval_seconds = flush_interval_seconds.to_f
  @batch = MessageBatch.new(batch_size)
  @lock = Mutex.new
  @state_lock = Mutex.new
  @condition = ConditionVariable.new
  @flush_requested = false
  @shutdown = false
  @pid = Process.pid
  @transport_options = { api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification] }
  @transport = Transport.new(@transport_options)
end

Instance Method Details

#is_requesting?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Check whether we have outstanding requests.

TODO: Rename to ‘requesting?` in future version

Returns:

  • (Boolean)

    Whether the worker has outstanding requests.



110
111
112
113
114
# File 'lib/posthog/send_worker.rb', line 110

def is_requesting? # rubocop:disable Naming/PredicateName
  ensure_current_process!

  @lock.synchronize { !@batch.empty? }
end

#notifyvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Wake the worker when producers enqueue new messages.



89
90
91
92
93
# File 'lib/posthog/send_worker.rb', line 89

def notify
  ensure_current_process!

  @state_lock.synchronize { @condition.signal }
end

#request_flushvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Request the worker to send any pending events without waiting for the configured flush interval. Used by Client#flush and shutdown paths.



77
78
79
80
81
82
83
84
# File 'lib/posthog/send_worker.rb', line 77

def request_flush
  ensure_current_process!

  @state_lock.synchronize do
    @flush_requested = true
    @condition.broadcast
  end
end

#runvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Continuously runs the loop to check for new events.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/posthog/send_worker.rb', line 52

def run
  ensure_current_process!

  until shutdown?
    wait_for_work
    break if shutdown?
    next if @queue.empty?

    build_batch

    begin
      send_batch unless @batch.empty?
    ensure
      @lock.synchronize { @batch.clear }
      clear_flush_request_if_idle
    end
  end
ensure
  shutdown_transport
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.



96
97
98
99
100
101
102
103
104
# File 'lib/posthog/send_worker.rb', line 96

def shutdown
  ensure_current_process!

  @state_lock.synchronize do
    @shutdown = true
    @flush_requested = true
    @condition.broadcast
  end
end