Although Vowpal Wabbit has mostly lost out in the world of Deep Learning, I still think there is much value in thinking about how it serves features, as launching a Vowpal Wabbit daemon and loading new data in is very much a pipeline I would like to see from the perspective of how a feature store could be built.

The Data Format

The data format makes use of Unix-like ideals, whereby it presumes denormalised data in the form:

<label> <weight> <tag> | <feat1> <feat2>

All encoded in a sparse format, the form <key>:<value>. Then the pipeline can load from essentially a jsonlines like interface to perform both scoring and training.

Why Should We Care?

This kind of file based format is very powerful in generating and creating data for consumption in a pipeline. I think as a first prototype, an approach similar to this can be used to serve as a loose prototype. We can probably rely more on the json interface than the raw vw format. Something like:

mq --table my_table --data my_data

will consume and perform “ETL” on the incoming row of data to generate some data in a denormalized table. This format can allow us to “play back” items to repopulate, or even overwrite items depending on how it is implemented.

from tinydb import TinyDB, Query
import argparse
import tempfile
import json
import os


parser = argparse.ArgumentParser(description="Maquette interface")
parser.add_argument("--table", type=str, action="store")
parser.add_argument("--data", type=str, action="store")


def insert_data(table, data):
    try:
        json_data = json.loads(data)
    except ValueError as e:
        return None
    
    tbl = db.table(table)
    tbl.insert(json_data)
    return None


tmpdir = tempfile.gettempdir()
db_file = os.path.join(tmpdir, 'test.db')
db = TinyDB(str(db_file))

args = parser.parse_args(['--table', 'mytable', '--data', '{"hello":1, "world":2}']) 
insert_data(args.table, args.data)

args = parser.parse_args(['--table', 'mytable', '--data', '{"hello":1, "world":3}']) 
insert_data(args.table, args.data)


tbl = db.table('mytable')
query = Query()
tbl.search(query.hello == 1)  # [{'hello': 1, 'world': 2}, {'hello': 1, 'world': 3}]

Okay, so we have start; but this isn’t quite enough, we need to define what is:

  • The primary key
  • What aggregation types we want to do
  • What is the timestamp or time granularity on which we should consider

Let’s ignore time for now (discussion for another day), and try the first item. Let’s say we want to ingest and perform both a count and sum on the incoming data

from tinydb import TinyDB, where
import argparse
import tempfile
import json
import os


parser = argparse.ArgumentParser(description="Maquette interface")
parser.add_argument("--table", type=str, action="store")
parser.add_argument("--data", type=str, action="store")
parser.add_argument("--id", type=str, action="store")


def insert_data(db, table, data, key):
    try:
        json_data = json.loads(data)
    except ValueError as e:
        return None
    
    tbl = db.table(table)
    feats = tbl.search(where(key) == json_data[key])
    if feats:
        # perform some kind of update
        old_data = feats[0]
        for k, v in json_data.items():
            if k == key:
                continue

            # hard code for now
            sum_field = f"{k}_sum"
            count_field = f"{k}_count"
            sum_val = old_data.get(sum_field)
            count_val = old_data.get(count_field)
            if sum_val is None:
                sum_val = v
            else:
                sum_val += v
            if count_val is None:
                count_val = 1
            else:
                count_val += 1
            old_data[sum_field] = sum_val
            old_data[count_field] = count_val
        json_data = old_data.copy()
    
    tbl.upsert(json_data, where(key) == json_data[key])
    return None


tmpdir = tempfile.gettempdir()
db_file = os.path.join(tmpdir, 'test.db')
db = TinyDB(str(db_file))
tbl = db.table('mytable')

args = parser.parse_args(['--table', 'mytable', '--data', '{"hello":1, "world":2, "id":1}', "--id", "id"]) 
insert_data(db, args.table, args.data, args.id)
print(tbl.search(where(args.id) == 1))

args = parser.parse_args(['--table', 'mytable', '--data', '{"hello":1, "world":3, "id":1}', "--id", "id"]) 
insert_data(db, args.table, args.data, args.id)
print(tbl.search(where(args.id) == 1))

Okay - so now we have the start of how this update mechanism for input data may be developed! In the future, we’ll look into how we can “daemonise” it.