Vectorizing Real-Time Kafka Events for RAG
In the previous post, I set up pgvector and Ollama for embedding and vector search. Now I need to fill the database with data. Not documents or PDFs. Real-time saga events flowing through Kafka.
Every time a saga finishes (success or failure), the system publishes to a notify-ending topic. My AI agent listens to that topic and vectorizes every event. Over time, this builds a searchable history of all saga executions.
The Kafka Consumer
The entry point is a standard Spring Kafka listener:
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.topic.notify-ending}")
public void onSagaEnded(String payload) {
var event = parseEvent(payload).orElse(null);
if (event == null) {
log.warn("[OperationsService] Failed to parse event payload");
return;
}
// Vectorize ALL events for learning
String historyText = buildHistoryText(event);
vectorize(event, historyText);
// Diagnose only failures
if (event.getStatus() == SagaStatusEnum.FAIL) {
diagnose(event, historyText);
}
}
Two key decisions here. First, I vectorize all events, not just failures. Success events are valuable too. They establish what "normal" looks like, which helps the vector search distinguish between common patterns and anomalies.
Second, diagnosis only runs on failures. There's no point asking the LLM to analyze a successful saga. But the successful saga still gets stored as context for future failure analysis.
Building the Text Representation
The event object carries structured data: source, status, message, timestamps. I need to convert this into a text string that embeds well.
private String buildHistoryText(Event event) {
return event.getEventHistory().stream()
.map(h -> h.getSource() + " [" + h.getStatus() + "]: " + h.getMessage())
.collect(Collectors.joining("\n"));
}
A typical output looks like:
ORCHESTRATOR [SUCCESS]: Saga started!
PRODUCT_VALIDATION_SERVICE [SUCCESS]: Products are validated successfully!
PAYMENT_SERVICE [ROLLBACK]: Fail to realize payment: New customer limit exceeded: R$450.00 > R$500.00
PAYMENT_SERVICE [FAIL]: Rollback executed for payment!
PRODUCT_VALIDATION_SERVICE [FAIL]: Rollback executed on product validation!
ORCHESTRATOR [FAIL]: Saga finished with errors!
This format works well for embeddings because it preserves the sequence of events, the service names, and the error messages. When two failures have similar histories, their vector representations will be close together.
Vectorizing with Metadata
The vectorization step creates an embedding and stores it with metadata for later filtering:
private void vectorize(Event event, String historyText) {
String profileKey = classifyProfile(event);
var metadata = new Metadata()
.put("orderId", event.getOrderId())
.put("status", event.getStatus().toString())
.put("profileKey", profileKey)
.put("createdAt", LocalDateTime.now().toString());
var segment = TextSegment.from(historyText, metadata);
embeddingStore.add(embeddingModel.embed(segment).content(), segment);
}
The profileKey classifies the customer into categories like new:high-value, vip:any, or returning:low-value:
public String classify(Order order) {
if (order == null || order.getOrderId() == null) return "default";
String clientType = resolveClientType(order);
String valueSegment = resolveValueSegment(order, clientType);
return clientType + ":" + valueSegment;
}
private String resolveClientType(Order order) {
if (order.getClientType() == null) return "new";
return switch (order.getClientType().toLowerCase()) {
case "vip" -> "vip";
case "returning" -> "returning";
default -> "new";
};
}
private String resolveValueSegment(Order order, String clientType) {
if ("vip".equals(clientType)) return "any";
return order.getTotalAmount() >= 200.0 ? "high-value" : "low-value";
}
This metadata serves two purposes. It lets me filter searches by profile (only find incidents from similar customers). And it lets me query the embedding store for analytics (how many failures per profile, what's the most common failure pattern for VIPs).
Processing Speed
Each vectorization involves two operations: calling Ollama to generate the embedding and writing to pgvector.
The Ollama call takes 5-15ms for nomic-embed-text on a modern laptop. The pgvector write takes 1-2ms. Total overhead per event: under 20ms.
At the volumes my system handles (hundreds of sagas per day), this is negligible. The Kafka consumer processes events as fast as they arrive. No backpressure, no batching needed.
For higher volumes (thousands per second), you'd want to batch the embeddings and use bulk inserts. LangChain4j's EmbeddingStoreIngestor supports this, but I haven't needed it.
Virtual Threads
The vectorization runs on a Kafka consumer thread. With Spring Boot's virtual threads enabled, each message gets its own virtual thread. The Ollama HTTP call and the pgvector write don't block the consumer group.
spring:
threads:
virtual:
enabled: true
Without virtual threads, a slow Ollama response could delay message processing and cause consumer lag. With virtual threads, it's a non-issue.
What Gets Stored in pgvector
After running for a while, the saga_history_embeddings table looks like this:
| embedding (vector) | text | metadata |
|---|---|---|
| [0.023, -0.114, 0.089, ...] | ORCHESTRATOR [SUCCESS]: Saga started... | {orderId: "abc", status: "SUCCESS", profileKey: "vip:any"} |
| [0.045, -0.098, 0.102, ...] | ORCHESTRATOR [SUCCESS]: Saga started... PAYMENT [ROLLBACK]: blocked... | {orderId: "def", status: "FAIL", profileKey: "new:high-value"} |
Each row is one saga execution. The embedding captures the semantic meaning of the entire history. Two payment failures for the same reason will have similar embeddings even if the order IDs, amounts, and timestamps are different.
What's Next
The data is flowing in. In the next post, I'll show how to search it: finding similar past incidents when a new saga fails, tuning the similarity threshold, and injecting the results into the LLM prompt for diagnosis.
The repo: github.com/pedrop3/saga-orchestration
Top comments (0)