Class: CanvasSync::Importers::BulkImporter

Inherits:
Object
  • Object
show all
Defined in:
lib/canvas_sync/importers/bulk_importer.rb

Defined Under Namespace

Classes: UserChunker

Constant Summary collapse

DEFAULT_BATCH_SIZE =

The batch import size can be customized by setting the ‘BULK_IMPORTER_BATCH_SIZE’ environment variable

10_000

Class Method Summary collapse

Class Method Details

.batch_sizeObject



160
161
162
163
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 160

def self.batch_size
  batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i
  batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE
end

.condition_sql(klass, columns, report_start = nil) ⇒ Object

This method generates SQL that looks like: (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email)

This prevents activerecord-import from setting the ‘updated_at` column for rows that haven’t actually changed. This allows you to query for rows that have changed by doing something like:

started_at = Time.now run_the_users_sync! changed = User.where(“updated_at >= ?”, started_at)



146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 146

def self.condition_sql(klass, columns, report_start = nil)
  columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ")
  excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ")
  condition_sql = "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})"

  if klass.column_names.include?("canvas_synced_at") && report_start
    condition_sql += " AND #{klass.quoted_table_name}.canvas_synced_at < '#{report_start}'"
  elsif klass.column_names.include?("updated_at") && report_start
    condition_sql += " AND #{klass.quoted_table_name}.updated_at < '#{report_start}'"
  end

  condition_sql
end

.import(report_file_path, mapping, klass, conflict_target, import_args: {}) {|row| ... } ⇒ Object

Does a bulk import of a set of models using the activerecord-import gem.

Parameters:

  • report_file_path (String)

    path to the report CSV

  • mapping (Hash)

    a hash of the values to import. See ‘model_mappings.yml` for a format example

  • klass (Object)

    e.g., User

  • conflict_target (Symbol)

    the csv column name that maps to the database column that will determine if we need to update or insert a given row. e.g.,: canvas_user_id

  • import_args (Hash) (defaults to: {})

    Any arguments passed here will be passed through to ActiveRecord::BulkImport. Note: passing the key [:on_duplicate_key_ignore] will override the default behavior of [:on_duplicate_key_update]

Yield Parameters:

  • row (Array)

    if a block is passed in it will yield the current row from the CSV. This can be used if you need to filter or massage the data in any way.



20
21
22
23
24
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 20

def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}, &blk) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength
  ClassCallbackExecutor.run_if_defined(klass, :sync_import) do
    perform_in_batches(report_file_path, mapping, klass, conflict_target, import_args: import_args, &blk)
  end
end

.perform_import(klass, columns, rows, conflict_target, import_args = {}) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 100

def self.perform_import(klass, columns, rows, conflict_target, import_args={})
  return if rows.length.zero?
  columns = columns.dup

  update_conditions = {
    condition: condition_sql(klass, columns, import_args[:sync_start_time]),
    columns: columns
  }
  update_conditions[:conflict_target] = conflict_target if conflict_target.present?

  options = { validate: false, on_duplicate_key_update: update_conditions }.merge(import_args)
  options.delete(:on_duplicate_key_update) if options.key?(:on_duplicate_key_ignore)

  result = nil
  callback_env = {
    batch: rows,
    import_result: nil,
    import_options: options,
  }
  ClassCallbackExecutor.run_if_defined(klass, :sync_batch_import, callback_env) do
    result = klass.import(columns, rows, options)
    callback_env[:import_result] = result

    global_updates = {
      canvas_synced_at: DateTime.now,
      canvas_sync_batch_id: JobBatches::Batch.current_context[:sync_batch_id],
    }
    global_updates.slice!(*klass.column_names.map(&:to_sym))
    if global_updates.present? && result.ids.present?
      klass.where(id: result.ids).update_all(global_updates)
    end
  end

  result
end

.perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {}, &block) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 26

def self.perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {}, &block)
  mapping = {}.with_indifferent_access
  raw_mapping.each do |db_col, opts|
    next if opts[:deprecated] && !klass.column_names.include?(db_col.to_s)

    mapping[db_col] = opts
  end

  csv_column_names = mapping.values.map { |value| value[:report_column].to_s }
  database_column_names = mapping.keys

  conflict_target = Array(conflict_target).map(&:to_s)
  conflict_target_indices = conflict_target.map{|ct| database_column_names.index(ct) }

  enumer = CSV.foreach(report_file_path, headers: true, header_converters: :symbol).lazy

  # Optionally filter rows by a passed block
  if block
    enumer = enumer.filter_map do |row|
      catch :skip do
        block.call(row)
      end
    end
  end

  # Optionally chunk by a computed value. Mainly so we can group duplicate rows and choose one
  chunker = nil
  chunker = UserChunker.new if defined?(User) && klass == User && csv_column_names.include?('user_id')
  if chunker
    enumer = enumer.chunk{|row| chunker.key(row) }.flat_map{|key, rows| chunker.choose(key, rows) }
  end

  # Prepare the rows for import
  enumer = enumer.map do |row|
    mapping.map do |db_col, col_def|
      value = nil
      value = row[col_def[:report_column]] if col_def[:report_column]

      if col_def[:type]
        if col_def[:type].to_sym == :datetime
          # TODO: add some timezone config to the mapping.
          # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one
          value = DateTime.parse(value).utc rescue nil # rubocop:disable Style/RescueModifier
        elsif col_def[:type].to_sym == :yaml
          # CSV parse from reading the report escapes the new lines and YAML throws an error with double backslashes
          value = YAML.load(value.gsub("\\n", "\n")) rescue nil # rubocop:disable Style/RescueModifier
        end
      end

      value = col_def[:transform].call(value, row) if col_def[:transform]

      value
    end
  end

  # Reject rows within a single batch that have the same ID
  row_ids = nil
  if conflict_target.present?
    enumer = enumer.reject do |row|
      key = conflict_target_indices.map{|ct| row[ct] }
      skip = row_ids[key]
      row_ids[key] = true
      skip
    end
  end

  # Start importing
  row_ids = {}
  enumer.each_slice(batch_size) do |batch|
    perform_import(klass, database_column_names, batch, conflict_target, import_args)
    row_ids = {}
  end
end