Class: Exwiw::Adapter::PostgresqlAdapter::StreamingResult
- Inherits:
-
Object
- Object
- Exwiw::Adapter::PostgresqlAdapter::StreamingResult
- Includes:
- Enumerable
- Defined in:
- lib/exwiw/adapter/postgresql_adapter.rb
Overview
A lazy, streaming stand-in for the materialized rows #execute used to
return (connection.exec(sql).values). It pulls rows off the wire one
at a time via libpq's single-row mode instead of buffering the whole
result set, so the dump's dominant memory cost — a Ruby array as large
as the table — never materializes. The Runner drives it exactly like the
old Array: #size to skip empty tables and log the count, then a single
streaming pass (SqlBulkInsert#write_inserts -> each_slice) to write the
INSERT.
Mirrors MongodbAdapter::StreamingResult; two SQL-specific differences:
- #size cannot be answered cheaply from the cursor, so it runs a
separate `SELECT COUNT(*)` of the same query. (MongoDB uses
count_documents, an index-only walk; the SQL COUNT re-runs the query
plan but transfers no row data — Postgres prunes the unused
projection of the wrapped subquery.) This keeps the Runner contract
unchanged, so MongoDB and the other SQL adapters are untouched.
- the streaming pass ties up the connection until fully drained. The
Runner always drains it (write_inserts) before issuing any further
query (post_insert_sql / DELETE) on the same connection, so the
ordering invariant holds.
Instance Method Summary collapse
-
#each ⇒ Object
Stream the result set row by row.
-
#initialize(connection:, data_sql:, count_sql:) ⇒ StreamingResult
constructor
A new instance of StreamingResult.
- #size ⇒ Object (also: #length)
Constructor Details
#initialize(connection:, data_sql:, count_sql:) ⇒ StreamingResult
Returns a new instance of StreamingResult.
31 32 33 34 35 |
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 31 def initialize(connection:, data_sql:, count_sql:) @connection = connection @data_sql = data_sql @count_sql = count_sql end |
Instance Method Details
#each ⇒ Object
Stream the result set row by row. Each row is an Array of String|nil
in libpq's text format — byte-identical to what #exec(sql).values
produced, so the generated INSERT is unchanged.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 45 def each return enum_for(:each) { size } unless block_given? @connection.send_query(@data_sql) @connection.set_single_row_mode begin while (result = @connection.get_result) begin result.check result.each_row { |row| yield row } ensure result.clear end end rescue StandardError # If iteration is abandoned mid-stream (a SQL error surfaced by # #check, or the consumer raised), drain any results still queued so # a later query on this same connection does not fail with "another # command is already in progress". drain raise end self end |
#size ⇒ Object Also known as: length
37 38 39 |
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 37 def size @size ||= @connection.exec(@count_sql).getvalue(0, 0).to_i end |