Class: TIMEx::Strategies::Wakeup

Inherits:
Base
  • Object
show all
Defined in:
lib/timex/strategies/wakeup.rb

Overview

Note:

Accessing #read_io / #write_io before #arm without a constructor deadline creates the pipe but does not install a timer; the read end blocks until #cancel! unless you pass a deadline to #initialize or call #arm.

Pipe-based wakeup primitive: blocked I/O on #read_io unblocks when the deadline fires (or #cancel! is called), and the CancellationToken transitions to #fired?. Use to wake threads blocked in IO.select on resources without native deadlines.

Each instance is single-use: the pipe is created lazily and closed in ensure after #run. Construct a fresh instance per operation.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

call, #call

Methods included from NamedComponent

included

Constructor Details

#initialize(deadline = nil) ⇒ Wakeup

Returns a new instance of Wakeup.

Parameters:

  • deadline (Deadline, Numeric, Time, nil) (defaults to: nil)

    when given, calls #arm immediately



24
25
26
27
28
29
30
31
32
33
# File 'lib/timex/strategies/wakeup.rb', line 24

def initialize(deadline = nil)
  super()
  @token = CancellationToken.new
  @read_io = nil
  @write_io = nil
  @timer = nil
  @closed = false
  @io_mutex = Mutex.new
  arm(deadline) if deadline
end

Instance Attribute Details

#tokenObject (readonly)

Returns the value of attribute token.



21
22
23
# File 'lib/timex/strategies/wakeup.rb', line 21

def token
  @token
end

Instance Method Details

#arm(deadline) ⇒ void

This method returns an undefined value.

Arms a background timer that invokes #cancel! with :timeout when the deadline elapses.

Parameters:

  • deadline (Deadline, Numeric, Time, nil)

Raises:



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/timex/strategies/wakeup.rb', line 58

def arm(deadline)
  raise TIMEx::Error, "Wakeup is single-use; construct a fresh instance" if closed?

  deadline = Deadline.coerce(deadline)
  return if deadline.infinite?

  ensure_pipe
  @timer = Thread.new do
    remaining = deadline.remaining
    ::Kernel.sleep(remaining) if remaining.positive?
    fire(reason: :timeout)
  end
end

#cancel!(reason: :user) ⇒ Boolean?

Cancels observers and wakes blocked readers.

Parameters:

Returns:

  • (Boolean, nil)

    result of the internal fire sequence



76
77
78
# File 'lib/timex/strategies/wakeup.rb', line 76

def cancel!(reason: :user)
  fire(reason:)
end

#closevoid

This method returns an undefined value.

Idempotently closes pipe ends and stops the timer thread.



88
89
90
91
92
93
94
95
96
97
# File 'lib/timex/strategies/wakeup.rb', line 88

def close
  @timer&.kill
  @io_mutex.synchronize do
    return if @closed

    @read_io.close if @read_io && !@read_io.closed?
    @write_io.close if @write_io && !@write_io.closed?
    @closed = true
  end
end

#closed?Boolean

Returns true after #close.

Returns:

  • (Boolean)

    true after #close



48
49
50
# File 'lib/timex/strategies/wakeup.rb', line 48

def closed?
  @io_mutex.synchronize { @closed }
end

#fired?Boolean

Returns whether #cancel! / timeout has fired.

Returns:

  • (Boolean)

    whether #cancel! / timeout has fired



81
82
83
# File 'lib/timex/strategies/wakeup.rb', line 81

def fired?
  @token.cancelled?
end

#read_io::IO

Returns readable end of the wakeup pipe (creates the pipe lazily).

Returns:

  • (::IO)

    readable end of the wakeup pipe (creates the pipe lazily)



36
37
38
39
# File 'lib/timex/strategies/wakeup.rb', line 36

def read_io
  ensure_pipe
  @read_io
end

#write_io::IO

Returns writable end used internally to signal readiness.

Returns:

  • (::IO)

    writable end used internally to signal readiness



42
43
44
45
# File 'lib/timex/strategies/wakeup.rb', line 42

def write_io
  ensure_pipe
  @write_io
end