DEV Community

Roberto de Vargas Neto
Roberto de Vargas Neto

Posted on β€’ Edited on

From Stream to Database: Processing Market Data with Spring Boot, Redis, and Flyway

Hey folks!

Continuing the My Broker B3 series, today we build trading-broker-asset β€” the Java service that consumes the quotes published to Kafka by trading-broker-market-data and transforms them into structured data in MySQL, with high-performance caching in Redis.

If the Python service is the collector, this one is the processor.


πŸ—οΈ This Service's Role in the Ecosystem

[trading-broker-market-data]
         β”‚
         β”‚ Kafka: trading-assets-market-data-v1
         β–Ό
[trading-broker-asset]
         β”‚
    β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
    β–Ό         β–Ό
  MySQL     Redis
(assets)  market:price:{TICKER}
              β”‚
    [b3-matching-engine]   (reads from here)
    [trading-broker-order] (validates ticker from here)
Enter fullscreen mode Exit fullscreen mode

Two critical consumers depend on the data this service maintains: the Matching Engine (for price decisions) and the Order API (to validate whether a ticker exists and is active).


🎯 MVP Focus

In this phase, I prioritized:

  • Robust Kafka consumer with error handling
  • Asset upsert: creates on first event, updates on subsequent ones
  • Redis cache with 10-minute TTL
  • REST API for catalog queries
  • Status filter: only ACTIVE assets are returned

πŸ› οΈ Tech Stack

Technology Usage
Java 21 + Spring Boot 3.5.11 Service core
Spring Kafka Quote event consumption
MySQL + Flyway Versioned asset catalog
Spring Data Redis Real-time price cache
SpringDoc OpenAPI Swagger UI documentation

πŸ—οΈ Implementation Pillars

1. The Database Schema (Flyway)

CREATE TABLE assets (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    ticker VARCHAR(20) NOT NULL UNIQUE,
    name VARCHAR(200),
    current_price DECIMAL(19, 4),
    last_update DATETIME,
    status ENUM('ACTIVE', 'INACTIVE') NOT NULL DEFAULT 'ACTIVE',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Enter fullscreen mode Exit fullscreen mode

Why Flyway? Schema versioning ensures that any environment (local, CI, production) has exactly the same database structure, without surprises.

2. The Kafka Consumer

@KafkaListener(
        topics = "trading-assets-market-data-v1",
        groupId = "trading-broker-asset"
)
public void consume(AssetMarketDataDTO dto) {
    log.info("Market data received for ticker: {}", dto.getTicker());
    try {
        assetService.updateAsset(dto);
    } catch (Exception e) {
        log.error("Failed to process market data for ticker {}: {}",
                dto.getTicker(), e.getMessage(), e);
        // Rethrow so Kafka can apply retry policy instead of silently discarding
        throw e;
    }
}
Enter fullscreen mode Exit fullscreen mode

Why rethrow the exception? If processing fails silently, the offset is committed and the message is discarded β€” the asset is never updated. Rethrowing lets Kafka apply its retry policy.

3. The Asset Upsert

@Transactional
public void updateAsset(AssetMarketDataDTO dto) {
    Asset asset = assetRepository.findByTicker(dto.getTicker())
            .orElse(Asset.builder()
                    .ticker(dto.getTicker())
                    .status(AssetStatus.ACTIVE)
                    .build());

    asset.setName(dto.getName());
    asset.setCurrentPrice(dto.getPrice());
    // Uses real market timestamp from event β€” not LocalDateTime.now()
    asset.setLastUpdate(dto.getUpdatedAt() != null ? dto.getUpdatedAt() : LocalDateTime.now());

    assetRepository.save(asset);

    // Update Redis cache after persisting
    marketPriceCacheService.updatePrice(dto.getTicker(), dto.getPrice());
}
Enter fullscreen mode Exit fullscreen mode

Important decision: we use the real updatedAt from the Kafka event, not LocalDateTime.now(). This ensures lastUpdate in the database reflects when the price was generated by the market, not when it was processed by the service.

4. Redis Cache with TTL

private static final Duration CACHE_TTL = Duration.ofMinutes(10);

public void updatePrice(String ticker, BigDecimal price) {
    String key = "market:price:" + ticker;
    // TTL of 10 minutes β€” prevents stale prices for inactive or delisted assets
    redisTemplate.opsForValue().set(key, price.toString(), CACHE_TTL);
}
Enter fullscreen mode Exit fullscreen mode

Why TTL? If an asset is deactivated or delisted, its price will expire from Redis in 10 minutes. Without TTL, the Matching Engine could use prices for assets that no longer exist.

5. Status Filter in Queries

public List<AssetDTO> findAllActive() {
    // Query directly by status β€” not findAll() + filter in memory
    return assetRepository.findAllByStatus(AssetStatus.ACTIVE).stream()
            .map(AssetDTO::fromEntity)
            .toList();
}

public Optional<AssetDTO> findByTicker(String ticker) {
    // Only returns ACTIVE assets β€” inactive returns 404
    return assetRepository.findByTickerAndStatus(ticker.toUpperCase(), AssetStatus.ACTIVE)
            .map(AssetDTO::fromEntity);
}
Enter fullscreen mode Exit fullscreen mode

When trading-broker-order validates a ticker before creating an order, it calls GET /api/v1/assets/{ticker}. If the asset is inactive, it gets a 404 and the order is rejected. Correct and predictable.


🌐 REST API

Method Endpoint Description
GET /api/v1/assets List all active assets
GET /api/v1/assets/{ticker} Find asset by ticker (404 if inactive)

πŸ“„ Swagger UI: http://localhost:8083/swagger-ui.html


βœ… Validating the Execution

With the application running and trading-broker-market-data publishing to Kafka:

  • βœ… Consumer connected to the trading-assets-market-data-v1 topic
  • βœ… Assets being created/updated in MySQL in real time
  • βœ… Redis being updated with TTL after each event
  • βœ… Logs showing Updating ticker PETR4 with price R$ 47.37

πŸš€ What's Next?

With the asset catalog working, the next step is b3-market-sync-api β€” which synchronizes prices directly to the B3's Redis β€” followed by b3-matching-engine-api, which uses those prices to execute orders.


πŸ”Ž About the Series

⬅️ Previous Post: Tooling Tips: MongoDB and Kafka

➑️ Next Post: Syncing the Real Market: Consuming Brapi and Feeding Redis with Spring Boot

πŸ“˜ Series Index: Series Roadmap


Links:

Top comments (0)