Module: Restate

Defined in:
lib/restate.rb,
lib/restate/vm.rb,
lib/restate/serde.rb,
lib/restate/client.rb,
lib/restate/config.rb,
lib/restate/errors.rb,
lib/restate/context.rb,
lib/restate/handler.rb,
lib/restate/railtie.rb,
lib/restate/service.rb,
lib/restate/testing.rb,
lib/restate/version.rb,
lib/restate/endpoint.rb,
lib/restate/workflow.rb,
lib/restate/discovery.rb,
lib/restate/service_dsl.rb,
lib/restate/introspection.rb,
lib/restate/service_proxy.rb,
lib/restate/durable_future.rb,
lib/restate/server/context.rb,
lib/restate/server/handler.rb,
lib/restate/virtual_object.rb,
lib/restate/middleware/deadlock_detection.rb

Overview

typed: false frozen_string_literal: true

Defined Under Namespace

Modules: BytesSerde, Context, Discovery, JsonSerde, Middleware, ObjectContext, ObjectSharedContext, Serde, Server, ServiceDSL, Sys, Testing, WorkflowContext, WorkflowSharedContext Classes: AttemptFinishedEvent, Client, CombinedFuture, Config, DisconnectedError, DoProgressAnyCompleted, DoProgressCancelSignalReceived, DoProgressExecuteRun, DoProgressReadFromInput, DoWaitPendingRun, DryStructSerde, DurableCallFuture, DurableFuture, Endpoint, Failure, Handler, HandlerIO, InternalError, Invocation, NotReady, Railtie, Request, RunRetryConfig, RunRetryPolicy, SendHandle, Service, ServiceTag, Suspended, SuspendedError, TerminalError, TimeoutError, TypeSerde, VMWrapper, VirtualObject, Workflow

Constant Summary collapse

NOT_READY =
NotReady.new.freeze
SUSPENDED =
Suspended.new.freeze
CANCEL_HANDLE =
Internal::CANCEL_NOTIFICATION_HANDLE
DO_PROGRESS_ANY_COMPLETED =
DoProgressAnyCompleted.new.freeze
DO_PROGRESS_READ_FROM_INPUT =
DoProgressReadFromInput.new.freeze
DO_PROGRESS_CANCEL_SIGNAL_RECEIVED =
DoProgressCancelSignalReceived.new.freeze
DO_WAIT_PENDING_RUN =
DoWaitPendingRun.new.freeze
NOT_SET =

Sentinel value to distinguish “caller didn’t pass serde” from an explicit value.

Object.new.freeze
PRIMITIVE_SCHEMAS =

Maps Ruby primitive types to JSON Schema snippets for discovery.

{
  String => { 'type' => 'string' }.freeze,
  Integer => { 'type' => 'integer' }.freeze,
  Float => { 'type' => 'number' }.freeze,
  TrueClass => { 'type' => 'boolean' }.freeze,
  FalseClass => { 'type' => 'boolean' }.freeze,
  Array => { 'type' => 'array' }.freeze,
  Hash => { 'type' => 'object' }.freeze,
  NilClass => { 'type' => 'null' }.freeze
}.freeze
VERSION =
'1.0.0'

Class Method Summary collapse

Class Method Details

.all(*futures) ⇒ Object

Wait for every future to complete and return their values in input order. Short-circuits on the first TerminalError. Accepts either splat futures or a single Array. Semantics match JS Promise.all.



299
300
301
# File 'lib/restate.rb', line 299

def all(*futures)
  fetch_context!.all(*futures)
end

.all_settled(*futures) ⇒ Object

Wait for every future to settle. Returns an Array of outcome descriptors (+:fulfilled, value: …+ or {status: :rejected, reason: …}), in input order. Semantics match JS Promise.allSettled.



320
321
322
# File 'lib/restate.rb', line 320

def all_settled(*futures)
  fetch_context!.all_settled(*futures)
end

.any(*futures) ⇒ Object

Wait for the first successful future and return its value. Raises only if every future failed terminally. Accepts splat or single Array. Semantics match JS Promise.any.



313
314
315
# File 'lib/restate.rb', line 313

def any(*futures)
  fetch_context!.any(*futures)
end

.arel_to_sql(arel) ⇒ String

Convert an Arel AST to a SQL string without requiring an AR connection. Uses a standalone visitor that emits ANSI SQL (DataFusion-compatible).

Parameters:

  • arel (Arel::SelectManager)

    an Arel query

Returns:

  • (String)

    SQL string



105
106
107
108
# File 'lib/restate/introspection.rb', line 105

def arel_to_sql(arel)
  collector = Arel::Collectors::SQLString.new
  DataFusionVisitor.new.accept(arel.ast, collector).value
end

.awakeable(serde: JsonSerde) ⇒ Object

Create an awakeable for external callbacks. Returns [awakeable_id, DurableFuture].



236
237
238
# File 'lib/restate.rb', line 236

def awakeable(serde: JsonSerde)
  fetch_context!.awakeable(serde: serde)
end

.cancel_invocation(invocation_id) ⇒ Object

Request cancellation of another invocation.



339
340
341
# File 'lib/restate.rb', line 339

def cancel_invocation(invocation_id)
  fetch_context!.cancel_invocation(invocation_id)
end

.clear(name) ⇒ Object

Durably remove a single state entry.



151
152
153
# File 'lib/restate.rb', line 151

def clear(name)
  fetch_context!.clear(name)
end

.clear_allObject

Durably remove all state entries.



156
157
158
# File 'lib/restate.rb', line 156

def clear_all
  fetch_context!.clear_all
end

.clientObject

Returns a pre-configured Client using the global config. Creates a new Client on each call (stateless — safe to discard).

Examples:

Restate.client.service(Greeter).greet("World")
Restate.client.resolve_awakeable(id, payload)
Restate.client.create_deployment("http://localhost:9080")


76
77
78
79
80
81
# File 'lib/restate.rb', line 76

def client
  cfg = config
  Client.new(ingress_url: cfg.ingress_url, admin_url: cfg.admin_url,
             ingress_headers: resolve_headers(cfg.ingress_headers),
             admin_headers: resolve_headers(cfg.admin_headers))
end

.configObject

Returns the global configuration. Creates a default one on first access.



64
65
66
67
# File 'lib/restate.rb', line 64

def config
  @config = nil unless defined?(@config)
  @config ||= Config.new
end

.configure {|config| ... } ⇒ Object

Configure the SDK globally. Settings are used by Restate.client.

Examples:

Restate.configure do |c|
  c.ingress_url = "http://localhost:8080"
  c.admin_url   = "http://localhost:9070"
end

Yields:



59
60
61
# File 'lib/restate.rb', line 59

def configure(&)
  yield config
end

.endpoint(*services, protocol: nil, identity_keys: nil) ⇒ Endpoint

Create an endpoint, optionally binding services. Returns an Endpoint that can be further configured before calling .app.

Parameters:

  • services (Array<Class>)

    service classes or instances to bind

Returns:



41
42
43
44
45
46
47
48
# File 'lib/restate.rb', line 41

def endpoint(*services, protocol: nil, identity_keys: nil)
  ep = Endpoint.new
  ep.streaming_protocol if protocol == 'bidi'
  ep.request_response_protocol if protocol == 'request_response'
  services.each { |s| ep.bind(s) }
  identity_keys&.each { |k| ep.identity_key(k) }
  ep
end

.generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object

Durably call a handler using raw bytes (no serialization).



222
223
224
225
# File 'lib/restate.rb', line 222

def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil)
  fetch_context!.generic_call(service, handler, arg, key: key,
                                                     idempotency_key: idempotency_key, headers: headers)
end

.generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object

Fire-and-forget send using raw bytes (no serialization).



228
229
230
231
# File 'lib/restate.rb', line 228

def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil)
  fetch_context!.generic_send(service, handler, arg, key: key, delay: delay,
                                                     idempotency_key: idempotency_key, headers: headers)
end

.get(name, serde: JsonSerde) ⇒ Object

Durably retrieve a state entry. Returns nil if unset.



136
137
138
# File 'lib/restate.rb', line 136

def get(name, serde: JsonSerde)
  fetch_context!.get(name, serde: serde)
end

.get_async(name, serde: JsonSerde) ⇒ Object

Durably retrieve a state entry, returning a DurableFuture instead of blocking.



141
142
143
# File 'lib/restate.rb', line 141

def get_async(name, serde: JsonSerde)
  fetch_context!.get_async(name, serde: serde)
end

.invoke_handler(handler:, ctx:, in_buffer:, middleware: []) ⇒ Object

Invoke a handler with the context and raw input bytes. The context is passed as the first argument to every handler. Middleware (if any) wraps the handler call. Returns raw output bytes.



36
37
38
39
40
41
42
43
# File 'lib/restate/handler.rb', line 36

def invoke_handler(handler:, ctx:, in_buffer:, middleware: [])
  out_arg = if middleware.empty?
              invoke_handler_direct(handler, in_buffer)
            else
              invoke_handler_with_middleware(handler, ctx, in_buffer, middleware)
            end
  handler.handler_io.output_serde.serialize(out_arg)
end

.invoke_handler_direct(handler, in_buffer) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/restate/handler.rb', line 45

def invoke_handler_direct(handler, in_buffer)
  if handler.arity == 1
    begin
      in_arg = handler.handler_io.input_serde.deserialize(in_buffer)
    rescue StandardError => e
      Kernel.raise TerminalError, "Unable to parse input argument: #{e.message}"
    end
    handler.callable.call(in_arg)
  else
    handler.callable.call
  end
end

.invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/restate/handler.rb', line 58

def invoke_handler_with_middleware(handler, ctx, in_buffer, middleware)
  call_handler = Kernel.proc { invoke_handler_direct(handler, in_buffer) }
  chain = middleware.reverse.reduce(call_handler) do |nxt, mw|
    Kernel.proc { mw.call(handler, ctx, &nxt) }
  end
  chain.call
end

.keyObject

Returns the key for this virtual object or workflow invocation.



332
333
334
# File 'lib/restate.rb', line 332

def key
  fetch_context!.key
end

.object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably call a handler on a Restate virtual object.



189
190
191
192
193
194
# File 'lib/restate.rb', line 189

def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                input_serde: NOT_SET, output_serde: NOT_SET)
  ctx = fetch_context!
  ctx.object_call(service, handler, key, arg, idempotency_key: idempotency_key,
                                              headers: headers, input_serde: input_serde, output_serde: output_serde)
end

.object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Fire-and-forget send to a Restate virtual object handler.



197
198
199
200
201
202
# File 'lib/restate.rb', line 197

def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil,
                headers: nil, input_serde: NOT_SET)
  ctx = fetch_context!
  ctx.object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key,
                                              headers: headers, input_serde: input_serde)
end

.peek_promise(name, serde: JsonSerde) ⇒ Object

Peek at a durable promise without blocking. Returns nil if not yet resolved.



275
276
277
# File 'lib/restate.rb', line 275

def peek_promise(name, serde: JsonSerde)
  fetch_context!.peek_promise(name, serde: serde)
end

.promise(name, serde: JsonSerde) ⇒ Object

Get a durable promise value, blocking until resolved.



270
271
272
# File 'lib/restate.rb', line 270

def promise(name, serde: JsonSerde)
  fetch_context!.promise(name, serde: serde)
end

.query(arel_or_sql) ⇒ Array<Hash>

Execute an Arel query or raw SQL string against the Restate admin introspection API. Returns an array of row hashes.

Examples:

With Arel

i = Restate::Sys::Invocation
Restate.query(i.project(Arel.star).take(10))

With raw SQL

Restate.query("SELECT id, status FROM sys_invocation LIMIT 10")

Parameters:

  • arel_or_sql (Arel::SelectManager, String)

    an Arel query or raw SQL

Returns:

  • (Array<Hash>)

    rows returned by Restate



122
123
124
125
# File 'lib/restate/introspection.rb', line 122

def query(arel_or_sql)
  sql = arel_or_sql.respond_to?(:ast) ? arel_to_sql(arel_or_sql) : arel_or_sql.to_s
  client.execute_query(sql)
end

.race(*futures) ⇒ Object

Wait for the first future to settle and return its value. Raises if the winning future failed. Accepts either splat futures or a single Array. Semantics match JS Promise.race.



306
307
308
# File 'lib/restate.rb', line 306

def race(*futures)
  fetch_context!.race(*futures)
end

.reject_awakeable(awakeable_id, message, code: 500) ⇒ Object

Reject an awakeable with a terminal failure.



246
247
248
# File 'lib/restate.rb', line 246

def reject_awakeable(awakeable_id, message, code: 500)
  fetch_context!.reject_awakeable(awakeable_id, message, code: code)
end

.reject_promise(name, message, code: 500) ⇒ Object

Reject a durable promise with a terminal failure.



285
286
287
# File 'lib/restate.rb', line 285

def reject_promise(name, message, code: 500)
  fetch_context!.reject_promise(name, message, code: code)
end

.reject_signal(invocation_id, name, message, code: 500) ⇒ Object

Send a terminal failure to a named signal on another invocation.



263
264
265
# File 'lib/restate.rb', line 263

def reject_signal(invocation_id, name, message, code: 500)
  fetch_context!.reject_signal(invocation_id, name, message, code: code)
end

.requestObject

Returns metadata about the current invocation (id, headers, raw body).



327
328
329
# File 'lib/restate.rb', line 327

def request
  fetch_context!.request
end

.resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object

Resolve an awakeable with a success value.



241
242
243
# File 'lib/restate.rb', line 241

def resolve_awakeable(awakeable_id, payload, serde: JsonSerde)
  fetch_context!.resolve_awakeable(awakeable_id, payload, serde: serde)
end

.resolve_promise(name, payload, serde: JsonSerde) ⇒ Object

Resolve a durable promise with a value.



280
281
282
# File 'lib/restate.rb', line 280

def resolve_promise(name, payload, serde: JsonSerde)
  fetch_context!.resolve_promise(name, payload, serde: serde)
end

.resolve_signal(invocation_id, name, payload, serde: JsonSerde) ⇒ Object

Send a success value to a named signal on another invocation.



258
259
260
# File 'lib/restate.rb', line 258

def resolve_signal(invocation_id, name, payload, serde: JsonSerde)
  fetch_context!.resolve_signal(invocation_id, name, payload, serde: serde)
end

.run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Execute a durable side effect. The block runs at most once; the result is journaled and replayed on retries. Returns a DurableFuture.



119
120
121
# File 'lib/restate.rb', line 119

def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  fetch_context!.run(name, serde: serde, retry_policy: retry_policy, background: background, &action)
end

.run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Convenience shortcut for run(…).await. Returns the result directly.



124
125
126
# File 'lib/restate.rb', line 124

def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  fetch_context!.run_sync(name, serde: serde, retry_policy: retry_policy, background: background, &action)
end

.service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably call a handler on a Restate service.



173
174
175
176
177
178
# File 'lib/restate.rb', line 173

def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil,
                 input_serde: NOT_SET, output_serde: NOT_SET)
  ctx = fetch_context!
  ctx.service_call(service, handler, arg, key: key, idempotency_key: idempotency_key,
                                          headers: headers, input_serde: input_serde, output_serde: output_serde)
end

.service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Fire-and-forget send to a Restate service handler.



181
182
183
184
185
186
# File 'lib/restate.rb', line 181

def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil,
                 headers: nil, input_serde: NOT_SET)
  ctx = fetch_context!
  ctx.service_send(service, handler, arg, key: key, delay: delay, idempotency_key: idempotency_key,
                                          headers: headers, input_serde: input_serde)
end

.set(name, value, serde: JsonSerde) ⇒ Object

Durably set a state entry.



146
147
148
# File 'lib/restate.rb', line 146

def set(name, value, serde: JsonSerde)
  fetch_context!.set(name, value, serde: serde)
end

.signal(name, serde: JsonSerde) ⇒ Object

Wait for a named signal addressed to this invocation. Returns a DurableFuture.



253
254
255
# File 'lib/restate.rb', line 253

def signal(name, serde: JsonSerde)
  fetch_context!.signal(name, serde: serde)
end

.sleep(seconds) ⇒ Object

Durable timer that survives handler restarts.



129
130
131
# File 'lib/restate.rb', line 129

def sleep(seconds)
  fetch_context!.sleep(seconds)
end

.state_keysObject

List all state entry names.



161
162
163
# File 'lib/restate.rb', line 161

def state_keys
  fetch_context!.state_keys
end

.state_keys_asyncObject

List all state entry names, returning a DurableFuture.



166
167
168
# File 'lib/restate.rb', line 166

def state_keys_async
  fetch_context!.state_keys_async
end

.wait_any(*futures) ⇒ Object

Wait until any of the given futures completes. Returns [completed, remaining].



292
293
294
# File 'lib/restate.rb', line 292

def wait_any(*futures)
  fetch_context!.wait_any(*futures)
end

.workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably call a handler on a Restate workflow.



205
206
207
208
209
210
211
# File 'lib/restate.rb', line 205

def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                  input_serde: NOT_SET, output_serde: NOT_SET)
  ctx = fetch_context!
  ctx.workflow_call(service, handler, key, arg,
                    idempotency_key: idempotency_key, headers: headers,
                    input_serde: input_serde, output_serde: output_serde)
end

.workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Fire-and-forget send to a Restate workflow handler.



214
215
216
217
218
219
# File 'lib/restate.rb', line 214

def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil,
                  headers: nil, input_serde: NOT_SET)
  ctx = fetch_context!
  ctx.workflow_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key,
                                                headers: headers, input_serde: input_serde)
end