Backend

Implementing CQRS and Event Sourcing in Spring Boot

Implementing CQRS And Event Sourcing In Spring Boot

Introduction

As applications grow, a single unified data model becomes a bottleneck. Read operations often require different data shapes than write operations, and auditing every change becomes harder over time. Traditional CRUD approaches store only the current state, losing the history of how that state was reached. CQRS (Command Query Responsibility Segregation) and Event Sourcing address these challenges by separating read and write concerns while preserving a complete history of all changes. Together, they enable scalable, maintainable, and fully auditable systems. In this comprehensive guide, you will learn how CQRS and Event Sourcing work, understand when to use them, and implement both patterns cleanly in a Spring Boot application with complete working examples.

What Is CQRS?

CQRS (Command Query Responsibility Segregation) is an architectural pattern that separates the write model (commands) from the read model (queries). Instead of using the same model for both reading and writing data, the application handles these operations through independent paths with potentially different data structures and storage mechanisms.

CQRS Architecture

┌─────────────────────────────────────────────────────────────┐
│                        Client                                │
└─────────────────┬───────────────────────┬───────────────────┘
                  │                       │
          Commands│                       │Queries
                  ▼                       ▼
        ┌─────────────────┐     ┌─────────────────┐
        │  Command Side   │     │   Query Side    │
        │  (Write Model)  │     │  (Read Model)   │
        └────────┬────────┘     └────────┬────────┘
                 │                       │
                 ▼                       ▼
        ┌─────────────────┐     ┌─────────────────┐
        │  Event Store    │────▶│  Read Database  │
        │  (Write DB)     │     │  (Projections)  │
        └─────────────────┘     └─────────────────┘

Benefits of CQRS

  • Optimized reads: Query models can be denormalized and indexed specifically for read patterns
  • Independent scaling: Scale read and write sides based on their actual load
  • Simpler models: Each side focuses on its specific concern without compromise
  • Better performance: Reads don’t compete with writes for resources
  • Flexibility: Use different storage technologies for each side

What Is Event Sourcing?

Event Sourcing stores all changes to application state as a sequence of events rather than saving only the current state. Each event represents something that happened in the system—an immutable fact like OrderCreated, PaymentReceived, or ItemShipped.

Event Sourcing Concept

Traditional: Store current state
┌───────────────────────────────┐
│ Order #123                    │
│ Status: Shipped               │
│ Total: $150                   │
└───────────────────────────────┘

Event Sourcing: Store all events
┌───────────────────────────────┐
│ 1. OrderCreated { id: 123 }   │
│ 2. ItemAdded { item: "A" }    │
│ 3. ItemAdded { item: "B" }    │
│ 4. PaymentReceived { $150 }   │
│ 5. OrderShipped { carrier }   │
└───────────────────────────────┘

Benefits of Event Sourcing

  • Complete audit trail: Every change is recorded and queryable
  • Time travel: Reconstruct state at any point in time
  • Debugging: Replay events to understand how bugs occurred
  • Analytics: Analyze historical patterns and trends
  • Event replay: Rebuild read models or create new projections

How CQRS and Event Sourcing Work Together

While CQRS and Event Sourcing are separate patterns, they complement each other naturally:

  1. The command side validates intent and produces events
  2. Events are persisted to the event store
  3. Event handlers (projections) consume events and update read models
  4. The query side reads from optimized projections

Project Setup

Dependencies

// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    implementation("org.springframework.boot:spring-boot-starter-validation")
    
    // For async event handling
    implementation("org.springframework.boot:spring-boot-starter-amqp")
    
    // Or use Axon Framework for full CQRS/ES support
    // implementation("org.axonframework:axon-spring-boot-starter:4.9.0")
    
    runtimeOnly("org.postgresql:postgresql")
    
    compileOnly("org.projectlombok:lombok")
    annotationProcessor("org.projectlombok:lombok")
}

Defining Domain Events

Events are immutable records of what happened. Use Java records for clean, immutable event definitions.

// Base event interface
public sealed interface OrderEvent permits
        OrderCreatedEvent,
        OrderItemAddedEvent,
        OrderPaymentReceivedEvent,
        OrderShippedEvent,
        OrderCancelledEvent {
    
    String orderId();
    Instant occurredAt();
}

// Concrete events
public record OrderCreatedEvent(
        String orderId,
        String customerId,
        Instant occurredAt
) implements OrderEvent {}

public record OrderItemAddedEvent(
        String orderId,
        String productId,
        int quantity,
        BigDecimal price,
        Instant occurredAt
) implements OrderEvent {}

public record OrderPaymentReceivedEvent(
        String orderId,
        String paymentId,
        BigDecimal amount,
        Instant occurredAt
) implements OrderEvent {}

public record OrderShippedEvent(
        String orderId,
        String trackingNumber,
        String carrier,
        Instant occurredAt
) implements OrderEvent {}

Implementing the Event Store

The event store persists events in append-only fashion.

Event Entity

@Entity
@Table(name = "event_store")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StoredEvent {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(nullable = false)
    private String aggregateId;
    
    @Column(nullable = false)
    private String aggregateType;
    
    @Column(nullable = false)
    private String eventType;
    
    @Column(columnDefinition = "jsonb", nullable = false)
    private String payload;
    
    @Column(nullable = false)
    private Integer version;
    
    @Column(nullable = false)
    private Instant occurredAt;
    
    @Column(nullable = false)
    private Instant storedAt;
}

Event Store Repository

@Repository
public interface EventStoreRepository extends JpaRepository<StoredEvent, Long> {
    
    List<StoredEvent> findByAggregateIdOrderByVersionAsc(String aggregateId);
    
    Optional<StoredEvent> findTopByAggregateIdOrderByVersionDesc(String aggregateId);
    
    @Query("SELECT e FROM StoredEvent e WHERE e.storedAt > :since ORDER BY e.id")
    List<StoredEvent> findEventsSince(@Param("since") Instant since);
}

Event Store Service

@Service
@RequiredArgsConstructor
@Slf4j
public class EventStore {
    
    private final EventStoreRepository repository;
    private final ObjectMapper objectMapper;
    private final ApplicationEventPublisher eventPublisher;
    
    @Transactional
    public void save(String aggregateId, String aggregateType, List<Object> events, int expectedVersion) {
        // Optimistic concurrency check
        int currentVersion = getCurrentVersion(aggregateId);
        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException(
                "Expected version " + expectedVersion + " but found " + currentVersion
            );
        }
        
        int version = expectedVersion;
        Instant now = Instant.now();
        
        for (Object event : events) {
            version++;
            StoredEvent storedEvent = StoredEvent.builder()
                    .aggregateId(aggregateId)
                    .aggregateType(aggregateType)
                    .eventType(event.getClass().getSimpleName())
                    .payload(serialize(event))
                    .version(version)
                    .occurredAt(extractOccurredAt(event))
                    .storedAt(now)
                    .build();
            
            repository.save(storedEvent);
            
            // Publish for projections
            eventPublisher.publishEvent(event);
            log.debug("Stored and published event: {}", event.getClass().getSimpleName());
        }
    }
    
    public List<Object> load(String aggregateId, Class<?>... eventTypes) {
        return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
                .stream()
                .map(stored -> deserialize(stored.getPayload(), stored.getEventType()))
                .toList();
    }
    
    private int getCurrentVersion(String aggregateId) {
        return repository.findTopByAggregateIdOrderByVersionDesc(aggregateId)
                .map(StoredEvent::getVersion)
                .orElse(0);
    }
    
    private String serialize(Object event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new EventSerializationException("Failed to serialize event", e);
        }
    }
    
    private Object deserialize(String payload, String eventType) {
        try {
            Class<?> eventClass = Class.forName("com.example.events." + eventType);
            return objectMapper.readValue(payload, eventClass);
        } catch (Exception e) {
            throw new EventSerializationException("Failed to deserialize event", e);
        }
    }
}

Aggregate Implementation

Aggregates encapsulate domain logic and produce events.

public class OrderAggregate {
    
    private String orderId;
    private String customerId;
    private OrderStatus status;
    private List<OrderItem> items = new ArrayList<>();
    private BigDecimal totalAmount = BigDecimal.ZERO;
    private int version = 0;
    
    private final List<OrderEvent> uncommittedEvents = new ArrayList<>();
    
    // Factory method for creating new orders
    public static OrderAggregate create(String orderId, String customerId) {
        OrderAggregate order = new OrderAggregate();
        order.apply(new OrderCreatedEvent(orderId, customerId, Instant.now()));
        return order;
    }
    
    // Reconstitute from events
    public static OrderAggregate fromEvents(List<OrderEvent> events) {
        OrderAggregate order = new OrderAggregate();
        events.forEach(order::applyEvent);
        order.version = events.size();
        return order;
    }
    
    // Command: Add item
    public void addItem(String productId, int quantity, BigDecimal price) {
        if (status != OrderStatus.CREATED) {
            throw new InvalidOrderStateException("Cannot add items to " + status + " order");
        }
        apply(new OrderItemAddedEvent(orderId, productId, quantity, price, Instant.now()));
    }
    
    // Command: Receive payment
    public void receivePayment(String paymentId, BigDecimal amount) {
        if (status != OrderStatus.CREATED) {
            throw new InvalidOrderStateException("Cannot pay for " + status + " order");
        }
        if (amount.compareTo(totalAmount) < 0) {
            throw new InsufficientPaymentException("Payment " + amount + " less than " + totalAmount);
        }
        apply(new OrderPaymentReceivedEvent(orderId, paymentId, amount, Instant.now()));
    }
    
    // Command: Ship order
    public void ship(String trackingNumber, String carrier) {
        if (status != OrderStatus.PAID) {
            throw new InvalidOrderStateException("Cannot ship " + status + " order");
        }
        apply(new OrderShippedEvent(orderId, trackingNumber, carrier, Instant.now()));
    }
    
    private void apply(OrderEvent event) {
        applyEvent(event);
        uncommittedEvents.add(event);
    }
    
    private void applyEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> {
                this.orderId = e.orderId();
                this.customerId = e.customerId();
                this.status = OrderStatus.CREATED;
            }
            case OrderItemAddedEvent e -> {
                items.add(new OrderItem(e.productId(), e.quantity(), e.price()));
                totalAmount = totalAmount.add(e.price().multiply(BigDecimal.valueOf(e.quantity())));
            }
            case OrderPaymentReceivedEvent e -> {
                this.status = OrderStatus.PAID;
            }
            case OrderShippedEvent e -> {
                this.status = OrderStatus.SHIPPED;
            }
            default -> throw new UnknownEventException("Unknown event: " + event.getClass());
        }
    }
    
    public List<OrderEvent> getUncommittedEvents() {
        return List.copyOf(uncommittedEvents);
    }
    
    public void markCommitted() {
        uncommittedEvents.clear();
    }
    
    public int getVersion() {
        return version;
    }
}

Command Handlers

@Service
@RequiredArgsConstructor
public class OrderCommandHandler {
    
    private final EventStore eventStore;
    
    @Transactional
    public String handle(CreateOrderCommand command) {
        String orderId = UUID.randomUUID().toString();
        OrderAggregate order = OrderAggregate.create(orderId, command.customerId());
        
        eventStore.save(
            orderId,
            "Order",
            new ArrayList<>(order.getUncommittedEvents()),
            0
        );
        
        return orderId;
    }
    
    @Transactional
    public void handle(AddItemCommand command) {
        OrderAggregate order = loadAggregate(command.orderId());
        order.addItem(command.productId(), command.quantity(), command.price());
        saveAggregate(order);
    }
    
    @Transactional
    public void handle(ReceivePaymentCommand command) {
        OrderAggregate order = loadAggregate(command.orderId());
        order.receivePayment(command.paymentId(), command.amount());
        saveAggregate(order);
    }
    
    private OrderAggregate loadAggregate(String orderId) {
        List<Object> events = eventStore.load(orderId);
        if (events.isEmpty()) {
            throw new OrderNotFoundException(orderId);
        }
        return OrderAggregate.fromEvents(
            events.stream().map(e -> (OrderEvent) e).toList()
        );
    }
    
    private void saveAggregate(OrderAggregate order) {
        eventStore.save(
            order.getOrderId(),
            "Order",
            new ArrayList<>(order.getUncommittedEvents()),
            order.getVersion()
        );
        order.markCommitted();
    }
}

Read Model Projections

Projections listen to events and maintain denormalized read models.

@Component
@RequiredArgsConstructor
@Slf4j
public class OrderProjection {
    
    private final OrderReadRepository repository;
    
    @EventListener
    public void on(OrderCreatedEvent event) {
        OrderView view = OrderView.builder()
                .orderId(event.orderId())
                .customerId(event.customerId())
                .status("CREATED")
                .totalAmount(BigDecimal.ZERO)
                .itemCount(0)
                .createdAt(event.occurredAt())
                .build();
        repository.save(view);
        log.info("Projection: Order {} created", event.orderId());
    }
    
    @EventListener
    public void on(OrderItemAddedEvent event) {
        repository.findById(event.orderId()).ifPresent(view -> {
            BigDecimal itemTotal = event.price().multiply(BigDecimal.valueOf(event.quantity()));
            view.setTotalAmount(view.getTotalAmount().add(itemTotal));
            view.setItemCount(view.getItemCount() + 1);
            repository.save(view);
        });
    }
    
    @EventListener
    public void on(OrderPaymentReceivedEvent event) {
        repository.findById(event.orderId()).ifPresent(view -> {
            view.setStatus("PAID");
            view.setPaidAt(event.occurredAt());
            repository.save(view);
        });
    }
    
    @EventListener
    public void on(OrderShippedEvent event) {
        repository.findById(event.orderId()).ifPresent(view -> {
            view.setStatus("SHIPPED");
            view.setTrackingNumber(event.trackingNumber());
            view.setShippedAt(event.occurredAt());
            repository.save(view);
        });
    }
}

Query Handlers

@Service
@RequiredArgsConstructor
public class OrderQueryHandler {
    
    private final OrderReadRepository repository;
    
    public OrderView handle(GetOrderQuery query) {
        return repository.findById(query.orderId())
                .orElseThrow(() -> new OrderNotFoundException(query.orderId()));
    }
    
    public Page<OrderView> handle(GetCustomerOrdersQuery query) {
        return repository.findByCustomerId(
            query.customerId(),
            PageRequest.of(query.page(), query.size(), Sort.by("createdAt").descending())
        );
    }
    
    public List<OrderView> handle(GetPendingOrdersQuery query) {
        return repository.findByStatus("CREATED");
    }
}

Common Mistakes to Avoid

Making Events Mutable

Events represent facts that happened. They must be immutable. Use Java records or mark all fields final.

Ignoring Eventual Consistency

Read models are eventually consistent with the event store. Design UIs to handle this delay gracefully.

Storing Large Data in Events

Events should contain only what changed. Store large blobs separately and reference them by ID.

No Event Versioning

Event schemas evolve. Plan for versioning with upcasters that transform old events to new formats.

Using CQRS/ES for Simple CRUD

These patterns add complexity. For simple CRUD applications, standard Spring JPA is usually sufficient.

When to Use CQRS and Event Sourcing

These patterns are ideal for systems that need:

  • Complete audit trails and compliance requirements
  • Complex domain logic with many business rules
  • High scalability with different read/write loads
  • Event-driven microservices architectures
  • Temporal queries (“what was the state on date X?”)

Conclusion

Using CQRS and Event Sourcing in Spring Boot transforms how your application handles data. By separating commands from queries and storing every change as an immutable event, you gain scalability, complete auditability, and the ability to reconstruct state at any point in time. Start small with one aggregate and one projection, validate the patterns work for your domain, then expand as your system grows. For complementary architecture patterns, read Building Reactive APIs with Spring WebFlux. For API design in event-driven systems, explore GraphQL APIs with Spring Boot and Netflix DGS. For deeper guidance on event sourcing implementations, see the EventStoreDB documentation and the Axon Framework documentation.

Leave a Comment