Class: CanvasSync::Importers::BulkImporter
- Inherits:
- 
      Object
      
        - Object
- CanvasSync::Importers::BulkImporter
 
- Defined in:
- lib/canvas_sync/importers/bulk_importer.rb
Defined Under Namespace
Classes: NullRowBuffer, RowBuffer, UserRowBuffer
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
          The batch import size can be customized by setting the ‘BULK_IMPORTER_BATCH_SIZE’ environment variable 
- 10_000
Class Method Summary collapse
- .batch_size ⇒ Object
- 
  
    
      .condition_sql(klass, columns, report_start = nil)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    This method generates SQL that looks like: (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email). 
- 
  
    
      .import(report_file_path, mapping, klass, conflict_target, import_args: {}) {|row| ... } ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Does a bulk import of a set of models using the activerecord-import gem. 
- .perform_import(klass, columns, rows, conflict_target, import_args = {}) ⇒ Object
- .perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {}) ⇒ Object
Class Method Details
.batch_size ⇒ Object
| 152 153 154 155 | # File 'lib/canvas_sync/importers/bulk_importer.rb', line 152 def self.batch_size batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE end | 
.condition_sql(klass, columns, report_start = nil) ⇒ Object
This method generates SQL that looks like: (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email)
This prevents activerecord-import from setting the ‘updated_at` column for rows that haven’t actually changed. This allows you to query for rows that have changed by doing something like:
started_at = Time.now run_the_users_sync! changed = User.where(“updated_at >= ?”, started_at)
| 138 139 140 141 142 143 144 145 146 147 148 149 150 | # File 'lib/canvas_sync/importers/bulk_importer.rb', line 138 def self.condition_sql(klass, columns, report_start = nil) columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ") excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ") condition_sql = "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})" if klass.column_names.include?("canvas_synced_at") && report_start condition_sql += " AND #{klass.quoted_table_name}.canvas_synced_at < '#{report_start}'" elsif klass.column_names.include?("updated_at") && report_start condition_sql += " AND #{klass.quoted_table_name}.updated_at < '#{report_start}'" end condition_sql end | 
.import(report_file_path, mapping, klass, conflict_target, import_args: {}) {|row| ... } ⇒ Object
Does a bulk import of a set of models using the activerecord-import gem.
| 20 21 22 23 24 | # File 'lib/canvas_sync/importers/bulk_importer.rb', line 20 def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}, &blk) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength ClassCallbackExecutor.run_if_defined(klass, :sync_import) do perform_in_batches(report_file_path, mapping, klass, conflict_target, import_args: import_args, &blk) end end | 
.perform_import(klass, columns, rows, conflict_target, import_args = {}) ⇒ Object
| 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | # File 'lib/canvas_sync/importers/bulk_importer.rb', line 92 def self.perform_import(klass, columns, rows, conflict_target, import_args={}) return if rows.length.zero? columns = columns.dup update_conditions = { condition: condition_sql(klass, columns, import_args[:sync_start_time]), columns: columns } update_conditions[:conflict_target] = conflict_target if conflict_target.present? = { validate: false, on_duplicate_key_update: update_conditions }.merge(import_args) .delete(:on_duplicate_key_update) if .key?(:on_duplicate_key_ignore) result = nil callback_env = { batch: rows, import_result: nil, import_options: , } ClassCallbackExecutor.run_if_defined(klass, :sync_batch_import, callback_env) do result = klass.import(columns, rows, ) callback_env[:import_result] = result global_updates = { canvas_synced_at: DateTime.now, canvas_sync_batch_id: JobBatches::Batch.current_context[:sync_batch_id], } global_updates.slice!(*klass.column_names.map(&:to_sym)) if global_updates.present? && result.ids.present? klass.where(id: result.ids).update_all(global_updates) end end result end | 
.perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {}) ⇒ Object
| 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | # File 'lib/canvas_sync/importers/bulk_importer.rb', line 26 def self.perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {}) mapping = {}.with_indifferent_access raw_mapping.each do |db_col, opts| next if opts[:deprecated] && !klass.column_names.include?(db_col.to_s) mapping[db_col] = opts end csv_column_names = mapping.values.map { |value| value[:report_column].to_s } database_column_names = mapping.keys conflict_target = Array(conflict_target).map(&:to_s) conflict_target_indices = conflict_target.map{|ct| database_column_names.index(ct) } row_ids = {} batcher = CanvasSync::BatchProcessor.new(of: batch_size) do |batch| row_ids = {} perform_import(klass, database_column_names, batch, conflict_target, import_args) end row_buffer_out = ->(row) { formatted_row = mapping.map do |db_col, col_def| value = nil value = row[col_def[:report_column]] if col_def[:report_column] if col_def[:type] if col_def[:type].to_sym == :datetime # TODO: add some timezone config to the mapping. # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one value = DateTime.parse(value).utc rescue nil # rubocop:disable Style/RescueModifier end end value = col_def[:transform].call(value, row) if col_def[:transform] value end if conflict_target.present? key = conflict_target_indices.map{|ct| formatted_row[ct] } next if row_ids[key] row_ids[key] = true end batcher << formatted_row } row_buffer = nil if defined?(User) && klass == User && csv_column_names.include?('user_id') row_buffer = UserRowBuffer.new(&row_buffer_out) else row_buffer = NullRowBuffer.new(&row_buffer_out) end CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row| row = yield(row) if block_given? next if row.nil? row_buffer << row end row_buffer.flush batcher.flush end |