Class: Restate::ServerContext

Inherits:
Object
  • Object
show all
Includes:
WorkflowContext, WorkflowSharedContext
Defined in:
lib/restate/server_context.rb

Overview

The core execution context for a Restate handler invocation. Implements the progress loop and all context API methods (state, run, sleep, call, send).

Concurrency model:

- The handler runs inside a Fiber managed by Falcon/Async.
- `run` blocks spawn child Async tasks.
- When the progress loop needs input, it dequeues from @input_queue, yielding the Fiber.
- The HTTP input reader (a separate Async task) feeds chunks into @input_queue.
- Output chunks are written directly to the streaming response body.

Defined Under Namespace

Modules: BackgroundPool

Constant Summary collapse

LOGGER =
Logger.new($stdout, progname: 'Restate::ServerContext')

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) ⇒ ServerContext

Returns a new instance of ServerContext.



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/restate/server_context.rb', line 26

def initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [],
               outbound_middleware: [])
  @vm = vm
  @handler = handler
  @invocation = invocation
  @send_output = send_output
  @input_queue = input_queue
  @run_coros_to_execute = {}
  @attempt_finished_event = AttemptFinishedEvent.new
  @middleware = middleware
  @outbound_middleware = outbound_middleware
end

Instance Attribute Details

#invocationObject (readonly)

Returns the value of attribute invocation.



24
25
26
# File 'lib/restate/server_context.rb', line 24

def invocation
  @invocation
end

#vmObject (readonly)

Returns the value of attribute vm.



24
25
26
# File 'lib/restate/server_context.rb', line 24

def vm
  @vm
end

Instance Method Details

#awakeable(serde: JsonSerde) ⇒ Object

Creates an awakeable and returns [awakeable_id, DurableFuture].



285
286
287
288
# File 'lib/restate/server_context.rb', line 285

def awakeable(serde: JsonSerde)
  id, handle = @vm.sys_awakeable
  [id, DurableFuture.new(self, handle, serde: serde)]
end

#cancel_invocation(invocation_id) ⇒ Object

Requests cancellation of another invocation by its id.



337
338
339
# File 'lib/restate/server_context.rb', line 337

def cancel_invocation(invocation_id)
  @vm.sys_cancel_invocation(invocation_id)
end

#clear(name) ⇒ Object

Durably removes a single state entry by name.



113
114
115
# File 'lib/restate/server_context.rb', line 113

def clear(name)
  @vm.sys_clear_state(name)
end

#clear_allObject

Durably removes all state entries for this virtual object or workflow.



118
119
120
# File 'lib/restate/server_context.rb', line 118

def clear_all
  @vm.sys_clear_all_state
end

#completed?(handle) ⇒ Boolean

Check if a handle is completed (non-blocking).

Returns:

  • (Boolean)


154
155
156
# File 'lib/restate/server_context.rb', line 154

def completed?(handle)
  @vm.is_completed(handle)
end

#enterObject

Runs the handler to completion, writing the output (or failure) to the journal.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/restate/server_context.rb', line 42

def enter
  Thread.current[:restate_context] = self
  Thread.current[:restate_service_kind] = @handler.service_tag.kind
  Thread.current[:restate_handler_kind] = @handler.kind
  in_buffer = @invocation.input_buffer
  out_buffer = Restate.invoke_handler(handler: @handler, ctx: self, in_buffer: in_buffer,
                                      middleware: @middleware)
  @vm.sys_write_output_success(out_buffer)
  @vm.sys_end
rescue TerminalError => e
  failure = Failure.new(code: e.status_code, message: e.message)
  @vm.sys_write_output_failure(failure)
  @vm.sys_end
rescue SuspendedError, InternalError
  # These are expected internal control flow exceptions; do nothing.
rescue DisconnectedError
  raise
rescue StandardError => e
  # Walk the cause chain for TerminalError or internal exceptions
  cause = e
  handled = false
  while cause
    if cause.is_a?(TerminalError)
      f = Failure.new(code: cause.status_code, message: cause.message)
      @vm.sys_write_output_failure(f)
      @vm.sys_end
      handled = true
      break
    elsif cause.is_a?(SuspendedError) || cause.is_a?(InternalError)
      handled = true
      break
    end
    cause = cause.cause
  end
  unless handled
    @vm.notify_error(e.inspect, e.backtrace&.join("\n"))
    raise
  end
ensure
  @run_coros_to_execute.clear
  Thread.current[:restate_context] = nil
  Thread.current[:restate_service_kind] = nil
  Thread.current[:restate_handler_kind] = nil
end

#generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object

Durably calls a handler using raw bytes (no serialization). Useful for proxying.



344
345
346
347
348
349
350
351
352
353
# File 'lib/restate/server_context.rb', line 344

def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil)
  with_outbound_middleware(service, handler, headers) do |hdrs|
    call_handle = @vm.sys_call(
      service: service, handler: handler, parameter: arg,
      key: key, idempotency_key: idempotency_key, headers: hdrs
    )
    DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle,
                          output_serde: nil)
  end
end

#generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object

Sends a one-way invocation using raw bytes (no serialization). Useful for proxying.



356
357
358
359
360
361
362
363
364
365
# File 'lib/restate/server_context.rb', line 356

def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil)
  delay_ms = delay ? (delay * 1000).to_i : nil
  with_outbound_middleware(service, handler, headers) do |hdrs|
    invocation_id_handle = @vm.sys_send(
      service: service, handler: handler, parameter: arg,
      key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs
    )
    SendHandle.new(self, invocation_id_handle)
  end
end

#get(name, serde: JsonSerde) ⇒ Object

Durably retrieves a state entry by name. Returns nil if unset.



97
98
99
# File 'lib/restate/server_context.rb', line 97

def get(name, serde: JsonSerde)
  get_async(name, serde: serde).await
end

#get_async(name, serde: JsonSerde) ⇒ Object

Returns a DurableFuture for a state entry. Resolves to nil if unset.



102
103
104
105
# File 'lib/restate/server_context.rb', line 102

def get_async(name, serde: JsonSerde)
  handle = @vm.sys_get_state(name)
  DurableFuture.new(self, handle, serde: serde)
end

#keyObject

Returns the key for this virtual object or workflow invocation.



380
381
382
# File 'lib/restate/server_context.rb', line 380

def key
  @invocation.key
end

#object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably calls a handler on a Restate virtual object, keyed by key.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/restate/server_context.rb', line 236

def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                input_serde: NOT_SET, output_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  out_serde = resolve_serde(output_serde, handler_meta, :output_serde)
  parameter = in_serde.serialize(arg)
  with_outbound_middleware(svc_name, handler_name, headers) do |hdrs|
    call_handle = @vm.sys_call(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, idempotency_key: idempotency_key, headers: hdrs
    )
    DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle,
                          output_serde: out_serde)
  end
end

#object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Sends a one-way invocation to a Restate virtual object handler (fire-and-forget).



253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/restate/server_context.rb', line 253

def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil,
                input_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  parameter = in_serde.serialize(arg)
  delay_ms = delay ? (delay * 1000).to_i : nil
  with_outbound_middleware(svc_name, handler_name, headers) do |hdrs|
    invocation_id_handle = @vm.sys_send(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs
    )
    SendHandle.new(self, invocation_id_handle)
  end
end

#on_attempt_finishedObject

Called by the server when the attempt ends (handler completed, disconnected, or transient error). Signals the attempt_finished_event so that user code and background pool jobs can clean up.



90
91
92
# File 'lib/restate/server_context.rb', line 90

def on_attempt_finished
  @attempt_finished_event.set!
end

#peek_promise(name, serde: JsonSerde) ⇒ Object

Peeks at a durable promise value without blocking. Returns nil if not yet resolved.



312
313
314
315
316
317
# File 'lib/restate/server_context.rb', line 312

def peek_promise(name, serde: JsonSerde)
  handle = @vm.sys_peek_promise(name)
  poll_and_take(handle) do |raw|
    raw.nil? ? nil : serde.deserialize(raw)
  end
end

#promise(name, serde: JsonSerde) ⇒ Object

Gets a durable promise value, blocking until resolved.



304
305
306
307
308
309
# File 'lib/restate/server_context.rb', line 304

def promise(name, serde: JsonSerde)
  handle = @vm.sys_get_promise(name)
  poll_and_take(handle) do |raw|
    raw.nil? ? nil : serde.deserialize(raw)
  end
end

#reject_awakeable(awakeable_id, message, code: 500) ⇒ Object

Rejects an awakeable with a terminal failure.



296
297
298
299
# File 'lib/restate/server_context.rb', line 296

def reject_awakeable(awakeable_id, message, code: 500)
  failure = Failure.new(code: code, message: message)
  @vm.sys_complete_awakeable_failure(awakeable_id, failure)
end

#reject_promise(name, message, code: 500) ⇒ Object

Rejects a durable promise with a terminal failure.



327
328
329
330
331
332
# File 'lib/restate/server_context.rb', line 327

def reject_promise(name, message, code: 500)
  failure = Failure.new(code: code, message: message)
  handle = @vm.sys_complete_promise_failure(name, failure)
  poll_and_take(handle)
  nil
end

#requestObject

Returns metadata about the current invocation (id, headers, raw body).



370
371
372
373
374
375
376
377
# File 'lib/restate/server_context.rb', line 370

def request
  @request ||= Request.new(
    id: @invocation.invocation_id,
    headers: @invocation.headers.to_h,
    body: @invocation.input_buffer,
    attempt_finished_event: @attempt_finished_event
  )
end

#resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object

Resolves an awakeable with a success value.



291
292
293
# File 'lib/restate/server_context.rb', line 291

def resolve_awakeable(awakeable_id, payload, serde: JsonSerde)
  @vm.sys_complete_awakeable_success(awakeable_id, serde.serialize(payload))
end

#resolve_handle(handle) ⇒ Object

Block until a previously created handle completes. Returns the value.



144
145
146
# File 'lib/restate/server_context.rb', line 144

def resolve_handle(handle)
  poll_and_take(handle)
end

#resolve_promise(name, payload, serde: JsonSerde) ⇒ Object

Resolves a durable promise with a success value.



320
321
322
323
324
# File 'lib/restate/server_context.rb', line 320

def resolve_promise(name, payload, serde: JsonSerde)
  handle = @vm.sys_complete_promise_success(name, serde.serialize(payload))
  poll_and_take(handle)
  nil
end

#run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Executes a durable side effect. The block runs at most once; its result is journaled and replayed on retries. Returns a DurableFuture for the result.

Pass background: true to run the block in a real OS Thread, keeping the fiber event loop responsive for other concurrent handlers. Use this for CPU-intensive work.



179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/restate/server_context.rb', line 179

def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  handle = @vm.sys_run(name)

  @run_coros_to_execute[handle] =
    if background
      -> { execute_run_threaded(handle, action, serde, retry_policy) }
    else
      -> { execute_run(handle, action, serde, retry_policy) }
    end

  DurableFuture.new(self, handle, serde: serde)
end

#run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Convenience shortcut for run(…).await — executes the durable side effect and returns the result directly.

Accepts all the same options as run, including background: true.



196
197
198
# File 'lib/restate/server_context.rb', line 196

def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  run(name, serde: serde, retry_policy: retry_policy, background: background, &action).await
end

#service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably calls a handler on a Restate service and returns a future for its result.



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/restate/server_context.rb', line 203

def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil,
                 input_serde: NOT_SET, output_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  out_serde = resolve_serde(output_serde, handler_meta, :output_serde)
  parameter = in_serde.serialize(arg)
  with_outbound_middleware(svc_name, handler_name, headers) do |hdrs|
    call_handle = @vm.sys_call(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, idempotency_key: idempotency_key, headers: hdrs
    )
    DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle,
                          output_serde: out_serde)
  end
end

#service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Sends a one-way invocation to a Restate service handler (fire-and-forget).



220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/restate/server_context.rb', line 220

def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil,
                 input_serde: NOT_SET)
  svc_name, handler_name, handler_meta = resolve_call_target(service, handler)
  in_serde = resolve_serde(input_serde, handler_meta, :input_serde)
  parameter = in_serde.serialize(arg)
  delay_ms = delay ? (delay * 1000).to_i : nil
  with_outbound_middleware(svc_name, handler_name, headers) do |hdrs|
    invocation_id_handle = @vm.sys_send(
      service: svc_name, handler: handler_name, parameter: parameter,
      key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs
    )
    SendHandle.new(self, invocation_id_handle)
  end
end

#set(name, value, serde: JsonSerde) ⇒ Object

Durably sets a state entry. The value is serialized via serde.



108
109
110
# File 'lib/restate/server_context.rb', line 108

def set(name, value, serde: JsonSerde)
  @vm.sys_set_state(name, serde.serialize(value))
end

#sleep(seconds) ⇒ Object

Returns a durable future that completes after the given duration. The timer survives handler restarts.



137
138
139
140
141
# File 'lib/restate/server_context.rb', line 137

def sleep(seconds)
  millis = (seconds * 1000).to_i
  handle = @vm.sys_sleep(millis)
  DurableFuture.new(self, handle)
end

#state_keysObject

Returns the list of all state entry names for this virtual object or workflow.



123
124
125
# File 'lib/restate/server_context.rb', line 123

def state_keys
  state_keys_async.await
end

#state_keys_asyncObject

Returns a DurableFuture for the list of all state entry names.



128
129
130
131
# File 'lib/restate/server_context.rb', line 128

def state_keys_async
  handle = @vm.sys_get_state_keys
  DurableFuture.new(self, handle)
end

#take_completed(handle) ⇒ Object

Take a completed handle’s notification, returning the value. Raises TerminalError if the handle resolved to a failure.



160
161
162
# File 'lib/restate/server_context.rb', line 160

def take_completed(handle)
  must_take_notification(handle)
end

#wait_any(*futures) ⇒ Object

Wait until any of the given futures completes. Returns [completed, remaining].



165
166
167
168
169
# File 'lib/restate/server_context.rb', line 165

def wait_any(*futures)
  handles = futures.map(&:handle)
  wait_any_handle(handles)
  futures.partition(&:completed?)
end

#wait_any_handle(handles) ⇒ Object

Wait until any of the given handles completes. Does not take notifications.



149
150
151
# File 'lib/restate/server_context.rb', line 149

def wait_any_handle(handles)
  poll_or_cancel(handles) unless handles.any? { |h| @vm.is_completed(h) }
end

#workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably calls a handler on a Restate workflow, keyed by key.



269
270
271
272
273
# File 'lib/restate/server_context.rb', line 269

def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                  input_serde: NOT_SET, output_serde: NOT_SET)
  object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers,
              input_serde: input_serde, output_serde: output_serde) # rubocop:disable Layout/HashAlignment
end

#workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Sends a one-way invocation to a Restate workflow handler (fire-and-forget).



276
277
278
279
280
# File 'lib/restate/server_context.rb', line 276

def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil,
                  input_serde: NOT_SET)
  object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers,
              input_serde: input_serde) # rubocop:disable Layout/HashAlignment
end