Class: PostHog::SendWorker Private

Inherits:
Object
  • Object
show all
Includes:
Defaults, Logging, Utils
Defined in:
lib/posthog/send_worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Background worker that batches and sends queued events.

Constant Summary

Constants included from Defaults

Defaults::MAX_HASH_SIZE

Constants included from Utils

Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON

Instance Method Summary collapse

Methods included from Logging

included, #logger

Methods included from Utils

convert_to_datetime, date_in_iso8601, datetime_in_iso8601, deep_symbolize_keys, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601, uid

Constructor Details

#initialize(queue, api_key, options = {}) ⇒ SendWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Creates a new worker

The worker continuously takes messages off the queue and makes requests to the posthog.com api

Parameters:

  • queue (Queue)

    Queue synchronized between client and worker.

  • api_key (String)

    Project API key.

  • options (Hash) (defaults to: {})

    Worker options.

Options Hash (options):

  • :batch_size (Integer)

    How many items to send in a batch.

  • :on_error (Proc)

    Callback invoked as ‘on_error.call(status, error)`.

  • :host (String)

    PostHog API host URL.

  • :skip_ssl_verification (Boolean)

    Disable SSL certificate verification.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/posthog/send_worker.rb', line 29

def initialize(queue, api_key, options = {})
  symbolize_keys! options
  @queue = queue
  @api_key = api_key
  @on_error = options[:on_error] || proc { |status, error| }
  batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
  @batch = MessageBatch.new(batch_size)
  @lock = Mutex.new
  @shutdown_mutex = Mutex.new
  @shutdown = false
  @transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification]
end

Instance Method Details

#is_requesting?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Check whether we have outstanding requests.

TODO: Rename to ‘requesting?` in future version

Returns:

  • (Boolean)

    Whether the worker has outstanding requests.



79
80
81
# File 'lib/posthog/send_worker.rb', line 79

def is_requesting? # rubocop:disable Naming/PredicateName
  @lock.synchronize { !@batch.empty? }
end

#runvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Continuously runs the loop to check for new events.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/posthog/send_worker.rb', line 45

def run
  until shutdown?
    return if @queue.empty?

    @lock.synchronize do
      consume_message_from_queue! until @batch.full? || @queue.empty?
    end

    begin
      unless @batch.empty?
        res = @transport.send @api_key, @batch
        handle_error(res.status, res.error) unless res.status == 200
      end
    ensure
      @lock.synchronize { @batch.clear }
    end
  end
ensure
  # Worker threads exit when the queue is drained and are restarted for the
  # next burst of events. Close the persistent connection on each exit and
  # let Transport reconnect lazily when a future worker sends another batch.
  @transport.shutdown
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.



70
71
72
73
# File 'lib/posthog/send_worker.rb', line 70

def shutdown
  @shutdown_mutex.synchronize { @shutdown = true }
  @transport.shutdown
end