Module: Echoes::KittyGraphics

Defined in:
lib/echoes/kitty_graphics.rb,
lib/echoes/kitty_graphics_appkit.rb

Overview

Minimum-viable Kitty graphics protocol decoder. Wire format:

\e_G<comma-separated-options>;<base64-payload>\e\

Parser hands us the body (sans ‘e_G…e\` framing) split into `meta` and the still-base64 `payload`. We accumulate chunks keyed by image id (m=1 = more, m=0 = last), base64-decode the full payload, decode PNG (f=100, the default) into RGBA via AppKit, then either cache the image (a=t) or display it via `screen.put_kitty_image` (a=T or a=p).

State lives on the parser (per-pane) — not module globals —so chunks from different panes don’t collide.

Defined Under Namespace

Modules: AppKitPng

Constant Summary collapse

CHUNK_LIMIT_BYTES =
16 * 1024 * 1024
CACHE_LIMIT =

most-recent N images, LRU

16
DEFAULT_FORMAT =

PNG

'100'.freeze

Class Method Summary collapse

Class Method Details

.assemble_chunk(state, meta, payload) ⇒ Object

Add one APC chunk’s payload to the per-id buffer. Returns the full assembled payload + final-options Hash when this chunk closes the image (m=0 or no m=); returns nil while more chunks are still expected.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/echoes/kitty_graphics.rb', line 43

def assemble_chunk(state, meta, payload)
  opts = parse_options(meta)
  id   = opts['i'] || opts['I'] || ''
  more = opts['m'] == '1'

  buf = state[:chunks][id]
  if buf
    # Subsequent chunk: append payload, update saved-options
    # only with newly-seen non-id keys (Kitty's spec: only `m`
    # / `q` / `S` / size differ across chunks).
    return nil if buf[1].bytesize + payload.bytesize > CHUNK_LIMIT_BYTES
    buf[0]['m'] = opts['m'] if opts.key?('m')
    buf[0]['q'] = opts['q'] if opts.key?('q')
    buf[1] << payload
  else
    # First chunk: stash both options and payload.
    return nil if payload.bytesize > CHUNK_LIMIT_BYTES
    buf = [opts, +"".b]
    buf[1] << payload
    state[:chunks][id] = buf
  end

  return nil if more

  state[:chunks].delete(id)
  [buf[0], buf[1]]
end

.cache_image(state, id, image) ⇒ Object



272
273
274
275
276
277
# File 'lib/echoes/kitty_graphics.rb', line 272

def cache_image(state, id, image)
  return if id.empty?
  state[:cache].delete(id)         # touch (LRU)
  state[:cache][id] = image
  state[:cache].shift while state[:cache].size > CACHE_LIMIT
end

.decode_image(bytes, format, opts = {}) ⇒ Object

Decode an image payload to width:, height:.

f=100 / unset — PNG (and anything else NSBitmapImageRep
                eats: JPEG, GIF, TIFF, BMP)
f=24          — raw RGB packed, dims from s= / v=
f=32          — raw RGBA packed, dims from s= / v=


247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/echoes/kitty_graphics.rb', line 247

def decode_image(bytes, format, opts = {})
  case format.to_s
  when '100', ''
    decode_png(bytes)
  when '24'
    load_appkit
    AppKitPng.from_rgb(bytes, opts['s'].to_i, opts['v'].to_i)
  when '32'
    load_appkit
    AppKitPng.from_rgba(bytes, opts['s'].to_i, opts['v'].to_i)
  end
end

.decode_payload(b64) ⇒ Object

Tolerant base64 decode: strips whitespace (some clients line-wrap APC payloads) and returns nil on invalid input. Avoids ‘require ’base64’‘ (no longer in default gems on Ruby 3.4+) by going through `String#unpack1(’m0’)‘.

The kitty spec explicitly permits omitted padding, and ‘kitten icat –transfer-mode stream` splits its base64 at arbitrary byte boundaries — the assembled payload across all m=1 chunks can therefore end mid-quad. Pad to a multiple of 4 before strict decode so we don’t silently EBADPNG.



210
211
212
213
214
215
216
217
218
# File 'lib/echoes/kitty_graphics.rb', line 210

def decode_payload(b64)
  cleaned = b64.to_s.delete("\r\n\t ")
  return ''.b if cleaned.empty?
  pad = (4 - cleaned.bytesize % 4) % 4
  cleaned += '=' * pad if pad > 0
  cleaned.unpack1('m0')
rescue ArgumentError
  nil
end

.decode_png(bytes) ⇒ Object

PNG → width:, height:. Implemented in kitty_graphics_appkit.rb; loaded lazily so non-GUI tests (which don’t link AppKit) still pass.



263
264
265
266
# File 'lib/echoes/kitty_graphics.rb', line 263

def decode_png(bytes)
  load_appkit
  AppKitPng.decode(bytes)
end

.display_image(screen, image, opts) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/echoes/kitty_graphics.rb', line 279

def display_image(screen, image, opts)
  return unless screen.respond_to?(:put_kitty_image)
  raw_id = opts['i'].to_s
  raw_id = opts['I'].to_s if raw_id.empty?
  screen.put_kitty_image(
    rgba:             image[:rgba],
    width:            image[:width],
    height:           image[:height],
    cells_w:          (opts['c'] && !opts['c'].empty?) ? opts['c'].to_i : nil,
    cells_h:          (opts['r'] && !opts['r'].empty?) ? opts['r'].to_i : nil,
    # Sub-cell pixel offsets for fine alignment. The kitty spec
    # caps these at one cell minus one pixel; we let them
    # through as-is — the renderer clips at the cell rect, so
    # anything larger just gets cropped.
    px_x_offset:      opts['X'].to_i,
    px_y_offset:      opts['Y'].to_i,
    suppress_cursor:  opts['C'] == '1',
    image_id:         raw_id.empty? ? nil : raw_id,
  )
end

.handle_chunk(state, meta, payload, screen:, writer:) ⇒ Object

Top-level dispatcher — one call per APC frame.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/echoes/kitty_graphics.rb', line 72

def handle_chunk(state, meta, payload, screen:, writer:)
  assembled = assemble_chunk(state, meta, payload)
  return unless assembled
  opts, b64 = assembled

  action = opts['a'].to_s
  action = 'T' if action.empty?  # default action

  case action
  when 'T', 't'
    bytes = decode_payload(b64)
    return respond(writer, opts, error: 'EBADPNG') unless bytes

    # Transmission medium (`t=…`):
    #   d (default) — payload is the image bytes (already decoded above)
    #   f          — payload is an absolute file path; we read it
    #   t          — same as `f`, then delete the file (temp file)
    #   s          — shared memory; not supported
    bytes = resolve_transmission(bytes, opts)
    return respond(writer, opts, error: 'ENOENT') unless bytes

    # Compression (`o=…`):
    #   (unset) — payload is uncompressed
    #   z       — payload is raw zlib (deflate). kitten icat
    #             defaults to o=z for any non-PNG transmission,
    #             so this is the common case in the wild.
    bytes = inflate_if_needed(bytes, opts['o'])
    return respond(writer, opts, error: 'EBADDATA') unless bytes

    image = decode_image(bytes, opts['f'] || DEFAULT_FORMAT, opts)
    return respond(writer, opts, error: 'EBADPNG') unless image

    cache_image(state, opts['i'] || opts['I'] || '', image)
    if action == 'T'
      display_image(screen, image, opts)
    end
    respond(writer, opts, ok: true)

  when 'p'
    id    = opts['i'] || opts['I'] || ''
    image = state[:cache][id]
    if image
      display_image(screen, image, opts)
      respond(writer, opts, ok: true)
    else
      respond(writer, opts, error: 'ENOENT')
    end

  when 'd'
    # kitty spec: a=d deletes placements (lowercase d-selector)
    # or placements + images (uppercase selector). Default
    # selector is 'a' (all placements on the visible screen).
    #
    # Supported selectors (subset; expand as use cases arrive):
    #   d=a / d=A  — all placements; uppercase also flushes
    #                 the bitmap cache so the next a=p has to
    #                 re-decode.
    #   d=i / d=I  — placements matching i= / I=; uppercase
    #                 also flushes that one cache entry.
    #   (omitted)  — treated as d=a per spec.
    sel    = (opts['d'] || 'a').to_s
    target = opts['i'] || opts['I']
    upper  = sel == sel.upcase && !sel.empty?
    case sel.downcase
    when 'a', ''
      screen.placements.clear if screen.respond_to?(:placements)
      state[:cache].clear if upper
    when 'i', 'n'
      if target && !target.empty?
        if screen.respond_to?(:placements)
          screen.placements.reject! { |pl| pl[:image_id] == target }
        end
        state[:cache].delete(target) if upper
      end
    end
    respond(writer, opts, ok: true)

  when 'q'
    # Capability probe. kitten icat (and others) send a tiny
    # dummy frame with a=q before transmitting real images;
    # they read the OK/error reply to decide whether the
    # terminal supports the protocol. We don't decode the
    # payload — just answer whether we'd accept this
    # format / transmission / compression combo.
    if supported_format?(opts['f']) &&
       supported_transmission?(opts['t']) &&
       supported_compression?(opts['o'])
      respond(writer, opts, ok: true)
    else
      respond(writer, opts, error: 'EBADF')
    end
  end
end

.inflate_if_needed(bytes, compression) ⇒ Object

Raw-zlib inflate when the wire says ‘o=z`. Returns the original bytes when no compression was applied, or nil on a corrupt stream (so the caller can answer EBADDATA).



190
191
192
193
194
195
196
197
198
# File 'lib/echoes/kitty_graphics.rb', line 190

def inflate_if_needed(bytes, compression)
  case compression.to_s
  when ''  then bytes
  when 'z' then Zlib::Inflate.inflate(bytes)
  else          nil
  end
rescue Zlib::Error
  nil
end

.load_appkitObject



268
269
270
# File 'lib/echoes/kitty_graphics.rb', line 268

def load_appkit
  require_relative 'kitty_graphics_appkit'
end

.parse_options(meta) ⇒ Object

Parse “a=T,f=100,i=1” into “f”=>“100”, “i”=>“1”. Keys with no ‘=` map to ” so the caller can still tell they were present.



29
30
31
32
33
34
35
36
37
# File 'lib/echoes/kitty_graphics.rb', line 29

def parse_options(meta)
  out = {}
  meta.to_s.split(',').each do |pair|
    k, v = pair.split('=', 2)
    next if k.nil? || k.empty?
    out[k] = (v || '').to_s
  end
  out
end

.resolve_transmission(decoded_payload, opts) ⇒ Object

Read image bytes off the filesystem when the wire requested ‘t=f` or `t=t`. Returns nil on missing / unreadable file or any read error. For `t=t` the file is unlinked after a successful read; the kitty client uses this when sending a one-shot tempfile it owns.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/echoes/kitty_graphics.rb', line 225

def resolve_transmission(decoded_payload, opts)
  case opts['t'].to_s
  when '', 'd'
    decoded_payload
  when 'f', 't'
    path = decoded_payload.dup.force_encoding('UTF-8')
    return nil if path.empty? || !File.file?(path) || !File.readable?(path)
    data = File.binread(path)
    File.delete(path) if opts['t'] == 't'
    data
  else
    nil  # 's' / anything else — not supported
  end
rescue StandardError
  nil
end

.respond(writer, opts, ok: false, error: nil) ⇒ Object

Spec: q=0 verbose, q=1 suppress success, q=2 suppress all. We always carry ‘i=`/`I=` back so the client can correlate.



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/echoes/kitty_graphics.rb', line 302

def respond(writer, opts, ok: false, error: nil)
  return unless writer
  quiet = opts['q'].to_s
  return if quiet == '2'
  return if quiet == '1' && ok

  id_part =
    if (id = opts['i']) && !id.empty? then "i=#{id}"
    elsif (n = opts['I']) && !n.empty? then "I=#{n}"
    else ''
    end
  msg = ok ? 'OK' : error.to_s
  writer.call("\e_G#{id_part};#{msg}\e\\")
rescue StandardError
  # Writing to a closed pty etc. — never let response failure
  # break the pane.
end

.supported_compression?(o) ⇒ Boolean

Returns:

  • (Boolean)


180
181
182
183
184
185
# File 'lib/echoes/kitty_graphics.rb', line 180

def supported_compression?(o)
  case o.to_s
  when '', 'z' then true   # uncompressed, or raw zlib
  else false
  end
end

.supported_format?(f) ⇒ Boolean

Returns:

  • (Boolean)


166
167
168
169
170
171
# File 'lib/echoes/kitty_graphics.rb', line 166

def supported_format?(f)
  case f.to_s
  when '', '100', '24', '32' then true
  else false
  end
end

.supported_transmission?(t) ⇒ Boolean

Returns:

  • (Boolean)


173
174
175
176
177
178
# File 'lib/echoes/kitty_graphics.rb', line 173

def supported_transmission?(t)
  case t.to_s
  when '', 'd', 'f', 't' then true   # direct, file, tempfile
  else false                          # 's' (shared mem) etc.
  end
end