Class: Restate::Server::Context
- Inherits:
-
Object
- Object
- Restate::Server::Context
- Includes:
- WorkflowContext, WorkflowSharedContext
- Defined in:
- lib/restate/server/context.rb
Overview
The core execution context for a Restate handler invocation. Implements the progress loop and all context API methods (state, run, sleep, call, send).
Concurrency model:
- The handler runs inside a Fiber managed by Falcon/Async.
- `run` blocks spawn child Async tasks.
- When the progress loop needs input, it dequeues from @input_queue, yielding the Fiber.
- The HTTP input reader (a separate Async task) feeds chunks into @input_queue.
- Output chunks are written directly to the streaming response body.
Defined Under Namespace
Modules: BackgroundPool
Constant Summary collapse
- LOGGER =
Logger.new($stdout, progname: 'Restate::Server::Context')
Instance Attribute Summary collapse
-
#invocation ⇒ Object
readonly
Returns the value of attribute invocation.
-
#vm ⇒ Object
readonly
Returns the value of attribute vm.
Instance Method Summary collapse
-
#all(*futures) ⇒ Object
Build a lazy combinator over the given futures and return it as a
CombinedFuture. -
#all_settled(*futures) ⇒ Object
Lazy combinator.
-
#any(*futures) ⇒ Object
Lazy combinator.
-
#awakeable(serde: JsonSerde) ⇒ Object
Creates an awakeable and returns [awakeable_id, DurableFuture].
-
#cancel_invocation(invocation_id) ⇒ Object
Requests cancellation of another invocation by its id.
-
#clear(name) ⇒ Object
Durably removes a single state entry by name.
-
#clear_all ⇒ Object
Durably removes all state entries for this virtual object or workflow.
-
#completed?(handle) ⇒ Boolean
Check if a handle is completed (non-blocking).
-
#enter ⇒ Object
Runs the handler to completion, writing the output (or failure) to the journal.
-
#generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably calls a handler using raw bytes (no serialization).
-
#generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Sends a one-way invocation using raw bytes (no serialization).
-
#get(name, serde: JsonSerde) ⇒ Object
Durably retrieves a state entry by name.
-
#get_async(name, serde: JsonSerde) ⇒ Object
Returns a DurableFuture for a state entry.
-
#initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) ⇒ Context
constructor
A new instance of Context.
-
#key ⇒ Object
Returns the key for this virtual object or workflow invocation.
-
#object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate virtual object, keyed by
key. -
#object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate virtual object handler (fire-and-forget).
-
#on_attempt_finished ⇒ Object
Called by the server when the attempt ends (handler completed, disconnected, or transient error).
-
#peek_promise(name, serde: JsonSerde) ⇒ Object
Peeks at a durable promise value without blocking.
-
#promise(name, serde: JsonSerde) ⇒ Object
Gets a durable promise value, blocking until resolved.
-
#race(*futures) ⇒ Object
Lazy combinator.
-
#reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Rejects an awakeable with a terminal failure.
-
#reject_promise(name, message, code: 500) ⇒ Object
Rejects a durable promise with a terminal failure.
-
#reject_signal(invocation_id, name, message, code: 500) ⇒ Object
Send a terminal failure to a named signal on another invocation.
-
#request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
-
#resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolves an awakeable with a success value.
-
#resolve_handle(handle) ⇒ Object
Block until a previously created handle completes.
-
#resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolves a durable promise with a success value.
-
#resolve_signal(invocation_id, name, payload, serde: JsonSerde) ⇒ Object
Send a success value to a named signal on another invocation.
-
#run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Executes a durable side effect.
-
#run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await — executes the durable side effect and returns the result directly.
-
#service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate service and returns a future for its result.
-
#service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate service handler (fire-and-forget).
-
#set(name, value, serde: JsonSerde) ⇒ Object
Durably sets a state entry.
-
#signal(name, serde: JsonSerde) ⇒ Object
Wait for a named signal addressed to this invocation.
-
#sleep(seconds) ⇒ Object
Returns a durable future that completes after the given duration.
-
#state_keys ⇒ Object
Returns the list of all state entry names for this virtual object or workflow.
-
#state_keys_async ⇒ Object
Returns a DurableFuture for the list of all state entry names.
-
#take_completed(handle) ⇒ Object
Take a completed handle’s notification, returning the value.
-
#wait_any(*futures) ⇒ Object
Wait until any of the given futures completes.
-
#wait_any_handle(handles) ⇒ Object
Wait until any of the given handles completes.
-
#wait_combined(future_tree) ⇒ Object
Drive progress over a combinator tree.
-
#workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate workflow, keyed by
key. -
#workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate workflow handler (fire-and-forget).
Constructor Details
#initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) ⇒ Context
Returns a new instance of Context.
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/restate/server/context.rb', line 27 def initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], outbound_middleware: []) @vm = vm @handler = handler @invocation = invocation @send_output = send_output @input_queue = input_queue @run_coros_to_execute = {} @attempt_finished_event = AttemptFinishedEvent.new @middleware = middleware @outbound_middleware = outbound_middleware end |
Instance Attribute Details
#invocation ⇒ Object (readonly)
Returns the value of attribute invocation.
25 26 27 |
# File 'lib/restate/server/context.rb', line 25 def invocation @invocation end |
#vm ⇒ Object (readonly)
Returns the value of attribute vm.
25 26 27 |
# File 'lib/restate/server/context.rb', line 25 def vm @vm end |
Instance Method Details
#all(*futures) ⇒ Object
Build a lazy combinator over the given futures and return it as a CombinedFuture. Nothing blocks until .await is called, so the combinators compose: Restate.race(Restate.all(a, b), c).await.
Semantics match JS Promise.all — when awaited, returns the values in input order, short-circuiting on the first TerminalError.
178 179 180 181 |
# File 'lib/restate/server/context.rb', line 178 def all(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) CombinedFuture.new(self, :all_succeeded_or_first_failed, futures) end |
#all_settled(*futures) ⇒ Object
Lazy combinator. Awaiting waits for every future to settle and returns an Array of { status: :fulfilled, value: … } or { status: :rejected, reason: TerminalError } entries, in input order. Matches JS Promise.allSettled.
205 206 207 208 |
# File 'lib/restate/server/context.rb', line 205 def all_settled(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) CombinedFuture.new(self, :all_completed, futures) end |
#any(*futures) ⇒ Object
Lazy combinator. Awaiting returns the value of the first successful future; raises only if every future fails terminally. Matches JS Promise.any.
194 195 196 197 198 199 |
# File 'lib/restate/server/context.rb', line 194 def any(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) raise ArgumentError, 'any requires at least one future' if futures.empty? CombinedFuture.new(self, :first_succeeded_or_all_failed, futures) end |
#awakeable(serde: JsonSerde) ⇒ Object
Creates an awakeable and returns [awakeable_id, DurableFuture].
328 329 330 331 |
# File 'lib/restate/server/context.rb', line 328 def awakeable(serde: JsonSerde) id, handle = @vm.sys_awakeable [id, DurableFuture.new(self, handle, serde: serde)] end |
#cancel_invocation(invocation_id) ⇒ Object
Requests cancellation of another invocation by its id.
399 400 401 |
# File 'lib/restate/server/context.rb', line 399 def cancel_invocation(invocation_id) @vm.sys_cancel_invocation(invocation_id) end |
#clear(name) ⇒ Object
Durably removes a single state entry by name.
114 115 116 |
# File 'lib/restate/server/context.rb', line 114 def clear(name) @vm.sys_clear_state(name) end |
#clear_all ⇒ Object
Durably removes all state entries for this virtual object or workflow.
119 120 121 |
# File 'lib/restate/server/context.rb', line 119 def clear_all @vm.sys_clear_all_state end |
#completed?(handle) ⇒ Boolean
Check if a handle is completed (non-blocking).
155 156 157 |
# File 'lib/restate/server/context.rb', line 155 def completed?(handle) @vm.is_completed(handle) end |
#enter ⇒ Object
Runs the handler to completion, writing the output (or failure) to the journal.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/restate/server/context.rb', line 43 def enter Thread.current[:restate_context] = self Thread.current[:restate_service_kind] = @handler.service_tag.kind Thread.current[:restate_handler_kind] = @handler.kind in_buffer = @invocation.input_buffer out_buffer = Restate.invoke_handler(handler: @handler, ctx: self, in_buffer: in_buffer, middleware: @middleware) @vm.sys_write_output_success(out_buffer) @vm.sys_end rescue TerminalError => e failure = Failure.new(code: e.status_code, message: e., metadata: e.) @vm.sys_write_output_failure(failure) @vm.sys_end rescue SuspendedError, InternalError # These are expected internal control flow exceptions; do nothing. rescue DisconnectedError raise rescue StandardError => e # Walk the cause chain for TerminalError or internal exceptions cause = e handled = false while cause if cause.is_a?(TerminalError) f = Failure.new(code: cause.status_code, message: cause., metadata: cause.) @vm.sys_write_output_failure(f) @vm.sys_end handled = true break elsif cause.is_a?(SuspendedError) || cause.is_a?(InternalError) handled = true break end cause = cause.cause end unless handled @vm.notify_error(e.inspect, e.backtrace&.join("\n")) raise end ensure @run_coros_to_execute.clear Thread.current[:restate_context] = nil Thread.current[:restate_service_kind] = nil Thread.current[:restate_handler_kind] = nil end |
#generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably calls a handler using raw bytes (no serialization). Useful for proxying.
406 407 408 409 410 411 412 413 414 415 |
# File 'lib/restate/server/context.rb', line 406 def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) with_outbound_middleware(service, handler, headers) do |hdrs| call_handle = @vm.sys_call( service: service, handler: handler, parameter: arg, key: key, idempotency_key: idempotency_key, headers: hdrs ) DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, output_serde: nil) end end |
#generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Sends a one-way invocation using raw bytes (no serialization). Useful for proxying.
418 419 420 421 422 423 424 425 426 427 |
# File 'lib/restate/server/context.rb', line 418 def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) delay_ms = delay ? (delay * 1000).to_i : nil with_outbound_middleware(service, handler, headers) do |hdrs| invocation_id_handle = @vm.sys_send( service: service, handler: handler, parameter: arg, key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs ) SendHandle.new(self, invocation_id_handle) end end |
#get(name, serde: JsonSerde) ⇒ Object
Durably retrieves a state entry by name. Returns nil if unset.
98 99 100 |
# File 'lib/restate/server/context.rb', line 98 def get(name, serde: JsonSerde) get_async(name, serde: serde).await end |
#get_async(name, serde: JsonSerde) ⇒ Object
Returns a DurableFuture for a state entry. Resolves to nil if unset.
103 104 105 106 |
# File 'lib/restate/server/context.rb', line 103 def get_async(name, serde: JsonSerde) handle = @vm.sys_get_state(name) DurableFuture.new(self, handle, serde: serde) end |
#key ⇒ Object
Returns the key for this virtual object or workflow invocation.
442 443 444 |
# File 'lib/restate/server/context.rb', line 442 def key @invocation.key end |
#object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate virtual object, keyed by key.
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/restate/server/context.rb', line 279 def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) out_serde = resolve_serde(output_serde, , :output_serde) parameter = in_serde.serialize(arg) with_outbound_middleware(svc_name, handler_name, headers, handler_meta: ) do |hdrs| call_handle = @vm.sys_call( service: svc_name, handler: handler_name, parameter: parameter, key: key, idempotency_key: idempotency_key, headers: hdrs ) DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, output_serde: out_serde) end end |
#object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate virtual object handler (fire-and-forget).
296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/restate/server/context.rb', line 296 def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) parameter = in_serde.serialize(arg) delay_ms = delay ? (delay * 1000).to_i : nil with_outbound_middleware(svc_name, handler_name, headers, handler_meta: ) do |hdrs| invocation_id_handle = @vm.sys_send( service: svc_name, handler: handler_name, parameter: parameter, key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs ) SendHandle.new(self, invocation_id_handle) end end |
#on_attempt_finished ⇒ Object
Called by the server when the attempt ends (handler completed, disconnected, or transient error). Signals the attempt_finished_event so that user code and background pool jobs can clean up.
91 92 93 |
# File 'lib/restate/server/context.rb', line 91 def on_attempt_finished @attempt_finished_event.set! end |
#peek_promise(name, serde: JsonSerde) ⇒ Object
Peeks at a durable promise value without blocking. Returns nil if not yet resolved.
374 375 376 377 378 379 |
# File 'lib/restate/server/context.rb', line 374 def peek_promise(name, serde: JsonSerde) handle = @vm.sys_peek_promise(name) poll_and_take(handle) do |raw| raw.nil? ? nil : serde.deserialize(raw) end end |
#promise(name, serde: JsonSerde) ⇒ Object
Gets a durable promise value, blocking until resolved.
366 367 368 369 370 371 |
# File 'lib/restate/server/context.rb', line 366 def promise(name, serde: JsonSerde) handle = @vm.sys_get_promise(name) poll_and_take(handle) do |raw| raw.nil? ? nil : serde.deserialize(raw) end end |
#race(*futures) ⇒ Object
Lazy combinator. Awaiting returns the value of the first future to settle, or raises if it settled with a TerminalError. Matches JS Promise.race.
185 186 187 188 189 190 |
# File 'lib/restate/server/context.rb', line 185 def race(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) raise ArgumentError, 'race requires at least one future' if futures.empty? CombinedFuture.new(self, :first_completed, futures) end |
#reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Rejects an awakeable with a terminal failure.
339 340 341 342 |
# File 'lib/restate/server/context.rb', line 339 def reject_awakeable(awakeable_id, , code: 500) failure = Failure.new(code: code, message: ) @vm.sys_complete_awakeable_failure(awakeable_id, failure) end |
#reject_promise(name, message, code: 500) ⇒ Object
Rejects a durable promise with a terminal failure.
389 390 391 392 393 394 |
# File 'lib/restate/server/context.rb', line 389 def reject_promise(name, , code: 500) failure = Failure.new(code: code, message: ) handle = @vm.sys_complete_promise_failure(name, failure) poll_and_take(handle) nil end |
#reject_signal(invocation_id, name, message, code: 500) ⇒ Object
Send a terminal failure to a named signal on another invocation.
358 359 360 361 |
# File 'lib/restate/server/context.rb', line 358 def reject_signal(invocation_id, name, , code: 500) failure = Failure.new(code: code, message: ) @vm.sys_complete_signal_failure(invocation_id, name, failure) end |
#request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
432 433 434 435 436 437 438 439 |
# File 'lib/restate/server/context.rb', line 432 def request @request ||= Request.new( id: @invocation.invocation_id, headers: @invocation.headers.to_h, body: @invocation.input_buffer, attempt_finished_event: @attempt_finished_event ) end |
#resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolves an awakeable with a success value.
334 335 336 |
# File 'lib/restate/server/context.rb', line 334 def resolve_awakeable(awakeable_id, payload, serde: JsonSerde) @vm.sys_complete_awakeable_success(awakeable_id, serde.serialize(payload)) end |
#resolve_handle(handle) ⇒ Object
Block until a previously created handle completes. Returns the value.
145 146 147 |
# File 'lib/restate/server/context.rb', line 145 def resolve_handle(handle) poll_and_take(handle) end |
#resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolves a durable promise with a success value.
382 383 384 385 386 |
# File 'lib/restate/server/context.rb', line 382 def resolve_promise(name, payload, serde: JsonSerde) handle = @vm.sys_complete_promise_success(name, serde.serialize(payload)) poll_and_take(handle) nil end |
#resolve_signal(invocation_id, name, payload, serde: JsonSerde) ⇒ Object
Send a success value to a named signal on another invocation.
353 354 355 |
# File 'lib/restate/server/context.rb', line 353 def resolve_signal(invocation_id, name, payload, serde: JsonSerde) @vm.sys_complete_signal_success(invocation_id, name, serde.serialize(payload)) end |
#run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Executes a durable side effect. The block runs at most once; its result is journaled and replayed on retries. Returns a DurableFuture for the result.
Pass background: true to run the block in a real OS Thread, keeping the fiber event loop responsive for other concurrent handlers. Use this for CPU-intensive work.
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/restate/server/context.rb', line 218 def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) run = @vm.sys_run(name) handle = run.handle # Schedule the run closure only if the run wasn't replayed. unless run.replayed @run_coros_to_execute[handle] = if background -> { execute_run_threaded(handle, action, serde, retry_policy) } else -> { execute_run(handle, action, serde, retry_policy) } end end DurableFuture.new(self, handle, serde: serde) end |
#run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await — executes the durable side effect and returns the result directly.
Accepts all the same options as run, including background: true.
239 240 241 |
# File 'lib/restate/server/context.rb', line 239 def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) run(name, serde: serde, retry_policy: retry_policy, background: background, &action).await end |
#service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate service and returns a future for its result.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/restate/server/context.rb', line 246 def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) out_serde = resolve_serde(output_serde, , :output_serde) parameter = in_serde.serialize(arg) with_outbound_middleware(svc_name, handler_name, headers, handler_meta: ) do |hdrs| call_handle = @vm.sys_call( service: svc_name, handler: handler_name, parameter: parameter, key: key, idempotency_key: idempotency_key, headers: hdrs ) DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, output_serde: out_serde) end end |
#service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate service handler (fire-and-forget).
263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/restate/server/context.rb', line 263 def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) svc_name, handler_name, = resolve_call_target(service, handler) in_serde = resolve_serde(input_serde, , :input_serde) parameter = in_serde.serialize(arg) delay_ms = delay ? (delay * 1000).to_i : nil with_outbound_middleware(svc_name, handler_name, headers, handler_meta: ) do |hdrs| invocation_id_handle = @vm.sys_send( service: svc_name, handler: handler_name, parameter: parameter, key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs ) SendHandle.new(self, invocation_id_handle) end end |
#set(name, value, serde: JsonSerde) ⇒ Object
Durably sets a state entry. The value is serialized via serde.
109 110 111 |
# File 'lib/restate/server/context.rb', line 109 def set(name, value, serde: JsonSerde) @vm.sys_set_state(name, serde.serialize(value)) end |
#signal(name, serde: JsonSerde) ⇒ Object
Wait for a named signal addressed to this invocation. Returns a DurableFuture.
347 348 349 350 |
# File 'lib/restate/server/context.rb', line 347 def signal(name, serde: JsonSerde) handle = @vm.sys_signal(name) DurableFuture.new(self, handle, serde: serde) end |
#sleep(seconds) ⇒ Object
Returns a durable future that completes after the given duration. The timer survives handler restarts.
138 139 140 141 142 |
# File 'lib/restate/server/context.rb', line 138 def sleep(seconds) millis = (seconds * 1000).to_i handle = @vm.sys_sleep(millis) DurableFuture.new(self, handle) end |
#state_keys ⇒ Object
Returns the list of all state entry names for this virtual object or workflow.
124 125 126 |
# File 'lib/restate/server/context.rb', line 124 def state_keys state_keys_async.await end |
#state_keys_async ⇒ Object
Returns a DurableFuture for the list of all state entry names.
129 130 131 132 |
# File 'lib/restate/server/context.rb', line 129 def state_keys_async handle = @vm.sys_get_state_keys DurableFuture.new(self, handle) end |
#take_completed(handle) ⇒ Object
Take a completed handle’s notification, returning the value. Raises TerminalError if the handle resolved to a failure.
161 162 163 |
# File 'lib/restate/server/context.rb', line 161 def take_completed(handle) must_take_notification(handle) end |
#wait_any(*futures) ⇒ Object
Wait until any of the given futures completes. Returns [completed, remaining].
166 167 168 169 170 |
# File 'lib/restate/server/context.rb', line 166 def wait_any(*futures) handles = futures.map(&:handle) wait_any_handle(handles) futures.partition(&:completed?) end |
#wait_any_handle(handles) ⇒ Object
Wait until any of the given handles completes. Does not take notifications.
150 151 152 |
# File 'lib/restate/server/context.rb', line 150 def wait_any_handle(handles) poll_or_cancel(handles) unless handles.any? { |h| @vm.is_completed(h) } end |
#wait_combined(future_tree) ⇒ Object
Drive progress over a combinator tree. Returns when the combinator logically completes (the shared-core decides based on the tree shape). future_tree follows the encoding documented in lib/restate/vm.rb#do_await. Public so CombinedFuture#await can drive it via @ctx.wait_combined.
450 451 452 |
# File 'lib/restate/server/context.rb', line 450 def wait_combined(future_tree) progress_loop { @vm.do_await(future_tree) } end |
#workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably calls a handler on a Restate workflow, keyed by key.
312 313 314 315 316 |
# File 'lib/restate/server/context.rb', line 312 def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) # rubocop:disable Layout/HashAlignment end |
#workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Sends a one-way invocation to a Restate workflow handler (fire-and-forget).
319 320 321 322 323 |
# File 'lib/restate/server/context.rb', line 319 def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) # rubocop:disable Layout/HashAlignment end |