Class: Exwiw::Adapter::PostgresqlAdapter::StreamingResult
- Inherits:
-
Object
- Object
- Exwiw::Adapter::PostgresqlAdapter::StreamingResult
- Includes:
- Enumerable
- Defined in:
- lib/exwiw/adapter/postgresql_adapter.rb
Overview
A lazy, streaming stand-in for the materialized rows #execute used to return (‘connection.exec(sql).values`). It pulls rows off the wire one at a time via libpq’s single-row mode instead of buffering the whole result set, so the dump’s dominant memory cost — a Ruby array as large as the table — never materializes. The Runner drives it exactly like the old Array: #size to skip empty tables and log the count, then a single streaming pass (SqlBulkInsert#write_inserts -> each_slice) to write the INSERT.
Mirrors MongodbAdapter::StreamingResult; two SQL-specific differences:
- #size cannot be answered cheaply from the cursor, so it runs a
separate `SELECT COUNT(*)` of the same query. (MongoDB uses
count_documents, an index-only walk; the SQL COUNT re-runs the query
plan but transfers no row data — Postgres prunes the unused
projection of the wrapped subquery.) This keeps the Runner contract
unchanged, so MongoDB and the other SQL adapters are untouched.
- the streaming pass ties up the connection until fully drained. The
Runner always drains it (write_inserts) before issuing any further
query (post_insert_sql / DELETE) on the same connection, so the
ordering invariant holds.
Instance Method Summary collapse
-
#each ⇒ Object
Stream the result set row by row.
-
#initialize(connection:, data_sql:, count_sql:) ⇒ StreamingResult
constructor
A new instance of StreamingResult.
- #size ⇒ Object (also: #length)
Constructor Details
#initialize(connection:, data_sql:, count_sql:) ⇒ StreamingResult
Returns a new instance of StreamingResult.
31 32 33 34 35 |
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 31 def initialize(connection:, data_sql:, count_sql:) @connection = connection @data_sql = data_sql @count_sql = count_sql end |
Instance Method Details
#each ⇒ Object
Stream the result set row by row. Each row is an Array of String|nil in libpq’s text format — byte-identical to what ‘#exec(sql).values` produced, so the generated INSERT is unchanged.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 45 def each return enum_for(:each) { size } unless block_given? @connection.send_query(@data_sql) @connection.set_single_row_mode begin while (result = @connection.get_result) begin result.check result.each_row { |row| yield row } ensure result.clear end end rescue StandardError # If iteration is abandoned mid-stream (a SQL error surfaced by # #check, or the consumer raised), drain any results still queued so # a later query on this same connection does not fail with "another # command is already in progress". drain raise end self end |
#size ⇒ Object Also known as: length
37 38 39 |
# File 'lib/exwiw/adapter/postgresql_adapter.rb', line 37 def size @size ||= @connection.exec(@count_sql).getvalue(0, 0).to_i end |