Class: JRPC::SharedClient::Ticket

Inherits:
Object
  • Object
show all
Defined in:
lib/jrpc/shared_client/ticket.rb

Overview

One in-flight request or notification together with its result future.

The result is backed by a write-once Concurrent::Promises.resolvable_future. That single primitive replaces the old hand-rolled Mutex+ConditionVariable state machine and gives us three things for free:

* idempotent resolution  - fulfill/reject never raise on a second call,
  so the transport loop and the close path can both resolve a ticket
  without racing (fixes the old :cancelled -> :done overwrite).
* a caller-side timeout   - #wait takes a timeout, so a caller is no
  longer at the mercy of the loop thread to be woken (fixes the
  "loop dies, callers hang forever" hole).
* fiber cooperation       - the future's wait bottoms out in a
  scheduler-aware ConditionVariable, so blocking a fiber under Falcon /
  rage-rb yields to the reactor instead of stalling the thread.

‘cancelled` is a separate monotonic flag (the caller gave up); it is deliberately NOT part of the future, so cancellation can never race or overwrite a real result.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, payload:, thread:, expires_at: nil) ⇒ Ticket

Returns a new instance of Ticket.



29
30
31
32
33
34
35
36
# File 'lib/jrpc/shared_client/ticket.rb', line 29

def initialize(id:, payload:, thread:, expires_at: nil)
  @id = id
  @payload = payload
  @thread = thread
  @expires_at = expires_at
  @future = Concurrent::Promises.resolvable_future
  @cancelled = Concurrent::AtomicBoolean.new(false)
end

Instance Attribute Details

#expires_atObject (readonly)

Returns the value of attribute expires_at.



27
28
29
# File 'lib/jrpc/shared_client/ticket.rb', line 27

def expires_at
  @expires_at
end

#idObject (readonly)

Returns the value of attribute id.



27
28
29
# File 'lib/jrpc/shared_client/ticket.rb', line 27

def id
  @id
end

#payloadObject (readonly)

Returns the value of attribute payload.



27
28
29
# File 'lib/jrpc/shared_client/ticket.rb', line 27

def payload
  @payload
end

#threadObject (readonly)

Returns the value of attribute thread.



27
28
29
# File 'lib/jrpc/shared_client/ticket.rb', line 27

def thread
  @thread
end

Instance Method Details

#alive?Boolean

fire_and_forget tickets (thread: nil) are never “alive”.

Returns:

  • (Boolean)


79
# File 'lib/jrpc/shared_client/ticket.rb', line 79

def alive? = @thread.nil? ? false : @thread.alive?

#cancelObject



53
# File 'lib/jrpc/shared_client/ticket.rb', line 53

def cancel = @cancelled.make_true

#cancelled?Boolean

Returns:

  • (Boolean)


54
# File 'lib/jrpc/shared_client/ticket.rb', line 54

def cancelled? = @cancelled.true?

#errorObject



51
# File 'lib/jrpc/shared_client/ticket.rb', line 51

def error = @future.rejected? ? @future.reason : nil

#expired?(now) ⇒ Boolean

Returns:

  • (Boolean)


81
# File 'lib/jrpc/shared_client/ticket.rb', line 81

def expired?(now) = @expires_at ? now >= @expires_at : false

#fulfill(value) ⇒ Object

— worker (transport loop / close) side —————————— All idempotent: the first resolution wins, later ones are no-ops.



59
# File 'lib/jrpc/shared_client/ticket.rb', line 59

def fulfill(value) = @future.fulfill(value, false)

#fulfilled?Boolean

Returns:

  • (Boolean)


48
# File 'lib/jrpc/shared_client/ticket.rb', line 48

def fulfilled? = @future.fulfilled?

#reject(err) ⇒ Object



60
# File 'lib/jrpc/shared_client/ticket.rb', line 60

def reject(err) = @future.reject(err, false)

#rejected?Boolean

Returns:

  • (Boolean)


49
# File 'lib/jrpc/shared_client/ticket.rb', line 49

def rejected? = @future.rejected?

#resolved?Boolean

Returns:

  • (Boolean)


47
# File 'lib/jrpc/shared_client/ticket.rb', line 47

def resolved? = @future.resolved?

#resultObject



50
# File 'lib/jrpc/shared_client/ticket.rb', line 50

def result = @future.fulfilled? ? @future.value : nil

#signal_done(result:) ⇒ Object

Resolution aliases used by Registry/OutboundQueue and the transport loop.



63
# File 'lib/jrpc/shared_client/ticket.rb', line 63

def signal_done(result:) = fulfill(result)

#signal_error(err) ⇒ Object



64
65
# File 'lib/jrpc/shared_client/ticket.rb', line 64

def signal_error(err) = reject(err)
# blocking notification: caller waits only for the send

#signal_sentObject

blocking notification: caller waits only for the send



66
# File 'lib/jrpc/shared_client/ticket.rb', line 66

def signal_sent = fulfill(nil)

#stateObject

Coarse state view kept for Registry, which skips already-settled tickets.



69
70
71
72
73
74
# File 'lib/jrpc/shared_client/ticket.rb', line 69

def state
  return :cancelled if cancelled?
  return :done if resolved?

  :pending
end

#wait(timeout = nil) ⇒ Object

Block until resolved or ‘timeout` seconds elapse (nil waits forever). Cooperates with the fiber scheduler. Returns self; inspect afterwards.



42
43
44
45
# File 'lib/jrpc/shared_client/ticket.rb', line 42

def wait(timeout = nil)
  @future.wait(timeout)
  self
end