Class: Takagi::EventBus::Future
- Inherits:
-
Object
- Object
- Takagi::EventBus::Future
- Defined in:
- lib/takagi/event_bus/future.rb
Overview
Future for sync/async request-reply pattern (pure Ruby) Uses ConditionVariable for efficient waiting
Instance Method Summary collapse
-
#completed? ⇒ Boolean
Check if Future is completed.
-
#error ⇒ Exception?
Get the error if present.
-
#error? ⇒ Boolean
Check if Future completed with error.
-
#initialize ⇒ Future
constructor
Initialize a new Future.
-
#set_error(error) ⇒ Object
Set the error state of the Future.
-
#set_value(value) ⇒ Object
Set the successful value of the Future.
-
#success? ⇒ Boolean
Check if Future completed successfully.
-
#try_value ⇒ Object?
Try to get value without blocking.
-
#value(timeout: nil) ⇒ Object
Get the value of the Future (blocking).
-
#wait(timeout: nil) ⇒ Boolean
Wait for completion without returning value.
Constructor Details
#initialize ⇒ Future
Initialize a new Future
27 28 29 30 31 32 33 |
# File 'lib/takagi/event_bus/future.rb', line 27 def initialize @mutex = Mutex.new @condition = ConditionVariable.new @value = nil @completed = false @error = nil end |
Instance Method Details
#completed? ⇒ Boolean
Check if Future is completed
96 97 98 |
# File 'lib/takagi/event_bus/future.rb', line 96 def completed? @mutex.synchronize { @completed } end |
#error ⇒ Exception?
Get the error if present
114 115 116 |
# File 'lib/takagi/event_bus/future.rb', line 114 def error @mutex.synchronize { @error } end |
#error? ⇒ Boolean
Check if Future completed with error
102 103 104 |
# File 'lib/takagi/event_bus/future.rb', line 102 def error? @mutex.synchronize { @completed && !@error.nil? } end |
#set_error(error) ⇒ Object
Set the error state of the Future
51 52 53 54 55 56 57 58 59 |
# File 'lib/takagi/event_bus/future.rb', line 51 def set_error(error) @mutex.synchronize do raise 'Future already completed' if @completed @error = error @completed = true @condition.broadcast end end |
#set_value(value) ⇒ Object
Set the successful value of the Future
38 39 40 41 42 43 44 45 46 |
# File 'lib/takagi/event_bus/future.rb', line 38 def set_value(value) @mutex.synchronize do raise 'Future already completed' if @completed @value = value @completed = true @condition.broadcast end end |
#success? ⇒ Boolean
Check if Future completed successfully
108 109 110 |
# File 'lib/takagi/event_bus/future.rb', line 108 def success? @mutex.synchronize { @completed && @error.nil? } end |
#try_value ⇒ Object?
Try to get value without blocking
120 121 122 123 124 125 126 127 |
# File 'lib/takagi/event_bus/future.rb', line 120 def try_value @mutex.synchronize do return nil unless @completed raise @error if @error @value end end |
#value(timeout: nil) ⇒ Object
Get the value of the Future (blocking)
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/takagi/event_bus/future.rb', line 66 def value(timeout: nil) # rubocop:disable Metrics/PerceivedComplexity @mutex.synchronize do unless @completed if timeout # Wait with timeout deadline = Time.now + timeout while !@completed && Time.now < deadline remaining = deadline - Time.now break if remaining <= 0 @condition.wait(@mutex, remaining) end raise Timeout::Error, "Future timed out after #{timeout}s" unless @completed else # Wait indefinitely @condition.wait(@mutex) until @completed end end # Raise error if set raise @error if @error # Return value @value end end |
#wait(timeout: nil) ⇒ Boolean
Wait for completion without returning value
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/takagi/event_bus/future.rb', line 132 def wait(timeout: nil) @mutex.synchronize do return true if @completed if timeout deadline = Time.now + timeout while !@completed && Time.now < deadline remaining = deadline - Time.now break if remaining <= 0 @condition.wait(@mutex, remaining) end else @condition.wait(@mutex) until @completed end @completed end end |