Class: JRPC::SharedClient::Ticket
- Inherits:
-
Object
- Object
- JRPC::SharedClient::Ticket
- Defined in:
- lib/jrpc/shared_client/ticket.rb
Overview
One in-flight request or notification together with its result future.
The result is backed by a write-once Concurrent::Promises.resolvable_future. That single primitive replaces the old hand-rolled Mutex+ConditionVariable state machine and gives us three things for free:
* idempotent resolution - fulfill/reject never raise on a second call,
so the transport loop and the close path can both resolve a ticket
without racing (fixes the old :cancelled -> :done overwrite).
* a caller-side timeout - #wait takes a timeout, so a caller is no
longer at the mercy of the loop thread to be woken (fixes the
"loop dies, callers hang forever" hole).
* fiber cooperation - the future's wait bottoms out in a
scheduler-aware ConditionVariable, so blocking a fiber under Falcon /
rage-rb yields to the reactor instead of stalling the thread.
‘cancelled` is a separate monotonic flag (the caller gave up); it is deliberately NOT part of the future, so cancellation can never race or overwrite a real result.
Instance Attribute Summary collapse
-
#expires_at ⇒ Object
readonly
Returns the value of attribute expires_at.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#payload ⇒ Object
readonly
Returns the value of attribute payload.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
fire_and_forget tickets (thread: nil) are never “alive”.
- #cancel ⇒ Object
- #cancelled? ⇒ Boolean
- #error ⇒ Object
- #expired?(now) ⇒ Boolean
-
#fulfill(value) ⇒ Object
— worker (transport loop / close) side —————————— All idempotent: the first resolution wins, later ones are no-ops.
- #fulfilled? ⇒ Boolean
-
#initialize(id:, payload:, thread:, expires_at: nil) ⇒ Ticket
constructor
A new instance of Ticket.
- #reject(err) ⇒ Object
- #rejected? ⇒ Boolean
- #resolved? ⇒ Boolean
- #result ⇒ Object
-
#signal_done(result:) ⇒ Object
Resolution aliases used by Registry/OutboundQueue and the transport loop.
- #signal_error(err) ⇒ Object
-
#signal_sent ⇒ Object
blocking notification: caller waits only for the send.
-
#state ⇒ Object
Coarse state view kept for Registry, which skips already-settled tickets.
-
#wait(timeout = nil) ⇒ Object
Block until resolved or ‘timeout` seconds elapse (nil waits forever).
Constructor Details
#initialize(id:, payload:, thread:, expires_at: nil) ⇒ Ticket
Returns a new instance of Ticket.
29 30 31 32 33 34 35 36 |
# File 'lib/jrpc/shared_client/ticket.rb', line 29 def initialize(id:, payload:, thread:, expires_at: nil) @id = id @payload = payload @thread = thread @expires_at = expires_at @future = Concurrent::Promises.resolvable_future @cancelled = Concurrent::AtomicBoolean.new(false) end |
Instance Attribute Details
#expires_at ⇒ Object (readonly)
Returns the value of attribute expires_at.
27 28 29 |
# File 'lib/jrpc/shared_client/ticket.rb', line 27 def expires_at @expires_at end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
27 28 29 |
# File 'lib/jrpc/shared_client/ticket.rb', line 27 def id @id end |
#payload ⇒ Object (readonly)
Returns the value of attribute payload.
27 28 29 |
# File 'lib/jrpc/shared_client/ticket.rb', line 27 def payload @payload end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
27 28 29 |
# File 'lib/jrpc/shared_client/ticket.rb', line 27 def thread @thread end |
Instance Method Details
#alive? ⇒ Boolean
fire_and_forget tickets (thread: nil) are never “alive”.
79 |
# File 'lib/jrpc/shared_client/ticket.rb', line 79 def alive? = @thread.nil? ? false : @thread.alive? |
#cancel ⇒ Object
53 |
# File 'lib/jrpc/shared_client/ticket.rb', line 53 def cancel = @cancelled.make_true |
#cancelled? ⇒ Boolean
54 |
# File 'lib/jrpc/shared_client/ticket.rb', line 54 def cancelled? = @cancelled.true? |
#error ⇒ Object
51 |
# File 'lib/jrpc/shared_client/ticket.rb', line 51 def error = @future.rejected? ? @future.reason : nil |
#expired?(now) ⇒ Boolean
81 |
# File 'lib/jrpc/shared_client/ticket.rb', line 81 def expired?(now) = @expires_at ? now >= @expires_at : false |
#fulfill(value) ⇒ Object
— worker (transport loop / close) side —————————— All idempotent: the first resolution wins, later ones are no-ops.
59 |
# File 'lib/jrpc/shared_client/ticket.rb', line 59 def fulfill(value) = @future.fulfill(value, false) |
#fulfilled? ⇒ Boolean
48 |
# File 'lib/jrpc/shared_client/ticket.rb', line 48 def fulfilled? = @future.fulfilled? |
#reject(err) ⇒ Object
60 |
# File 'lib/jrpc/shared_client/ticket.rb', line 60 def reject(err) = @future.reject(err, false) |
#rejected? ⇒ Boolean
49 |
# File 'lib/jrpc/shared_client/ticket.rb', line 49 def rejected? = @future.rejected? |
#resolved? ⇒ Boolean
47 |
# File 'lib/jrpc/shared_client/ticket.rb', line 47 def resolved? = @future.resolved? |
#result ⇒ Object
50 |
# File 'lib/jrpc/shared_client/ticket.rb', line 50 def result = @future.fulfilled? ? @future.value : nil |
#signal_done(result:) ⇒ Object
Resolution aliases used by Registry/OutboundQueue and the transport loop.
63 |
# File 'lib/jrpc/shared_client/ticket.rb', line 63 def signal_done(result:) = fulfill(result) |
#signal_error(err) ⇒ Object
64 65 |
# File 'lib/jrpc/shared_client/ticket.rb', line 64 def signal_error(err) = reject(err) # blocking notification: caller waits only for the send |
#signal_sent ⇒ Object
blocking notification: caller waits only for the send
66 |
# File 'lib/jrpc/shared_client/ticket.rb', line 66 def signal_sent = fulfill(nil) |
#state ⇒ Object
Coarse state view kept for Registry, which skips already-settled tickets.
69 70 71 72 73 74 |
# File 'lib/jrpc/shared_client/ticket.rb', line 69 def state return :cancelled if cancelled? return :done if resolved? :pending end |
#wait(timeout = nil) ⇒ Object
Block until resolved or ‘timeout` seconds elapse (nil waits forever). Cooperates with the fiber scheduler. Returns self; inspect afterwards.
42 43 44 45 |
# File 'lib/jrpc/shared_client/ticket.rb', line 42 def wait(timeout = nil) @future.wait(timeout) self end |