Anomaly Detection on data streams, in one line of Bash

2019/11/02

During the writing of my master thesis, I studied thoroughly Anomaly Detection algorithms with a focus on those of them that could be deployed not only on batches, but on streams of data, in order to achieve real-time Anomaly Detection, on a big scale.

After research, I chose 5 algorithms with such capabilities that looked promising, studied the relevant papers and understood how they worked. I then proceeded to implement each one, run it on a cluster (with the help of Apache Flink) with real and synthetic datasets, and find out the strenghts and weaknesses of each.

I plan on releasing all my implementations and experiments in the near future when I find some free time.

Gaussian model to detect anomalies

One of these algorithms was based on a simple Gaussian-distribution model, which surpisingly, despite its simplicity, turned out to be the most robust on the real-world dataset I had used (the popular KDD Cup 99 dataset in case you are wondering). This algorithm was by far the fastest, and was yielding ROC AUC scores of over 0.97, which is pretty good for an algorithm that does only one pass on the data.

I thought it would be fun to implement it in one line of Bash. While this is technically feasible, the awk script would look very compressed and ugly, so I expanded it in a few more lines :) . But altogether, it is one line.

Background

Without further ado, let’s begin with some theory. We start by assuming that our data are following the Gaussian distribution.

Therefore, given an instance \( x \sim \mathcal{N}(\mu, \Sigma), x \in \mathbb{R}^d, x=[x_1, x_2, \ldots, x_d]^T \), its probability density function is:

\[ p(x; \mu, \Sigma)= \frac{1}{(2\pi)^{d/2}|\Sigma|^{\frac{1}{2}}}\exp\bigg(-\frac{1}{2}(x-\mu)^T\Sigma^{-1}(x-\mu)\bigg) \]

where \(\mu \in \mathbb{R}^d\) is the mean and \( \Sigma \in \mathbb{R}^{d \times d} \) is the covariance matrix.

Now assuming that variables \( x_i \sim \mathcal{N}(\mu_i, \sigma_i^{2}) \) are all independent, we get:

\[ p(x) = p(x_1, x_2, \ldots, x_d) = p(x_1; \mu_1, \sigma_1^2)p(x_2; \mu_2, \sigma_2^2)\ldots p(x_d; \mu_d, \sigma_d^2) = \prod_{j=1}^{d} p(x_j; \mu_j, \sigma_j^2) \] where:

\[ p(x; \mu, \sigma^2)=\frac{1}{\sqrt{2\pi}\sigma} \exp\Big(- \frac{(x-\mu)^2}{2\sigma^2}\Big) \]

To train the model (which basically consists of the values \( \mu_i, \sigma_i^2, \forall i \in [1, \ldots, d] \)), one needs to calculate the following parameters (MLE):

Then, in the evaluation phase, given a new example \( x \), we compute:

\[ p(x) = \prod_{i=1}^{d}{ \frac{1}{\sqrt{2\pi}\sigma_i} \exp\Big(- \frac{(x_i-\mu_i)^2}{2\sigma_i^2}\Big) } \] and we flag \(x\) as anomaly if the value of \( p(x) \) is smaller than a threshold value \(\epsilon\) (hyperparameter).

Now here is the imporant part. How will we compute these values incrementally?

By having the tuples \( T_i = \big( \sum_{j=1}^{n}{x_i^{(j)}}, ~ \sum_{j=1}^{n}{{(x_i^{(j)})}^2}, ~ n \big) \), where \( n \) is the count of the instances, available at any point of the computation! This way, when training or evaluating, in batch or on stream, we have access to all parameters of the model \( \mu_i \) and \( \sigma_i^2 \) at any time by calculating:

\[ \mu_i = \frac{\sum_{j=1}^{n}{x_i^{(j)}}}{n} = \frac{T_i[0]}{T_i[2]}, ~~~~~ \sigma_i^2 = \frac{\sum_{j=1}^{n}{{(x_i^{(j)})}^2}}{n} - \mu_i^2 = \frac{T_i[1]}{T_i[2]} - (\frac{T_i[0]}{T_i[2]})^2 \]

So let’s go ahead and fetch the dataset, then implement the algorithm with awk.

Fetch and prepare the dataset

The dataset is available here. I will be using the 10 percent version, and only the HTTP protocol.

We’ll use curl for the GET request, pipe it into zcat (because it is gzipped), and then use awk to keep only the HTTP rows, and filter out most features, keeping only the 4 most important columns: duration, src_bytes and dst_bytes, plus the label. We also edit the label to be 1 in case of anomaly, or 0, in case of normal observation.

URL=http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz # 10 percent
OUTFILE=http.csv

curl $URL | zcat | awk 'BEGIN{ FS=","; OFS="," } { LABEL=($NF=="normal." ? 0 : 1); if($3 == "http") print $1, $5, $6, LABEL }' > $OUTFILE

So now our dataset with 64293 rows is in the http.csv file.

Implementation

We’ll put our code in a file called gauss.awk. Start by defining some variables:

BEGIN {
    FS = ",";

    # < NF to ignore the last column (label)
    for (i = 1; i < NF; i++) {
        SUMS[i] = 0;
        SQUARES[i] = 0;
    }
}

FS is a special variable used by awk, denoting the input field separator. Since we have a csv, we use commas. SUMS and SQUARES are two arrays, with the one holding the cumulative sum of the values of each feature, and the other holding the cumulative sum of the squares of these values.

Next we move on to the main awk loop, and write the code which will apply to every row of the input file.

Here we will first update the model, by updating the SUMS and SQUARES arrays. For each feature (or column, call it however you like), we update the respective value using the current row that awk is parsing.

After that, we use the current row and the model (the arrays), to calculate an anomaly score, according to the formula we discussed earlier (see \( p(x) \)). To get an accurate value of \( \pi \) for the calculations, I am using atan2(0, -1).

{
    # Update the model
    for (i = 1; i < NF; i++) {
        SUMS[i] += $i;
        SQUARES[i] += $i ** 2;
    }

    # Calculate score using the model
    score = 1;
    for (i = 1; i < NF; i++) {
        mu = SUMS[i] / NR;
        sigma2 = (SQUARES[i] / NR) - mu ** 2;
        if (sigma2 == 0)
            score = 0;
        else {
            score *= exp(- (($i - mu) ** 2) / (2 * sigma2)) / sqrt(2 * atan2(0, -1) * sigma2);
        }
    }

    # Output the score
    print score, $NF
}

Finally, we output the score, along with the actual label.

Now let’s run this:

$ awk -f gauss.awk http.csv
...
2.01308e-12 0
2.02266e-12 0
2.04232e-12 0
2.04342e-12 0
2.0474e-12 0
2.02895e-12 0
2.0324e-12 0
2.03025e-12 0

Looks good. Some warnings on stderr about the exp() function are normal, it is because some arguments passed in exp are < -745 and therefore return incredibly small values but awk handles them as 0 so we ’re good.

Now I want to calculate the ROC AUC score, and for that, I will be using a simple python script with the numpy module, and the roc_auc_score function from the scikit-learn module:

import numpy as np
from sklearn.metrics import roc_auc_score
from sys import stdin

preds = []
actuals = []

for line in stdin:
    s = line.split(' ')
    preds.append(-float(s[0]))
    actuals.append(int(s[1]))

preds_arr = np.array(preds)
actuals_arr = np.array(actuals)

print(roc_auc_score(actuals_arr, preds_arr))

Now I can pipe the output of awk into the script, which I’ll call auroc.py, like that:

$ awk -f gauss.awk  http.csv | python3 auroc.py

, and this prints the score, which is 0.9762313543238387, a pretty good value.

Last but not least, we can even choose a threshold and start flagging the values. This is what an anomaly detector actually does. What’s the intuition behind this? Since the scores are nothing more than the probability of the observation in the Gaussian distribution (the model), we will be flagging the value as anomaly, if this score is lower than a particular threshold, meaning that it is having a low probability of occurring.

A good threshold here could be 9.83056225277626e-15 (estimated with some trial and error), so we can just change the last lines of the script to:

if (score < 9.83056225277626e-15)
    print 1, $NF
else
    print 0, $NF

, so that it prints 1 if the score is lower than the threshold, or 0 if is not, along with the actual label. You can now calculate the F1 score if you want with some greps of the output.

Putting it all together

Now let’s glue everything together in one line:

$ curl -s $URL | zcat | awk 'BEGIN{ FS=","; OFS="," } { LABEL=($NF=="normal." ? 0 : 1); if($3 == "http") print $1, $5, $6, LABEL }' | awk -f gauss.awk 2> /dev/null | python3 auroc.py
0.9762313543238387

I got rid of the http.csv file and piped the output of the first awk directly into the second one, and I also redirected the stderr of the second awk to /dev/null to silence the warnings, so that we can get a clean output in the terminal.

So there you go, one pass on the data with a few lines of awk are enough to handle many cases of spotting outliers in datasets.

As always, for any questions or suggestions, send me an email.

Sidenote: Big Data Frameworks

This algorithm can be implemented in any Big Data framework (Apache Spark, Hadoop, Flink, whatever…) in a very straightforward way, with the functional operators map and reduce. Assume that our training data are \( n \) d-dimensional vectors \( x^{(j)} = [x_1^{(j)}, x_2^{(j)}, \ldots, x_d^{(j)}]^T, j \in [1, \ldots, n] \). Then the values of vectors \( \mu \) and \( \sigma^2 \) can be computed by:

// Scala
dataSet
  .map { x => (x, pow(x, 2), 1) }
  .reduce { (x1, x2) => (x1._1 + x2._1, x1._2 + x2._2, x1._3 + x2._3) }
  .collect()

References

This simple Gaussian model is presented in the great introductory course of Andrew Ng on Coursera. If you are interested in Machine Learning, I believe there is no better introductory course on the Internet than this one.