Class: ClaudeAgentSDK::TranscriptMirrorBatcher
- Inherits:
-
Object
- Object
- ClaudeAgentSDK::TranscriptMirrorBatcher
- Defined in:
- lib/claude_agent_sdk/transcript_mirror_batcher.rb
Overview
Batching layer between ‘transcript_mirror` stdout frames and a SessionStore.
The CLI subprocess emits ‘href="...">type”:“transcript_mirror”,“filePath”:…,“entries”:` frames interleaved with normal SDK messages. The Query read loop peels these off and hands them to #enqueue, which accumulates them and flushes to SessionStore#append either when a `result` message arrives (explicit #flush) or when the pending buffer exceeds size thresholds (eager background flush). This keeps adapter latency off the hot path during model streaming.
Adapter failures are retried (MIRROR_APPEND_MAX_ATTEMPTS total) with short backoff; timeouts are not retried since the in-flight call may still land. Failures never raise — the local-disk transcript is already durable, so the session continues unaffected — and are reported via the on_error callback (which surfaces them as a MirrorErrorMessage). Adapters should dedupe by entry when present, since a retried batch may overlap a prior write.
The semaphore serializes appends, but a #send that exceeds send_timeout is abandoned (its worker thread keeps running) and the next drain proceeds, so two #append calls for the SAME key can briefly overlap. SessionStore#append must be thread-safe per key (see that method’s contract).
Constant Summary collapse
- MAX_PENDING_ENTRIES =
Eager-flush thresholds (exposed for tests).
500- MAX_PENDING_BYTES =
1 MiB
1 << 20
- SEND_TIMEOUT_SECONDS =
60.0- MIRROR_APPEND_MAX_ATTEMPTS =
Bounded retry for transient adapter failures. The backoff list length is MIRROR_APPEND_MAX_ATTEMPTS - 1 (one delay between each pair of attempts).
3- MIRROR_APPEND_BACKOFF_S =
[0.2, 0.8].freeze
Instance Method Summary collapse
-
#close ⇒ Object
Final flush before teardown.
-
#enqueue(file_path, entries) ⇒ Object
Buffer a frame; schedule an eager background flush if thresholds are exceeded.
-
#flush ⇒ Object
Flush all pending entries, serialized after any in-flight eager flush.
-
#initialize(store:, projects_dir:, on_error:, send_timeout: SEND_TIMEOUT_SECONDS, max_pending_entries: MAX_PENDING_ENTRIES, max_pending_bytes: MAX_PENDING_BYTES) ⇒ TranscriptMirrorBatcher
constructor
A new instance of TranscriptMirrorBatcher.
Constructor Details
#initialize(store:, projects_dir:, on_error:, send_timeout: SEND_TIMEOUT_SECONDS, max_pending_entries: MAX_PENDING_ENTRIES, max_pending_bytes: MAX_PENDING_BYTES) ⇒ TranscriptMirrorBatcher
Returns a new instance of TranscriptMirrorBatcher.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 47 def initialize(store:, projects_dir:, on_error:, send_timeout: SEND_TIMEOUT_SECONDS, max_pending_entries: MAX_PENDING_ENTRIES, max_pending_bytes: MAX_PENDING_BYTES) @store = store @projects_dir = projects_dir @on_error = on_error @send_timeout = send_timeout @max_pending_entries = max_pending_entries @max_pending_bytes = max_pending_bytes @pending = [] @pending_entries = 0 @pending_bytes = 0 # Fiber-aware lock: the critical section blocks on SessionStore#append # (a thread hop), so a Thread::Mutex would deadlock the reactor. The # semaphore serializes drains so append ordering matches enqueue order. @lock = Async::Semaphore.new(1) end |
Instance Method Details
#close ⇒ Object
Final flush before teardown. Never raises.
97 98 99 100 101 |
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 97 def close flush rescue StandardError => e warn "Claude SDK: TranscriptMirrorBatcher close flush failed: #{e.}" end |
#enqueue(file_path, entries) ⇒ Object
Buffer a frame; schedule an eager background flush if thresholds are exceeded. Synchronous and fire-and-forget.
entries are deep-stringified because the transport parses CLI output with symbolized keys, but SessionStore entries are opaque JSON blobs that must round-trip through string keys (Postgres JSONB / Redis) and feed fold_session_summary, which reads string keys.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 71 def enqueue(file_path, entries) entries = deep_stringify(Array(entries)) # An empty frame mirrors nothing (do_flush skips empty keys anyway), so # drop it here: otherwise its 2-byte "[]" inflates @pending_bytes and, in # eager mode (thresholds 0), schedules a no-op background drain per frame. return if entries.empty? # Approximate wire size — one stringify per frame (not per entry). size = JSON.generate(entries).bytesize @pending << { file_path: file_path, entries: entries } @pending_entries += entries.length @pending_bytes += size return unless @pending_entries > @max_pending_entries || @pending_bytes > @max_pending_bytes task = Async::Task.current? # Fire-and-forget on the reactor; @lock in #drain serializes against any # in-flight flush so append ordering holds. #drain never raises. task ? task.async { drain } : drain end |
#flush ⇒ Object
Flush all pending entries, serialized after any in-flight eager flush.
92 93 94 |
# File 'lib/claude_agent_sdk/transcript_mirror_batcher.rb', line 92 def flush drain end |