Class: Restate::ServerContext
- Inherits:
-
Object
- Object
- Restate::ServerContext
- Includes:
- WorkflowContext, WorkflowSharedContext
- Defined in:
- lib/restate/server_context.rb
Overview
The core execution context for a Restate handler invocation. Implements the progress loop and all context API methods (state, run, sleep, call, send).
Concurrency model:
- The handler runs inside a Fiber managed by Falcon/Async.
- `run` blocks spawn child Async tasks.
- When the progress loop needs input, it dequeues from @input_queue, yielding the Fiber.
- The HTTP input reader (a separate Async task) feeds chunks into @input_queue.
- Output chunks are written directly to the streaming response body.
Defined Under Namespace
Modules: BackgroundPool
Constant Summary collapse
- LOGGER =
Logger.new($stdout, progname: 'Restate::ServerContext')
Instance Attribute Summary collapse
-
#invocation ⇒ Object
readonly
Returns the value of attribute invocation.
-
#vm ⇒ Object
readonly
Returns the value of attribute vm.
Instance Method Summary collapse
-
#awakeable(serde: JsonSerde) ⇒ Object
Creates an awakeable and returns [awakeable_id, DurableFuture].
-
#cancel_invocation(invocation_id) ⇒ Object
Requests cancellation of another invocation by its id.
-
#clear(name) ⇒ Object
Durably removes a single state entry by name.
-
#clear_all ⇒ Object
Durably removes all state entries for this virtual object or workflow.
-
#completed?(handle) ⇒ Boolean
Check if a handle is completed (non-blocking).
-
#enter ⇒ Object
Runs the handler to completion, writing the output (or failure) to the journal.
-
#generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably calls a handler using raw bytes (no serialization).
-
#generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Sends a one-way invocation using raw bytes (no serialization).
-
#get(name, serde: JsonSerde) ⇒ Object
Durably retrieves a state entry by name.
-
#get_async(name, serde: JsonSerde) ⇒ Object
Returns a DurableFuture for a state entry.
-
#initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) ⇒ ServerContext
constructor
A new instance of ServerContext.
-
#key ⇒ Object
Returns the key for this virtual object or workflow invocation.
-
#object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate virtual object, keyed by
key. -
#object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate virtual object handler (fire-and-forget).
-
#on_attempt_finished ⇒ Object
Called by the server when the attempt ends (handler completed, disconnected, or transient error).
-
#peek_promise(name, serde: JsonSerde) ⇒ Object
Peeks at a durable promise value without blocking.
-
#promise(name, serde: JsonSerde) ⇒ Object
Gets a durable promise value, blocking until resolved.
-
#reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Rejects an awakeable with a terminal failure.
-
#reject_promise(name, message, code: 500) ⇒ Object
Rejects a durable promise with a terminal failure.
-
#request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
-
#resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolves an awakeable with a success value.
-
#resolve_handle(handle) ⇒ Object
Block until a previously created handle completes.
-
#resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolves a durable promise with a success value.
-
#run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Executes a durable side effect.
-
#run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await — executes the durable side effect and returns the result directly.
-
#service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate service and returns a future for its result.
-
#service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate service handler (fire-and-forget).
-
#set(name, value, serde: JsonSerde) ⇒ Object
Durably sets a state entry.
-
#sleep(seconds) ⇒ Object
Returns a durable future that completes after the given duration.
-
#state_keys ⇒ Object
Returns the list of all state entry names for this virtual object or workflow.
-
#state_keys_async ⇒ Object
Returns a DurableFuture for the list of all state entry names.
-
#take_completed(handle) ⇒ Object
Take a completed handle’s notification, returning the value.
-
#wait_any(*futures) ⇒ Object
Wait until any of the given futures completes.
-
#wait_any_handle(handles) ⇒ Object
Wait until any of the given handles completes.
-
#workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate workflow, keyed by
key. -
#workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate workflow handler (fire-and-forget).
Constructor Details
#initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) ⇒ ServerContext
Returns a new instance of ServerContext.
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/restate/server_context.rb', line 26 def initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) @vm = vm @handler = handler @invocation = invocation @send_output = send_output @input_queue = input_queue @run_coros_to_execute = {} @attempt_finished_event = AttemptFinishedEvent.new @middleware = middleware @outbound_middleware = outbound_middleware end |
Instance Attribute Details
#invocation ⇒ Object (readonly)
Returns the value of attribute invocation.
24 25 26 |
# File 'lib/restate/server_context.rb', line 24 def invocation @invocation end |
#vm ⇒ Object (readonly)
Returns the value of attribute vm.
24 25 26 |
# File 'lib/restate/server_context.rb', line 24 def vm @vm end |
Instance Method Details
#awakeable(serde: JsonSerde) ⇒ Object
Creates an awakeable and returns [awakeable_id, DurableFuture].
285 286 287 288 |
# File 'lib/restate/server_context.rb', line 285 def awakeable(serde: JsonSerde) id, handle = @vm.sys_awakeable [id, DurableFuture.new(self, handle, serde: serde)] end |
#cancel_invocation(invocation_id) ⇒ Object
Requests cancellation of another invocation by its id.
337 338 339 |
# File 'lib/restate/server_context.rb', line 337 def cancel_invocation(invocation_id) @vm.sys_cancel_invocation(invocation_id) end |
#clear(name) ⇒ Object
Durably removes a single state entry by name.
113 114 115 |
# File 'lib/restate/server_context.rb', line 113 def clear(name) @vm.sys_clear_state(name) end |
#clear_all ⇒ Object
Durably removes all state entries for this virtual object or workflow.
118 119 120 |
# File 'lib/restate/server_context.rb', line 118 def clear_all @vm.sys_clear_all_state end |
#completed?(handle) ⇒ Boolean
Check if a handle is completed (non-blocking).
154 155 156 |
# File 'lib/restate/server_context.rb', line 154 def completed?(handle) @vm.is_completed(handle) end |
#enter ⇒ Object
Runs the handler to completion, writing the output (or failure) to the journal.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/restate/server_context.rb', line 42 def enter Thread.current[:restate_context] = self Thread.current[:restate_service_kind] = @handler.service_tag.kind Thread.current[:restate_handler_kind] = @handler.kind in_buffer = @invocation.input_buffer out_buffer = Restate.invoke_handler(handler: @handler, ctx: self, in_buffer: in_buffer, middleware: @middleware) @vm.sys_write_output_success(out_buffer) @vm.sys_end rescue TerminalError => e failure = Failure.new(code: e.status_code, message: e.) @vm.sys_write_output_failure(failure) @vm.sys_end rescue SuspendedError, InternalError # These are expected internal control flow exceptions; do nothing. rescue DisconnectedError raise rescue StandardError => e # Walk the cause chain for TerminalError or internal exceptions cause = e handled = false while cause if cause.is_a?(TerminalError) f = Failure.new(code: cause.status_code, message: cause.) @vm.sys_write_output_failure(f) @vm.sys_end handled = true break elsif cause.is_a?(SuspendedError) || cause.is_a?(InternalError) handled = true break end cause = cause.cause end unless handled @vm.notify_error(e.inspect, e.backtrace&.join("\n")) raise end ensure @run_coros_to_execute.clear Thread.current[:restate_context] = nil Thread.current[:restate_service_kind] = nil Thread.current[:restate_handler_kind] = nil end |
#generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably calls a handler using raw bytes (no serialization). Useful for proxying.
344 345 346 347 348 349 350 351 352 353 |
# File 'lib/restate/server_context.rb', line 344 def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) with_outbound_middleware(service, handler, headers) do |hdrs| call_handle = @vm.sys_call( service: service, handler: handler, parameter: arg, key: key, idempotency_key: idempotency_key, headers: hdrs ) DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, output_serde: nil) end end |
#generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Sends a one-way invocation using raw bytes (no serialization). Useful for proxying.
356 357 358 359 360 361 362 363 364 365 |
# File 'lib/restate/server_context.rb', line 356 def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) delay_ms = delay ? (delay * 1000).to_i : nil with_outbound_middleware(service, handler, headers) do |hdrs| invocation_id_handle = @vm.sys_send( service: service, handler: handler, parameter: arg, key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs ) SendHandle.new(self, invocation_id_handle) end end |
#get(name, serde: JsonSerde) ⇒ Object
Durably retrieves a state entry by name. Returns nil if unset.
97 98 99 |
# File 'lib/restate/server_context.rb', line 97 def get(name, serde: JsonSerde) get_async(name, serde: serde).await end |
#get_async(name, serde: JsonSerde) ⇒ Object
Returns a DurableFuture for a state entry. Resolves to nil if unset.
102 103 104 105 |
# File 'lib/restate/server_context.rb', line 102 def get_async(name, serde: JsonSerde) handle = @vm.sys_get_state(name) DurableFuture.new(self, handle, serde: serde) end |
#key ⇒ Object
Returns the key for this virtual object or workflow invocation.
380 381 382 |
# File 'lib/restate/server_context.rb', line 380 def key @invocation.key end |
#object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate virtual object, keyed by key.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/restate/server_context.rb', line 236 def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) out_serde = resolve_serde(output_serde, , :output_serde) parameter = in_serde.serialize(arg) with_outbound_middleware(svc_name, handler_name, headers) do |hdrs| call_handle = @vm.sys_call( service: svc_name, handler: handler_name, parameter: parameter, key: key, idempotency_key: idempotency_key, headers: hdrs ) DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, output_serde: out_serde) end end |
#object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate virtual object handler (fire-and-forget).
253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/restate/server_context.rb', line 253 def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) parameter = in_serde.serialize(arg) delay_ms = delay ? (delay * 1000).to_i : nil with_outbound_middleware(svc_name, handler_name, headers) do |hdrs| invocation_id_handle = @vm.sys_send( service: svc_name, handler: handler_name, parameter: parameter, key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs ) SendHandle.new(self, invocation_id_handle) end end |
#on_attempt_finished ⇒ Object
Called by the server when the attempt ends (handler completed, disconnected, or transient error). Signals the attempt_finished_event so that user code and background pool jobs can clean up.
90 91 92 |
# File 'lib/restate/server_context.rb', line 90 def on_attempt_finished @attempt_finished_event.set! end |
#peek_promise(name, serde: JsonSerde) ⇒ Object
Peeks at a durable promise value without blocking. Returns nil if not yet resolved.
312 313 314 315 316 317 |
# File 'lib/restate/server_context.rb', line 312 def peek_promise(name, serde: JsonSerde) handle = @vm.sys_peek_promise(name) poll_and_take(handle) do |raw| raw.nil? ? nil : serde.deserialize(raw) end end |
#promise(name, serde: JsonSerde) ⇒ Object
Gets a durable promise value, blocking until resolved.
304 305 306 307 308 309 |
# File 'lib/restate/server_context.rb', line 304 def promise(name, serde: JsonSerde) handle = @vm.sys_get_promise(name) poll_and_take(handle) do |raw| raw.nil? ? nil : serde.deserialize(raw) end end |
#reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Rejects an awakeable with a terminal failure.
296 297 298 299 |
# File 'lib/restate/server_context.rb', line 296 def reject_awakeable(awakeable_id, , code: 500) failure = Failure.new(code: code, message: ) @vm.sys_complete_awakeable_failure(awakeable_id, failure) end |
#reject_promise(name, message, code: 500) ⇒ Object
Rejects a durable promise with a terminal failure.
327 328 329 330 331 332 |
# File 'lib/restate/server_context.rb', line 327 def reject_promise(name, , code: 500) failure = Failure.new(code: code, message: ) handle = @vm.sys_complete_promise_failure(name, failure) poll_and_take(handle) nil end |
#request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
370 371 372 373 374 375 376 377 |
# File 'lib/restate/server_context.rb', line 370 def request @request ||= Request.new( id: @invocation.invocation_id, headers: @invocation.headers.to_h, body: @invocation.input_buffer, attempt_finished_event: @attempt_finished_event ) end |
#resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolves an awakeable with a success value.
291 292 293 |
# File 'lib/restate/server_context.rb', line 291 def resolve_awakeable(awakeable_id, payload, serde: JsonSerde) @vm.sys_complete_awakeable_success(awakeable_id, serde.serialize(payload)) end |
#resolve_handle(handle) ⇒ Object
Block until a previously created handle completes. Returns the value.
144 145 146 |
# File 'lib/restate/server_context.rb', line 144 def resolve_handle(handle) poll_and_take(handle) end |
#resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolves a durable promise with a success value.
320 321 322 323 324 |
# File 'lib/restate/server_context.rb', line 320 def resolve_promise(name, payload, serde: JsonSerde) handle = @vm.sys_complete_promise_success(name, serde.serialize(payload)) poll_and_take(handle) nil end |
#run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Executes a durable side effect. The block runs at most once; its result is journaled and replayed on retries. Returns a DurableFuture for the result.
Pass background: true to run the block in a real OS Thread, keeping the fiber event loop responsive for other concurrent handlers. Use this for CPU-intensive work.
179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/restate/server_context.rb', line 179 def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) handle = @vm.sys_run(name) @run_coros_to_execute[handle] = if background -> { execute_run_threaded(handle, action, serde, retry_policy) } else -> { execute_run(handle, action, serde, retry_policy) } end DurableFuture.new(self, handle, serde: serde) end |
#run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await — executes the durable side effect and returns the result directly.
Accepts all the same options as run, including background: true.
196 197 198 |
# File 'lib/restate/server_context.rb', line 196 def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) run(name, serde: serde, retry_policy: retry_policy, background: background, &action).await end |
#service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate service and returns a future for its result.
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/restate/server_context.rb', line 203 def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) out_serde = resolve_serde(output_serde, , :output_serde) parameter = in_serde.serialize(arg) with_outbound_middleware(svc_name, handler_name, headers) do |hdrs| call_handle = @vm.sys_call( service: svc_name, handler: handler_name, parameter: parameter, key: key, idempotency_key: idempotency_key, headers: hdrs ) DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, output_serde: out_serde) end end |
#service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate service handler (fire-and-forget).
220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/restate/server_context.rb', line 220 def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) parameter = in_serde.serialize(arg) delay_ms = delay ? (delay * 1000).to_i : nil with_outbound_middleware(svc_name, handler_name, headers) do |hdrs| invocation_id_handle = @vm.sys_send( service: svc_name, handler: handler_name, parameter: parameter, key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs ) SendHandle.new(self, invocation_id_handle) end end |
#set(name, value, serde: JsonSerde) ⇒ Object
Durably sets a state entry. The value is serialized via serde.
108 109 110 |
# File 'lib/restate/server_context.rb', line 108 def set(name, value, serde: JsonSerde) @vm.sys_set_state(name, serde.serialize(value)) end |
#sleep(seconds) ⇒ Object
Returns a durable future that completes after the given duration. The timer survives handler restarts.
137 138 139 140 141 |
# File 'lib/restate/server_context.rb', line 137 def sleep(seconds) millis = (seconds * 1000).to_i handle = @vm.sys_sleep(millis) DurableFuture.new(self, handle) end |
#state_keys ⇒ Object
Returns the list of all state entry names for this virtual object or workflow.
123 124 125 |
# File 'lib/restate/server_context.rb', line 123 def state_keys state_keys_async.await end |
#state_keys_async ⇒ Object
Returns a DurableFuture for the list of all state entry names.
128 129 130 131 |
# File 'lib/restate/server_context.rb', line 128 def state_keys_async handle = @vm.sys_get_state_keys DurableFuture.new(self, handle) end |
#take_completed(handle) ⇒ Object
Take a completed handle’s notification, returning the value. Raises TerminalError if the handle resolved to a failure.
160 161 162 |
# File 'lib/restate/server_context.rb', line 160 def take_completed(handle) must_take_notification(handle) end |
#wait_any(*futures) ⇒ Object
Wait until any of the given futures completes. Returns [completed, remaining].
165 166 167 168 169 |
# File 'lib/restate/server_context.rb', line 165 def wait_any(*futures) handles = futures.map(&:handle) wait_any_handle(handles) futures.partition(&:completed?) end |
#wait_any_handle(handles) ⇒ Object
Wait until any of the given handles completes. Does not take notifications.
149 150 151 |
# File 'lib/restate/server_context.rb', line 149 def wait_any_handle(handles) poll_or_cancel(handles) unless handles.any? { |h| @vm.is_completed(h) } end |
#workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate workflow, keyed by key.
269 270 271 272 273 |
# File 'lib/restate/server_context.rb', line 269 def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) # rubocop:disable Layout/HashAlignment end |
#workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate workflow handler (fire-and-forget).
276 277 278 279 280 |
# File 'lib/restate/server_context.rb', line 276 def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) # rubocop:disable Layout/HashAlignment end |