Class: Aspera::Api::Httpgw

Inherits:
Rest
  • Object
show all
Defined in:
lib/aspera/api/httpgw.rb

Overview

Start a transfer using Aspera HTTP Gateway, using web socket secure for uploads ref: api.ibm.com/explorer/catalog/aspera/product/ibm-aspera/api/http-gateway-api/doc/guides-toc developer.ibm.com/apis/catalog?search=%22aspera%20http%22 HTTP GW Upload protocol:

#     type                Contents            Ack                 Counter

v1

0     JSON.transfer_spec  Transfer Spec       "end upload"        sent_general
1..   JSON.slice_upload   File base64 chunks  "end upload"        sent_general

v2

0     JSON.transfer_spec  Transfer Spec       "end upload"        sent_general
1     JSON.slice_upload   File start          "end_slice_upload"  sent_v2_delimiter
2..   Binary              File binary chunks  "end upload"        sent_general
last  JSON.slice_upload   File end            "end_slice_upload"  sent_v2_delimiter

Constant Summary collapse

DEFAULT_BASE_PATH =
'/aspera/http-gwy'
INFO_ENDPOINT =
'info'
MSG_SEND_TRANSFER_SPEC =
'transfer_spec'
MSG_SEND_SLICE_UPLOAD =
'slice_upload'
API_V1 =

upload API versions

'v1'
API_V2 =
'v2'
THR_RECV =
'recv'
LOG_WS_SEND =
'ws: send: '.red
LOG_WS_RECV =
"ws: #{THR_RECV}: ".green

Constants inherited from Rest

Rest::ENTITY_NOT_FOUND, Rest::JSON_DECODE

Instance Attribute Summary

Attributes inherited from Rest

#auth_params, #base_url

Instance Method Summary collapse

Methods inherited from Rest

array_params, array_params?, basic_token, build_uri, #call, #cancel, #create, decode_query, #delete, io_http_session, #lookup_by_name, #oauth, #params, parse_header, #read, remote_certificate_chain, set_parameters, start_http_session, #update, user_agent

Constructor Details

#initialize(url:, api_version: API_V2, upload_chunk_size: 64_000, synchronous: false, notify_cb: nil, **opts) ⇒ Httpgw

Returns a new instance of Httpgw.

Parameters:

  • url (String)

    URL of the HTTP Gateway, without version



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/aspera/api/httpgw.rb', line 278

def initialize(
  url:,
  api_version:       API_V2,
  upload_chunk_size: 64_000,
  synchronous:       false,
  notify_cb:         nil,
  **opts
)
  Log.log.debug{Log.dump(:gw_url, url)}
  # add scheme if missing
  url = "https://#{url}" unless url.match?(%r{^[a-z]{1,6}://})
  raise 'GW URL shall be with scheme https' unless url.start_with?('https://')
  # remove trailing slash and version (o=only once) if present
  # TODO: issue warning ?
  url = url.gsub(%r{/+$}, '').gsub(%r{/#{API_V1}$}o, '')
  # assume GW is always under specific path (TODO: remove this ?)
  url = File.join(url, DEFAULT_BASE_PATH) unless url.end_with?(DEFAULT_BASE_PATH)
  @gw_root_url = url
  super(base_url: "#{@gw_root_url}/#{API_V1}", **opts)
  @upload_version = api_version
  @upload_chunk_size = upload_chunk_size
  @synchronous = synchronous
  @notify_cb = notify_cb
  # get API info
  @api_info = read('info')[:data].freeze
  Log.log.debug{Log.dump(:api_info, @api_info)}
  # web socket endpoint: by default use v2 (newer gateways), without base64 encoding
  # is the latest supported? else revert to old api
  if !@upload_version.eql?(API_V1)
    if !@api_info['endpoints'].any?{|i|i.include?(@upload_version)}
      Log.log.warn{"API version #{@upload_version} not supported, reverting to #{API_V1}"}
      @upload_version = API_V1
    end
  end
end

Instance Method Details

#download(transfer_spec) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/aspera/api/httpgw.rb', line 246

def download(transfer_spec)
  transfer_spec['zip_required'] ||= false
  transfer_spec['source_root'] ||= '/'
  # is normally provided by application, like package name
  if !transfer_spec.key?('download_name')
    # by default it is the name of first file
    download_name = File.basename(transfer_spec['paths'].first['source'], '.*')
    # ands add indication of number of files if there is more than one
    if transfer_spec['paths'].length > 1
      download_name += " #{transfer_spec['paths'].length} Files"
    end
    transfer_spec['download_name'] = download_name
  end
  creation = create('download', {'transfer_spec' => transfer_spec})[:data]
  transfer_uuid = creation['url'].split('/').last
  file_name =
    if transfer_spec['zip_required'] || transfer_spec['paths'].length > 1
      # it is a zip file if zip is required or there is more than 1 file
      transfer_spec['download_name'] + '.zip'
    else
      # it is a plain file if we don't require zip and there is only one file
      File.basename(transfer_spec['paths'].first['source'])
    end
  file_path = File.join(transfer_spec['destination_root'], file_name)
  call(operation: 'GET', subpath: "download/#{transfer_uuid}", save_to_file: file_path)
end

#infoObject



273
274
275
# File 'lib/aspera/api/httpgw.rb', line 273

def info
  return @api_info
end

#process_read_threadObject

main function of read thread



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/aspera/api/httpgw.rb', line 109

def process_read_thread
  Log.log.debug{"#{LOG_WS_RECV}read thread started"}
  frame_parser = ::WebSocket::Frame::Incoming::Client.new(version: @ws_handshake.version)
  until @ws_io.eof?
    begin # rubocop:disable Style/RedundantBegin
      # ready byte by byte until frame is ready
      # blocking read
      byte = @ws_io.read(1)
      Log.log.trace2{"#{LOG_WS_RECV}read: #{byte} (#{byte.class}) eof=#{@ws_io.eof?}"}
      frame_parser << byte
      frame_ok = frame_parser.next
      next if frame_ok.nil?
      process_received_message(frame_ok.data.to_s)
      Log.log.trace2{"#{LOG_WS_RECV}counts: #{@shared_info[:count]}"}
    rescue => e
      Log.log.debug{"#{LOG_WS_RECV}Exception: #{e}"}
      @shared_info[:mutex].synchronize do
        @shared_info[:read_exception] = e
        @shared_info[:cond_var].signal
      end
      break
    end
  end
  Log.log.debug do
    "#{LOG_WS_RECV}exception: #{@shared_info[:read_exception]},cls=#{@shared_info[:read_exception].class})"
  end unless @shared_info[:read_exception].nil?
  Log.log.debug{"#{LOG_WS_RECV}read thread stopped (ws eof=#{@ws_io.eof?})"}
end

#process_received_message(message) ⇒ Object

message processing for read thread



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/aspera/api/httpgw.rb', line 81

def process_received_message(message)
  Log.log.debug{"#{LOG_WS_RECV}message: [#{message}] (#{message.class})"}
  if message.eql?(MSG_RECV_DATA_RECEIVED_SIGNAL)
    @shared_info[:mutex].synchronize do
      @shared_info[:count][:received_general] += 1
      @shared_info[:cond_var].signal
    end
  elsif message.eql?(MSG_RECV_SLICE_UPLOAD_SIGNAL)
    @shared_info[:mutex].synchronize do
      @shared_info[:count][:received_v2_delimiter] += 1
      @shared_info[:cond_var].signal
    end
  else
    message.chomp!
    error_message =
      if message.start_with?('"') && message.end_with?('"')
        # remove double quotes : 1..-2
        JSON.parse(Base64.strict_decode64(message.chomp[1..-2]))['message']
      elsif message.start_with?('{') && message.end_with?('}')
        JSON.parse(message)['message']
      else
        "unknown message from gateway: [#{message}]"
      end
    raise error_message
  end
end

#upload(transfer_spec) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/aspera/api/httpgw.rb', line 138

def upload(transfer_spec)
  # identify this session uniquely
  session_id = SecureRandom.uuid
  @notify_cb&.call(session_id: nil, type: :pre_start, info: 'starting')
  # process files to send, modify `paths` in transfer_spec
  files_to_send = process_upload_list(transfer_spec)
  # total size of all files is last element
  total_bytes_to_transfer = files_to_send.pop
  Log.log.trace1{Log.dump(:modified_tspec, transfer_spec)}
  Log.log.trace1{Log.dump(:files_to_send, files_to_send)}
  # TODO: check that this is available in endpoints: @api_info['endpoints']
  upload_url = File.join(@gw_root_url, @upload_version, 'upload')
  @notify_cb&.call(session_id: nil, type: :pre_start, info: 'connecting wss')
  # open web socket to end point (equivalent to Net::HTTP.start)
  http_session = Rest.start_http_session(upload_url)
  # get the underlying socket i/o
  @ws_io = Rest.io_http_session(http_session)
  @ws_handshake = ::WebSocket::Handshake::Client.new(url: upload_url, headers: {})
  @ws_io.write(@ws_handshake.to_s)
  sleep(0.1)
  @ws_handshake << @ws_io.readuntil("\r\n\r\n")
  Aspera.assert(@ws_handshake.finished?){'Error in websocket handshake'}
  Log.log.debug{"#{LOG_WS_SEND}handshake success"}
  # data shared between main thread and read thread
  @shared_info = {
    read_exception: nil, # error message if any in callback
    count:          {
      sent_general:          0,
      received_general:      0,
      sent_v2_delimiter:     0,
      received_v2_delimiter: 0
    },
    mutex:          Mutex.new,
    cond_var:       ConditionVariable.new
  }
  # start read thread after handshake
  @ws_read_thread = Thread.new {process_read_thread}
  @notify_cb&.call(session_id: session_id, type: :session_start)
  @notify_cb&.call(session_id: session_id, type: :session_size, info: total_bytes_to_transfer)
  sleep(1)
  # notify progress bar
  @notify_cb&.call(type: :session_size, session_id: session_id, info: total_bytes_to_transfer)
  # first step send transfer spec
  ws_snd_json(MSG_SEND_TRANSFER_SPEC, transfer_spec)
  # current file index
  file_index = 0
  # aggregate size sent
  session_sent_bytes = 0
  # process each file
  files_to_send.each do |file_to_send|
    last_slice = (file_to_send[:size] - 1) / @upload_chunk_size
    slice_info = {
      name:         file_to_send[:name],
      # TODO: get mime type?
      type:         'application/octet-stream',
      size:         file_to_send[:size],
      slice:        0, # current slice index
      # index of last slice (i.e number of slices - 1)
      total_slices: last_slice + 1,
      fileIndex:    file_index
    }
    file = file_to_send[:file]
    file = File.open(file) unless file.is_a?(Transfer::FauxFile)
    begin
      until file.eof?
        slice_bin_data = file.read(@upload_chunk_size)
        # interrupt main thread if read thread failed
        raise @shared_info[:read_exception] unless @shared_info[:read_exception].nil?
        begin
          if @upload_version.eql?(API_V1)
            slice_info[:data] = Base64.strict_encode64(slice_bin_data)
            ws_snd_json(MSG_SEND_SLICE_UPLOAD, slice_info)
          else
            # send once, before data, at beginning
            ws_snd_json(MSG_SEND_SLICE_UPLOAD, slice_info) if slice_info[:slice].eql?(0)
            ws_send(ws_type: :binary, data: slice_bin_data)
            Log.log.trace1{"#{LOG_WS_SEND}buffer: file: #{file_index}, slice: #{slice_info[:slice]}/#{last_slice}"}
            # send once, after data, at end
            ws_snd_json(MSG_SEND_SLICE_UPLOAD, slice_info) if slice_info[:slice].eql?(last_slice)
          end
        rescue Errno::EPIPE => e
          raise @shared_info[:read_exception] unless @shared_info[:read_exception].nil?
          raise e
        rescue Net::ReadTimeout => e
          Log.log.warn{'A timeout condition using HTTPGW may signal a permission problem on destination. Check ascp logs on httpgw.'}
          raise e
        end
        session_sent_bytes += slice_bin_data.length
        @notify_cb&.call(type: :transfer, session_id: session_id, info: session_sent_bytes)
        slice_info[:slice] += 1
      end
    ensure
      file.close
    end
    file_index += 1
  end
  # throttling may have skipped last one
  @notify_cb&.call(type: :transfer, session_id: session_id, info: session_sent_bytes)
  @notify_cb&.call(type: :end, session_id: session_id)
  ws_send(ws_type: :close, data: nil)
  Log.log.debug("Finished upload, waiting for end of #{THR_RECV} thread.")
  @ws_read_thread.join
  Log.log.debug{'Read thread joined'}
  # session no more used
  @ws_io = nil
  http_session&.finish
end

#ws_send(ws_type:, data:) ⇒ Object

send data on http gw web socket



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/aspera/api/httpgw.rb', line 57

def ws_send(ws_type:, data:)
  Log.log.trace1{"#{LOG_WS_SEND}sending: #{ws_type} (#{data&.length || 0} bytes)"}
  @shared_info[:count][:sent_general] += 1 if ws_type.eql?(:binary)
  frame_generator = ::WebSocket::Frame::Outgoing::Client.new(data: data, type: ws_type, version: @ws_handshake.version)
  @ws_io.write(frame_generator.to_s)
  if @synchronous
    @shared_info[:mutex].synchronize do
      # if read thread exited, there will be no more updates
      # we allow for 1 of difference else it stays blocked
      while @ws_read_thread.alive? &&
          @shared_info[:read_exception].nil? &&
          (((@shared_info[:count][:sent_general] - @shared_info[:count][:received_general]) > 1) ||
            ((@shared_info[:count][:received_v2_delimiter] - @shared_info[:count][:sent_v2_delimiter]) > 1))
        if !@shared_info[:cond_var].wait(@shared_info[:mutex], 2.0)
          Log.log.trace1{"#{LOG_WS_SEND}#{'timeout'.blue}: #{@shared_info[:count]}"}
        end
      end
    end
  end
  raise @shared_info[:read_exception] unless @shared_info[:read_exception].nil?
  Log.log.trace2{"#{LOG_WS_SEND}counts: #{@shared_info[:count]}"}
end

#ws_snd_json(msg_type, payload) ⇒ Object

send message on http gw web socket



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/aspera/api/httpgw.rb', line 42

def ws_snd_json(msg_type, payload)
  if msg_type.eql?(MSG_SEND_SLICE_UPLOAD) && @upload_version.eql?(API_V2)
    @shared_info[:count][:sent_v2_delimiter] += 1
  else
    @shared_info[:count][:sent_general] += 1
  end
  Log.log.trace1 do
    log_data = payload.dup
    log_data[:data] = "[data #{log_data[:data].length} bytes]" if log_data.key?(:data)
    "#{LOG_WS_SEND}json: #{msg_type}: #{JSON.generate(log_data)}"
  end
  ws_send(ws_type: :text, data: JSON.generate({msg_type => payload}))
end