Module: Restate
- Defined in:
- lib/restate.rb,
lib/restate/vm.rb,
lib/restate/serde.rb,
lib/restate/client.rb,
lib/restate/config.rb,
lib/restate/errors.rb,
lib/restate/server.rb,
lib/restate/context.rb,
lib/restate/handler.rb,
lib/restate/railtie.rb,
lib/restate/service.rb,
lib/restate/testing.rb,
lib/restate/version.rb,
lib/restate/endpoint.rb,
lib/restate/workflow.rb,
lib/restate/discovery.rb,
lib/restate/service_dsl.rb,
lib/restate/introspection.rb,
lib/restate/service_proxy.rb,
lib/restate/durable_future.rb,
lib/restate/server_context.rb,
lib/restate/virtual_object.rb
Overview
typed: false frozen_string_literal: true
Defined Under Namespace
Modules: BytesSerde, Context, Discovery, JsonSerde, ObjectContext, ObjectSharedContext, Serde, ServiceDSL, Sys, Testing, WorkflowContext, WorkflowSharedContext Classes: AttemptFinishedEvent, Client, Config, DisconnectedError, DoProgressAnyCompleted, DoProgressCancelSignalReceived, DoProgressExecuteRun, DoProgressReadFromInput, DoWaitPendingRun, DryStructSerde, DurableCallFuture, DurableFuture, Endpoint, Failure, Handler, HandlerIO, InternalError, Invocation, NotReady, Railtie, Request, RunRetryConfig, RunRetryPolicy, SendHandle, Server, ServerContext, Service, ServiceTag, Suspended, SuspendedError, TerminalError, TypeSerde, VMWrapper, VirtualObject, Workflow
Constant Summary collapse
- NOT_READY =
NotReady.new.freeze
- SUSPENDED =
Suspended.new.freeze
- CANCEL_HANDLE =
Internal::CANCEL_NOTIFICATION_HANDLE
- DO_PROGRESS_ANY_COMPLETED =
DoProgressAnyCompleted.new.freeze
- DO_PROGRESS_READ_FROM_INPUT =
DoProgressReadFromInput.new.freeze
- DO_PROGRESS_CANCEL_SIGNAL_RECEIVED =
DoProgressCancelSignalReceived.new.freeze
- DO_WAIT_PENDING_RUN =
DoWaitPendingRun.new.freeze
- NOT_SET =
Sentinel value to distinguish “caller didn’t pass serde” from an explicit value.
Object.new.freeze
- PRIMITIVE_SCHEMAS =
Maps Ruby primitive types to JSON Schema snippets for discovery.
{ String => { 'type' => 'string' }.freeze, Integer => { 'type' => 'integer' }.freeze, Float => { 'type' => 'number' }.freeze, TrueClass => { 'type' => 'boolean' }.freeze, FalseClass => { 'type' => 'boolean' }.freeze, Array => { 'type' => 'array' }.freeze, Hash => { 'type' => 'object' }.freeze, NilClass => { 'type' => 'null' }.freeze }.freeze
- VERSION =
'0.10.0'
Class Method Summary collapse
-
.arel_to_sql(arel) ⇒ String
Convert an Arel AST to a SQL string without requiring an AR connection.
-
.awakeable(serde: JsonSerde) ⇒ Object
Create an awakeable for external callbacks.
-
.cancel_invocation(invocation_id) ⇒ Object
Request cancellation of another invocation.
-
.clear(name) ⇒ Object
Durably remove a single state entry.
-
.clear_all ⇒ Object
Durably remove all state entries.
-
.client ⇒ Object
Returns a pre-configured Client using the global
config. -
.config ⇒ Object
Returns the global configuration.
-
.configure {|config| ... } ⇒ Object
Configure the SDK globally.
-
.endpoint(*services, protocol: nil, identity_keys: nil) ⇒ Endpoint
Create an endpoint, optionally binding services.
-
.generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably call a handler using raw bytes (no serialization).
-
.generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Fire-and-forget send using raw bytes (no serialization).
-
.get(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry.
-
.get_async(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry, returning a DurableFuture instead of blocking.
-
.invoke_handler(handler:, ctx:, in_buffer:, middleware: []) ⇒ Object
Invoke a handler with the context and raw input bytes.
- .invoke_handler_direct(handler, in_buffer) ⇒ Object
- .invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) ⇒ Object
-
.key ⇒ Object
Returns the key for this virtual object or workflow invocation.
-
.object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate virtual object.
-
.object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate virtual object handler.
-
.peek_promise(name, serde: JsonSerde) ⇒ Object
Peek at a durable promise without blocking.
-
.promise(name, serde: JsonSerde) ⇒ Object
Get a durable promise value, blocking until resolved.
-
.query(arel_or_sql) ⇒ Array<Hash>
Execute an Arel query or raw SQL string against the Restate admin introspection API.
-
.reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Reject an awakeable with a terminal failure.
-
.reject_promise(name, message, code: 500) ⇒ Object
Reject a durable promise with a terminal failure.
-
.request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
-
.resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolve an awakeable with a success value.
-
.resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolve a durable promise with a value.
-
.run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Execute a durable side effect.
-
.run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await.
-
.service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate service.
-
.service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate service handler.
-
.set(name, value, serde: JsonSerde) ⇒ Object
Durably set a state entry.
-
.sleep(seconds) ⇒ Object
Durable timer that survives handler restarts.
-
.state_keys ⇒ Object
List all state entry names.
-
.state_keys_async ⇒ Object
List all state entry names, returning a DurableFuture.
-
.wait_any(*futures) ⇒ Object
Wait until any of the given futures completes.
-
.workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate workflow.
-
.workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate workflow handler.
Class Method Details
.arel_to_sql(arel) ⇒ String
Convert an Arel AST to a SQL string without requiring an AR connection. Uses a standalone visitor that emits ANSI SQL (DataFusion-compatible).
105 106 107 108 |
# File 'lib/restate/introspection.rb', line 105 def arel_to_sql(arel) collector = Arel::Collectors::SQLString.new DataFusionVisitor.new.accept(arel.ast, collector).value end |
.awakeable(serde: JsonSerde) ⇒ Object
Create an awakeable for external callbacks. Returns [awakeable_id, DurableFuture].
230 231 232 |
# File 'lib/restate.rb', line 230 def awakeable(serde: JsonSerde) fetch_context!.awakeable(serde: serde) end |
.cancel_invocation(invocation_id) ⇒ Object
Request cancellation of another invocation.
288 289 290 |
# File 'lib/restate.rb', line 288 def cancel_invocation(invocation_id) fetch_context!.cancel_invocation(invocation_id) end |
.clear(name) ⇒ Object
Durably remove a single state entry.
145 146 147 |
# File 'lib/restate.rb', line 145 def clear(name) fetch_context!.clear(name) end |
.clear_all ⇒ Object
Durably remove all state entries.
150 151 152 |
# File 'lib/restate.rb', line 150 def clear_all fetch_context!.clear_all end |
.client ⇒ Object
Returns a pre-configured Client using the global config. Creates a new Client on each call (stateless — safe to discard).
76 77 78 79 80 |
# File 'lib/restate.rb', line 76 def client cfg = config Client.new(ingress_url: cfg.ingress_url, admin_url: cfg.admin_url, ingress_headers: cfg.ingress_headers, admin_headers: cfg.admin_headers) end |
.config ⇒ Object
Returns the global configuration. Creates a default one on first access.
64 65 66 67 |
# File 'lib/restate.rb', line 64 def config @config = nil unless defined?(@config) @config ||= Config.new end |
.configure {|config| ... } ⇒ Object
Configure the SDK globally. Settings are used by Restate.client.
59 60 61 |
# File 'lib/restate.rb', line 59 def configure(&) yield config end |
.endpoint(*services, protocol: nil, identity_keys: nil) ⇒ Endpoint
Create an endpoint, optionally binding services. Returns an Endpoint that can be further configured before calling .app.
41 42 43 44 45 46 47 48 |
# File 'lib/restate.rb', line 41 def endpoint(*services, protocol: nil, identity_keys: nil) ep = Endpoint.new ep.streaming_protocol if protocol == 'bidi' ep.request_response_protocol if protocol == 'request_response' services.each { |s| ep.bind(s) } identity_keys&.each { |k| ep.identity_key(k) } ep end |
.generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object
Durably call a handler using raw bytes (no serialization).
216 217 218 219 |
# File 'lib/restate.rb', line 216 def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) fetch_context!.generic_call(service, handler, arg, key: key, idempotency_key: idempotency_key, headers: headers) end |
.generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object
Fire-and-forget send using raw bytes (no serialization).
222 223 224 225 |
# File 'lib/restate.rb', line 222 def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) fetch_context!.generic_send(service, handler, arg, key: key, delay: delay, idempotency_key: idempotency_key, headers: headers) end |
.get(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry. Returns nil if unset.
130 131 132 |
# File 'lib/restate.rb', line 130 def get(name, serde: JsonSerde) fetch_context!.get(name, serde: serde) end |
.get_async(name, serde: JsonSerde) ⇒ Object
Durably retrieve a state entry, returning a DurableFuture instead of blocking.
135 136 137 |
# File 'lib/restate.rb', line 135 def get_async(name, serde: JsonSerde) fetch_context!.get_async(name, serde: serde) end |
.invoke_handler(handler:, ctx:, in_buffer:, middleware: []) ⇒ Object
Invoke a handler with the context and raw input bytes. The context is passed as the first argument to every handler. Middleware (if any) wraps the handler call. Returns raw output bytes.
36 37 38 39 40 41 42 43 |
# File 'lib/restate/handler.rb', line 36 def invoke_handler(handler:, ctx:, in_buffer:, middleware: []) out_arg = if middleware.empty? invoke_handler_direct(handler, in_buffer) else invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) end handler.handler_io.output_serde.serialize(out_arg) end |
.invoke_handler_direct(handler, in_buffer) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/restate/handler.rb', line 45 def invoke_handler_direct(handler, in_buffer) if handler.arity == 1 begin in_arg = handler.handler_io.input_serde.deserialize(in_buffer) rescue StandardError => e Kernel.raise TerminalError, "Unable to parse input argument: #{e.}" end handler.callable.call(in_arg) else handler.callable.call end end |
.invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/restate/handler.rb', line 58 def invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) call_handler = Kernel.proc { invoke_handler_direct(handler, in_buffer) } chain = middleware.reverse.reduce(call_handler) do |nxt, mw| Kernel.proc { mw.call(handler, ctx, &nxt) } end chain.call end |
.key ⇒ Object
Returns the key for this virtual object or workflow invocation.
281 282 283 |
# File 'lib/restate.rb', line 281 def key fetch_context!.key end |
.object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate virtual object.
183 184 185 186 187 188 |
# File 'lib/restate.rb', line 183 def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ctx = fetch_context! ctx.object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) end |
.object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate virtual object handler.
191 192 193 194 195 196 |
# File 'lib/restate.rb', line 191 def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ctx = fetch_context! ctx.object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) end |
.peek_promise(name, serde: JsonSerde) ⇒ Object
Peek at a durable promise without blocking. Returns nil if not yet resolved.
252 253 254 |
# File 'lib/restate.rb', line 252 def peek_promise(name, serde: JsonSerde) fetch_context!.peek_promise(name, serde: serde) end |
.promise(name, serde: JsonSerde) ⇒ Object
Get a durable promise value, blocking until resolved.
247 248 249 |
# File 'lib/restate.rb', line 247 def promise(name, serde: JsonSerde) fetch_context!.promise(name, serde: serde) end |
.query(arel_or_sql) ⇒ Array<Hash>
Execute an Arel query or raw SQL string against the Restate admin introspection API. Returns an array of row hashes.
122 123 124 125 |
# File 'lib/restate/introspection.rb', line 122 def query(arel_or_sql) sql = arel_or_sql.respond_to?(:ast) ? arel_to_sql(arel_or_sql) : arel_or_sql.to_s client.execute_query(sql) end |
.reject_awakeable(awakeable_id, message, code: 500) ⇒ Object
Reject an awakeable with a terminal failure.
240 241 242 |
# File 'lib/restate.rb', line 240 def reject_awakeable(awakeable_id, , code: 500) fetch_context!.reject_awakeable(awakeable_id, , code: code) end |
.reject_promise(name, message, code: 500) ⇒ Object
Reject a durable promise with a terminal failure.
262 263 264 |
# File 'lib/restate.rb', line 262 def reject_promise(name, , code: 500) fetch_context!.reject_promise(name, , code: code) end |
.request ⇒ Object
Returns metadata about the current invocation (id, headers, raw body).
276 277 278 |
# File 'lib/restate.rb', line 276 def request fetch_context!.request end |
.resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object
Resolve an awakeable with a success value.
235 236 237 |
# File 'lib/restate.rb', line 235 def resolve_awakeable(awakeable_id, payload, serde: JsonSerde) fetch_context!.resolve_awakeable(awakeable_id, payload, serde: serde) end |
.resolve_promise(name, payload, serde: JsonSerde) ⇒ Object
Resolve a durable promise with a value.
257 258 259 |
# File 'lib/restate.rb', line 257 def resolve_promise(name, payload, serde: JsonSerde) fetch_context!.resolve_promise(name, payload, serde: serde) end |
.run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Execute a durable side effect. The block runs at most once; the result is journaled and replayed on retries. Returns a DurableFuture.
113 114 115 |
# File 'lib/restate.rb', line 113 def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) fetch_context!.run(name, serde: serde, retry_policy: retry_policy, background: background, &action) end |
.run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object
Convenience shortcut for run(…).await. Returns the result directly.
118 119 120 |
# File 'lib/restate.rb', line 118 def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) fetch_context!.run_sync(name, serde: serde, retry_policy: retry_policy, background: background, &action) end |
.service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate service.
167 168 169 170 171 172 |
# File 'lib/restate.rb', line 167 def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ctx = fetch_context! ctx.service_call(service, handler, arg, key: key, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) end |
.service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate service handler.
175 176 177 178 179 180 |
# File 'lib/restate.rb', line 175 def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ctx = fetch_context! ctx.service_send(service, handler, arg, key: key, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) end |
.set(name, value, serde: JsonSerde) ⇒ Object
Durably set a state entry.
140 141 142 |
# File 'lib/restate.rb', line 140 def set(name, value, serde: JsonSerde) fetch_context!.set(name, value, serde: serde) end |
.sleep(seconds) ⇒ Object
Durable timer that survives handler restarts.
123 124 125 |
# File 'lib/restate.rb', line 123 def sleep(seconds) fetch_context!.sleep(seconds) end |
.state_keys ⇒ Object
List all state entry names.
155 156 157 |
# File 'lib/restate.rb', line 155 def state_keys fetch_context!.state_keys end |
.state_keys_async ⇒ Object
List all state entry names, returning a DurableFuture.
160 161 162 |
# File 'lib/restate.rb', line 160 def state_keys_async fetch_context!.state_keys_async end |
.wait_any(*futures) ⇒ Object
Wait until any of the given futures completes. Returns [completed, remaining].
269 270 271 |
# File 'lib/restate.rb', line 269 def wait_any(*futures) fetch_context!.wait_any(*futures) end |
.workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object
Durably call a handler on a Restate workflow.
199 200 201 202 203 204 205 |
# File 'lib/restate.rb', line 199 def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ctx = fetch_context! ctx.workflow_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde, output_serde: output_serde) end |
.workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object
Fire-and-forget send to a Restate workflow handler.
208 209 210 211 212 213 |
# File 'lib/restate.rb', line 208 def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ctx = fetch_context! ctx.workflow_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, input_serde: input_serde) end |