DEV Community

Pedro Santos
Pedro Santos

Posted on

Vectorizing Real-Time Kafka Events

Vectorizing Real-Time Kafka Events for RAG

In the previous post, I set up pgvector and Ollama for embedding and vector search. Now I need to fill the database with data. Not documents or PDFs. Real-time saga events flowing through Kafka.

Every time a saga finishes (success or failure), the system publishes to a notify-ending topic. My AI agent listens to that topic and vectorizes every event. Over time, this builds a searchable history of all saga executions.

The Kafka Consumer

The entry point is a standard Spring Kafka listener:

@KafkaListener(
    groupId = "${spring.kafka.consumer.group-id}",
    topics = "${spring.kafka.topic.notify-ending}")
public void onSagaEnded(String payload) {
    var event = parseEvent(payload).orElse(null);
    if (event == null) {
        log.warn("[OperationsService] Failed to parse event payload");
        return;
    }

    // Vectorize ALL events for learning
    String historyText = buildHistoryText(event);
    vectorize(event, historyText);

    // Diagnose only failures
    if (event.getStatus() == SagaStatusEnum.FAIL) {
        diagnose(event, historyText);
    }
}
Enter fullscreen mode Exit fullscreen mode

Two key decisions here. First, I vectorize all events, not just failures. Success events are valuable too. They establish what "normal" looks like, which helps the vector search distinguish between common patterns and anomalies.

Second, diagnosis only runs on failures. There's no point asking the LLM to analyze a successful saga. But the successful saga still gets stored as context for future failure analysis.

Building the Text Representation

The event object carries structured data: source, status, message, timestamps. I need to convert this into a text string that embeds well.

private String buildHistoryText(Event event) {
    return event.getEventHistory().stream()
        .map(h -> h.getSource() + " [" + h.getStatus() + "]: " + h.getMessage())
        .collect(Collectors.joining("\n"));
}
Enter fullscreen mode Exit fullscreen mode

A typical output looks like:

ORCHESTRATOR [SUCCESS]: Saga started!
PRODUCT_VALIDATION_SERVICE [SUCCESS]: Products are validated successfully!
PAYMENT_SERVICE [ROLLBACK]: Fail to realize payment: New customer limit exceeded: R$450.00 > R$500.00
PAYMENT_SERVICE [FAIL]: Rollback executed for payment!
PRODUCT_VALIDATION_SERVICE [FAIL]: Rollback executed on product validation!
ORCHESTRATOR [FAIL]: Saga finished with errors!
Enter fullscreen mode Exit fullscreen mode

This format works well for embeddings because it preserves the sequence of events, the service names, and the error messages. When two failures have similar histories, their vector representations will be close together.

Vectorizing with Metadata

The vectorization step creates an embedding and stores it with metadata for later filtering:

private void vectorize(Event event, String historyText) {
    String profileKey = classifyProfile(event);

    var metadata = new Metadata()
        .put("orderId",    event.getOrderId())
        .put("status",     event.getStatus().toString())
        .put("profileKey", profileKey)
        .put("createdAt",  LocalDateTime.now().toString());

    var segment = TextSegment.from(historyText, metadata);
    embeddingStore.add(embeddingModel.embed(segment).content(), segment);
}
Enter fullscreen mode Exit fullscreen mode

The profileKey classifies the customer into categories like new:high-value, vip:any, or returning:low-value:

public String classify(Order order) {
    if (order == null || order.getOrderId() == null) return "default";
    String clientType   = resolveClientType(order);
    String valueSegment = resolveValueSegment(order, clientType);
    return clientType + ":" + valueSegment;
}

private String resolveClientType(Order order) {
    if (order.getClientType() == null) return "new";
    return switch (order.getClientType().toLowerCase()) {
        case "vip"       -> "vip";
        case "returning" -> "returning";
        default          -> "new";
    };
}

private String resolveValueSegment(Order order, String clientType) {
    if ("vip".equals(clientType)) return "any";
    return order.getTotalAmount() >= 200.0 ? "high-value" : "low-value";
}
Enter fullscreen mode Exit fullscreen mode

This metadata serves two purposes. It lets me filter searches by profile (only find incidents from similar customers). And it lets me query the embedding store for analytics (how many failures per profile, what's the most common failure pattern for VIPs).

Processing Speed

Each vectorization involves two operations: calling Ollama to generate the embedding and writing to pgvector.

The Ollama call takes 5-15ms for nomic-embed-text on a modern laptop. The pgvector write takes 1-2ms. Total overhead per event: under 20ms.

At the volumes my system handles (hundreds of sagas per day), this is negligible. The Kafka consumer processes events as fast as they arrive. No backpressure, no batching needed.

For higher volumes (thousands per second), you'd want to batch the embeddings and use bulk inserts. LangChain4j's EmbeddingStoreIngestor supports this, but I haven't needed it.

Virtual Threads

The vectorization runs on a Kafka consumer thread. With Spring Boot's virtual threads enabled, each message gets its own virtual thread. The Ollama HTTP call and the pgvector write don't block the consumer group.

spring:
  threads:
    virtual:
      enabled: true
Enter fullscreen mode Exit fullscreen mode

Without virtual threads, a slow Ollama response could delay message processing and cause consumer lag. With virtual threads, it's a non-issue.

What Gets Stored in pgvector

After running for a while, the saga_history_embeddings table looks like this:

embedding (vector) text metadata
[0.023, -0.114, 0.089, ...] ORCHESTRATOR [SUCCESS]: Saga started... {orderId: "abc", status: "SUCCESS", profileKey: "vip:any"}
[0.045, -0.098, 0.102, ...] ORCHESTRATOR [SUCCESS]: Saga started... PAYMENT [ROLLBACK]: blocked... {orderId: "def", status: "FAIL", profileKey: "new:high-value"}

Each row is one saga execution. The embedding captures the semantic meaning of the entire history. Two payment failures for the same reason will have similar embeddings even if the order IDs, amounts, and timestamps are different.

What's Next

The data is flowing in. In the next post, I'll show how to search it: finding similar past incidents when a new saga fails, tuning the similarity threshold, and injecting the results into the LLM prompt for diagnosis.

The repo: github.com/pedrop3/saga-orchestration

Top comments (0)