Class: Journaled::Outbox::BatchProcessor
- Inherits:
-
Object
- Object
- Journaled::Outbox::BatchProcessor
- Defined in:
- lib/journaled/outbox/batch_processor.rb
Overview
Processes batches of outbox events
This class handles the core business logic of:
-
Fetching events from the database (with FOR UPDATE)
-
Sending them to Kinesis one at a time to guarantee ordering
-
Handling successful deliveries (deleting events)
-
Handling permanent failures (marking with failed_at)
-
Handling ephemeral failures (stopping processing and committing)
Events are processed one at a time to guarantee ordering. If an event fails with an ephemeral error, processing stops and the transaction commits (deleting successes and marking permanent failures), then the loop re-enters.
All operations happen within a single database transaction for consistency. The Worker class delegates to this for actual event processing.
Instance Method Summary collapse
-
#initialize ⇒ BatchProcessor
constructor
A new instance of BatchProcessor.
-
#process_batch ⇒ Hash
Process a single batch of events.
Constructor Details
#initialize ⇒ BatchProcessor
Returns a new instance of BatchProcessor.
21 22 23 |
# File 'lib/journaled/outbox/batch_processor.rb', line 21 def initialize @batch_sender = KinesisBatchSender.new end |
Instance Method Details
#process_batch ⇒ Hash
Process a single batch of events
Wraps the entire batch processing in a single transaction:
-
SELECT FOR UPDATE (claim events)
-
Send to Kinesis (batch sender handles one-at-a-time and short-circuiting)
-
Delete successful events
-
Mark failed events (batch sender only returns permanent failures)
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/journaled/outbox/batch_processor.rb', line 34 def process_batch ActiveRecord::Base.transaction do events = Event.fetch_batch_for_update Rails.logger.info("[journaled] Processing batch of #{events.count} events") result = batch_sender.send_batch(events) # Delete successful events Event.where(id: result[:succeeded].map(&:id)).delete_all if result[:succeeded].any? # Mark failed events mark_events_as_failed(result[:failed]) if result[:failed].any? Rails.logger.info( "[journaled] Batch complete: #{result[:succeeded].count} succeeded, " \ "#{result[:failed].count} marked as failed (batch size: #{events.count})", ) { succeeded: result[:succeeded].count, failed_permanently: result[:failed].count, } end end |