Class: ClaudeAgentSDK::TranscriptMirrorBatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/claude_agent_sdk/transcript_mirror_batcher.rb

Overview

Batching layer between ‘transcript_mirror` stdout frames and a SessionStore.

The CLI subprocess emits ‘href="...">type”:“transcript_mirror”,“filePath”:…,“entries”:` frames interleaved with normal SDK messages. The Query read loop peels these off and hands them to #enqueue, which accumulates them and flushes to SessionStore#append either when a `result` message arrives (explicit #flush) or when the pending buffer exceeds size thresholds (eager background flush). This keeps adapter latency off the hot path during model streaming.

Adapter failures are retried (MIRROR_APPEND_MAX_ATTEMPTS total) with short backoff; timeouts are not retried since the in-flight call may still land. Failures never raise — the local-disk transcript is already durable, so the session continues unaffected — and are reported via the on_error callback (which surfaces them as a MirrorErrorMessage). Adapters should dedupe by entry when present, since a retried batch may overlap a prior write.

The semaphore serializes appends, but a #send that exceeds send_timeout is abandoned (its worker thread keeps running) and the next drain proceeds, so two #append calls for the SAME key can briefly overlap. SessionStore#append must be thread-safe per key (see that method’s contract).

Constant Summary collapse

MAX_PENDING_ENTRIES =

Eager-flush thresholds (exposed for tests).

500
MAX_PENDING_BYTES =

1 MiB

1 << 20
SEND_TIMEOUT_SECONDS =
60.0
MIRROR_APPEND_MAX_ATTEMPTS =

Bounded retry for transient adapter failures. The backoff list length is MIRROR_APPEND_MAX_ATTEMPTS - 1 (one delay between each pair of attempts).

3
MIRROR_APPEND_BACKOFF_S =
[0.2, 0.8].freeze

Instance Method Summary collapse

Constructor Details

#initialize(store:, projects_dir:, on_error:, send_timeout: SEND_TIMEOUT_SECONDS, max_pending_entries: MAX_PENDING_ENTRIES, max_pending_bytes: MAX_PENDING_BYTES) ⇒ TranscriptMirrorBatcher

Returns a new instance of TranscriptMirrorBatcher.

Parameters:

  • store (SessionStore)

    the adapter to mirror into

  • projects_dir (String)

    base dir for file_path -> SessionKey mapping

  • on_error (#call)

    called as on_error.call(key, message) after a batch exhausts retries; must not raise



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 47

def initialize(store:, projects_dir:, on_error:, send_timeout: SEND_TIMEOUT_SECONDS,
               max_pending_entries: MAX_PENDING_ENTRIES, max_pending_bytes: MAX_PENDING_BYTES)
  @store = store
  @projects_dir = projects_dir
  @on_error = on_error
  @send_timeout = send_timeout
  @max_pending_entries = max_pending_entries
  @max_pending_bytes = max_pending_bytes
  @pending = []
  @pending_entries = 0
  @pending_bytes = 0
  # Fiber-aware lock: the critical section blocks on SessionStore#append
  # (a thread hop), so a Thread::Mutex would deadlock the reactor. The
  # semaphore serializes drains so append ordering matches enqueue order.
  @lock = Async::Semaphore.new(1)
end

Instance Method Details

#closeObject

Final flush before teardown. Never raises.



97
98
99
100
101
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 97

def close
  flush
rescue StandardError => e
  warn "Claude SDK: TranscriptMirrorBatcher close flush failed: #{e.message}"
end

#enqueue(file_path, entries) ⇒ Object

Buffer a frame; schedule an eager background flush if thresholds are exceeded. Synchronous and fire-and-forget.

entries are deep-stringified because the transport parses CLI output with symbolized keys, but SessionStore entries are opaque JSON blobs that must round-trip through string keys (Postgres JSONB / Redis) and feed fold_session_summary, which reads string keys.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 71

def enqueue(file_path, entries)
  entries = deep_stringify(Array(entries))
  # An empty frame mirrors nothing (do_flush skips empty keys anyway), so
  # drop it here: otherwise its 2-byte "[]" inflates @pending_bytes and, in
  # eager mode (thresholds 0), schedules a no-op background drain per frame.
  return if entries.empty?

  # Approximate wire size — one stringify per frame (not per entry).
  size = JSON.generate(entries).bytesize
  @pending << { file_path: file_path, entries: entries }
  @pending_entries += entries.length
  @pending_bytes += size
  return unless @pending_entries > @max_pending_entries || @pending_bytes > @max_pending_bytes

  task = Async::Task.current?
  # Fire-and-forget on the reactor; @lock in #drain serializes against any
  # in-flight flush so append ordering holds. #drain never raises.
  task ? task.async { drain } : drain
end

#flushObject

Flush all pending entries, serialized after any in-flight eager flush.



92
93
94
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 92

def flush
  drain
end