Class: Restate::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/restate/client.rb

Overview

HTTP client for invoking Restate services and managing the Restate runtime from outside the Restate runtime.

Examples:

Via global config (recommended)

Restate.configure do |c|
  c.ingress_url = "http://localhost:8080"
  c.admin_url = "http://localhost:9070"
end
client = Restate.client
result = client.service(Greeter).greet("World")

Standalone

client = Restate::Client.new(ingress_url: "http://localhost:8080",
                             admin_url: "http://localhost:9070")

Service invocation

client.service("Greeter").greet("World")
client.object("Counter", "my-key").add(5)
client.workflow("UserSignup", "user42").run("user@example.com")

Admin operations

client.resolve_awakeable(awakeable_id, "result")
client.reject_awakeable(awakeable_id, "failed")
client.cancel_invocation(invocation_id)
client.create_deployment("http://localhost:9080")

Instance Method Summary collapse

Constructor Details

#initialize(ingress_url: 'http://localhost:8080', admin_url: 'http://localhost:9070', ingress_headers: {}, admin_headers: {}) ⇒ Client

Returns a new instance of Client.



34
35
36
37
38
39
40
# File 'lib/restate/client.rb', line 34

def initialize(ingress_url: 'http://localhost:8080', admin_url: 'http://localhost:9070',
               ingress_headers: {}, admin_headers: {})
  @ingress_url = ingress_url.chomp('/')
  @admin_url = admin_url.chomp('/')
  @ingress_headers = ingress_headers
  @admin_headers = admin_headers
end

Instance Method Details

#cancel_invocation(invocation_id) ⇒ Object

Cancel a running invocation.



75
76
77
# File 'lib/restate/client.rb', line 75

def cancel_invocation(invocation_id)
  post_admin("/restate/invocations/#{invocation_id}/cancel", nil)
end

#execute_query(sql) ⇒ Array<Hash>

Execute a SQL query against Restate’s introspection API (DataFusion). The admin API exposes system tables (sys_invocation, sys_journal, state, etc.) that can be queried with standard SQL.

Examples:

client.execute_query("SELECT id, status FROM sys_invocation LIMIT 10")

Parameters:

  • sql (String)

    a SQL query string

Returns:

  • (Array<Hash>)

    rows returned by the query



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/restate/client.rb', line 95

def execute_query(sql) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength
  uri = URI("#{@admin_url}/query")
  request = Net::HTTP::Post.new(uri)
  request['Content-Type'] = 'application/json'
  request['Accept'] = 'application/json'
  @admin_headers.each { |k, v| request[k] = v }
  request.body = JSON.generate({ query: sql })
  response = Net::HTTP.start(uri.hostname, uri.port, # steep:ignore ArgumentTypeMismatch
                             use_ssl: uri.scheme == 'https',
                             open_timeout: 5,
                             read_timeout: 30) { |http| http.request(request) }
  Kernel.raise "Restate query error: #{response.code} #{response.body}" unless response.is_a?(Net::HTTPSuccess)
  body = response.body
  body && !body.empty? ? (JSON.parse(body)['rows'] || []) : []
end

#kill_invocation(invocation_id) ⇒ Object

Kill a running invocation (immediate termination, no cleanup).



80
81
82
# File 'lib/restate/client.rb', line 80

def kill_invocation(invocation_id)
  post_admin("/restate/invocations/#{invocation_id}/kill", nil)
end

#object(service, key) ⇒ Object

Returns a proxy for calling a keyed virtual object.



50
51
52
# File 'lib/restate/client.rb', line 50

def object(service, key)
  ClientServiceProxy.new(@ingress_url, resolve_name(service), key, @ingress_headers)
end

#reject_awakeable(awakeable_id, message, code: 500) ⇒ Object

Reject an awakeable from outside the Restate runtime.



67
68
69
70
# File 'lib/restate/client.rb', line 67

def reject_awakeable(awakeable_id, message, code: 500)
  post_ingress("/restate/awakeables/#{awakeable_id}/reject",
               { 'message' => message, 'code' => code })
end

#resolve_awakeable(awakeable_id, payload) ⇒ Object

Resolve an awakeable from outside the Restate runtime.



62
63
64
# File 'lib/restate/client.rb', line 62

def resolve_awakeable(awakeable_id, payload)
  post_ingress("/restate/awakeables/#{awakeable_id}/resolve", payload)
end

#service(service) ⇒ Object

Returns a proxy for calling a stateless service.



45
46
47
# File 'lib/restate/client.rb', line 45

def service(service)
  ClientServiceProxy.new(@ingress_url, resolve_name(service), nil, @ingress_headers)
end

#workflow(service, key) ⇒ Object

Returns a proxy for calling a workflow.



55
56
57
# File 'lib/restate/client.rb', line 55

def workflow(service, key)
  ClientServiceProxy.new(@ingress_url, resolve_name(service), key, @ingress_headers)
end