In this post we’ll talk briefly about the P2 algorithm, which will allow one to calculate percentiles on a stream (approximately)!

This post is more about implementing the algorithm, and I’ll leave the discussion around what it does to a later stage.

Implementation

For percentile, which isn’t the minimum of maximum, we require keeping tabs of 5 markers:

  • The minimum
  • $p/2$ quantile
  • $p$ quantile
  • $(1+p)/2$ quantile
  • The maximum

In this format, we need to retain a few items:

  • marker heights
  • marker positions
  • desired marker positions

Because we have 5 marker points, then the algorithm only kicks in when we have at least 5 points.

import numpy as np

class ApproxQuantile1D(object):
    # applies approximate quantile assuming the input object is a 1d list

    def __init__(self, p=50):
        self.p = p
        self._p = p/100  # this is so it is the same definition in numpy

        ## initialisation is set for when count is < 5 numbers
        self.n = None  # marker position
        self.ns = None  # desired marker position
        self.dns = None
        self.q = None  # marker heights
        self.reset()

    def reset(self):
        self.n = [0, 1, 2, 3, 4]  # initialise marker positions
        self.ns = [0, 2*self._p, 4*self._p, 2+2*self._p, 4]  # initialise desired marker position
        self.dns = [0, self._p/2, self._p, (1+self._p) /2, 1]
        self.q = []

    def fit(self, X):
        self.reset()
        return self.partial_fit(X)

    def partial_fit(self, X):
        for x in X:
            self.partial_fit_(x)
        return self

    def partial_fit_(self, x):
        # assume x is a single number
        # part A initialization
        if len(self.q) < 5:
            self.q.append(x)
            self.q = sorted(self.q)
            return self
        
        # part B
        if x <= self.q[0]:
            self.q[0] = x  # new min
            k = 0
        if x >= self.q[4]:
            self.q[4] = x
            k = 3
        elif x < self.q[1]:
            k = 0
        elif x < self.q[2]:
            k = 1
        elif x < self.q[3]:
            k = 2
        elif x < self.q[4]:
            k = 3
        else:
            raise Exception("All cases should have been covered?")
        
        # item 2 in algorithm - increment markers
        for i in range(k+1, 5):
            self.n[i] += 1
        for i in range(5):
            self.ns[i] += self.dns[i]

        for i in [1, 2, 3]:
            d = self.ns[i] - self.n[i]
            if (d >= 1 and self.n[i+1] - self.n[i] > 1) or (d <= -1 and self.n[i-1] - self.n[i] < -1):  # item 3 in algorithm
                d_sign = np.sign(d)
                q_para = self.parabolic(i, d_sign)
                # q_linear = self.linear(i, d_sign)
                if (self.q[i] < q_para) and (q_para < self.q[i+1]):
                    self.q[i] = q_para
                else:
                    self.q[i] = self.linear(i, d_sign)
                self.n[i] += d_sign
        return self
    
    def parabolic(self, i, d):
        # copy from paper
        i = int(i)
        d = int(d)
        return self.q[i] + ((d/(self.n[i+1] - self.n[i-1])) * (
            (self.n[i] - self.n[i-1] + d) * ((self.q[i+1] - self.q[i])/(self.n[i+1] - self.n[i])) + (self.n[i+1] - self.n[i] + d) * ((self.q[i] - self.q[i-1])/(self.n[i] - self.n[i-1]))
        ))
    def linear(self, i, d):
        i = int(i)
        d = int(d)
        return self.q[i] + d*((self.q[i+d]-self.q[i])/(self.n[i+d] - self.n[i]))

    def score(self):
        # returns the percentile
        if self.p == 0:
            return self.q[0]
        if self.p == 100:
            return self.q[4]
        if len(self.q) < 5:
            target_index = int(int(len(self.q) * self._p))
            return self.q[target_index]        
        return self.q[2]

class FiveNumberSummary(object):
    def __init__(self):
        self.summary = [ApproxQuantile1D(0), ApproxQuantile1D(25), ApproxQuantile1D(50), ApproxQuantile1D(75), ApproxQuantile1D(100)]

    def fit(self, X):
        for s in self.summary:
            s.fit(X)
        return self

    def partial_fit(self, X):
        for s in self.summary:
            s.partial_fit(X)
        return self

    def score(self):
        return [
            s.score() for s in self.summary
        ]

# examples - its not perfect, and I _probably_ have implemented something wrong
# but its a pretty good start.
X = np.random.normal(size=100)
X1 = np.random.uniform(size=100)
fns = FiveNumberSummary()
fns.fit(X)
print("approx summary for X", fns.score())
print("summary for X", [np.percentile(X, p) for p in [0, 25, 50, 75, 100]])

fns.partial_fit(X1)
fns.score()
X2 = np.hstack([X, X1])
print("approx summary for X2", fns.score())
print("summary for X2", [np.percentile(X2, p) for p in [0, 25, 50, 75, 100]])

The original paper “The $P^2$ Algorithm for Dynamic Statistical Computing Calculation of Quantiles and Histograms Without Storing Observations” can be found here: https://www.cse.wustl.edu/~jain/papers/ftp/psqr.pdf