Projections and Event Handling
In Thalo, projections play a crucial role in listening to and handling events. They ensure an at-least-once delivery guarantee, maintaining the integrity and consistency of the system.
Understanding Projections
A projection in Thalo is a component that listens for specific events and updates its state accordingly. It's typically used for creating read models or reacting to events in a certain way.
Implementing a Projection
Here's a basic outline for implementing a projection in Thalo:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut last_global_id = None; // This would typically be stored in a database
let mut count = 0; // This is the projection state, and could be a row in a database
let mut client = ProjectionClient::connect("http://localhost:4433").await?;
let mut streaming = client
.subscribe_to_events(SubscriptionRequest {
name: "my_projection".to_string(),
events: vec![EventInterest {
category: "counter".to_string(),
event: "Incremented".to_string(),
}],
})
.await?
.into_inner();
while let Some(message) = streaming.message().await? {
if last_global_id.map_or(false, |last_global_id| message.global_id <= last_global_id) {
// Ignore, since we've already handled this event.
// This logic keeps the projection idempotent, which is important since
// projections have an at-least-once guarantee, meaning if a connection issue
// occurs, we might reprocess event we've already seen.
continue;
}
// Update the count
let event: Incremented = serde_json::from_str::<serde_json::Value>(&message.data)
.and_then(|payload| serde_json::from_value(json!({ message.msg_type: payload })))?;
count += event.amount;
// In a transaction, save both the projection state (count) and last global id in a database
last_global_id = Some(message.global_id);
// Acknowledge we've handled this event
client
.acknowledge_event(Acknowledgement {
name: "my_projection".to_string(),
global_id: message.global_id,
})
.await?;
}
Ok(())
}
This example demonstrates subscribing to events, processing them, and acknowledging each event to ensure at-least-once delivery.
Acknowledging Events
In Thalo, each projection is responsible for acknowledging the events it processes. This acknowledgement informs Thalo that the event has been handled, allowing the system to update the last acknowledged event position. It's crucial for projections to implement idempotent behavior, as Thalo's at-least-once delivery guarantee means events might be delivered multiple times, especially in cases of connection issues or system failures. Properly acknowledging events ensures that Thalo maintains an accurate record of which events have been processed, enhancing the system's reliability and consistency.