Class: AthenaUtils::AthenaClient

Inherits:
Object
  • Object
show all
Defined in:
lib/athena_utils/athena_client.rb

Constant Summary collapse

DEFAULT_WAIT_TIME =

seconds

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database, work_group, wait_time = DEFAULT_WAIT_TIME) ⇒ AthenaClient

Returns a new instance of AthenaClient.



18
19
20
21
22
# File 'lib/athena_utils/athena_client.rb', line 18

def initialize(database, work_group, wait_time = DEFAULT_WAIT_TIME)
  @database = database
  @work_group = work_group
  @wait_time = wait_time
end

Instance Attribute Details

#aws_athena_clientObject



24
25
26
# File 'lib/athena_utils/athena_client.rb', line 24

def aws_athena_client
  @aws_athena_client ||= create_aws_athena_client
end

#aws_s3_clientObject



32
33
34
# File 'lib/athena_utils/athena_client.rb', line 32

def aws_s3_client
  @aws_s3_client ||= create_aws_s3_client
end

#databaseObject (readonly)

database is the name of the Athena DB work_group is Athena Work Group to use with queries



9
10
11
# File 'lib/athena_utils/athena_client.rb', line 9

def database
  @database
end

#wait_timeObject

wait_time is time to wait before checking query results again



13
14
15
# File 'lib/athena_utils/athena_client.rb', line 13

def wait_time
  @wait_time
end

#work_groupObject (readonly)

database is the name of the Athena DB work_group is Athena Work Group to use with queries



9
10
11
# File 'lib/athena_utils/athena_client.rb', line 9

def work_group
  @work_group
end

Instance Method Details

#create_aws_athena_clientObject



28
29
30
# File 'lib/athena_utils/athena_client.rb', line 28

def create_aws_athena_client
  Aws::Athena::Client.new
end

#create_aws_s3_clientObject



36
37
38
# File 'lib/athena_utils/athena_client.rb', line 36

def create_aws_s3_client
  Aws::S3::Client.new
end

#query(query) ⇒ Object



40
41
42
43
# File 'lib/athena_utils/athena_client.rb', line 40

def query(query)
  query_execution_id = query_async(query)
  wait([query_execution_id])[query_execution_id]
end

#query_async(query) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/athena_utils/athena_client.rb', line 45

def query_async(query)
  response = aws_athena_client.start_query_execution(
    query_string: query,
    query_execution_context: {
      database: database
    },
    work_group: work_group
  )

  response.query_execution_id
end

#wait(query_execution_ids) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/athena_utils/athena_client.rb', line 57

def wait(query_execution_ids)
  results = {}

  while results.size != query_execution_ids.size
    query_execution_ids.each do |query_execution_id|
      next if results.key?(query_execution_id)

      query_status = aws_athena_client.get_query_execution(
        query_execution_id: query_execution_id
      )

      case query_status[:query_execution][:status][:state]
      when 'SUCCEEDED'
        results[query_execution_id] = AthenaQueryResults.new(query_status, aws_s3_client)
      when 'RUNNING',
           'QUEUED'
      # no-op
        next
      else
        raise(AthenaQueryError.new("Query failed #{query_status}"))
      end
    end

    sleep(wait_time) if results.size != query_execution_ids.size
  end

  results
end