Class: Julewire::Core::Destinations::TailSampling

Inherits:
Object
  • Object
show all
Defined in:
lib/julewire/core/destinations/tail_sampling.rb

Constant Summary collapse

DEFAULT_MAX_EXECUTIONS =
1024
DEFAULT_MAX_RECORDS_PER_EXECUTION =
1000
DEFAULT_SAMPLE_RATE =
0.1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(destination:, **options) ⇒ TailSampling

Returns a new instance of TailSampling.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 39

def initialize(destination:, **options)
  Validation.validate_options!(options, OPTION_KEYS, name: :tail_sampling)
  options = tail_options(options)
  @destination = Registry.validate!(destination)
  @name = Destinations.normalize_name(options.name)
  @sample_rate = options.sample_rate
  Processing::Sampling.threshold_for(options.sample_rate)
  @slow_ms = validate_slow_ms(options.slow_ms)
  @max_executions = Validation.validate_integer_limit!(
    options.max_executions,
    name: :max_executions,
    positive: true
  )
  @max_records_per_execution = Validation.validate_integer_limit!(
    options.max_records_per_execution,
    name: :max_records_per_execution,
    positive: true
  )
  Validation.validate_callable!(options.decider, name: :decider, allow_nil: true)
  Validation.validate_callable!(options.on_drop, name: :on_drop, allow_nil: true)
  Validation.validate_callable!(options.on_failure, name: :on_failure, allow_nil: true)
  @decider = options.decider
  @on_drop = options.on_drop
  @on_failure = options.on_failure
  initialize_state
end

Instance Attribute Details

#nameObject (readonly)



37
38
39
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 37

def name
  @name
end

Instance Method Details

#after_fork!Object



84
85
86
87
88
89
90
91
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 84

def after_fork!
  @mutex.synchronize { initialize_buffer }
  @destination.after_fork! if @destination.respond_to?(:after_fork!)
  self
rescue StandardError => e
  record_failure(e, nil, phase: :after_fork)
  self
end

#close(timeout: nil) ⇒ Object



80
81
82
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 80

def close(timeout: nil)
  drain_and_lifecycle(:close, timeout: timeout)
end

#emit(record) ⇒ Object



66
67
68
69
70
71
72
73
74
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 66

def emit(record)
  result = @mutex.synchronize { accept_record(record) }
  result.losses.compact.each { notify_drop(it) }
  result.records.each { emit_target(it) }
  nil
rescue StandardError => e
  record_failure(e, record)
  nil
end

#flush(timeout: nil) ⇒ Object



76
77
78
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 76

def flush(timeout: nil)
  drain_and_lifecycle(:flush, timeout: timeout)
end

#healthObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 95

def health
  buffered_executions = @mutex.synchronize { @buffers.length }
  destination = destination_health
  status = health_status
  @health.snapshot(
    buffered_executions: buffered_executions,
    destination: destination,
    max_executions: @max_executions,
    max_records_per_execution: @max_records_per_execution,
    sample_rate: @sample_rate,
    slow_ms: @slow_ms,
    status: status
  )
end

#resource_identityObject



93
# File 'lib/julewire/core/destinations/tail_sampling.rb', line 93

def resource_identity = self