Class: CanvasSync::Importers::BulkImporter

Inherits:
Object
  • Object
show all
Defined in:
lib/canvas_sync/importers/bulk_importer.rb

Defined Under Namespace

Classes: UserChunker

Constant Summary collapse

DEFAULT_BATCH_SIZE =

The batch import size can be customized by setting the ‘BULK_IMPORTER_BATCH_SIZE’ environment variable

10_000

Class Method Summary collapse

Class Method Details

.batch_sizeObject



184
185
186
187
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 184

def self.batch_size
  batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i
  batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE
end

.condition_sql(klass, columns, report_start = nil) ⇒ Object

This method generates SQL that looks like: (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email)

This prevents activerecord-import from setting the ‘updated_at` column for rows that haven’t actually changed. This allows you to query for rows that have changed by doing something like:

started_at = Time.now run_the_users_sync! changed = User.where(“updated_at >= ?”, started_at)



170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 170

def self.condition_sql(klass, columns, report_start = nil)
  columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ")
  excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ")
  condition_sql = "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})"

  if klass.column_names.include?("canvas_synced_at") && report_start
    condition_sql += " AND #{klass.quoted_table_name}.canvas_synced_at < '#{report_start}'"
  elsif klass.column_names.include?("updated_at") && report_start
    condition_sql += " AND #{klass.quoted_table_name}.updated_at < '#{report_start}'"
  end

  condition_sql
end

.import(report_file_path, *args, import_args: {}, &blk) ⇒ Object

NEW: (report_file_path, klass, mapping, import_args: {}, &blk) LEGACY: (report_file_path, columns, klass, conflict_target, import_args: {}, &blk)



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 23

def self.import(report_file_path, *args, import_args: {}, &blk) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength
  if args.length == 2
    klass = args[0]
    mapping = args[1]
  elsif args.length == 3
    klass = args[1]
    mapping = { columns: args[0], conflict_target: args[2] }
  end

  ClassCallbackExecutor.run_if_defined(klass, :sync_import) do
    perform_in_batches(report_file_path, klass, mapping, import_args: import_args, &blk)
  end
end

.perform_import(klass, columns, rows, conflict_target, import_args = {}) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 124

def self.perform_import(klass, columns, rows, conflict_target, import_args={})
  return if rows.length.zero?
  columns = columns.dup

  update_conditions = {
    condition: condition_sql(klass, columns, import_args[:sync_start_time]),
    columns: columns
  }
  update_conditions[:conflict_target] = conflict_target if conflict_target.present?

  options = { validate: false, on_duplicate_key_update: update_conditions }.merge(import_args)
  options.delete(:on_duplicate_key_update) if options.key?(:on_duplicate_key_ignore)

  result = nil
  callback_env = {
    batch: rows,
    import_result: nil,
    import_options: options,
  }
  ClassCallbackExecutor.run_if_defined(klass, :sync_batch_import, callback_env) do
    result = klass.import(columns, rows, options)
    callback_env[:import_result] = result

    global_updates = {
      canvas_synced_at: DateTime.now,
      canvas_sync_batch_id: JobBatches::Batch.current_context[:sync_batch_id],
    }
    global_updates.slice!(*klass.column_names.map(&:to_sym))
    if global_updates.present? && result.ids.present?
      klass.where(id: result.ids).update_all(global_updates)
    end
  end

  result
end

.perform_in_batches(report_file_path, klass, mapping, import_args: {}, &block) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 37

def self.perform_in_batches(report_file_path, klass, mapping, import_args: {}, &block)
  conflict_target = Array(mapping[:conflict_target]).map(&:to_s)

  col_mapping = {}.with_indifferent_access
  mapping[:columns].each do |db_col, opts|
    next if opts[:deprecated] && !klass.column_names.include?(db_col.to_s)

    col_mapping[db_col] = opts
  end

  csv_column_names = col_mapping.values.map { |value| value[:report_column].to_s }
  database_column_names = col_mapping.keys

  enumer = CSV.foreach(report_file_path, headers: true, header_converters: :symbol).lazy

  # Optionally filter rows by a passed block
  if block
    enumer = enumer.filter_map do |row|
      catch :skip do
        block.call(row)
      end
    end
  end

  # Convert rows to a hash that we can manipulate if desired
  enumer = enumer.map {|row| row.to_h.with_indifferent_access }

  # Optionally chunk by a computed value. Mainly so we can group duplicate rows and choose one
  chunker = nil
  chunker = UserChunker.new if defined?(User) && klass == User && csv_column_names.include?('user_id')
  if chunker
    enumer = enumer.chunk{|row| chunker.key(row) }.flat_map{|key, rows| chunker.choose(key, rows) }
  end

  # Pre-alter rows
  (mapping[:row_alterations] || []).each do |alterer|
    enumer = enumer.filter_map do |row|
      catch :skip do
        klass.instance_exec(row, &alterer)
        row
      end
    end
  end

  # Prepare the rows for import
  enumer = enumer.map do |row|
    col_mapping.map do |db_col, col_def|
      value = nil
      value = row[col_def[:report_column]] if col_def[:report_column]

      if col_def[:type]
        if col_def[:type].to_sym == :datetime
          # TODO: add some timezone config to the mapping.
          # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one
          value = DateTime.parse(value).utc rescue nil # rubocop:disable Style/RescueModifier
        elsif col_def[:type].to_sym == :yaml
          # CSV parse from reading the report escapes the new lines and YAML throws an error with double backslashes
          value = YAML.load(value.gsub("\\n", "\n")) rescue nil # rubocop:disable Style/RescueModifier
        end
      end

      value = col_def[:transform].call(value, row) if col_def[:transform]

      value
    end
  end

  # Reject rows within a single batch that have the same ID (use indices so we can use the post-transformed values)
  conflict_target_indices = conflict_target.map{|ct| database_column_names.index(ct) }
  row_ids = Set.new
  if conflict_target.present?
    enumer = enumer.reject do |row|
      key = conflict_target_indices.map{|ct| row[ct] }
      next true if row_ids.include?(key)

      row_ids << key
      false
    end
  end

  # Start importing
  enumer.each_slice(batch_size) do |batch|
    perform_import(klass, database_column_names, batch, conflict_target, import_args)
    row_ids.clear
  end
end