Here at Boundary we do analytics a little bit differently than the standard “store it and batch it” approach. We’ve built a streaming analytics system that processes data as it comes to us in real time. Given our first-hand experience we think Streaming algorithms are really cool and insanely practical. Fortunately they’re heavily researched. Unfortunately they’re also relatively new in the professional world which means that when talking about these algorithms, the divide between the academic world and industry can feel pretty large. We thought we’d do our part to help bridge that gap by publishing a few blog posts about cool streaming algorithms we’ve come across.
Defining the Problem
The problem that inspired this post can be stated pretty simply.
Given a single pass over an infinite stream of data, find the most frequently occurring items in the stream.
This is a pretty common problem, and one we encounter at Boundary. It turns out that the provably most efficient solution to this problem scales badly (we’ll come back to this). This is also a common enough problem that there’s a lot of research being done on trying to get around this restriction by finding an approximate solution. Approximation algorithms are awesome – they’re almost always far more space efficient or computationally efficient than than the exact solution to the problem they’re trying to solve, in exchange for some (hopefully bounded) form of error in the solution. Depending on the problem and the kind of error it introduces, using an approximate algorithm can feel like getting something for nothing.
We’ll be looking at an approximation algorithm called SpaceSaving that tries to solve this problem in the minimum amount of space. It turns out, the algorithm is able to give you an approximate solution with bounded error (the bound on error is determined ahead of time) in a fixed amount of space.
That’s pretty awesome.
Graham Cormode and Marios Hadjieleftheriou published an excellent overview of some known approximate solutions to this problem in particular. If this blog post is even mildly interesting to you, I highly recommend this paper, and will be using some terminology from it during this post.
We need to spend a few minutes on definitions before diving in:
We’re going to assume that our infinite stream of data contains elements of some infinite set. We’ll call the set of items we’ve seen so far
. So, at any point in time, we’ve seen exactly
distinct elements.
We’re going to use
to refer to the length of the stream so far. This is the total number of items seen so far, not the total number of distinct items seen so far, so
is always at least as large as
.
We also need a formal, mathematical definition of the problem. Finding the most frequently occurring items means finding items that occur more often than some threshold. This threshold is usually defined as some fraction of
– this bounds the solution space relative to the size of the stream without forcing a solution to include exactly
items (for some arbitrary choice of
). Given that, the the problem can be formalized as finding all items in a stream whose frequency is greater than
for some
. The approximate version of the problem introduces some error
and asks for all items with frequency greater than
. In English, we’re trying to find items whose frequency accounts for more than a given percent of the stream, and we’re going to allow some error around what the actual percentage is. That is, if we specify that we’d like to see items that each account for at least 10% of the stream, and 2% error, it’s acceptable to include items that account for only 8% of the stream. We’ll borrow some nomenclature from the overview paper I mentioned above, and refer to this as “the frequent items problem”. Frequent items are also referred to in the literature as “heavy hitters”. We’ll throw that name around too.
It’s pretty common for items in a stream to have some kind of explicit value attached. The canonical example is a stream of cash-register transactions, where the world’s unluckiest cashier has to record how much money the store made in an infinite number of transactions. We can tweak our definition of the frequent items problem to cover this by letting items in a stream have non-negative weights. If you see an item
with weight
, that’s basically equivalent to seeing item
exactly
times in a row. There’s more rigor you can apply, especially if you’d like to talk about negative weights, but we’ll ignore all of that for this blog post.
Actually estimating the frequency of individual items can be formalized as finding an estimate
of an item’s frequency
such that
. This says that estimates are allowed to be incorrect, but the difference between the estimate and the true value has to be bounded by a fraction of the length of the stream so far. The error-term
here is the same as the
above. Again, we’ll borrow some nomenclature, and refer to this as “the frequency estimation problem”.
Finally, we’re going to be pragmatic about what infinity means here: really really really big. Annoyingly, we’re going to define it as big enough that solution involving more computing power or more storage just won’t work – the stream is big enough that you can never buy enough RAM, or can’t just add another node to your cluster-computing setup of choice. Think about something like the Twitter firehose or a high-volume stream of netflow data, or something with equal volume that never stops.
Solving the Problem Exactly
Let’s also spend a moment and look at the exact solution for counting frequent items, so that we have a baseline for understanding what we gain/lose in by using an approximate algorithm like SpaceSaving.
In pseudo-code the simplest way to find the most frequent items in a finite data-set looks something like:
weights = {} // Initialize a weight counter for all items to 0
for (item, weight) in data_set:
weights[item] += weight
sort_by_weight(weights)
heavy_hitters = take_largest_10(weights)
In English, this keeps a running sum of weights per-item, and eventually sorts the list of observed items by weight. There are a few things to point out:
So, the exact solution to the problem requires keeping track of every item seen so far so that you can sum weights together. This means that the exact approach takes at least
space. This approach also needs to do a lookup for every item in the stream to increment the corresponding counter, and then needs to sort the list of all items which takes
time. This can be done a little more efficiently by using a priority queue instead of sorting the entire list in place – if you’re after the top-
elements of a stream, you can finish in
time.
Since the time-bounds on this approach are fairly reasonable, we’re going to leave them alone for now. They will act as a nice baseline later, though, so don’t forget about them entirely.
However, the space bounds for this approach are a little more problematic. Scaling linearly with the number of distinct items in the stream,
, isn’t really going to cut it;
is only bounded by
, the length of the stream so far, which we defined so that it can be arbitrarily large. Our machine is going to start running out of available memory as our stream gets larger and larger and larger. This is the point at which you say “just add more RAM” and I remind you that we (in)conveniently defined the problem so that wasn’t an option. We’ll have to find another approach.
Wait, seriously, that works?
Invariably, while explaining the idea for this blog post to folks, someone would say – “wait, can’t you just keep around a fixed size priority queue and update it as you see new items? Just kick out the small things as you go”. As It turns out, this doesn’t end up working; a frequently occurring item with low weight can get accidentally ignored, as can a frequent item that doesn’t show up until well into the stream, since they’ll never be able to displace the smallest item in the priority queue.
However it turns out that, with a bit of a tweak and a lot of math, an eerily similar approach ends up working.
Like we mentioned above, the SpaceSaving algorithm devised by Metwally et. al (and the accompanying StreamSummary data structure) solves both the frequent items problem and the frequency estimation problem using a fixed amount of space. In a little more detail, SpaceSaving is a deterministic, counter-based algorithm for solving the frequent items problem with fixed error guarantees relative to
. On heavily skewed data sets, the algorithm performs extremely well and the error bounds can be made extremely tight (The paper does an excellent job of showing the difference in error when the algorithm is applied to Zipfian data, but I’m not going to cover that here).
So, what’s the secret?
SpaceSaving works by keeping exact (item, count) pairs for the first
distinct items (we’ll discuss what
is shortly) observed in the stream. Subsequent items, if they’re already monitored, increment the count in the proper pair. If an incoming item isn’t already being monitored, it replaces the item in the (item, count) pair with the smallest count, and then increments its count as usual. Any replaced item also needs an error associated with it – the first
items are counted exactly, so their error counts are set to 0. Whenever an item replaces the previous minimum, it could have been seen anywhere between 1 and min(count)+1 times, so its error counter gets set to min(count), which is the most that item’s count may have been overestimated by.
In pseudo-code, SpaceSaving looks like this:
counts = { } // An empty map of item to count
errors = { } // An empty map of item to error count
for (item, weight) in stream:
if len(counts) < m:
counts[item] += weight
else:
if item in counts:
counts[item] += weight
else:
prev_min = item_with_min_count(counts)
counts[item] = counts[prev_min] + weight
errors[item] = counts[prev_min]
counters.remove_key(prev_min)
The algorithm has a few nice properties right off the bat:
- It’s deterministic. The same input in the same order always produces the same results. The algorithm doesn’t introduce any randomness.
- Every item in the stream increments some counter, so the sum of all counters will be equal to the current size of the stream,
.
- With a predetermined
for the frequent items problem, the algorithm uses space inversely proportional to the error. In other words, the space required is
. This is where the
mentioned above comes from: the algorithm will solve the frequent items problem with the given error if
is chosen to be larger than
.
Whoa. Those are all pretty cool guarantees. Determinism is particularly cool for an approximate algorithm, but the fixed size and the sum of the counters always being
is kind of confusing. The
items that the algorithm tracks aren’t all going to be the most frequent items. This is where the error counts from the algorithm come in. An item is a frequent element if the count - error reported by the algorithm is larger than
. From above, remember that
is just some fraction of the number of items seen so far, so this says that an item monitored by the algorithm is a frequent item if the counts that we can guarantee are larger than the cutoff. Conservative and correct. This wouldn’t be quite so useful, except that the authors also prove that any item whose true frequency is at least min(count) is monitored by the algorithm. The combination of these two properties is what makes SpaceSaving an approximate solution to the frequent items problem, and not just something weird you can do to your data.
This is also where the error
comes back into play. Even though it monitors
items, it doesn’t guarantee they’re all frequent items. In fact, it might report items that are not-quite-frequent enough, or might not report borderline items if their error terms are high enough.
That’s a pretty cool algorithm.
The authors also spend part of the paper proposing a StreamSummary data structure that efficiently maintains (item, count, error) counters and tracks the minimum (there’s an implementation here). In exchange for using slightly more space, you can do the same thing with an implementation based on a priority queue and a hash-table to check whether or not an item is being monitored. This ends up being less space-efficient than StreamSummary in exchange for better insert/update and query times. We’ve been playing with an implementation in that style, and might post about it at some point in the future.
The only downside of SpaceSaving/StreamSummary seems to be approximating the individual frequencies of items. The way
is defined, the error in the individual frequencies is relative to the number of items seen so far, and not relative to the frequencies themselves. This means that the error on the frequency counts for individual elements can be arbitrarily large, relative to the true frequency of the actual element. In practice, this can mean an outrageous amount of error relative to the true frequency of an individual element. We haven’t thrown an implementation into production here at Boundary because of this – we’d like to be able to guarantee our customers that the flow-data we report to them is correct.
Despite this downside, we think SpaceSaving (and StreamSummary) is particularly cool because it’s so simple: no hashing is involved and the correctness proofs are relatively accessible. It’s also a perfect example of the kind of tradeoff you have to make when dealing with an approximate algorithm. By giving up on error bounds on individual item frequencies, you gain bounded error on frequent items relative to
and get to solve the problem in a fixed amount of space.
Bibliography
We linked to the following papers above:
There’s much more excellent reading to be done on heavy hitter algorithms, an probabilstic algorithms in general: the gang at Aggregate Knowledge has done a short post on Count-Min Sketch which solves the same heavy-hitters problem by taking a bloom-filter-esque approach.