Class: Journaled::Outbox::BatchProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/journaled/outbox/batch_processor.rb

Overview

Processes batches of outbox events

This class handles the core business logic of:

  • Fetching events from the database (with FOR UPDATE)

  • Sending them to Kinesis one at a time to guarantee ordering

  • Handling successful deliveries (deleting events)

  • Handling permanent failures (marking with failed_at)

  • Handling ephemeral failures (stopping processing and committing)

Events are processed one at a time to guarantee ordering. If an event fails with an ephemeral error, processing stops and the transaction commits (deleting successes and marking permanent failures), then the loop re-enters.

All operations happen within a single database transaction for consistency. The Worker class delegates to this for actual event processing.

Instance Method Summary collapse

Constructor Details

#initializeBatchProcessor

Returns a new instance of BatchProcessor.



21
22
23
# File 'lib/journaled/outbox/batch_processor.rb', line 21

def initialize
  @batch_sender = KinesisBatchSender.new
end

Instance Method Details

#process_batchHash

Process a single batch of events

Wraps the entire batch processing in a single transaction:

  1. SELECT FOR UPDATE (claim events)

  2. Send to Kinesis (batch sender handles one-at-a-time and short-circuiting)

  3. Delete successful events

  4. Mark failed events (batch sender only returns permanent failures)

Returns:

  • (Hash)

    Statistics with :succeeded, :failed_permanently counts



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/journaled/outbox/batch_processor.rb', line 34

def process_batch
  ActiveRecord::Base.transaction do
    events = Event.fetch_batch_for_update
    Rails.logger.info("[journaled] Processing batch of #{events.count} events")

    result = batch_sender.send_batch(events)

    # Delete successful events
    Event.where(id: result[:succeeded].map(&:id)).delete_all if result[:succeeded].any?

    # Mark failed events
    mark_events_as_failed(result[:failed]) if result[:failed].any?

    Rails.logger.info(
      "[journaled] Batch complete: #{result[:succeeded].count} succeeded, " \
      "#{result[:failed].count} marked as failed (batch size: #{events.count})",
    )

    {
      succeeded: result[:succeeded].count,
      failed_permanently: result[:failed].count,
    }
  end
end