Class: PostHog::SendWorker Private
- Inherits:
-
Object
- Object
- PostHog::SendWorker
- Defined in:
- lib/posthog/send_worker.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Background worker that batches and sends queued events.
Constant Summary
Constants included from Defaults
Constants included from Utils
Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON
Instance Method Summary collapse
-
#initialize(queue, api_key, options = {}) ⇒ SendWorker
constructor
private
public: Creates a new worker.
-
#is_requesting? ⇒ Boolean
private
public: Check whether we have outstanding requests.
-
#run ⇒ void
private
Continuously runs the loop to check for new events.
- #shutdown ⇒ void private
Methods included from Logging
Methods included from Utils
convert_to_datetime, date_in_iso8601, datetime_in_iso8601, deep_symbolize_keys, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601, uid
Constructor Details
#initialize(queue, api_key, options = {}) ⇒ SendWorker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
public: Creates a new worker
The worker continuously takes messages off the queue and makes requests to the posthog.com api
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/posthog/send_worker.rb', line 29 def initialize(queue, api_key, = {}) symbolize_keys! @queue = queue @api_key = api_key @on_error = [:on_error] || proc { |status, error| } batch_size = [:batch_size] || Defaults::MessageBatch::MAX_SIZE @batch = MessageBatch.new(batch_size) @lock = Mutex.new @shutdown_mutex = Mutex.new @shutdown = false @transport = Transport.new api_host: [:host], skip_ssl_verification: [:skip_ssl_verification] end |
Instance Method Details
#is_requesting? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
public: Check whether we have outstanding requests.
TODO: Rename to ‘requesting?` in future version
79 80 81 |
# File 'lib/posthog/send_worker.rb', line 79 def is_requesting? # rubocop:disable Naming/PredicateName @lock.synchronize { !@batch.empty? } end |
#run ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Continuously runs the loop to check for new events.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/posthog/send_worker.rb', line 45 def run until shutdown? return if @queue.empty? @lock.synchronize do until @batch.full? || @queue.empty? end begin unless @batch.empty? res = @transport.send @api_key, @batch handle_error(res.status, res.error) unless res.status == 200 end ensure @lock.synchronize { @batch.clear } end end ensure # Worker threads exit when the queue is drained and are restarted for the # next burst of events. Close the persistent connection on each exit and # let Transport reconnect lazily when a future worker sends another batch. @transport.shutdown end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
70 71 72 73 |
# File 'lib/posthog/send_worker.rb', line 70 def shutdown @shutdown_mutex.synchronize { @shutdown = true } @transport.shutdown end |