Class: JobWorkflow::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/job_workflow/semaphore.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency_key:, concurrency_duration:, concurrency_limit: 1, polling_interval: DEFAULT_POLLING_INTERVAL) ⇒ Semaphore

: (

  concurrency_key: String,
  concurrency_duration: ActiveSupport::Duration,
  ?concurrency_limit: Integer,
  ?polling_interval: Float
) -> void


25
26
27
28
29
30
31
32
33
34
35
# File 'lib/job_workflow/semaphore.rb', line 25

def initialize(
  concurrency_key:,
  concurrency_duration:,
  concurrency_limit: 1,
  polling_interval: DEFAULT_POLLING_INTERVAL
)
  @concurrency_key = concurrency_key
  @concurrency_duration = concurrency_duration
  @concurrency_limit = concurrency_limit
  @polling_interval = polling_interval
end

Instance Attribute Details

#concurrency_durationObject (readonly)

: ActiveSupport::Duration



10
11
12
# File 'lib/job_workflow/semaphore.rb', line 10

def concurrency_duration
  @concurrency_duration
end

#concurrency_keyObject (readonly)

: String



8
9
10
# File 'lib/job_workflow/semaphore.rb', line 8

def concurrency_key
  @concurrency_key
end

#concurrency_limitObject (readonly)

: Integer



9
10
11
# File 'lib/job_workflow/semaphore.rb', line 9

def concurrency_limit
  @concurrency_limit
end

Class Method Details

.available?Boolean

: () -> bool

Returns:

  • (Boolean)


14
15
16
# File 'lib/job_workflow/semaphore.rb', line 14

def available?
  QueueAdapter.current.semaphore_available?
end

Instance Method Details

#signalObject

: () -> bool



51
52
53
54
55
56
57
# File 'lib/job_workflow/semaphore.rb', line 51

def signal
  return true unless self.class.available?

  result = QueueAdapter.current.semaphore_signal(self)
  Instrumentation.notify_throttle_release(self)
  result
end

#waitObject

: () -> bool



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/job_workflow/semaphore.rb', line 38

def wait
  return true unless self.class.available?

  Instrumentation.instrument_throttle(self) do
    loop do
      return true if QueueAdapter.current.semaphore_wait(self)

      sleep(polling_interval)
    end
  end
end

#withObject

: [T] () { () -> T } -> T



60
61
62
63
64
65
# File 'lib/job_workflow/semaphore.rb', line 60

def with(&)
  wait
  yield
ensure
  signal
end