Class: Restate::Server::Context

Inherits:
Object
  • Object
show all
Includes:
WorkflowContext, WorkflowSharedContext
Defined in:
lib/restate/server/context.rb

Overview

The core execution context for a Restate handler invocation. Implements the progress loop and all context API methods (state, run, sleep, call, send).

Concurrency model:

- The handler runs inside a Fiber managed by Falcon/Async.
- `run` blocks spawn child Async tasks.
- When the progress loop needs input, it dequeues from @input_queue, yielding the Fiber.
- The HTTP input reader (a separate Async task) feeds chunks into @input_queue.
- Output chunks are written directly to the streaming response body.

Defined Under Namespace

Modules: BackgroundPool

Constant Summary collapse

LOGGER =
Logger.new($stdout, progname: 'Restate::Server::Context')

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) ⇒ Context

Returns a new instance of Context.



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/restate/server/context.rb', line 27

def initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [],
               outbound_middleware: [])
  @vm = vm
  @handler = handler
  @invocation = invocation
  @send_output = send_output
  @input_queue = input_queue
  @run_coros_to_execute = {}
  @attempt_finished_event = AttemptFinishedEvent.new
  @middleware = middleware
  @outbound_middleware = outbound_middleware
end

Instance Attribute Details

#invocationObject (readonly)

Returns the value of attribute invocation.



25
26
27
# File 'lib/restate/server/context.rb', line 25

def invocation
  @invocation
end

#vmObject (readonly)

Returns the value of attribute vm.



25
26
27
# File 'lib/restate/server/context.rb', line 25

def vm
  @vm
end

Instance Method Details

#all(*futures) ⇒ Object

Build a lazy combinator over the given futures and return it as a CombinedFuture. Nothing blocks until .await is called, so the combinators compose: Restate.race(Restate.all(a, b), c).await.

Semantics match JS Promise.all — when awaited, returns the values in input order, short-circuiting on the first TerminalError.



178
179
180
181
# File 'lib/restate/server/context.rb', line 178

def all(*futures)
  futures = futures.first if futures.length == 1 && futures.first.is_a?(Array)
  CombinedFuture.new(self, :all_succeeded_or_first_failed, futures)
end

#all_settled(*futures) ⇒ Object

Lazy combinator. Awaiting waits for every future to settle and returns an Array of { status: :fulfilled, value: … } or { status: :rejected, reason: TerminalError } entries, in input order. Matches JS Promise.allSettled.



205
206
207
208
# File 'lib/restate/server/context.rb', line 205

def all_settled(*futures)
  futures = futures.first if futures.length == 1 && futures.first.is_a?(Array)
  CombinedFuture.new(self, :all_completed, futures)
end

#any(*futures) ⇒ Object

Lazy combinator. Awaiting returns the value of the first successful future; raises only if every future fails terminally. Matches JS Promise.any.

Raises:

  • (ArgumentError)


194
195
196
197
198
199
# File 'lib/restate/server/context.rb', line 194

def any(*futures)
  futures = futures.first if futures.length == 1 && futures.first.is_a?(Array)
  raise ArgumentError, 'any requires at least one future' if futures.empty?

  CombinedFuture.new(self, :first_succeeded_or_all_failed, futures)
end

#awakeable(serde: JsonSerde) ⇒ Object

Creates an awakeable and returns [awakeable_id, DurableFuture].



328
329
330
331
# File 'lib/restate/server/context.rb', line 328

def awakeable(serde: JsonSerde)
  id, handle = @vm.sys_awakeable
  [id, DurableFuture.new(self, handle, serde: serde)]
end

#cancel_invocation(invocation_id) ⇒ Object

Requests cancellation of another invocation by its id.



399
400
401
# File 'lib/restate/server/context.rb', line 399

def cancel_invocation(invocation_id)
  @vm.sys_cancel_invocation(invocation_id)
end

#clear(name) ⇒ Object

Durably removes a single state entry by name.



114
115
116
# File 'lib/restate/server/context.rb', line 114

def clear(name)
  @vm.sys_clear_state(name)
end

#clear_allObject

Durably removes all state entries for this virtual object or workflow.



119
120
121
# File 'lib/restate/server/context.rb', line 119

def clear_all
  @vm.sys_clear_all_state
end

#completed?(handle) ⇒ Boolean

Check if a handle is completed (non-blocking).

Returns:

  • (Boolean)


155
156
157
# File 'lib/restate/server/context.rb', line 155

def completed?(handle)
  @vm.is_completed(handle)
end

#enterObject

Runs the handler to completion, writing the output (or failure) to the journal.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/restate/server/context.rb', line 43

def enter
  Thread.current[:restate_context] = self
  Thread.current[:restate_service_kind] = @handler.service_tag.kind
  Thread.current[:restate_handler_kind] = @handler.kind
  in_buffer = @invocation.input_buffer
  out_buffer = Restate.invoke_handler(handler: @handler, ctx: self, in_buffer: in_buffer,
                                      middleware: @middleware)
  @vm.sys_write_output_success(out_buffer)
  @vm.sys_end
rescue TerminalError => e
  failure = Failure.new(code: e.status_code, message: e.message, metadata: e.)
  @vm.sys_write_output_failure(failure)
  @vm.sys_end
rescue SuspendedError, InternalError
  # These are expected internal control flow exceptions; do nothing.
rescue DisconnectedError
  raise
rescue StandardError => e
  # Walk the cause chain for TerminalError or internal exceptions
  cause = e
  handled = false
  while cause
    if cause.is_a?(TerminalError)
      f = Failure.new(code: cause.status_code, message: cause.message, metadata: cause.)
      @vm.sys_write_output_failure(f)
      @vm.sys_end
      handled = true
      break
    elsif cause.is_a?(SuspendedError) || cause.is_a?(InternalError)
      handled = true
      break
    end
    cause = cause.cause
  end
  unless handled
    @vm.notify_error(e.inspect, e.backtrace&.join("\n"))
    raise
  end
ensure
  @run_coros_to_execute.clear
  Thread.current[:restate_context] = nil
  Thread.current[:restate_service_kind] = nil
  Thread.current[:restate_handler_kind] = nil
end

#generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object

Durably calls a handler using raw bytes (no serialization). Useful for proxying.



406
407
408
409
410
411
412
413
414
415
# File 'lib/restate/server/context.rb', line 406

def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil)
  with_outbound_middleware(service, handler, headers) do |hdrs|
    call_handle = @vm.sys_call(
      service: service, handler: handler, parameter: arg,
      key: key, idempotency_key: idempotency_key, headers: hdrs
    )
    DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle,
                          output_serde: nil)
  end
end

#generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object

Sends a one-way invocation using raw bytes (no serialization). Useful for proxying.



418
419
420
421
422
423
424
425
426
427
# File 'lib/restate/server/context.rb', line 418

def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil)
  delay_ms = delay ? (delay * 1000).to_i : nil
  with_outbound_middleware(service, handler, headers) do |hdrs|
    invocation_id_handle = @vm.sys_send(
      service: service, handler: handler, parameter: arg,
      key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs
    )
    SendHandle.new(self, invocation_id_handle)
  end
end

#get(name, serde: JsonSerde) ⇒ Object

Durably retrieves a state entry by name. Returns nil if unset.



98
99
100
# File 'lib/restate/server/context.rb', line 98

def get(name, serde: JsonSerde)
  get_async(name, serde: serde).await
end

#get_async(name, serde: JsonSerde) ⇒ Object

Returns a DurableFuture for a state entry. Resolves to nil if unset.



103
104
105
106
# File 'lib/restate/server/context.rb', line 103

def get_async(name, serde: JsonSerde)
  handle = @vm.sys_get_state(name)
  DurableFuture.new(self, handle, serde: serde)
end

#keyObject

Returns the key for this virtual object or workflow invocation.



442
443
444
# File 'lib/restate/server/context.rb', line 442

def key
  @invocation.key
end

#object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably calls a handler on a Restate virtual object, keyed by key.



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/restate/server/context.rb', line 279

def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                input_serde: NOT_SET, output_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  out_serde = resolve_serde(output_serde, handler_meta, :output_serde)
  parameter = in_serde.serialize(arg)
  with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs|
    call_handle = @vm.sys_call(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, idempotency_key: idempotency_key, headers: hdrs
    )
    DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle,
                          output_serde: out_serde)
  end
end

#object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Sends a one-way invocation to a Restate virtual object handler (fire-and-forget).



296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/restate/server/context.rb', line 296

def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil,
                input_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  parameter = in_serde.serialize(arg)
  delay_ms = delay ? (delay * 1000).to_i : nil
  with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs|
    invocation_id_handle = @vm.sys_send(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs
    )
    SendHandle.new(self, invocation_id_handle)
  end
end

#on_attempt_finishedObject

Called by the server when the attempt ends (handler completed, disconnected, or transient error). Signals the attempt_finished_event so that user code and background pool jobs can clean up.



91
92
93
# File 'lib/restate/server/context.rb', line 91

def on_attempt_finished
  @attempt_finished_event.set!
end

#peek_promise(name, serde: JsonSerde) ⇒ Object

Peeks at a durable promise value without blocking. Returns nil if not yet resolved.



374
375
376
377
378
379
# File 'lib/restate/server/context.rb', line 374

def peek_promise(name, serde: JsonSerde)
  handle = @vm.sys_peek_promise(name)
  poll_and_take(handle) do |raw|
    raw.nil? ? nil : serde.deserialize(raw)
  end
end

#promise(name, serde: JsonSerde) ⇒ Object

Gets a durable promise value, blocking until resolved.



366
367
368
369
370
371
# File 'lib/restate/server/context.rb', line 366

def promise(name, serde: JsonSerde)
  handle = @vm.sys_get_promise(name)
  poll_and_take(handle) do |raw|
    raw.nil? ? nil : serde.deserialize(raw)
  end
end

#race(*futures) ⇒ Object

Lazy combinator. Awaiting returns the value of the first future to settle, or raises if it settled with a TerminalError. Matches JS Promise.race.

Raises:

  • (ArgumentError)


185
186
187
188
189
190
# File 'lib/restate/server/context.rb', line 185

def race(*futures)
  futures = futures.first if futures.length == 1 && futures.first.is_a?(Array)
  raise ArgumentError, 'race requires at least one future' if futures.empty?

  CombinedFuture.new(self, :first_completed, futures)
end

#reject_awakeable(awakeable_id, message, code: 500) ⇒ Object

Rejects an awakeable with a terminal failure.



339
340
341
342
# File 'lib/restate/server/context.rb', line 339

def reject_awakeable(awakeable_id, message, code: 500)
  failure = Failure.new(code: code, message: message)
  @vm.sys_complete_awakeable_failure(awakeable_id, failure)
end

#reject_promise(name, message, code: 500) ⇒ Object

Rejects a durable promise with a terminal failure.



389
390
391
392
393
394
# File 'lib/restate/server/context.rb', line 389

def reject_promise(name, message, code: 500)
  failure = Failure.new(code: code, message: message)
  handle = @vm.sys_complete_promise_failure(name, failure)
  poll_and_take(handle)
  nil
end

#reject_signal(invocation_id, name, message, code: 500) ⇒ Object

Send a terminal failure to a named signal on another invocation.



358
359
360
361
# File 'lib/restate/server/context.rb', line 358

def reject_signal(invocation_id, name, message, code: 500)
  failure = Failure.new(code: code, message: message)
  @vm.sys_complete_signal_failure(invocation_id, name, failure)
end

#requestObject

Returns metadata about the current invocation (id, headers, raw body).



432
433
434
435
436
437
438
439
# File 'lib/restate/server/context.rb', line 432

def request
  @request ||= Request.new(
    id: @invocation.invocation_id,
    headers: @invocation.headers.to_h,
    body: @invocation.input_buffer,
    attempt_finished_event: @attempt_finished_event
  )
end

#resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object

Resolves an awakeable with a success value.



334
335
336
# File 'lib/restate/server/context.rb', line 334

def resolve_awakeable(awakeable_id, payload, serde: JsonSerde)
  @vm.sys_complete_awakeable_success(awakeable_id, serde.serialize(payload))
end

#resolve_handle(handle) ⇒ Object

Block until a previously created handle completes. Returns the value.



145
146
147
# File 'lib/restate/server/context.rb', line 145

def resolve_handle(handle)
  poll_and_take(handle)
end

#resolve_promise(name, payload, serde: JsonSerde) ⇒ Object

Resolves a durable promise with a success value.



382
383
384
385
386
# File 'lib/restate/server/context.rb', line 382

def resolve_promise(name, payload, serde: JsonSerde)
  handle = @vm.sys_complete_promise_success(name, serde.serialize(payload))
  poll_and_take(handle)
  nil
end

#resolve_signal(invocation_id, name, payload, serde: JsonSerde) ⇒ Object

Send a success value to a named signal on another invocation.



353
354
355
# File 'lib/restate/server/context.rb', line 353

def resolve_signal(invocation_id, name, payload, serde: JsonSerde)
  @vm.sys_complete_signal_success(invocation_id, name, serde.serialize(payload))
end

#run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Executes a durable side effect. The block runs at most once; its result is journaled and replayed on retries. Returns a DurableFuture for the result.

Pass background: true to run the block in a real OS Thread, keeping the fiber event loop responsive for other concurrent handlers. Use this for CPU-intensive work.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/restate/server/context.rb', line 218

def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  run = @vm.sys_run(name)
  handle = run.handle

  # Schedule the run closure only if the run wasn't replayed.
  unless run.replayed
    @run_coros_to_execute[handle] =
      if background
        -> { execute_run_threaded(handle, action, serde, retry_policy) }
      else
        -> { execute_run(handle, action, serde, retry_policy) }
      end
  end

  DurableFuture.new(self, handle, serde: serde)
end

#run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Convenience shortcut for run(…).await — executes the durable side effect and returns the result directly.

Accepts all the same options as run, including background: true.



239
240
241
# File 'lib/restate/server/context.rb', line 239

def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  run(name, serde: serde, retry_policy: retry_policy, background: background, &action).await
end

#service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably calls a handler on a Restate service and returns a future for its result.



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/restate/server/context.rb', line 246

def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil,
                 input_serde: NOT_SET, output_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  out_serde = resolve_serde(output_serde, handler_meta, :output_serde)
  parameter = in_serde.serialize(arg)
  with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs|
    call_handle = @vm.sys_call(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, idempotency_key: idempotency_key, headers: hdrs
    )
    DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle,
                          output_serde: out_serde)
  end
end

#service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Sends a one-way invocation to a Restate service handler (fire-and-forget).



263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/restate/server/context.rb', line 263

def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil,
                 input_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  parameter = in_serde.serialize(arg)
  delay_ms = delay ? (delay * 1000).to_i : nil
  with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs|
    invocation_id_handle = @vm.sys_send(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs
    )
    SendHandle.new(self, invocation_id_handle)
  end
end

#set(name, value, serde: JsonSerde) ⇒ Object

Durably sets a state entry. The value is serialized via serde.



109
110
111
# File 'lib/restate/server/context.rb', line 109

def set(name, value, serde: JsonSerde)
  @vm.sys_set_state(name, serde.serialize(value))
end

#signal(name, serde: JsonSerde) ⇒ Object

Wait for a named signal addressed to this invocation. Returns a DurableFuture.



347
348
349
350
# File 'lib/restate/server/context.rb', line 347

def signal(name, serde: JsonSerde)
  handle = @vm.sys_signal(name)
  DurableFuture.new(self, handle, serde: serde)
end

#sleep(seconds) ⇒ Object

Returns a durable future that completes after the given duration. The timer survives handler restarts.



138
139
140
141
142
# File 'lib/restate/server/context.rb', line 138

def sleep(seconds)
  millis = (seconds * 1000).to_i
  handle = @vm.sys_sleep(millis)
  DurableFuture.new(self, handle)
end

#state_keysObject

Returns the list of all state entry names for this virtual object or workflow.



124
125
126
# File 'lib/restate/server/context.rb', line 124

def state_keys
  state_keys_async.await
end

#state_keys_asyncObject

Returns a DurableFuture for the list of all state entry names.



129
130
131
132
# File 'lib/restate/server/context.rb', line 129

def state_keys_async
  handle = @vm.sys_get_state_keys
  DurableFuture.new(self, handle)
end

#take_completed(handle) ⇒ Object

Take a completed handle’s notification, returning the value. Raises TerminalError if the handle resolved to a failure.



161
162
163
# File 'lib/restate/server/context.rb', line 161

def take_completed(handle)
  must_take_notification(handle)
end

#wait_any(*futures) ⇒ Object

Wait until any of the given futures completes. Returns [completed, remaining].



166
167
168
169
170
# File 'lib/restate/server/context.rb', line 166

def wait_any(*futures)
  handles = futures.map(&:handle)
  wait_any_handle(handles)
  futures.partition(&:completed?)
end

#wait_any_handle(handles) ⇒ Object

Wait until any of the given handles completes. Does not take notifications.



150
151
152
# File 'lib/restate/server/context.rb', line 150

def wait_any_handle(handles)
  poll_or_cancel(handles) unless handles.any? { |h| @vm.is_completed(h) }
end

#wait_combined(future_tree) ⇒ Object

Drive progress over a combinator tree. Returns when the combinator logically completes (the shared-core decides based on the tree shape). future_tree follows the encoding documented in lib/restate/vm.rb#do_await. Public so CombinedFuture#await can drive it via @ctx.wait_combined.



450
451
452
# File 'lib/restate/server/context.rb', line 450

def wait_combined(future_tree)
  progress_loop { @vm.do_await(future_tree) }
end

#workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably calls a handler on a Restate workflow, keyed by key.



312
313
314
315
316
# File 'lib/restate/server/context.rb', line 312

def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                  input_serde: NOT_SET, output_serde: NOT_SET)
  object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers,
              input_serde: input_serde, output_serde: output_serde) # rubocop:disable Layout/HashAlignment
end

#workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Sends a one-way invocation to a Restate workflow handler (fire-and-forget).



319
320
321
322
323
# File 'lib/restate/server/context.rb', line 319

def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil,
                  input_serde: NOT_SET)
  object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers,
              input_serde: input_serde) # rubocop:disable Layout/HashAlignment
end