In the last post, we talked about the benefits of an event driven table - however we didn’t include information on how this would be denormalised!

In this post we’ll offer some solutions and thoughts on this, as well as possible considerations.

An event table can be framed as a long table in an EAVT like structure. We can construct a couple examples as shown below:

import pandas as pd
import numpy as np


event_df = pd.DataFrame({
    'entity': np.random.choice(range(10), 1000),
    # occurence of a particular event
    'attribute': np.random.choice(range(10), 1000), 
    # the value (assume without loss of generality, it is a number, if it is categorical, we can trivially one hot encode as a event x category
    'value': np.random.normal(size=1000), 
    # when the event actually happend
    'time': np.random.uniform(0, 100, 1000), 
})

Then to construct a flatten representation what are we expecting? We’re expecting a flattened table with (in this case) 10 entities or rows. The columns have to then be aggregated in two ways:

  • By the “time delta”
  • By the group by aggregation

For example, the interpretation of a “feature” in a denormalised format would be:

  • How many times did the event occur in the last X period, in which case, the “time delta” would be X, and the group by aggregation would be “count”

The naive thought process is to simply use a pivot table; however this invalidates some aspects of a feature store including the feature leakage which may be present. Instead we need to:

  • Filter the table, to ensure only the appropriate time stamps are kept
  • And then perform the relevant pivot to ensure that it is repeatable.

As an aside, it is important to compute the time relative to a “point in time” rather than relative to the instance! One of the key reasons is that is a reflection of how a model is deployed rather than what/how an entity responds.

from sklearn.preprocessing import OneHotEncoder
from sklearn.base import TransformerMixin
from sklearn_pandas import DataFrameMapper


class SnapshotAggregator(TransformerMixin):
    """
    sa = SnapshotAggregator(5, 'count', 2)
    sa.fit(event_df)
    out = sa.transform(event_df)

    # we could combine multiple
    sa = SnapshotAggregator(5, 'sum', 4)
    sa.fit(event_df)
    out2 = sa.transform(event_df)

    sa = SnapshotAggregator(5, 'count', 4)
    sa.fit(event_df)
    out3 = sa.transform(event_df)

    feats = (out
     .join(out2, on='entity', how='outer', lsuffix='2_count', rsuffix='4_sum')
     .join(out3, on='entity', how='outer', rsuffix='4_count'))

    """
    def __init__(self, time, agg, delta, categories = 'auto'):
        self.time = time
        self.agg = agg
        self.delta = delta
        self.categories = categories
        self.mapper = DataFrameMapper([(['attribute'], OneHotEncoder(categories = categories))])

    def gen_colnames(self, names):
        names = ["{}_{}_{}".format(x, self.delta, self.agg) for x in names]
        return names

    def time_filter(self, X, **kwargs):
        time = kwargs.get('time', self.time)
        agg = kwargs.get('agg', self.agg)
        delta = kwargs.get('delta', self.delta)

        df_snap = X[(X['time'] < time) & (X['time'] >= time - delta)]
        return df_snap

    def fit(self, X, y=None, **kwargs):
        # need to save the number of attributes we see, so that we can fill it out when scoring
        df_snap = self.time_filter(X, **kwargs)
        self.mapper.fit(df_snap)
        return self

    def transform(self, X, **kwargs):
        """
        Note that this is destructive compared with expectations around what scikit-learn
        actually does, due to the groupby transformation
        """
        df_snap = self.time_filter(X, **kwargs)
        df_out = self.mapper.transform(df_snap)
        df_out = df_out * df_snap['value'].values.reshape(-1, 1)
        df_out = pd.DataFrame(df_out, columns = self.gen_colnames(self.mapper.transformed_names_))
        if self.agg == 'count':
            df_out = df_out.replace(0, np.nan)
        df_out['entity'] = df_snap['entity'].values
        # return df_out
        return df_out.groupby('entity').agg(self.agg).reset_index()



This may not be that straightforward or “automated” in the sense that it achieves everything in a streamlined way - but it does offer a principled way to rebuild things at different snapshots so that it can be used in training and scoring. An example will be provided in a future post.