Class: PostHog::SendWorker Private
- Inherits:
-
Object
- Object
- PostHog::SendWorker
- Defined in:
- lib/posthog/send_worker.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Background worker that batches and sends queued events.
Constant Summary
Constants included from Defaults
Constants included from Utils
Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON
Instance Method Summary collapse
-
#initialize(queue, api_key, options = {}) ⇒ SendWorker
constructor
private
public: Creates a new worker.
-
#is_requesting? ⇒ Boolean
private
public: Check whether we have outstanding requests.
-
#notify ⇒ void
private
Wake the worker when producers enqueue new messages.
-
#request_flush ⇒ void
private
Request the worker to send any pending events without waiting for the configured flush interval.
-
#run ⇒ void
private
Continuously runs the loop to check for new events.
- #shutdown ⇒ void private
Methods included from Logging
Methods included from Utils
convert_to_datetime, date_in_iso8601, datetime_in_iso8601, deep_symbolize_keys, formatted_offset, get_by_symbol_or_string_key, is_valid_regex, isoify_dates, isoify_dates!, seconds_to_utc_offset, stringify_keys, symbolize_keys, symbolize_keys!, time_in_iso8601
Constructor Details
#initialize(queue, api_key, options = {}) ⇒ SendWorker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
public: Creates a new worker
The worker continuously takes messages off the queue and makes requests to the posthog.com api
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/posthog/send_worker.rb', line 30 def initialize(queue, api_key, = {}) symbolize_keys! @queue = queue @api_key = api_key @on_error = [:on_error] || proc { |status, error| } batch_size = [:batch_size] || Defaults::MessageBatch::MAX_SIZE flush_interval_seconds = [:flush_interval_seconds] || Defaults::MessageBatch::FLUSH_INTERVAL_SECONDS @flush_interval_seconds = flush_interval_seconds.to_f @batch = MessageBatch.new(batch_size) @lock = Mutex.new @state_lock = Mutex.new @condition = ConditionVariable.new @flush_requested = false @shutdown = false @pid = Process.pid @transport_options = { api_host: [:host], skip_ssl_verification: [:skip_ssl_verification] } @transport = Transport.new(@transport_options) end |
Instance Method Details
#is_requesting? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
public: Check whether we have outstanding requests.
TODO: Rename to ‘requesting?` in future version
110 111 112 113 114 |
# File 'lib/posthog/send_worker.rb', line 110 def is_requesting? # rubocop:disable Naming/PredicateName ensure_current_process! @lock.synchronize { !@batch.empty? } end |
#notify ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Wake the worker when producers enqueue new messages.
89 90 91 92 93 |
# File 'lib/posthog/send_worker.rb', line 89 def notify ensure_current_process! @state_lock.synchronize { @condition.signal } end |
#request_flush ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Request the worker to send any pending events without waiting for the configured flush interval. Used by Client#flush and shutdown paths.
77 78 79 80 81 82 83 84 |
# File 'lib/posthog/send_worker.rb', line 77 def request_flush ensure_current_process! @state_lock.synchronize do @flush_requested = true @condition.broadcast end end |
#run ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Continuously runs the loop to check for new events.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/posthog/send_worker.rb', line 52 def run ensure_current_process! until shutdown? wait_for_work break if shutdown? next if @queue.empty? build_batch begin send_batch unless @batch.empty? ensure @lock.synchronize { @batch.clear } clear_flush_request_if_idle end end ensure shutdown_transport end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
96 97 98 99 100 101 102 103 104 |
# File 'lib/posthog/send_worker.rb', line 96 def shutdown ensure_current_process! @state_lock.synchronize do @shutdown = true @flush_requested = true @condition.broadcast end end |