Skip to main content

Projections and Event Handling

In Thalo, projections play a crucial role in listening to and handling events. They ensure an at-least-once delivery guarantee, maintaining the integrity and consistency of the system.

Understanding Projections

A projection in Thalo is a component that listens for specific events and updates its state accordingly. It's typically used for creating read models or reacting to events in a certain way.

Implementing a Projection

Here's a basic outline for implementing a projection in Thalo:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut last_global_id = None; // This would typically be stored in a database
let mut count = 0; // This is the projection state, and could be a row in a database

let mut client = ProjectionClient::connect("http://localhost:4433").await?;
let mut streaming = client
.subscribe_to_events(SubscriptionRequest {
name: "my_projection".to_string(),
events: vec![EventInterest {
category: "counter".to_string(),
event: "Incremented".to_string(),
}],
})
.await?
.into_inner();

while let Some(message) = streaming.message().await? {
if last_global_id.map_or(false, |last_global_id| message.global_id <= last_global_id) {
// Ignore, since we've already handled this event.
// This logic keeps the projection idempotent, which is important since
// projections have an at-least-once guarantee, meaning if a connection issue
// occurs, we might reprocess event we've already seen.
continue;
}

// Update the count
let event: Incremented = serde_json::from_str::<serde_json::Value>(&message.data)
.and_then(|payload| serde_json::from_value(json!({ message.msg_type: payload })))?;

count += event.amount;

// In a transaction, save both the projection state (count) and last global id in a database
last_global_id = Some(message.global_id);

// Acknowledge we've handled this event
client
.acknowledge_event(Acknowledgement {
name: "my_projection".to_string(),
global_id: message.global_id,
})
.await?;
}

Ok(())
}

This example demonstrates subscribing to events, processing them, and acknowledging each event to ensure at-least-once delivery.

Acknowledging Events

In Thalo, each projection is responsible for acknowledging the events it processes. This acknowledgement informs Thalo that the event has been handled, allowing the system to update the last acknowledged event position. It's crucial for projections to implement idempotent behavior, as Thalo's at-least-once delivery guarantee means events might be delivered multiple times, especially in cases of connection issues or system failures. Properly acknowledging events ensures that Thalo maintains an accurate record of which events have been processed, enhancing the system's reliability and consistency.