Class: PostHog::SendWorker Private

Inherits:
Object
  • Object
show all
Includes:
Defaults, Logging, Utils
Defined in:
lib/posthog/send_worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Background worker that batches and sends queued events.

Constant Summary

Constants included from Defaults

Defaults::MAX_HASH_SIZE

Constants included from Utils

Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON

Instance Method Summary collapse

Methods included from Logging

included, #logger

Methods included from Utils

convert_to_datetime, date_in_iso8601, datetime_in_iso8601, deep_symbolize_keys, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601, uid

Constructor Details

#initialize(queue, api_key, options = {}) ⇒ SendWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Creates a new worker

The worker continuously takes messages off the queue and makes requests to the posthog.com api

Parameters:

  • queue (Queue)

    Queue synchronized between client and worker.

  • api_key (String)

    Project API key.

  • options (Hash) (defaults to: {})

    Worker options.

Options Hash (options):

  • :batch_size (Integer)

    How many items to send in a batch.

  • :on_error (Proc)

    Callback invoked as ‘on_error.call(status, error)`.

  • :host (String)

    PostHog API host URL.

  • :skip_ssl_verification (Boolean)

    Disable SSL certificate verification.



29
30
31
32
33
34
35
36
37
38
# File 'lib/posthog/send_worker.rb', line 29

def initialize(queue, api_key, options = {})
  symbolize_keys! options
  @queue = queue
  @api_key = api_key
  @on_error = options[:on_error] || proc { |status, error| }
  batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
  @batch = MessageBatch.new(batch_size)
  @lock = Mutex.new
  @transport = Transport.new api_host: options[:host], skip_ssl_verification: options[:skip_ssl_verification]
end

Instance Method Details

#is_requesting?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

public: Check whether we have outstanding requests.

TODO: Rename to ‘requesting?` in future version

Returns:

  • (Boolean)

    Whether the worker has outstanding requests.



71
72
73
# File 'lib/posthog/send_worker.rb', line 71

def is_requesting? # rubocop:disable Naming/PredicateName
  @lock.synchronize { !@batch.empty? }
end

#runvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Continuously runs the loop to check for new events.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/posthog/send_worker.rb', line 43

def run
  until Thread.current[:should_exit]
    return if @queue.empty?

    @lock.synchronize do
      consume_message_from_queue! until @batch.full? || @queue.empty?
    end

    unless @batch.empty?
      res = @transport.send @api_key, @batch
      @on_error.call(res.status, res.error) unless res.status == 200
    end

    @lock.synchronize { @batch.clear }
  end
ensure
  @transport.shutdown
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.



63
64
65
# File 'lib/posthog/send_worker.rb', line 63

def shutdown
  @transport.shutdown
end