Module: Restate
- Defined in:
- lib/restate.rb,
lib/restate/vm.rb,
lib/restate/serde.rb,
lib/restate/client.rb,
lib/restate/config.rb,
lib/restate/errors.rb,
lib/restate/context.rb,
lib/restate/handler.rb,
lib/restate/railtie.rb,
lib/restate/service.rb,
lib/restate/testing.rb,
lib/restate/version.rb,
lib/restate/endpoint.rb,
lib/restate/workflow.rb,
lib/restate/discovery.rb,
lib/restate/service_dsl.rb,
lib/restate/introspection.rb,
lib/restate/service_proxy.rb,
lib/restate/durable_future.rb,
lib/restate/server/context.rb,
lib/restate/server/handler.rb,
lib/restate/virtual_object.rb,
lib/restate/middleware/deadlock_detection.rb
Overview
typed: false frozen_string_literal: true
Defined Under Namespace
Modules: BytesSerde, Context, Discovery, JsonSerde, Middleware, ObjectContext, ObjectSharedContext, Serde, Server, ServiceDSL, Sys, Testing, WorkflowContext, WorkflowSharedContext Classes: AttemptFinishedEvent, Client, CombinedFuture, Config, DisconnectedError, DoProgressAnyCompleted, DoProgressCancelSignalReceived, DoProgressExecuteRun, DoProgressReadFromInput, DoWaitPendingRun, DryStructSerde, DurableCallFuture, DurableFuture, Endpoint, Failure, Handler, HandlerIO, InternalError, Invocation, NotReady, Railtie, Request, RunRetryConfig, RunRetryPolicy, SendHandle, Service, ServiceTag, Suspended, SuspendedError, TerminalError, TimeoutError, TypeSerde, VMWrapper, VirtualObject, Workflow
Constant Summary collapse
- NOT_READY =
NotReady.new.freeze
- SUSPENDED =
Suspended.new.freeze
- CANCEL_HANDLE =
Internal::CANCEL_NOTIFICATION_HANDLE
- DO_PROGRESS_ANY_COMPLETED =
DoProgressAnyCompleted.new.freeze
- DO_PROGRESS_READ_FROM_INPUT =
DoProgressReadFromInput.new.freeze
- DO_PROGRESS_CANCEL_SIGNAL_RECEIVED =
DoProgressCancelSignalReceived.new.freeze
- DO_WAIT_PENDING_RUN =
DoWaitPendingRun.new.freeze
- NOT_SET =
Sentinel value to distinguish “caller didn’t pass serde” from an explicit value.
Object.new.freeze
- PRIMITIVE_SCHEMAS =
Maps Ruby primitive types to JSON Schema snippets for discovery.
{ String => { 'type' => 'string' }.freeze, Integer => { 'type' => 'integer' }.freeze, Float => { 'type' => 'number' }.freeze, TrueClass => { 'type' => 'boolean' }.freeze, FalseClass => { 'type' => 'boolean' }.freeze, Array => { 'type' => 'array' }.freeze, Hash => { 'type' => 'object' }.freeze, NilClass => { 'type' => 'null' }.freeze }.freeze
- VERSION =
'1.0.0'
Class Method Summary collapse
-
.all(*futures) ⇒ Object
Wait for every future to complete and return their values in input order.
-
.all_settled(*futures) ⇒ Object
Wait for every future to settle.
-
.any(*futures) ⇒ Object
Wait for the first successful future and return its value.
-
.arel_to_sql(arel) ⇒ String
Convert an Arel AST to a SQL string without requiring an AR connection.
-
.awakeable(serde: JsonSerde) ⇒ Object
Create an awakeable for external callbacks.
-
.cancel_invocation(invocation_id) ⇒ Object
Request cancellation of another invocation.
-
.clear(name) ⇒ Object
Durably remove a single state entry.
-
.clear_all ⇒ Object
Durably remove all state entries.
-
.client ⇒ Object
Returns a pre-configured Client using the global
config. -
.config ⇒ Object
Returns the global configuration.
-
.configure {|config| ... } ⇒ Object
Configure the SDK globally.
-
.endpoint(*services, protocol: nil, identity_keys: nil) ⇒ Endpoint
Create an endpoint, optionally binding services.
-
.generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably call a handler using raw bytes (no serialization).
-
.generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Fire-and-forget send using raw bytes (no serialization).
-
.get(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry.
-
.get_async(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry, returning a DurableFuture instead of blocking.
-
.invoke_handler(handler:, ctx:, in_buffer:, middleware: []) ⇒ Object
Invoke a handler with the context and raw input bytes.
- .invoke_handler_direct(handler, in_buffer) ⇒ Object
- .invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) ⇒ Object
-
.key ⇒ Object
Returns the key for this virtual object or workflow invocation.
-
.object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate virtual object.
-
.object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate virtual object handler.
-
.peek_promise(name, serde: JsonSerde) ⇒ Object
Peek at a durable promise without blocking.
-
.promise(name, serde: JsonSerde) ⇒ Object
Get a durable promise value, blocking until resolved.
-
.query(arel_or_sql) ⇒ Array<Hash>
Execute an Arel query or raw SQL string against the Restate admin introspection API.
-
.race(*futures) ⇒ Object
Wait for the first future to settle and return its value.
-
.reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Reject an awakeable with a terminal failure.
-
.reject_promise(name, message, code: 500) ⇒ Object
Reject a durable promise with a terminal failure.
-
.reject_signal(invocation_id, name, message, code: 500) ⇒ Object
Send a terminal failure to a named signal on another invocation.
-
.request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
-
.resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolve an awakeable with a success value.
-
.resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolve a durable promise with a value.
-
.resolve_signal(invocation_id, name, payload, serde: JsonSerde) ⇒ Object
Send a success value to a named signal on another invocation.
-
.run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Execute a durable side effect.
-
.run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await.
-
.service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate service.
-
.service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate service handler.
-
.set(name, value, serde: JsonSerde) ⇒ Object
Durably set a state entry.
-
.signal(name, serde: JsonSerde) ⇒ Object
Wait for a named signal addressed to this invocation.
-
.sleep(seconds) ⇒ Object
Durable timer that survives handler restarts.
-
.state_keys ⇒ Object
List all state entry names.
-
.state_keys_async ⇒ Object
List all state entry names, returning a DurableFuture.
-
.wait_any(*futures) ⇒ Object
Wait until any of the given futures completes.
-
.workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate workflow.
-
.workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate workflow handler.
Class Method Details
.all(*futures) ⇒ Object
Wait for every future to complete and return their values in input order. Short-circuits on the first TerminalError. Accepts either splat futures or a single Array. Semantics match JS Promise.all.
299 300 301 |
# File 'lib/restate.rb', line 299 def all(*futures) fetch_context!.all(*futures) end |
.all_settled(*futures) ⇒ Object
Wait for every future to settle. Returns an Array of outcome descriptors (+:fulfilled, value: …+ or {status: :rejected, reason: …}), in input order. Semantics match JS Promise.allSettled.
320 321 322 |
# File 'lib/restate.rb', line 320 def all_settled(*futures) fetch_context!.all_settled(*futures) end |
.any(*futures) ⇒ Object
Wait for the first successful future and return its value. Raises only if every future failed terminally. Accepts splat or single Array. Semantics match JS Promise.any.
313 314 315 |
# File 'lib/restate.rb', line 313 def any(*futures) fetch_context!.any(*futures) end |
.arel_to_sql(arel) ⇒ String
Convert an Arel AST to a SQL string without requiring an AR connection. Uses a standalone visitor that emits ANSI SQL (DataFusion-compatible).
105 106 107 108 |
# File 'lib/restate/introspection.rb', line 105 def arel_to_sql(arel) collector = Arel::Collectors::SQLString.new DataFusionVisitor.new.accept(arel.ast, collector).value end |
.awakeable(serde: JsonSerde) ⇒ Object
Create an awakeable for external callbacks. Returns [awakeable_id, DurableFuture].
236 237 238 |
# File 'lib/restate.rb', line 236 def awakeable(serde: JsonSerde) fetch_context!.awakeable(serde: serde) end |
.cancel_invocation(invocation_id) ⇒ Object
Request cancellation of another invocation.
339 340 341 |
# File 'lib/restate.rb', line 339 def cancel_invocation(invocation_id) fetch_context!.cancel_invocation(invocation_id) end |
.clear(name) ⇒ Object
Durably remove a single state entry.
151 152 153 |
# File 'lib/restate.rb', line 151 def clear(name) fetch_context!.clear(name) end |
.clear_all ⇒ Object
Durably remove all state entries.
156 157 158 |
# File 'lib/restate.rb', line 156 def clear_all fetch_context!.clear_all end |
.client ⇒ Object
Returns a pre-configured Client using the global config. Creates a new Client on each call (stateless — safe to discard).
76 77 78 79 80 81 |
# File 'lib/restate.rb', line 76 def client cfg = config Client.new(ingress_url: cfg.ingress_url, admin_url: cfg.admin_url, ingress_headers: resolve_headers(cfg.ingress_headers), admin_headers: resolve_headers(cfg.admin_headers)) end |
.config ⇒ Object
Returns the global configuration. Creates a default one on first access.
64 65 66 67 |
# File 'lib/restate.rb', line 64 def config @config = nil unless defined?(@config) @config ||= Config.new end |
.configure {|config| ... } ⇒ Object
Configure the SDK globally. Settings are used by Restate.client.
59 60 61 |
# File 'lib/restate.rb', line 59 def configure(&) yield config end |
.endpoint(*services, protocol: nil, identity_keys: nil) ⇒ Endpoint
Create an endpoint, optionally binding services. Returns an Endpoint that can be further configured before calling .app.
41 42 43 44 45 46 47 48 |
# File 'lib/restate.rb', line 41 def endpoint(*services, protocol: nil, identity_keys: nil) ep = Endpoint.new ep.streaming_protocol if protocol == 'bidi' ep.request_response_protocol if protocol == 'request_response' services.each { |s| ep.bind(s) } identity_keys&.each { |k| ep.identity_key(k) } ep end |
.generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably call a handler using raw bytes (no serialization).
222 223 224 225 |
# File 'lib/restate.rb', line 222 def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) fetch_context!.generic_call(service, handler, arg, key: key, idempotency_key: idempotency_key, headers: headers) end |
.generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Fire-and-forget send using raw bytes (no serialization).
228 229 230 231 |
# File 'lib/restate.rb', line 228 def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) fetch_context!.generic_send(service, handler, arg, key: key, delay: delay, idempotency_key: idempotency_key, headers: headers) end |
.get(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry. Returns nil if unset.
136 137 138 |
# File 'lib/restate.rb', line 136 def get(name, serde: JsonSerde) fetch_context!.get(name, serde: serde) end |
.get_async(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry, returning a DurableFuture instead of blocking.
141 142 143 |
# File 'lib/restate.rb', line 141 def get_async(name, serde: JsonSerde) fetch_context!.get_async(name, serde: serde) end |
.invoke_handler(handler:, ctx:, in_buffer:, middleware: []) ⇒ Object
Invoke a handler with the context and raw input bytes. The context is passed as the first argument to every handler. Middleware (if any) wraps the handler call. Returns raw output bytes.
36 37 38 39 40 41 42 43 |
# File 'lib/restate/handler.rb', line 36 def invoke_handler(handler:, ctx:, in_buffer:, middleware: []) out_arg = if middleware.empty? invoke_handler_direct(handler, in_buffer) else invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) end handler.handler_io.output_serde.serialize(out_arg) end |
.invoke_handler_direct(handler, in_buffer) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/restate/handler.rb', line 45 def invoke_handler_direct(handler, in_buffer) if handler.arity == 1 begin in_arg = handler.handler_io.input_serde.deserialize(in_buffer) rescue StandardError => e Kernel.raise TerminalError, "Unable to parse input argument: #{e.}" end handler.callable.call(in_arg) else handler.callable.call end end |
.invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/restate/handler.rb', line 58 def invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) call_handler = Kernel.proc { invoke_handler_direct(handler, in_buffer) } chain = middleware.reverse.reduce(call_handler) do |nxt, mw| Kernel.proc { mw.call(handler, ctx, &nxt) } end chain.call end |
.key ⇒ Object
Returns the key for this virtual object or workflow invocation.
332 333 334 |
# File 'lib/restate.rb', line 332 def key fetch_context!.key end |
.object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate virtual object.
189 190 191 192 193 194 |
# File 'lib/restate.rb', line 189 def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ctx = fetch_context! ctx.object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) end |
.object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate virtual object handler.
197 198 199 200 201 202 |
# File 'lib/restate.rb', line 197 def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ctx = fetch_context! ctx.object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) end |
.peek_promise(name, serde: JsonSerde) ⇒ Object
Peek at a durable promise without blocking. Returns nil if not yet resolved.
275 276 277 |
# File 'lib/restate.rb', line 275 def peek_promise(name, serde: JsonSerde) fetch_context!.peek_promise(name, serde: serde) end |
.promise(name, serde: JsonSerde) ⇒ Object
Get a durable promise value, blocking until resolved.
270 271 272 |
# File 'lib/restate.rb', line 270 def promise(name, serde: JsonSerde) fetch_context!.promise(name, serde: serde) end |
.query(arel_or_sql) ⇒ Array<Hash>
Execute an Arel query or raw SQL string against the Restate admin introspection API. Returns an array of row hashes.
122 123 124 125 |
# File 'lib/restate/introspection.rb', line 122 def query(arel_or_sql) sql = arel_or_sql.respond_to?(:ast) ? arel_to_sql(arel_or_sql) : arel_or_sql.to_s client.execute_query(sql) end |
.race(*futures) ⇒ Object
Wait for the first future to settle and return its value. Raises if the winning future failed. Accepts either splat futures or a single Array. Semantics match JS Promise.race.
306 307 308 |
# File 'lib/restate.rb', line 306 def race(*futures) fetch_context!.race(*futures) end |
.reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Reject an awakeable with a terminal failure.
246 247 248 |
# File 'lib/restate.rb', line 246 def reject_awakeable(awakeable_id, , code: 500) fetch_context!.reject_awakeable(awakeable_id, , code: code) end |
.reject_promise(name, message, code: 500) ⇒ Object
Reject a durable promise with a terminal failure.
285 286 287 |
# File 'lib/restate.rb', line 285 def reject_promise(name, , code: 500) fetch_context!.reject_promise(name, , code: code) end |
.reject_signal(invocation_id, name, message, code: 500) ⇒ Object
Send a terminal failure to a named signal on another invocation.
263 264 265 |
# File 'lib/restate.rb', line 263 def reject_signal(invocation_id, name, , code: 500) fetch_context!.reject_signal(invocation_id, name, , code: code) end |
.request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
327 328 329 |
# File 'lib/restate.rb', line 327 def request fetch_context!.request end |
.resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolve an awakeable with a success value.
241 242 243 |
# File 'lib/restate.rb', line 241 def resolve_awakeable(awakeable_id, payload, serde: JsonSerde) fetch_context!.resolve_awakeable(awakeable_id, payload, serde: serde) end |
.resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolve a durable promise with a value.
280 281 282 |
# File 'lib/restate.rb', line 280 def resolve_promise(name, payload, serde: JsonSerde) fetch_context!.resolve_promise(name, payload, serde: serde) end |
.resolve_signal(invocation_id, name, payload, serde: JsonSerde) ⇒ Object
Send a success value to a named signal on another invocation.
258 259 260 |
# File 'lib/restate.rb', line 258 def resolve_signal(invocation_id, name, payload, serde: JsonSerde) fetch_context!.resolve_signal(invocation_id, name, payload, serde: serde) end |
.run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Execute a durable side effect. The block runs at most once; the result is journaled and replayed on retries. Returns a DurableFuture.
119 120 121 |
# File 'lib/restate.rb', line 119 def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) fetch_context!.run(name, serde: serde, retry_policy: retry_policy, background: background, &action) end |
.run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await. Returns the result directly.
124 125 126 |
# File 'lib/restate.rb', line 124 def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) fetch_context!.run_sync(name, serde: serde, retry_policy: retry_policy, background: background, &action) end |
.service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate service.
173 174 175 176 177 178 |
# File 'lib/restate.rb', line 173 def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ctx = fetch_context! ctx.service_call(service, handler, arg, key: key, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) end |
.service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate service handler.
181 182 183 184 185 186 |
# File 'lib/restate.rb', line 181 def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ctx = fetch_context! ctx.service_send(service, handler, arg, key: key, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) end |
.set(name, value, serde: JsonSerde) ⇒ Object
Durably set a state entry.
146 147 148 |
# File 'lib/restate.rb', line 146 def set(name, value, serde: JsonSerde) fetch_context!.set(name, value, serde: serde) end |
.signal(name, serde: JsonSerde) ⇒ Object
Wait for a named signal addressed to this invocation. Returns a DurableFuture.
253 254 255 |
# File 'lib/restate.rb', line 253 def signal(name, serde: JsonSerde) fetch_context!.signal(name, serde: serde) end |
.sleep(seconds) ⇒ Object
Durable timer that survives handler restarts.
129 130 131 |
# File 'lib/restate.rb', line 129 def sleep(seconds) fetch_context!.sleep(seconds) end |
.state_keys ⇒ Object
List all state entry names.
161 162 163 |
# File 'lib/restate.rb', line 161 def state_keys fetch_context!.state_keys end |
.state_keys_async ⇒ Object
List all state entry names, returning a DurableFuture.
166 167 168 |
# File 'lib/restate.rb', line 166 def state_keys_async fetch_context!.state_keys_async end |
.wait_any(*futures) ⇒ Object
Wait until any of the given futures completes. Returns [completed, remaining].
292 293 294 |
# File 'lib/restate.rb', line 292 def wait_any(*futures) fetch_context!.wait_any(*futures) end |
.workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate workflow.
205 206 207 208 209 210 211 |
# File 'lib/restate.rb', line 205 def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ctx = fetch_context! ctx.workflow_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) end |
.workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate workflow handler.
214 215 216 217 218 219 |
# File 'lib/restate.rb', line 214 def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ctx = fetch_context! ctx.workflow_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) end |