Class: Takagi::EventBus::Future

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus/future.rb

Overview

Future for sync/async request-reply pattern (pure Ruby) Uses ConditionVariable for efficient waiting

Examples:

Blocking wait

future = Future.new
Thread.new { future.set_value(42) }
value = future.value(timeout: 1.0) # => 42

Non-blocking check

future = Future.new
future.completed? # => false
future.set_value(42)
future.completed? # => true

Error propagation

future = Future.new
future.set_error(StandardError.new("Failed"))
future.value # raises StandardError

Instance Method Summary collapse

Constructor Details

#initializeFuture

Initialize a new Future



27
28
29
30
31
32
33
# File 'lib/takagi/event_bus/future.rb', line 27

def initialize
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @value = nil
  @completed = false
  @error = nil
end

Instance Method Details

#completed?Boolean

Check if Future is completed

Returns:

  • (Boolean)


96
97
98
# File 'lib/takagi/event_bus/future.rb', line 96

def completed?
  @mutex.synchronize { @completed }
end

#errorException?

Get the error if present

Returns:

  • (Exception, nil)


114
115
116
# File 'lib/takagi/event_bus/future.rb', line 114

def error
  @mutex.synchronize { @error }
end

#error?Boolean

Check if Future completed with error

Returns:

  • (Boolean)


102
103
104
# File 'lib/takagi/event_bus/future.rb', line 102

def error?
  @mutex.synchronize { @completed && !@error.nil? }
end

#set_error(error) ⇒ Object

Set the error state of the Future

Parameters:

  • error (Exception)

    The error to set

Raises:

  • (RuntimeError)

    If already completed



51
52
53
54
55
56
57
58
59
# File 'lib/takagi/event_bus/future.rb', line 51

def set_error(error)
  @mutex.synchronize do
    raise 'Future already completed' if @completed

    @error = error
    @completed = true
    @condition.broadcast
  end
end

#set_value(value) ⇒ Object

Set the successful value of the Future

Parameters:

  • value (Object)

    The value to set

Raises:

  • (RuntimeError)

    If already completed



38
39
40
41
42
43
44
45
46
# File 'lib/takagi/event_bus/future.rb', line 38

def set_value(value)
  @mutex.synchronize do
    raise 'Future already completed' if @completed

    @value = value
    @completed = true
    @condition.broadcast
  end
end

#success?Boolean

Check if Future completed successfully

Returns:

  • (Boolean)


108
109
110
# File 'lib/takagi/event_bus/future.rb', line 108

def success?
  @mutex.synchronize { @completed && @error.nil? }
end

#try_valueObject?

Try to get value without blocking

Returns:

  • (Object, nil)

    Value if completed, nil otherwise



120
121
122
123
124
125
126
127
# File 'lib/takagi/event_bus/future.rb', line 120

def try_value
  @mutex.synchronize do
    return nil unless @completed
    raise @error if @error

    @value
  end
end

#value(timeout: nil) ⇒ Object

Get the value of the Future (blocking)

Parameters:

  • timeout (Float, nil) (defaults to: nil)

    Timeout in seconds (nil = wait forever)

Returns:

  • (Object)

    The value

Raises:

  • (Timeout::Error)

    If timeout expires before completion

  • (Exception)

    The error if Future completed with error



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/takagi/event_bus/future.rb', line 66

def value(timeout: nil) # rubocop:disable Metrics/PerceivedComplexity
  @mutex.synchronize do
    unless @completed
      if timeout
        # Wait with timeout
        deadline = Time.now + timeout
        while !@completed && Time.now < deadline
          remaining = deadline - Time.now
          break if remaining <= 0

          @condition.wait(@mutex, remaining)
        end

        raise Timeout::Error, "Future timed out after #{timeout}s" unless @completed
      else
        # Wait indefinitely
        @condition.wait(@mutex) until @completed
      end
    end

    # Raise error if set
    raise @error if @error

    # Return value
    @value
  end
end

#wait(timeout: nil) ⇒ Boolean

Wait for completion without returning value

Parameters:

  • timeout (Float, nil) (defaults to: nil)

    Timeout in seconds

Returns:

  • (Boolean)

    True if completed, false if timeout



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/takagi/event_bus/future.rb', line 132

def wait(timeout: nil)
  @mutex.synchronize do
    return true if @completed

    if timeout
      deadline = Time.now + timeout
      while !@completed && Time.now < deadline
        remaining = deadline - Time.now
        break if remaining <= 0

        @condition.wait(@mutex, remaining)
      end
    else
      @condition.wait(@mutex) until @completed
    end

    @completed
  end
end