Module: Restate

Defined in:
lib/restate.rb,
lib/restate/vm.rb,
lib/restate/serde.rb,
lib/restate/client.rb,
lib/restate/config.rb,
lib/restate/errors.rb,
lib/restate/server.rb,
lib/restate/context.rb,
lib/restate/handler.rb,
lib/restate/railtie.rb,
lib/restate/service.rb,
lib/restate/testing.rb,
lib/restate/version.rb,
lib/restate/endpoint.rb,
lib/restate/workflow.rb,
lib/restate/discovery.rb,
lib/restate/service_dsl.rb,
lib/restate/introspection.rb,
lib/restate/service_proxy.rb,
lib/restate/durable_future.rb,
lib/restate/server_context.rb,
lib/restate/virtual_object.rb

Overview

typed: false frozen_string_literal: true

Defined Under Namespace

Modules: BytesSerde, Context, Discovery, JsonSerde, ObjectContext, ObjectSharedContext, Serde, ServiceDSL, Sys, Testing, WorkflowContext, WorkflowSharedContext Classes: AttemptFinishedEvent, Client, Config, DisconnectedError, DoProgressAnyCompleted, DoProgressCancelSignalReceived, DoProgressExecuteRun, DoProgressReadFromInput, DoWaitPendingRun, DryStructSerde, DurableCallFuture, DurableFuture, Endpoint, Failure, Handler, HandlerIO, InternalError, Invocation, NotReady, Railtie, Request, RunRetryConfig, RunRetryPolicy, SendHandle, Server, ServerContext, Service, ServiceTag, Suspended, SuspendedError, TerminalError, TypeSerde, VMWrapper, VirtualObject, Workflow

Constant Summary collapse

NOT_READY =
NotReady.new.freeze
SUSPENDED =
Suspended.new.freeze
CANCEL_HANDLE =
Internal::CANCEL_NOTIFICATION_HANDLE
DO_PROGRESS_ANY_COMPLETED =
DoProgressAnyCompleted.new.freeze
DO_PROGRESS_READ_FROM_INPUT =
DoProgressReadFromInput.new.freeze
DO_PROGRESS_CANCEL_SIGNAL_RECEIVED =
DoProgressCancelSignalReceived.new.freeze
DO_WAIT_PENDING_RUN =
DoWaitPendingRun.new.freeze
NOT_SET =

Sentinel value to distinguish “caller didn’t pass serde” from an explicit value.

Object.new.freeze
PRIMITIVE_SCHEMAS =

Maps Ruby primitive types to JSON Schema snippets for discovery.

{
  String => { 'type' => 'string' }.freeze,
  Integer => { 'type' => 'integer' }.freeze,
  Float => { 'type' => 'number' }.freeze,
  TrueClass => { 'type' => 'boolean' }.freeze,
  FalseClass => { 'type' => 'boolean' }.freeze,
  Array => { 'type' => 'array' }.freeze,
  Hash => { 'type' => 'object' }.freeze,
  NilClass => { 'type' => 'null' }.freeze
}.freeze
VERSION =
'0.10.0'

Class Method Summary collapse

Class Method Details

.arel_to_sql(arel) ⇒ String

Convert an Arel AST to a SQL string without requiring an AR connection. Uses a standalone visitor that emits ANSI SQL (DataFusion-compatible).

Parameters:

  • arel (Arel::SelectManager)

    an Arel query

Returns:

  • (String)

    SQL string



105
106
107
108
# File 'lib/restate/introspection.rb', line 105

def arel_to_sql(arel)
  collector = Arel::Collectors::SQLString.new
  DataFusionVisitor.new.accept(arel.ast, collector).value
end

.awakeable(serde: JsonSerde) ⇒ Object

Create an awakeable for external callbacks. Returns [awakeable_id, DurableFuture].



230
231
232
# File 'lib/restate.rb', line 230

def awakeable(serde: JsonSerde)
  fetch_context!.awakeable(serde: serde)
end

.cancel_invocation(invocation_id) ⇒ Object

Request cancellation of another invocation.



288
289
290
# File 'lib/restate.rb', line 288

def cancel_invocation(invocation_id)
  fetch_context!.cancel_invocation(invocation_id)
end

.clear(name) ⇒ Object

Durably remove a single state entry.



145
146
147
# File 'lib/restate.rb', line 145

def clear(name)
  fetch_context!.clear(name)
end

.clear_allObject

Durably remove all state entries.



150
151
152
# File 'lib/restate.rb', line 150

def clear_all
  fetch_context!.clear_all
end

.clientObject

Returns a pre-configured Client using the global config. Creates a new Client on each call (stateless — safe to discard).

Examples:

Restate.client.service(Greeter).greet("World")
Restate.client.resolve_awakeable(id, payload)
Restate.client.create_deployment("http://localhost:9080")


76
77
78
79
80
# File 'lib/restate.rb', line 76

def client
  cfg = config
  Client.new(ingress_url: cfg.ingress_url, admin_url: cfg.admin_url,
             ingress_headers: cfg.ingress_headers, admin_headers: cfg.admin_headers)
end

.configObject

Returns the global configuration. Creates a default one on first access.



64
65
66
67
# File 'lib/restate.rb', line 64

def config
  @config = nil unless defined?(@config)
  @config ||= Config.new
end

.configure {|config| ... } ⇒ Object

Configure the SDK globally. Settings are used by Restate.client.

Examples:

Restate.configure do |c|
  c.ingress_url = "http://localhost:8080"
  c.admin_url   = "http://localhost:9070"
end

Yields:



59
60
61
# File 'lib/restate.rb', line 59

def configure(&)
  yield config
end

.endpoint(*services, protocol: nil, identity_keys: nil) ⇒ Endpoint

Create an endpoint, optionally binding services. Returns an Endpoint that can be further configured before calling .app.

Parameters:

  • services (Array<Class>)

    service classes or instances to bind

Returns:



41
42
43
44
45
46
47
48
# File 'lib/restate.rb', line 41

def endpoint(*services, protocol: nil, identity_keys: nil)
  ep = Endpoint.new
  ep.streaming_protocol if protocol == 'bidi'
  ep.request_response_protocol if protocol == 'request_response'
  services.each { |s| ep.bind(s) }
  identity_keys&.each { |k| ep.identity_key(k) }
  ep
end

.generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) ⇒ Object

Durably call a handler using raw bytes (no serialization).



216
217
218
219
# File 'lib/restate.rb', line 216

def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil)
  fetch_context!.generic_call(service, handler, arg, key: key,
                                                     idempotency_key: idempotency_key, headers: headers)
end

.generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) ⇒ Object

Fire-and-forget send using raw bytes (no serialization).



222
223
224
225
# File 'lib/restate.rb', line 222

def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil)
  fetch_context!.generic_send(service, handler, arg, key: key, delay: delay,
                                                     idempotency_key: idempotency_key, headers: headers)
end

.get(name, serde: JsonSerde) ⇒ Object

Durably retrieve a state entry. Returns nil if unset.



130
131
132
# File 'lib/restate.rb', line 130

def get(name, serde: JsonSerde)
  fetch_context!.get(name, serde: serde)
end

.get_async(name, serde: JsonSerde) ⇒ Object

Durably retrieve a state entry, returning a DurableFuture instead of blocking.



135
136
137
# File 'lib/restate.rb', line 135

def get_async(name, serde: JsonSerde)
  fetch_context!.get_async(name, serde: serde)
end

.invoke_handler(handler:, ctx:, in_buffer:, middleware: []) ⇒ Object

Invoke a handler with the context and raw input bytes. The context is passed as the first argument to every handler. Middleware (if any) wraps the handler call. Returns raw output bytes.



36
37
38
39
40
41
42
43
# File 'lib/restate/handler.rb', line 36

def invoke_handler(handler:, ctx:, in_buffer:, middleware: [])
  out_arg = if middleware.empty?
              invoke_handler_direct(handler, in_buffer)
            else
              invoke_handler_with_middleware(handler, ctx, in_buffer, middleware)
            end
  handler.handler_io.output_serde.serialize(out_arg)
end

.invoke_handler_direct(handler, in_buffer) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/restate/handler.rb', line 45

def invoke_handler_direct(handler, in_buffer)
  if handler.arity == 1
    begin
      in_arg = handler.handler_io.input_serde.deserialize(in_buffer)
    rescue StandardError => e
      Kernel.raise TerminalError, "Unable to parse input argument: #{e.message}"
    end
    handler.callable.call(in_arg)
  else
    handler.callable.call
  end
end

.invoke_handler_with_middleware(handler, ctx, in_buffer, middleware) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/restate/handler.rb', line 58

def invoke_handler_with_middleware(handler, ctx, in_buffer, middleware)
  call_handler = Kernel.proc { invoke_handler_direct(handler, in_buffer) }
  chain = middleware.reverse.reduce(call_handler) do |nxt, mw|
    Kernel.proc { mw.call(handler, ctx, &nxt) }
  end
  chain.call
end

.keyObject

Returns the key for this virtual object or workflow invocation.



281
282
283
# File 'lib/restate.rb', line 281

def key
  fetch_context!.key
end

.object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably call a handler on a Restate virtual object.



183
184
185
186
187
188
# File 'lib/restate.rb', line 183

def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                input_serde: NOT_SET, output_serde: NOT_SET)
  ctx = fetch_context!
  ctx.object_call(service, handler, key, arg, idempotency_key: idempotency_key,
                                              headers: headers, input_serde: input_serde, output_serde: output_serde)
end

.object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Fire-and-forget send to a Restate virtual object handler.



191
192
193
194
195
196
# File 'lib/restate.rb', line 191

def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil,
                headers: nil, input_serde: NOT_SET)
  ctx = fetch_context!
  ctx.object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key,
                                              headers: headers, input_serde: input_serde)
end

.peek_promise(name, serde: JsonSerde) ⇒ Object

Peek at a durable promise without blocking. Returns nil if not yet resolved.



252
253
254
# File 'lib/restate.rb', line 252

def peek_promise(name, serde: JsonSerde)
  fetch_context!.peek_promise(name, serde: serde)
end

.promise(name, serde: JsonSerde) ⇒ Object

Get a durable promise value, blocking until resolved.



247
248
249
# File 'lib/restate.rb', line 247

def promise(name, serde: JsonSerde)
  fetch_context!.promise(name, serde: serde)
end

.query(arel_or_sql) ⇒ Array<Hash>

Execute an Arel query or raw SQL string against the Restate admin introspection API. Returns an array of row hashes.

Examples:

With Arel

i = Restate::Sys::Invocation
Restate.query(i.project(Arel.star).take(10))

With raw SQL

Restate.query("SELECT id, status FROM sys_invocation LIMIT 10")

Parameters:

  • arel_or_sql (Arel::SelectManager, String)

    an Arel query or raw SQL

Returns:

  • (Array<Hash>)

    rows returned by Restate



122
123
124
125
# File 'lib/restate/introspection.rb', line 122

def query(arel_or_sql)
  sql = arel_or_sql.respond_to?(:ast) ? arel_to_sql(arel_or_sql) : arel_or_sql.to_s
  client.execute_query(sql)
end

.reject_awakeable(awakeable_id, message, code: 500) ⇒ Object

Reject an awakeable with a terminal failure.



240
241
242
# File 'lib/restate.rb', line 240

def reject_awakeable(awakeable_id, message, code: 500)
  fetch_context!.reject_awakeable(awakeable_id, message, code: code)
end

.reject_promise(name, message, code: 500) ⇒ Object

Reject a durable promise with a terminal failure.



262
263
264
# File 'lib/restate.rb', line 262

def reject_promise(name, message, code: 500)
  fetch_context!.reject_promise(name, message, code: code)
end

.requestObject

Returns metadata about the current invocation (id, headers, raw body).



276
277
278
# File 'lib/restate.rb', line 276

def request
  fetch_context!.request
end

.resolve_awakeable(awakeable_id, payload, serde: JsonSerde) ⇒ Object

Resolve an awakeable with a success value.



235
236
237
# File 'lib/restate.rb', line 235

def resolve_awakeable(awakeable_id, payload, serde: JsonSerde)
  fetch_context!.resolve_awakeable(awakeable_id, payload, serde: serde)
end

.resolve_promise(name, payload, serde: JsonSerde) ⇒ Object

Resolve a durable promise with a value.



257
258
259
# File 'lib/restate.rb', line 257

def resolve_promise(name, payload, serde: JsonSerde)
  fetch_context!.resolve_promise(name, payload, serde: serde)
end

.run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Execute a durable side effect. The block runs at most once; the result is journaled and replayed on retries. Returns a DurableFuture.



113
114
115
# File 'lib/restate.rb', line 113

def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  fetch_context!.run(name, serde: serde, retry_policy: retry_policy, background: background, &action)
end

.run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) ⇒ Object

Convenience shortcut for run(…).await. Returns the result directly.



118
119
120
# File 'lib/restate.rb', line 118

def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action)
  fetch_context!.run_sync(name, serde: serde, retry_policy: retry_policy, background: background, &action)
end

.service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably call a handler on a Restate service.



167
168
169
170
171
172
# File 'lib/restate.rb', line 167

def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil,
                 input_serde: NOT_SET, output_serde: NOT_SET)
  ctx = fetch_context!
  ctx.service_call(service, handler, arg, key: key, idempotency_key: idempotency_key,
                                          headers: headers, input_serde: input_serde, output_serde: output_serde)
end

.service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Fire-and-forget send to a Restate service handler.



175
176
177
178
179
180
# File 'lib/restate.rb', line 175

def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil,
                 headers: nil, input_serde: NOT_SET)
  ctx = fetch_context!
  ctx.service_send(service, handler, arg, key: key, delay: delay, idempotency_key: idempotency_key,
                                          headers: headers, input_serde: input_serde)
end

.set(name, value, serde: JsonSerde) ⇒ Object

Durably set a state entry.



140
141
142
# File 'lib/restate.rb', line 140

def set(name, value, serde: JsonSerde)
  fetch_context!.set(name, value, serde: serde)
end

.sleep(seconds) ⇒ Object

Durable timer that survives handler restarts.



123
124
125
# File 'lib/restate.rb', line 123

def sleep(seconds)
  fetch_context!.sleep(seconds)
end

.state_keysObject

List all state entry names.



155
156
157
# File 'lib/restate.rb', line 155

def state_keys
  fetch_context!.state_keys
end

.state_keys_asyncObject

List all state entry names, returning a DurableFuture.



160
161
162
# File 'lib/restate.rb', line 160

def state_keys_async
  fetch_context!.state_keys_async
end

.wait_any(*futures) ⇒ Object

Wait until any of the given futures completes. Returns [completed, remaining].



269
270
271
# File 'lib/restate.rb', line 269

def wait_any(*futures)
  fetch_context!.wait_any(*futures)
end

.workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, input_serde: NOT_SET, output_serde: NOT_SET) ⇒ Object

Durably call a handler on a Restate workflow.



199
200
201
202
203
204
205
# File 'lib/restate.rb', line 199

def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil,
                  input_serde: NOT_SET, output_serde: NOT_SET)
  ctx = fetch_context!
  ctx.workflow_call(service, handler, key, arg,
                    idempotency_key: idempotency_key, headers: headers,
                    input_serde: input_serde, output_serde: output_serde)
end

.workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, input_serde: NOT_SET) ⇒ Object

Fire-and-forget send to a Restate workflow handler.



208
209
210
211
212
213
# File 'lib/restate.rb', line 208

def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil,
                  headers: nil, input_serde: NOT_SET)
  ctx = fetch_context!
  ctx.workflow_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key,
                                                headers: headers, input_serde: input_serde)
end