Class: PostHog::SendWorker Private
- Inherits:
-
Object
- Object
- PostHog::SendWorker
- Defined in:
- lib/posthog/send_worker.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Background worker that batches and sends queued events.
Constant Summary
Constants included from Defaults
Constants included from Utils
Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON
Instance Method Summary collapse
-
#initialize(queue, api_key, options = {}) ⇒ SendWorker
constructor
private
public: Creates a new worker.
-
#is_requesting? ⇒ Boolean
private
public: Check whether we have outstanding requests.
-
#notify ⇒ void
private
Wake the worker when producers enqueue new messages.
-
#request_flush ⇒ void
private
Request the worker to send any pending events without waiting for the configured flush interval.
-
#run ⇒ void
private
Continuously runs the loop to check for new events.
- #shutdown ⇒ void private
Methods included from Logging
Methods included from Utils
convert_to_datetime, date_in_iso8601, datetime_in_iso8601, deep_symbolize_keys, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601
Constructor Details
#initialize(queue, api_key, options = {}) ⇒ SendWorker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
public: Creates a new worker
The worker continuously takes messages off the queue and makes requests to the posthog.com api
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/posthog/send_worker.rb', line 31 def initialize(queue, api_key, = {}) symbolize_keys! @queue = queue @api_key = api_key @on_error = [:on_error] || proc { |status, error| } batch_size = [:batch_size] || Defaults::MessageBatch::MAX_SIZE flush_interval_seconds = [:flush_interval_seconds] || Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS @flush_interval_seconds = flush_interval_seconds.to_f @batch = MessageBatch.new(batch_size) @lock = Mutex.new @state_lock = Mutex.new @condition = ConditionVariable.new @flush_requested = false @shutdown = false @pid = Process.pid @transport_options = { api_host: [:host], skip_ssl_verification: [:skip_ssl_verification], compress_request: [:compress_request] } @transport_options[:retries] = [:max_retries].to_i + 1 if .key?(:max_retries) @transport = Transport.new(@transport_options) end |
Instance Method Details
#is_requesting? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
public: Check whether we have outstanding requests.
TODO: Rename to requesting? in future version
116 117 118 119 120 |
# File 'lib/posthog/send_worker.rb', line 116 def is_requesting? # rubocop:disable Naming/PredicateName ensure_current_process! @lock.synchronize { !@batch.empty? } end |
#notify ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Wake the worker when producers enqueue new messages.
95 96 97 98 99 |
# File 'lib/posthog/send_worker.rb', line 95 def notify ensure_current_process! @state_lock.synchronize { @condition.signal } end |
#request_flush ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Request the worker to send any pending events without waiting for the configured flush interval. Used by Client#flush and shutdown paths.
83 84 85 86 87 88 89 90 |
# File 'lib/posthog/send_worker.rb', line 83 def request_flush ensure_current_process! @state_lock.synchronize do @flush_requested = true @condition.broadcast end end |
#run ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Continuously runs the loop to check for new events.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/posthog/send_worker.rb', line 58 def run ensure_current_process! until shutdown? wait_for_work break if shutdown? next if @queue.empty? build_batch begin send_batch unless @batch.empty? ensure @lock.synchronize { @batch.clear } clear_flush_request_if_idle end end ensure shutdown_transport end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
102 103 104 105 106 107 108 109 110 |
# File 'lib/posthog/send_worker.rb', line 102 def shutdown ensure_current_process! @state_lock.synchronize do @shutdown = true @flush_requested = true @condition.broadcast end end |