Module: Daytona::FileTransfer

Defined in:
lib/daytona/file_transfer.rb

Overview

rubocop:disable Metrics/ModuleLength

Class Method Summary collapse

Class Method Details

.assert_download_length!(parser, remote_path) ⇒ Object

Raises:



217
218
219
220
221
222
223
# File 'lib/daytona/file_transfer.rb', line 217

def self.assert_download_length!(parser, remote_path)
  return unless parser.part_total_bytes && parser.part_bytes_emitted != parser.part_total_bytes

  raise Sdk::Error,
        "Multipart response length mismatch for #{remote_path}: " \
        "got #{parser.part_bytes_emitted} bytes, expected #{parser.part_total_bytes}"
end

.assign_download_boundary(parser, content_type) ⇒ Object

Raises:



150
151
152
153
154
155
# File 'lib/daytona/file_transfer.rb', line 150

def self.assign_download_boundary(parser, content_type)
  boundary = extract_multipart_boundary(content_type)
  raise Sdk::Error, 'Missing multipart boundary in download response' unless boundary

  parser.boundary_token = boundary
end

.drain_source_to(source, sink, cancel_event, remote_path) ⇒ Object

rubocop:enable Metrics/AbcSize, Metrics/MethodLength



291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/daytona/file_transfer.rb', line 291

def self.drain_source_to(source, sink, cancel_event, remote_path)
  io, owns_io = open_drain_source(source)
  begin
    while (chunk = io.read(64 * 1024))
      break if chunk.empty?
      raise Sdk::Error, "Upload cancelled: #{remote_path}" if cancel_event&.set?

      sink.write(chunk)
    end
  ensure
    io.close if owns_io && io.respond_to?(:close)
  end
end

.extract_multipart_boundary(content_type) ⇒ Object



143
144
145
146
147
148
# File 'lib/daytona/file_transfer.rb', line 143

def self.extract_multipart_boundary(content_type)
  match = content_type&.match(/boundary=(?:"([^"]+)"|([^;]+))/i)
  return unless match

  match.captures.compact.first
end

.open_drain_source(source) ⇒ Object

Raises:



376
377
378
379
380
381
# File 'lib/daytona/file_transfer.rb', line 376

def self.open_drain_source(source)
  return [source, false] if source.respond_to?(:read)
  return [StringIO.new(source.b), true] if source.is_a?(String)

  raise Sdk::Error, "Unsupported upload source: #{source.class}"
end

.raise_upload_error(response, _cancel_event, remote_path) ⇒ Object

Raises:



348
349
350
351
352
# File 'lib/daytona/file_transfer.rb', line 348

def self.raise_upload_error(response, _cancel_event, remote_path)
  raise Sdk::Error, "Upload timed out: #{remote_path}" if response.timed_out?
  raise Sdk::Error, "Upload cancelled: #{remote_path}" if response.return_code == :aborted_by_callback
  raise Sdk::Error, "HTTP #{response.code}: #{response.body}" unless response.success?
end

.recorded_upload_bytes(body, remote_path) ⇒ Object



366
367
368
369
370
371
372
373
374
# File 'lib/daytona/file_transfer.rb', line 366

def self.recorded_upload_bytes(body, remote_path)
  parsed = JSON.parse(body) rescue nil # rubocop:disable Style/RescueModifier
  return nil unless parsed.is_a?(Hash)

  files = Array(parsed['files'])
  match = files.find { |f| f.is_a?(Hash) && f['path'] == remote_path }
  bytes = match&.dig('bytes')
  bytes.is_a?(Integer) ? bytes : nil
end

.stream_download(api_client:, remote_path:, timeout:, on_progress: nil, cancel_event: nil, &block) ⇒ Object

rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Raises:



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/daytona/file_transfer.rb', line 158

def self.stream_download(api_client:, remote_path:, timeout:, on_progress: nil, cancel_event: nil, &block)
  config = api_client.config
  bytes_received = 0
  parser = nil
  wrapped_block = proc do |chunk|
    raise Sdk::Error, "Download cancelled: #{remote_path}" if cancel_event&.set?

    if on_progress
      bytes_received += chunk.bytesize
      on_progress.call(DownloadProgress.new(
                         bytes_received: bytes_received,
                         total_bytes: parser&.part_total_bytes
                       ))
    end
    block.call(chunk)
  end
  parser = MultipartDownloadStreamParser.new(&wrapped_block)
  response = nil

  request = Typhoeus::Request.new(
    "#{config.base_url}/files/bulk-download",
    method: :post,
    headers: api_client.default_headers.dup.merge(
      'Accept' => 'multipart/form-data',
      'Content-Type' => 'application/json'
    ),
    body: JSON.generate(paths: [remote_path]),
    timeout: timeout,
    ssl_verifypeer: config.verify_ssl,
    ssl_verifyhost: config.verify_ssl_host ? 2 : 0
  )

  request.on_headers do |stream_response|
    assign_download_boundary(parser, stream_response.headers['Content-Type'])
  end

  # Returning +:abort+ from the on_body callback tells libcurl to tear down the
  # connection immediately, which is how cancellation actually severs the
  # transfer rather than just stopping our own bookkeeping.
  request.on_body do |chunk|
    next :abort if cancel_event&.set?

    parser << chunk
  end

  request.on_complete do |completed_response|
    response = completed_response
    parser.finish! unless cancel_event&.set?
  end

  request.run

  raise Sdk::Error, "Download cancelled: #{remote_path}" if cancel_event&.set?
  raise Sdk::Error, parser.error_message if parser.error_message
  raise Sdk::Error, "HTTP #{response.code}" if response && !response.success?

  assert_download_length!(parser, remote_path)
end

.stream_upload(api_client:, remote_path:, source:, timeout:, on_progress: nil, cancel_event: nil) ⇒ Object

Uploads source to /files/bulk-upload via Typhoeus (libcurl), which streams the request body straight from disk without buffering it in memory. Local file paths are uploaded directly; in-memory IOs/bytes are first drained to a tempfile so we have a stable file handle for libcurl.

The daemon owns atomicity (writes to a sibling tempfile then renames), so a client-side abort just leaves no destination file at all.

rubocop:disable Metrics/MethodLength, Metrics/ParameterLists

Parameters:

  • api_client

    The OpenAPI-generated toolbox API client (auth/base-url only).

  • remote_path (String)

    Destination path in the sandbox.

  • source (String, IO)

    Local file path or any IO-like object responding to read(n).

  • timeout (Integer)

    Typhoeus timeout in seconds (0 disables).

  • on_progress (Proc, nil) (defaults to: nil)

    Optional callback invoked with Daytona::UploadProgress as libcurl reports real network upload progress.

  • cancel_event (#set?, nil) (defaults to: nil)

    Optional cancellation token. Checked while staging non-file sources and during the libcurl transfer itself.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/daytona/file_transfer.rb', line 243

def self.stream_upload(api_client:, remote_path:, source:, timeout:, on_progress: nil, cancel_event: nil)
  with_upload_file(source, cancel_event, remote_path) do |upload_path|
    config = api_client.config
    expected_bytes = File.size(upload_path)
    progress_callback = upload_progress_callback(on_progress, cancel_event)
    response = with_open_upload_file(upload_path) do |file|
      upload_request(
        api_client: api_client,
        config: config,
        remote_path: remote_path,
        file: file,
        timeout: timeout,
        progress_callback: progress_callback
      ).run
    end
    raise_upload_error(response, cancel_event, remote_path)
    verify_upload_response(response, remote_path, expected_bytes)
  end
end

.upload_progress_callback(on_progress, cancel_event) ⇒ Object

rubocop:enable Metrics/MethodLength, Metrics/ParameterLists



332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/daytona/file_transfer.rb', line 332

def self.upload_progress_callback(on_progress, cancel_event)
  last_bytes_sent = -1

  proc do |_clientp, _dltotal, _dlnow, _ultotal, ulnow|
    next 1 if cancel_event&.set?

    bytes_sent = ulnow.to_i
    if on_progress && bytes_sent > last_bytes_sent
      last_bytes_sent = bytes_sent
      on_progress.call(UploadProgress.new(bytes_sent: bytes_sent))
    end

    0
  end
end

.upload_request(api_client:, config:, remote_path:, file:, timeout:, progress_callback:) ⇒ Object

rubocop:disable Metrics/MethodLength, Metrics/ParameterLists



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/daytona/file_transfer.rb', line 313

def self.upload_request(api_client:, config:, remote_path:, file:, timeout:, progress_callback:)
  Typhoeus::Request.new(
    "#{config.base_url}/files/bulk-upload",
    method: :post,
    headers: api_client.default_headers.dup.tap { |h| h.delete('Content-Type') },
    body: {
      'files[0].path' => remote_path,
      'files[0].file' => file
    },
    timeout: timeout,
    ssl_verifypeer: config.verify_ssl,
    ssl_verifyhost: config.verify_ssl_host ? 2 : 0,
    noprogress: false,
    progressfunction: progress_callback,
    xferinfofunction: progress_callback
  )
end

.verify_upload_response(response, remote_path, expected_bytes) ⇒ Object

Compares the daemon’s reported bytes-written against what the SDK sent. Catches server-side miscounts (or extra-byte injection) at the upload call site instead of surfacing later as a download mismatch.

Raises:



357
358
359
360
361
362
363
364
# File 'lib/daytona/file_transfer.rb', line 357

def self.verify_upload_response(response, remote_path, expected_bytes)
  recorded = recorded_upload_bytes(response.body, remote_path)
  return if recorded.nil? || recorded == expected_bytes

  raise Sdk::Error,
        "Upload size mismatch for #{remote_path}: sent #{expected_bytes} bytes, " \
        "daemon recorded #{recorded}"
end

.with_open_upload_file(upload_path) ⇒ Object



305
306
307
308
309
310
# File 'lib/daytona/file_transfer.rb', line 305

def self.with_open_upload_file(upload_path)
  file = File.open(upload_path, 'rb')
  yield(file)
ensure
  file.close if file && !file.closed?
end

.with_upload_file(source, cancel_event, remote_path) ⇒ Object

Yields a path on disk that holds the source’s bytes, ready for libcurl to stream. Local files are passed through unchanged; everything else is drained into a tempfile that gets unlinked when we return. rubocop:disable Metrics/AbcSize, Metrics/MethodLength

Raises:



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/daytona/file_transfer.rb', line 268

def self.with_upload_file(source, cancel_event, remote_path)
  raise Sdk::Error, "Upload cancelled: #{remote_path}" if cancel_event&.set?

  return yield(source) if source.is_a?(String) && File.exist?(source)

  tmp = Tempfile.new(['daytona-upload-', File.extname(remote_path).to_s])
  tmp.binmode
  begin
    drain_source_to(source, tmp, cancel_event, remote_path)
    tmp.flush
    tmp.close
    yield(tmp.path)
  ensure
    tmp.close unless tmp.closed?
    begin
      tmp.unlink
    rescue StandardError
      # tempfile already gone, nothing to do
    end
  end
end