Class: Spree::ImportRow
- Inherits:
-
Object
- Object
- Spree::ImportRow
- Defined in:
- app/models/spree/import_row.rb
Constant Summary collapse
- STALLED_PROCESSING_AFTER =
How long a row may sit in ‘processing` before we consider its owning worker dead (OOM, SIGKILL, deploy without graceful drain — none of these trigger a Sidekiq retry). After this window, stalled rows no longer block the import from completing.
1.hour
Instance Method Summary collapse
- #attribute_by_schema_field(schema_field, mappings = nil) ⇒ Object
-
#bulk_process!(mappings:, schema_fields:) ⇒ Object
Bulk processing mode for large imports.
- #data_json ⇒ Object
- #process!(mappings: nil, schema_fields: nil) ⇒ Object
- #publish_import_row_completed_event ⇒ Object
- #publish_import_row_failed_event ⇒ Object
- #to_schema_hash ⇒ Object
Instance Method Details
#attribute_by_schema_field(schema_field, mappings = nil) ⇒ Object
83 84 85 86 87 88 89 90 |
# File 'app/models/spree/import_row.rb', line 83 def attribute_by_schema_field(schema_field, mappings = nil) mappings ||= import.mappings.mapped mapping = mappings.find { |m| m.schema_field == schema_field } return unless mapping&.mapped? data_json[mapping.file_column] end |
#bulk_process!(mappings:, schema_fields:) ⇒ Object
Bulk processing mode for large imports. Uses update_columns to skip callbacks, validations, and event publishing.
104 105 106 107 108 109 110 111 112 |
# File 'app/models/spree/import_row.rb', line 104 def bulk_process!(mappings:, schema_fields:) update_columns(status: 'processing', updated_at: Time.current) processor = import.row_processor_class.new(self, mappings: mappings, schema_fields: schema_fields) self.item = processor.process! update_columns(status: 'completed', item_type: item.class.name, item_id: item.id, updated_at: Time.current) rescue StandardError => e Rails.error.report(e, handled: true, context: { import_row_id: id }, source: 'spree.core') update_columns(status: 'failed', validation_errors: e., updated_at: Time.current) end |
#data_json ⇒ Object
64 65 66 67 68 |
# File 'app/models/spree/import_row.rb', line 64 def data_json @data_json ||= JSON.parse(data) rescue JSON::ParserError {} end |
#process!(mappings: nil, schema_fields: nil) ⇒ Object
92 93 94 95 96 97 98 99 100 |
# File 'app/models/spree/import_row.rb', line 92 def process!(mappings: nil, schema_fields: nil) start_processing! self.item = import.row_processor_class.new(self, mappings: mappings, schema_fields: schema_fields).process! complete! rescue StandardError => e Rails.error.report(e, handled: true, context: { import_row_id: id }, source: 'spree.core') self.validation_errors = e. fail! end |
#publish_import_row_completed_event ⇒ Object
114 115 116 |
# File 'app/models/spree/import_row.rb', line 114 def publish_import_row_completed_event publish_event('import_row.completed') end |
#publish_import_row_failed_event ⇒ Object
118 119 120 |
# File 'app/models/spree/import_row.rb', line 118 def publish_import_row_failed_event publish_event('import_row.failed') end |
#to_schema_hash ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'app/models/spree/import_row.rb', line 70 def to_schema_hash @to_schema_hash ||= begin mappings = import.mappings.mapped schema_fields = import.schema_fields attributes = {} schema_fields.each do |field| attributes[field[:name]] = attribute_by_schema_field(field[:name], mappings) end attributes end end |