Instant Data Analysis with Kafka Streams
The Streaming-Summary Problem
At 5 p.m., a business closes its doors and the owner wants to tally up purchases for the day. It’s easy, if tedious, to go through the stack of receipts (data) and calculate totals, averages, and variance for purchases that day (or any past day).
What if the business is open 24 hours a day and the owner wants to continuously produce descriptions of purchases for each hour? To make it interesting, assume receipts can show up late by minutes, weeks, or years. And sometimes customers submit updates to purchases they completed months ago. And the owner wants to calculate order statistics. And the customers don’t just have one receipt, they each have a receipt for errors, a receipt for database usage, and a receipt for CPU time. And there are a few thousand customers per second.
Some of the reporting that we do on usage of Agile Central is like the former scenario; we can take as long as we want to run calculations and it’s safe to assume we have all the data. But we also produce streaming summaries of usage over ten-minute windows, which allows us to quickly detect changes to performance that can’t wait until the end of the day.
All of the data we want to summarize is produced by application servers onto Kafka (a distributed log that we use to transmit messages). In the past, we’ve used Apache Samza for streaming work, but we’ve found it to be resource intensive and opaque. It was easy to lose days beating your head against configuration, only to discover an obvious exception was disappearing into the ether.
A First Pass
We’re fairly early adopters of Kafka Streams, and we’ve definitely had some growing pains. Early on, minor version changes seemed to break everything. The high-level API actually created much more work than it saved, and we switched to the Kafka Streams processor API. Also, all of our services are built from a Clojure service template, so we had to decide how to deal with the Java interop (we started out writing Java classes wrapped by Clojure and gradually moved towards gen-classed Clojure streams).
In the end, we ended up with a streaming summary service that’s as stable as any of our analysis tools and handles production traffic without breaking a sweat (using far fewer computational resources than we would’ve dedicated to a comparable job in Samza).
The whole streams application is captured in one medium-intimidating block of code:
builder.addSource(“Source”, Serdes.String().deserializer(), OurCustomSerdeFactory.getSerde().deserializer(), inputTopic)
.addProcessor(“AggregateMetrics”, () -> new MetricAggregationProcessor(props), “Source”)
.addSink(“SinkRekeyed”, this.rekeyedTopic, Serdes.String().serializer(), OurCustomSerdeFactory.getSerde().serializer() , “AggregateMetrics”)
.addSource(“SummarySource”, Serdes.String().deserializer(), OurCustomSerdeFactory.getSerde().deserializer(), this.rekeyedTopic)
.addProcessor(“Summarize”, () -> new SummaryProcessor(props), “SummarySource”)
.addSink(“SummarySink”, this.outputTopic, Serdes.String().serializer(), OurCustomSerdeFactory.getSerde().serializer(), “Summarize”);
Here’s what that does briefly:
- The source reads messages from Kafka, expecting the keys to be strings and the values to be encoded with our in-house serdes.
- The aggregation processor works over 5-minute windows, collecting those messages and bundling up the values that should go together
- That aggregator is backed by an in-memory data store, so we could potentially lose data on node failure, but we’re willing to tolerate this.
- Those messages are re-published to Kafka with a new key.
- We read those messages and run them through our summary processor, which outputs messages onto a final summary topic.
So Why is This So Complicated?
There are a few problems- we need to be able to parallelize this work across multiple nodes. We also need to make sure each node has at least the information that it needs to produce some part of the summaries. We’d also rather not force each node to have all the information. That means we have to understand how consumers share work in Kafka.
Each process working together to read from a topic in Kafka is part of a consumer group. To ensure that each message is read by exactly one process, the messages are divvied out according to their keys. Our incoming messages have string keys that are not useful in our summaries, hence the initial rekeying step.
To return to the analogy of the business owner summarizing receipts, imagine the proprietor now wants to know the average bill for each day of the week. The receipts, though, are kept in drawers according to the customer’s last name, so she’ll need to pull them all out and group them according to their respective day of the week before doing any computation.
Likewise, to create a meaningful summary, we need a node to eventually collect all the of the data for a given environment (like “test”, “development”, or “production”) and a given metric (like “database wait time” or “CPU wait time”). For five minutes, our workers collect as many values in their store for a given metric and environment as they can and then publish them with a new key that might look like “environment=prod:metric=dbWaitTime”. Then, the processors downstream can read only the messages with that key and know that they have enough information to produce a summary.
When a downstream processor gets a new set of values for a summary, we might need to update what we think the summary for that ten-minute window will be. It’s easy to maintain an exact measure of some statistics like average, minimum, and standard deviation on a running basis. But calculating order statistics, like p90, requires us to hold onto much more data- potentially every single value we’ve seen, sometimes millions of values for each summary. We explored a variety of exotic data structures and approximation techniques before falling back to algorithms 101 and using a slightly modified bucket sort. We just truncate the values to a few decimal points, and because they’re not widely distributed, the storage and sort problems become much less painful.
But there’s still another storage problem- when we’ve seen ten minutes worth of data for a given summary, should we send the summary to our output topic and throw away what’s in the key-value store? Then, if data shows up even a second late we can’t make an update to our original estimate. Or should we hoard our precious data forever, allowing it to slowly pile to the ceiling until the JVM collapses?
If our hypothetical business owner wanted to recalculate the p90 of order totals when a receipt arrived a month late, she could- assuming she kept all of the receipts for that day. She’d just insert it in order and take the 90th percentile. But what if she’d thrown the old records away?
We parameterize how late we allow records to show up and regularly clean the key value store in an effort to avoid either outcome. A custom timestamp extractor allows us to use the time on our app servers for Kafka’s stream time (rather than the built in timestamp added to Kafka 0.10, which could be incorrect in cases where publication of Kafka messages is delayed).
Though we’ve had issues with the key-value store working correctly against Kafka 0.10 and the fact that as fairly early adopters of Kafka Streams, lots of our problems are un-Google-able, our streaming services have been pretty ironclad and site engineers and DBAs can quickly see changes to performance. The intermediate topic moves more than 30,000 messages per second, so it’s fun to watch the throughput graphs as well.