Class: Exwiw::Adapter::PostgresqlAdapter::StreamingResult

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/exwiw/adapter/postgresql_adapter.rb

Overview

A lazy, streaming stand-in for the materialized rows #execute used to return (connection.exec(sql).values). It pulls rows off the wire one at a time via libpq's single-row mode instead of buffering the whole result set, so the dump's dominant memory cost — a Ruby array as large as the table — never materializes. The Runner drives it exactly like the old Array: #size to skip empty tables and log the count, then a single streaming pass (SqlBulkInsert#write_inserts -> each_slice) to write the INSERT.

Mirrors MongodbAdapter::StreamingResult; two SQL-specific differences:

- #size cannot be answered cheaply from the cursor, so it runs a
separate `SELECT COUNT(*)` of the same query. (MongoDB uses
count_documents, an index-only walk; the SQL COUNT re-runs the query
plan but transfers no row data — Postgres prunes the unused
projection of the wrapped subquery.) This keeps the Runner contract
unchanged, so MongoDB and the other SQL adapters are untouched.
- the streaming pass ties up the connection until fully drained. The
Runner always drains it (write_inserts) before issuing any further
query (post_insert_sql / DELETE) on the same connection, so the
ordering invariant holds.

Instance Method Summary collapse

Constructor Details

#initialize(connection:, data_sql:, count_sql:) ⇒ StreamingResult

Returns a new instance of StreamingResult.



31
32
33
34
35
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 31

def initialize(connection:, data_sql:, count_sql:)
  @connection = connection
  @data_sql = data_sql
  @count_sql = count_sql
end

Instance Method Details

#eachObject

Stream the result set row by row. Each row is an Array of String|nil in libpq's text format — byte-identical to what #exec(sql).values produced, so the generated INSERT is unchanged.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 45

def each
  return enum_for(:each) { size } unless block_given?

  @connection.send_query(@data_sql)
  @connection.set_single_row_mode
  begin
    while (result = @connection.get_result)
      begin
        result.check
        result.each_row { |row| yield row }
      ensure
        result.clear
      end
    end
  rescue StandardError
    # If iteration is abandoned mid-stream (a SQL error surfaced by
    # #check, or the consumer raised), drain any results still queued so
    # a later query on this same connection does not fail with "another
    # command is already in progress".
    drain
    raise
  end
  self
end

#sizeObject Also known as: length



37
38
39
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 37

def size
  @size ||= @connection.exec(@count_sql).getvalue(0, 0).to_i
end