Class: Karafka::Pro::ScheduledMessages::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/tracker.rb

Overview

Tracks basic state and metrics about schedules to be dispatched

It provides accurate today dispatch taken from daily buffer and estimates for future days

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTracker

Initializes the tracker with empty statistics



47
48
49
50
51
52
53
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 47

def initialize
  @daily = Hash.new { |h, k| h[k] = 0 }
  @started_at = Time.now.to_i
  @offsets = { low: -1, high: -1 }
  @state = "fresh"
  @reloads = 0
end

Instance Attribute Details

#reloads=(value) ⇒ Object (writeonly)

Sets the attribute reloads

Parameters:

  • value

    the value to set the attribute reloads to.



41
42
43
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 41

def reloads=(value)
  @reloads = value
end

#started_atInteger (readonly)

Returns time epoch when this tracker was started.

Returns:

  • (Integer)

    time epoch when this tracker was started



44
45
46
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 44

def started_at
  @started_at
end

#stateString

Returns current state.

Returns:

  • (String)

    current state



39
40
41
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 39

def state
  @state
end

Instance Method Details

#future(message) ⇒ Object

Tracks future message dispatch

It is only relevant for future days as for today we use accurate metrics from the daily buffer

Parameters:

  • message (Karafka::Messages::Message)

    schedule message. Should not be a tombstone message. Tombstone messages cancellations are not tracked because it would drastically increase complexity. For given day we use the accurate counter and for future days we use estimates.



81
82
83
84
85
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 81

def future(message)
  epoch = message.headers["schedule_target_epoch"]

  @daily[epoch_to_date(epoch)] += 1
end

#offsets(message) ⇒ Object

Tracks offsets of visited messages

Parameters:



58
59
60
61
62
63
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 58

def offsets(message)
  message_offset = message.offset

  @offsets[:low] = message_offset if @offsets[:low].negative?
  @offsets[:high] = message.offset
end

#to_hHash

Returns hash with details that we want to expose.

Returns:

  • (Hash)

    hash with details that we want to expose



88
89
90
91
92
93
94
95
96
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 88

def to_h
  {
    state: @state,
    offsets: @offsets,
    daily: @daily,
    started_at: @started_at,
    reloads: @reloads
  }.freeze
end

#today=(sum) ⇒ Object

Accurate (because coming from daily buffer) number of things to schedule daily

Parameters:

  • sum (Integer)


68
69
70
# File 'lib/karafka/pro/scheduled_messages/tracker.rb', line 68

def today=(sum)
  @daily[epoch_to_date(@started_at)] = sum
end