Class: GoodJob::CapsuleTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/capsule_tracker.rb

Overview

CapsuleTracker save a record in the database and periodically refreshes it. The intention is to create a heartbeat that can be used to determine whether a capsule/process is still active and use that to lock (or unlock) jobs.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(executor: Concurrent.global_io_executor) ⇒ CapsuleTracker

Returns a new instance of CapsuleTracker.

Parameters:

  • executor (Concurrent::AbstractExecutorService) (defaults to: Concurrent.global_io_executor)

    The executor to use for refreshing the process record.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/good_job/capsule_tracker.rb', line 26

def initialize(executor: Concurrent.global_io_executor)
  @executor = executor
  @mutex = Mutex.new
  @locks = 0
  @advisory_locked_connection = nil
  @record_id = SecureRandom.uuid
  @record = nil
  @refresh_task = nil

  # AS::ForkTracker is only present on Rails v6.1+.
  # Fall back to PID checking if ForkTracker is not available
  if defined?(ActiveSupport::ForkTracker)
    ActiveSupport::ForkTracker.after_fork { reset }
    @forktracker = true
  else
    @ruby_pid = ::Process.pid
    @forktracker = false
  end

  self.class.instances << self
end

Class Attribute Details

.instancesArray<GoodJob::CapsuleTracker>? (readonly)

List of all instantiated CapsuleTrackers in the current process.

Returns:



23
# File 'lib/good_job/capsule_tracker.rb', line 23

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Attribute Details

#advisory_locksInteger (readonly)

Number of tracked job executions with advisory locks.

Returns:

  • (Integer)


17
18
19
# File 'lib/good_job/capsule_tracker.rb', line 17

def advisory_locks
  @advisory_locks
end

#locksObject (readonly)

Number of tracked job executions.



13
14
15
# File 'lib/good_job/capsule_tracker.rb', line 13

def locks
  @locks
end

#recordGoodJob::Process? (readonly)

The database record used for tracking.

Returns:



10
11
12
# File 'lib/good_job/capsule_tracker.rb', line 10

def record
  @record
end

Instance Method Details

#advisory_locked?Boolean

Tests whether an active advisory lock has been taken on the record.

Returns:

  • (Boolean)


155
156
157
# File 'lib/good_job/capsule_tracker.rb', line 155

def advisory_locked?
  @advisory_locked_connection&.weakref_alive? && @advisory_locked_connection&.active?
end

#id_for_lockString?

The UUID to use for locking. May be nil if the process is not registered or is unusable/expired. If UUID has not yet been persisted to the database, this method will make a query to insert or update it.

Returns:

  • (String, nil)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/good_job/capsule_tracker.rb', line 51

def id_for_lock
  value = nil
  synchronize do
    next if @locks.zero?

    reset_on_fork
    if @record
      @record.refresh_if_stale
    else
      @record = GoodJob::Process.find_or_create_record(id: @record_id)
      create_refresh_task
    end
    value = @record&.id
  end
  value
end

#process_idString

The expected UUID of the process for use in inspection. Use #id_for_lock if using this as a lock key.

Returns:

  • (String)


71
72
73
# File 'lib/good_job/capsule_tracker.rb', line 71

def process_id
  @record_id
end

#register(with_advisory_lock: false) {|void| ... } ⇒ void

This method returns an undefined value.

Registers the current process around a job execution site. register is expected to be called multiple times in a process, but should be advisory locked only once (in a single thread).

Parameters:

  • with_advisory_lock (Boolean) (defaults to: false)

    Whether the lock strategy should us an advisory lock; the connection must be retained to support advisory locks.

Yields:

  • (void)

    If a block is given, the process will be unregistered after the block completes.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/good_job/capsule_tracker.rb', line 80

def register(with_advisory_lock: false)
  synchronize do
    if with_advisory_lock
      if @record
        if !advisory_locked? || !advisory_locked_connection?
          @record.class.transaction do
            @record.advisory_lock!
            @record.update(lock_type: :advisory)
          end
          @advisory_locked_connection = WeakRef.new(@record.class.connection)
        end
      else
        @record = GoodJob::Process.find_or_create_record(id: @record_id, with_advisory_lock: true)
        @advisory_locked_connection = WeakRef.new(@record.class.connection)
        create_refresh_task
      end
    end

    @locks += 1
  end
  return unless block_given?

  begin
    yield
  ensure
    unregister(with_advisory_lock: with_advisory_lock)
  end
end

#renew(silent: false) ⇒ void

This method returns an undefined value.

Refreshes the process record in the database.

Parameters:

  • silent (Boolean) (defaults to: false)

    Whether to silence logging.



145
146
147
148
149
150
151
# File 'lib/good_job/capsule_tracker.rb', line 145

def renew(silent: false)
  synchronize do
    GoodJob::Process.with_logger_silenced(silent: silent) do
      @record&.refresh_if_stale(cleanup: true)
    end
  end
end

#unregister(with_advisory_lock: false) ⇒ void

This method returns an undefined value.

Unregisters the current process from the database.

Parameters:

  • with_advisory_lock (Boolean) (defaults to: false)

    Whether the lock strategy should unlock an advisory lock; the connection must be able to support advisory locks.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/good_job/capsule_tracker.rb', line 112

def unregister(with_advisory_lock: false)
  synchronize do
    if @locks.zero?
      return
    elsif @locks == 1
      if @record
        if with_advisory_lock && advisory_locked? && advisory_locked_connection?
          @record.class.transaction do
            @record.advisory_unlock
            @record.destroy
          end
          @advisory_locked_connection = nil
        else
          @record.destroy
        end
        @record = nil
      end
      cancel_refresh_task
    elsif with_advisory_lock && advisory_locked? && advisory_locked_connection?
      @record.class.transaction do
        @record.advisory_unlock
        @record.update(lock_type: nil)
      end
      @advisory_locked_connection = nil
    end

    @locks -= 1 unless @locks.zero?
  end
end