Let’s try to build a minimalistic feature store pipeline using things from the scipy stack. We can make use of the strategy pattern to then manage the data pipelines which flow through so that we build consistent features.
import pandas as pd
from sklearn.base import TransformerMixin
from collections import namedtuple
from sklearn.preprocessing import StandardScaler
from sklearn_pandas import DataFrameMapper
import numpy as np
AttributeValue = namedtuple('AttributeValue', ['a', 'v'])
class Identity(TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X):
return X
class TimeFilter(TransformerMixin):
def __init__(self, start_date=AttributeValue("start_date", 1), end_date=AttributeValue("end_date", 4)):
"""
Presume the configuration is attribute-value pairing on the data set
We can make them objects, but set them as a dict for ease of reading.
Usage:
X = pd.DataFrame({"x": [1,2,3,4], "start_value":[0,1,2,3], "end_value": [2,3,4,5]})
TimeFilter().tranform(X)
"""
self.start_date = start_date
self.end_date = end_date
def fit(self, X, y=None):
return self
def transform(self, df):
return df[(df[self.start_date.a] >= self.start_date.v) & (df[self.end_date.a] <= self.end_date.v)]
if __name__ == "__main__":
mapper = DataFrameMapper([
(['x'], StandardScaler()),
('start_date', Identity()),
('end_date', Identity()),
], df_out=True)
mapper.fit_transform(TimeFilter().transform(X))
# now apply onto a bigger dataset, for multiple time periods...
X_full = pd.DataFrame({"x": np.arange(100), "start_date":np.arange(100), "end_date": np.arange(100)+2})
df_all = []
for start_date, end_date in zip(np.arange(90), np.arange(90) + 3):
df_all.append(mapper.transform(TimeFilter(start_date=AttributeValue("start_date", start_date), end_date=AttributeValue("end_date", end_date)).transform(X_full)).assign(timestamp=start_date))
# and here we have applied the said feature over multiple time periods!
df_all = pd.concat(df_all)
print(df_all)
In this part, the “strategy pattern” is implemented here:
for start_date, end_date in zip(np.arange(90), np.arange(90) + 3):
df_all.append(mapper.transform(TimeFilter(start_date=AttributeValue("start_date", start_date), end_date=AttributeValue("end_date", end_date)).transform(X_full)).assign(timestamp=start_date))
As part of the TimeFilter
object. Of course we should aim to do a slightly better job - but we’ll leave the refactoring exercise for later.