Class: Hatchet::Clients::Grpc::EventClient

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/clients/grpc/event_client.rb

Overview

gRPC client for the Hatchet Events service.

Handles pushing events, bulk events, logs, and stream events via gRPC. Uses the generated EventsService::Stub from events.proto on the shared channel.

Examples:

event_client = EventClient.new(config: hatchet_config, channel: channel)
event_client.push(key: "user:create", payload: { "name" => "Alice" })

Constant Summary collapse

MAX_LOG_MESSAGE_LENGTH =
10_000

Instance Method Summary collapse

Constructor Details

#initialize(config:, channel:) ⇒ EventClient

Returns a new instance of EventClient.

Parameters:

  • config (Hatchet::Config)

    The Hatchet configuration

  • channel (GRPC::Core::Channel)

    Shared gRPC channel



22
23
24
25
26
27
# File 'lib/hatchet/clients/grpc/event_client.rb', line 22

def initialize(config:, channel:)
  @config = config
  @logger = config.logger
  @channel = channel
  @stub = nil
end

Instance Method Details

#bulk_push(events, namespace: nil) ⇒ Events

Push multiple events via gRPC.

Parameters:

  • events (Array<Hash>)

    Array of event hashes with :key, :payload, :additional_metadata, :priority, :scope

  • namespace (String, nil) (defaults to: nil)

    Optional namespace override applied to all events

Returns:

  • (Events)

    Bulk push response



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/hatchet/clients/grpc/event_client.rb', line 75

def bulk_push(events, namespace: nil)
  ensure_connected!

  now = Time.now
  timestamp = Google::Protobuf::Timestamp.new(
    seconds: now.to_i,
    nanos: now.nsec,
  )

  items = events.map do |e|
    request_args = {
      key: @config.apply_namespace(e[:key], namespace_override: namespace),
      payload: JSON.generate(e[:payload] || {}),
      event_timestamp: timestamp,
    }

    if e[:additional_metadata]
      request_args[:additional_metadata] = if e[:additional_metadata].is_a?(String)
                                             e[:additional_metadata]
                                           else
                                             JSON.generate(e[:additional_metadata])
                                           end
    end

    request_args[:priority] = e[:priority] if e[:priority]
    request_args[:scope] = e[:scope] if e[:scope]

    ::PushEventRequest.new(**request_args)
  end

  request = ::BulkPushEventRequest.new(events: items)
  @stub.bulk_push(request, metadata: @config.)
end

#closeObject

Close the connection.



161
162
163
# File 'lib/hatchet/clients/grpc/event_client.rb', line 161

def close
  @stub = nil
end

#push(key:, payload:, additional_metadata: nil, priority: nil, scope: nil, namespace: nil) ⇒ Event

Push an event via gRPC.

Parameters:

  • key (String)

    Event key (will be namespaced)

  • payload (Hash)

    Event payload

  • additional_metadata (Hash, nil) (defaults to: nil)

    Additional metadata

  • priority (Integer, nil) (defaults to: nil)

    Event priority

  • scope (String, nil) (defaults to: nil)

    Event scope

  • namespace (String, nil) (defaults to: nil)

    Optional namespace override

Returns:

  • (Event)

    Push response



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/hatchet/clients/grpc/event_client.rb', line 38

def push(key:, payload:, additional_metadata: nil, priority: nil, scope: nil, namespace: nil)
  ensure_connected!

  now = Time.now
  timestamp = Google::Protobuf::Timestamp.new(
    seconds: now.to_i,
    nanos: now.nsec,
  )

  namespaced_key = @config.apply_namespace(key, namespace_override: namespace)

  request_args = {
    key: namespaced_key,
    payload: JSON.generate(payload),
    event_timestamp: timestamp,
  }

  if 
    request_args[:additional_metadata] = if .is_a?(String)
                                           
                                         else
                                           JSON.generate()
                                         end
  end

  request_args[:priority] = priority if priority
  request_args[:scope] = scope if scope

  request = ::PushEventRequest.new(**request_args)
  @stub.push(request, metadata: @config.)
end

#put_log(step_run_id:, message:) ⇒ PutLogResponse

Put a log message for a task run.

Parameters:

  • step_run_id (String)

    The task run external ID

  • message (String)

    Log message (truncated to 10K chars)

Returns:



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/hatchet/clients/grpc/event_client.rb', line 114

def put_log(step_run_id:, message:)
  ensure_connected!

  now = Time.now
  timestamp = Google::Protobuf::Timestamp.new(
    seconds: now.to_i,
    nanos: now.nsec,
  )

  truncated_message = message.length > MAX_LOG_MESSAGE_LENGTH ? message[0...MAX_LOG_MESSAGE_LENGTH] : message

  request = ::PutLogRequest.new(
    task_run_external_id: step_run_id,
    created_at: timestamp,
    message: truncated_message,
  )

  @stub.put_log(request, metadata: @config.)
end

#put_stream(step_run_id:, data:) ⇒ PutStreamEventResponse

Put a stream event for real-time streaming.

Parameters:

  • step_run_id (String)

    The task run external ID

  • data (String)

    Stream data chunk (sent as bytes)

Returns:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/hatchet/clients/grpc/event_client.rb', line 139

def put_stream(step_run_id:, data:)
  ensure_connected!

  now = Time.now
  timestamp = Google::Protobuf::Timestamp.new(
    seconds: now.to_i,
    nanos: now.nsec,
  )

  # The message field in PutStreamEventRequest is bytes
  message_bytes = data.is_a?(String) ? data.b : data.to_s.b

  request = ::PutStreamEventRequest.new(
    task_run_external_id: step_run_id,
    created_at: timestamp,
    message: message_bytes,
  )

  @stub.put_stream_event(request, metadata: @config.)
end