Class: Exwiw::Adapter::PostgresqlAdapter::StreamingResult

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/exwiw/adapter/postgresql_adapter.rb

Overview

A lazy, streaming stand-in for the materialized rows #execute used to return (‘connection.exec(sql).values`). It pulls rows off the wire one at a time via libpq’s single-row mode instead of buffering the whole result set, so the dump’s dominant memory cost — a Ruby array as large as the table — never materializes. The Runner drives it exactly like the old Array: #size to skip empty tables and log the count, then a single streaming pass (SqlBulkInsert#write_inserts -> each_slice) to write the INSERT.

Mirrors MongodbAdapter::StreamingResult; two SQL-specific differences:

- #size cannot be answered cheaply from the cursor, so it runs a
  separate `SELECT COUNT(*)` of the same query. (MongoDB uses
  count_documents, an index-only walk; the SQL COUNT re-runs the query
  plan but transfers no row data — Postgres prunes the unused
  projection of the wrapped subquery.) This keeps the Runner contract
  unchanged, so MongoDB and the other SQL adapters are untouched.
- the streaming pass ties up the connection until fully drained. The
  Runner always drains it (write_inserts) before issuing any further
  query (post_insert_sql / DELETE) on the same connection, so the
  ordering invariant holds.

Instance Method Summary collapse

Constructor Details

#initialize(connection:, data_sql:, count_sql:) ⇒ StreamingResult

Returns a new instance of StreamingResult.



31
32
33
34
35
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 31

def initialize(connection:, data_sql:, count_sql:)
  @connection = connection
  @data_sql = data_sql
  @count_sql = count_sql
end

Instance Method Details

#eachObject

Stream the result set row by row. Each row is an Array of String|nil in libpq’s text format — byte-identical to what ‘#exec(sql).values` produced, so the generated INSERT is unchanged.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 45

def each
  return enum_for(:each) { size } unless block_given?

  @connection.send_query(@data_sql)
  @connection.set_single_row_mode
  begin
    while (result = @connection.get_result)
      begin
        result.check
        result.each_row { |row| yield row }
      ensure
        result.clear
      end
    end
  rescue StandardError
    # If iteration is abandoned mid-stream (a SQL error surfaced by
    # #check, or the consumer raised), drain any results still queued so
    # a later query on this same connection does not fail with "another
    # command is already in progress".
    drain
    raise
  end
  self
end

#sizeObject Also known as: length



37
38
39
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 37

def size
  @size ||= @connection.exec(@count_sql).getvalue(0, 0).to_i
end