We previously looked at Vowpal Wabbit and the design choices made there. This got me thinking; what can we learn if we constrain the interfaces of our feature store?
Here are some assumptions:
- All data arrives as event (or log data)
- We generate features (per pipeline only) off this log data
Based on this, we can immediately “steal” what some of the interfaces could look like.
Event File Format
The event file format in this instance would be a simple logger format. We won’t dwell on this too much, as there could be a variety of ways this looks like (I’m not even going to claim that the scheme I’ve laid out is “canonical” in any sense):
<TIMESTAMP> <ENTITY>[ID]: [<KEY>=<VALUE>]|<MSG>
Based on this, we can make some assumptions that the log data should be sorted by timestamp. If it isn’t we can (hopefully) call a simple sort
on the file first. Due to this, we’ll limit <TIMESTAMP>
to be in YYYY-MM-DD
format so that it can be sorted (we’ll ignore the HH:MM:SS
component for simplicity for now)
This format, can allow for data with multiple entities, we just need them to be split by <ENTITY>
and the [ID]
corresponding to that entity. For simplicity, I’m going to assume that we never need to “join” data across entities.
The Feature Pipeline (Take 1)
The process for the pipeline from here would be as follows:
- Filter by date to avoid time leakage (we can use
awk
) - Magically flatten the log files so that each entity has a single row in the file, dropping the time component. (should be able to use
awk
; may need tosort
first!)
Input file (just generated off some log files, and reformatted for sanity). We’ll use %Y%m%d
format, as it can be compared without date conversions.
while read mon day time rest; do
printf "%s %s\n" "$(date -d "$mon $day" +%Y%m%d)" "$rest"
done < /var/log/kern.log > ~/kern.txt
Or for reproducibility, we’ll generate some data randomly in Python; this will create a log file that is several GB large.
import random
fname = "sample.txt"
date_list = [20200901+i for i in range(25)]
# entity_list = list("abcde")
entity_list = list("a") ## presume we only have one entity for nw
id_list = range(100)
key=list("xyz")
values=range(10)
num_rows = 1e5
with open(fname, 'w') as f:
for idx in range(int(num_rows)):
date = random.choice(date_list)
entity = random.choice(entity_list)
ids = random.choice(id_list)
num_kv = random.choice([1,2,3])
kv = ""
for _ in range(num_kv):
k = random.choice(key)
v = random.choice(values)
kv += f" {k}:{v}" # vw string format
f.write(f"{date} {entity}[{ids}]: {kv}\n")
if idx % 100000 == 0:
print(idx)
Some sample output would look like this:
20200923 a[2]: y:7 z:3 z:7
20200922 a[2]: x:2
20200923 a[0]: z:9
20200923 a[2]: z:3
20200922 a[0]: x:6
20200922 a[0]: y:3 y:9
If we want to filter by date from 23 Sep 2020, and going back 1 day it would be:
awk -vDate=`date -d'2020-09-23' +%Y%m%d` -vDate2=`date -d'20200923 -1 days' +%Y%m%d` '{if($1 <= Date && $1 >= Date2) print }' ~/sample.txt | wc -l
We can try it with different combinations as well.
awk -vDate=`date -d'2020-09-23' +%Y%m%d` -vDate2=`date -d'20200923 -14 days' +%Y%m%d` '{if($1 <= Date && $1 >= Date2) print }' ~/sample.txt | wc -l
Next we want to flatten the logs via awk, but we’ll need to sort first (n.b. sort is very expensive normally…this can be done via sort -k 2,2
)
If we didn’t sort, it would still work as a reducer
operation; i.e. we flatten, then sort, then flatten which maybe “cheaper”.
To do this, we compose the AWK script as follows, which uses the !a[$1]++
pattern to iterate through each key, and we accumulate an array of values to append to the growing list of features.
!a[$1]++
{
if("_" in b){
print e, b["_"]; delete b
}
}
{
e=$1;$1="";b["_"]=b["_"]""$0
}
END
{
print e, b["_"]
}'
Putting it altogether (with some minimal changes in the date filtering):
awk -vDate=`date -d'2020-09-23' +%Y%m%d` -vDate2=`date -d'20200923 -1 days' +%Y%m%d` '{if($1 <= Date && $1 >= Date2) {$1=""; gsub(/^[ \t]+|[ \t]+$/, ""); print $0} }' ~/sample.txt | sort -k 1,1 | awk '!a[$1]++{if("_" in b){print e, b["_"]; delete b}}{e=$1;$1="";b["_"]=b["_"]""$0} END{print e, b["_"]}' | awk '{ sub(/:[ ]+/, " | "); print }'
Building Features Automatically using Python Script
Now its probably more than possible to just do the whole thing in awk
, but I somewhat can’t be bothered at this stage.
import sys
import json
for line in sys.stdin:
entity, value = line.split("|", 1)
data = [x.split(":") for x in value.strip().split()]
feat_sum = {}
feat_count = {}
for d in data:
v_sum = feat_sum.get(d[0], 0.)
v_sum += float(d[1])
feat_sum[d[0]] = v_sum
v_count = feat_count.get(d[0], 0.)
v_count += 1
feat_count[d[0]] = v_count
feat = {
**{f"count_{k}":v for k,v in feat_count.items()},
**{f"sum_{k}":v for k,v in feat_sum.items()},
}
print(entity, "|", feat)
If we use Pandas, its probably a bit easier (to read…maybe)
import sys
import json
import pandas as pd
def to_numeric(d):
return {k: float(v) for k,v in d.items()}
for line in sys.stdin:
entity, value = line.split("|", 1)
data = pd.concat([pd.DataFrame([to_numeric(dict([x.split(":")]))]) for x in value.strip().split()])
feat = {}
feat_info = [{f'count_{k}': v['count'], f'sum_{k}': v['mean']} for k,v in data.describe().to_dict().items()]
for f in feat_info:
feat = {**f, **feat}
print(entity, "|", json.dumps(feat))
awk -vDate=`date -d'2020-09-23' +%Y%m%d` -vDate2=`date -d'20200923 -1 days' +%Y%m%d` '{if($1 <= Date && $1 >= Date2) {$1=""; gsub(/^[ \t]+|[ \t]+$/, ""); print $0} }' ~/sample.txt | sort -k 1,1 | awk '!a[$1]++{if("_" in b){print e, b["_"]; delete b}}{e=$1;$1="";b["_"]=b["_"]""$0} END{print e, b["_"]}' | awk '{ sub(/:[ ]+/, " | "); print }' | python process.py
This leads to the vowpal wabbit format for generating features. With this, we have a mechanism for converting log data to feature data with a time split.
Note that if you benchmark the scripts, there are very big performance differences:
time (awk -vDate=`date -d'2020-09-23' +%Y%m%d` -vDate2=`date -d'20200923 -10 days' +%Y%m%d` '{if($1 <= Date && $1 >= Date2) {$1=""; gsub(/^[ \t]+|[ \t]+$/, ""); print $0} }' ~/sample.txt | sort -k 1,1 | awk '!a[$1]++{if("_" in b){print e, b["_"]; delete b}}{e=$1;$1="";b["_"]=b["_"]""$0} END{print e, b["_"]}' | awk '{ sub(/:[ ]+/, " | "); print }' | head -n 100 | python process.py)
With the output: 1.06s user 0.09s system 108% cpu 1.059 total
time (awk -vDate=`date -d'2020-09-23' +%Y%m%d` -vDate2=`date -d'20200923 -10 days' +%Y%m%d` '{if($1 <= Date && $1 >= Date2) {$1=""; gsub(/^[ \t]+|[ \t]+$/, ""); print $0} }' ~/sample.txt | sort -k 1,1 | awk '!a[$1]++{if("_" in b){print e, b["_"]; delete b}}{e=$1;$1="";b["_"]=b["_"]""$0} END{print e, b["_"]}' | awk '{ sub(/:[ ]+/, " | "); print }' | head -n 100 | python process_pandas.py)
With the output: 146.98s user 0.55s system 98% cpu 2:29.32 total
We can see there is a massive difference in the two kind of approaches! This means there is something to think more about in terms of custom workflows and transformations when building out the feature store.