Class: Aspera::Agent::Trsdk
Constant Summary collapse
- LOCAL_SOCKET_ADDR =
'127.0.0.1'
- PORT_SEP =
':'
- AUTO_LOCAL_TCP_PORT =
port zero means select a random available high port
"#{PORT_SEP}0"
Constants inherited from Base
Instance Method Summary collapse
-
#initialize(url: AUTO_LOCAL_TCP_PORT, external: false, keep: false, **base_options) ⇒ Trsdk
constructor
A new instance of Trsdk.
- #shutdown ⇒ Object
- #start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
- #stop_daemon ⇒ Object
- #wait_for_transfers_completion ⇒ Object
Methods inherited from Base
agent_list, factory_create, #wait_for_completion
Constructor Details
#initialize(url: AUTO_LOCAL_TCP_PORT, external: false, keep: false, **base_options) ⇒ Trsdk
Returns a new instance of Trsdk.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/aspera/agent/trsdk.rb', line 24 def initialize( url: AUTO_LOCAL_TCP_PORT, external: false, keep: false, ** ) super(**) @keep = keep is_local_auto_port = url.eql?(AUTO_LOCAL_TCP_PORT) raise 'Cannot use options `keep` or `external` with port zero' if is_local_auto_port && (@keep || external) # keep PID for optional shutdown @daemon_pid = nil daemon_endpoint = url Log.log.debug{Log.dump(:daemon_endpoint, daemon_endpoint)} # retry loop begin # no address: local bind daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{daemon_endpoint}" if daemon_endpoint.match?(/^#{PORT_SEP}[0-9]+$/o) # Create stub (without credentials) @transfer_client = Transfersdk::TransferService::Stub.new(daemon_endpoint, :this_channel_is_insecure) # Initiate actual connection get_info_response = @transfer_client.get_info(Transfersdk::InstanceInfoRequest.new) Log.log.debug{"Daemon info: #{get_info_response}"} Log.log.warn{'Attached to existing daemon'} unless @daemon_pid || external || @keep at_exit{shutdown} rescue GRPC::Unavailable => e # if transferd is external: do not start it, or other error raise if external || !e..include?('failed to connect') # we already tried to start a daemon, but it failed Aspera.assert(@daemon_pid.nil?){"Daemon started with PID #{@daemon_pid}, but connection failed to #{daemon_endpoint}}"} Log.log.warn('no daemon present, starting daemon...') if external # transferd only supports local ip and port daemon_uri = URI.parse("ipv4://#{daemon_endpoint}") Aspera.assert(daemon_uri.scheme.eql?('ipv4')){"Invalid scheme daemon URI #{daemon_endpoint}"} # create a config file for daemon config = { address: daemon_uri.host, port: daemon_uri.port, fasp_runtime: { use_embedded: false, user_defined: { bin: Products::Trsdk.sdk_directory, etc: Products::Trsdk.sdk_directory } } } # config file and logs are created in same folder transferd_base_tmp = TempFileManager.instance.new_file_path_global('transferd') Log.log.debug{"transferd base tmp #{transferd_base_tmp}"} conf_file = "#{transferd_base_tmp}.conf" log_stdout = "#{transferd_base_tmp}.out" log_stderr = "#{transferd_base_tmp}.err" File.write(conf_file, config.to_json) @daemon_pid = Process.spawn(Ascp::Installation.instance.path(:transferd), '--config', conf_file, out: log_stdout, err: log_stderr) begin # wait for process to initialize, max 2 seconds Timeout.timeout(2.0) do # this returns if process dies (within 2 seconds) _, status = Process.wait2(@daemon_pid) raise "Transfer daemon exited with status #{status.exitstatus}. Check files: #{log_stdout} and #{log_stderr}" end rescue Timeout::Error nil end Log.log.debug{"Daemon started with pid #{@daemon_pid}"} Process.detach(@daemon_pid) if @keep at_exit {shutdown} # update port for next connection attempt (if auto high port was requested) daemon_endpoint = "#{LOCAL_SOCKET_ADDR}#{PORT_SEP}#{Products::Trsdk.daemon_port_from_log(log_stdout)}" if is_local_auto_port # local daemon started, try again retry end end |
Instance Method Details
#shutdown ⇒ Object
148 149 150 |
# File 'lib/aspera/agent/trsdk.rb', line 148 def shutdown stop_daemon unless @keep end |
#start_transfer(transfer_spec, token_regenerator: nil) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/aspera/agent/trsdk.rb', line 98 def start_transfer(transfer_spec, token_regenerator: nil) # create a transfer request transfer_request = Transfersdk::TransferRequest.new( transferType: Transfersdk::TransferType::FILE_REGULAR, # transfer type (file/stream) config: Transfersdk::TransferConfig.new, # transfer configuration transferSpec: transfer_spec.to_json) # transfer definition # send start transfer request to the transfer manager daemon start_transfer_response = @transfer_client.start_transfer(transfer_request) Log.log.debug{"start transfer response #{start_transfer_response}"} @transfer_id = start_transfer_response.transferId Log.log.debug{"transfer started with id #{@transfer_id}"} end |
#stop_daemon ⇒ Object
152 153 154 155 156 157 158 159 160 |
# File 'lib/aspera/agent/trsdk.rb', line 152 def stop_daemon if !@daemon_pid.nil? Log.log.debug("Stopping daemon #{@daemon_pid}") Process.kill('INT', @daemon_pid) _, status = Process.wait2(@daemon_pid) Log.log.debug("daemon stopped #{status}") @daemon_pid = nil end end |
#wait_for_transfers_completion ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/aspera/agent/trsdk.rb', line 111 def wait_for_transfers_completion # set to true when we know the total size of the transfer session_started = false bytes_expected = nil # monitor transfer status @transfer_client.monitor_transfers(Transfersdk::RegistrationRequest.new(transferId: [@transfer_id])) do |response| Log.log.debug{Log.dump(:response, response.to_h)} # Log.log.debug{"#{response.sessionInfo.preTransferBytes} #{response.transferInfo.bytesTransferred}"} case response.status when :RUNNING if !session_started notify_progress(:session_start, session_id: @transfer_id) session_started = true end if bytes_expected.nil? && !response.sessionInfo.preTransferBytes.eql?(0) bytes_expected = response.sessionInfo.preTransferBytes notify_progress(:session_size, session_id: @transfer_id, info: bytes_expected) end notify_progress(:transfer, session_id: @transfer_id, info: response.transferInfo.bytesTransferred) when :COMPLETED notify_progress(:transfer, session_id: @transfer_id, info: bytes_expected) if bytes_expected notify_progress(:end, session_id: @transfer_id) break when :FAILED, :CANCELED notify_progress(:end, session_id: @transfer_id) raise Transfer::Error, JSON.parse(response.)['Description'] when :QUEUED, :UNKNOWN_STATUS, :PAUSED, :ORPHANED notify_progress(:pre_start, session_id: nil, info: response.status.to_s.downcase) else Log.log.error{"unknown status#{response.status}"} end end # TODO: return status return [] end |