In this post we’ll look at an implementation of ABDM (Association-Based Distance Metric) and MVDM (Modified Value Difference Metric) for categorical encoding which can be used in k-nearest-neighbours. There currently isn’t a paper on this but is forthcoming at the time of writing. This is a quick implementation based on the Seldon Alibi implementation; mainly because their implementation wasn’t very “sklearn-eqsue”. This is also for my own edification!
High Level View of Shared Components
Whats the Algorithm?
The heart of both algorithms is the usage of multi-dimensional scaling; whereby it takes an array of ordinal values, and some data and magically transforms it into a mapping from the ordinal values to a multi-dimensional scale version of it.
To do this, it computes several steps:
- Compute the conditional probability of the values with respect to the provided data
- Calculate the distance among all ordinal values based on the conditional probability
- Use multi-dimensional scaling to conver the distance matrix to a 1-D vector, so that we have a mapping between the ordinal values to an embedding.
Conditional (Empirical) Probability The easiest way to represent this is if we one-hot encode our data, and perform a pivot to calculate the empircal mean by group (i.e. conditional probability).
The treatment for continuous variables in our reference data is then to discretise them through binning or some other mechanism.
This ends up being a one-line:
mean_over = one_hot_dataframe.groupby(series_array).agg("mean")
Distance Calculation We can then calculate the distance using any pair-wise distance measure like so:
d_pair_col = manhattan_distances(mean_over)
In ABDM the distance metric is also additionally transformed by KL-divergence, as an improved measure of dissimilarity; This is just a preprocessing step before we apply a distance calculation.
This ends up looking like something here:
from scipy.special import rel_entr # this is KL Divergence
eps = 1e-16
kl_div_dist = lambda u, v: rel_entr(u+eps, v+eps) + rel_entr(v+eps, u+eps)
def iterable_wrapper(X, iterable_transform):
X_dist = np.zeros(X.shape)
for row in range(X.shape[0]):
for col in range(X.shape[1]):
if col < row:
X_dist[row, col] = iterable_transform(row, col)
X_dist += X_dist.T
return X_dist
Multi-dimensional Scaling For multi-dimensional scaling, this is computed off scikit-learn’s algorithms:
mds = MDS(
n_components=2,
max_iter=5000,
eps=1e-9,
random_state=0,
n_init=4,
dissimilarity="precomputed",
metric=True,
)
mds.fit(d_pair_col)
emb = mds.embedding_
origin = emb[np.argsort(np.linalg.norm(emb, axis=1))[-1]].reshape(1, -1)
x_mapping = np.linalg.norm(emb - origin, axis=1)
final_mapping = zip([list(mean_over.index), x_mapping.tolist()])
High Level View of Split Components
The difference in the two algorithms can be summaried by how the comparitive dataset is produced, as well as the preprocessing which is used in both cases.
MVDM: the comparison dataset is a one-hot encoded version of the label ABDM: the comparison dataset is the modelling dataset itself, with the numeric variables discretised. In addition it uses a symmetric KL-Divergence as a preprocessor before applying manhattan distance.
From here we can apply some preprocessing for both approaches and wrap it up. Would I use the code below in production? Maybe; just may need some config/others so that parts like KBin and MDS portions are easily configurable.
# ABDM and MVDM
import numpy as np
from typing import Dict, Tuple
from sklearn.manifold import MDS
from itertools import product
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics.pairwise import manhattan_distances
from itertools import combinations
from sklearn.base import TransformerMixin
from sklearn.preprocessing import StandardScaler, KBinsDiscretizer
from sklearn.pipeline import make_pipeline
from scipy.special import rel_entr
eps = 1e-16
kl_div_dist = lambda u, v: rel_entr(u + eps, v + eps) + rel_entr(v + eps, u + eps)
def iterable_wrapper(X, iterable_transform):
X_dist = np.zeros(X.shape)
for row in range(X.shape[0]):
for col in range(X.shape[1]):
if col < row:
X_dist[row, col] = iterable_transform(row, col)
X_dist += X_dist.T
return X_dist
def multidim_scaling(
series_array,
one_hot_dataframe,
distance=manhattan_distances,
iterable_transform=None,
):
mean_over = one_hot_dataframe.groupby(series_array).agg("mean")
output_index = mean_over.index
if iterable_transform is not None:
mean_over = iterable_wrapper(mean_over, iterable_transform)
d_pair_col = distance(mean_over)
mds = MDS(
n_components=2,
max_iter=5000,
eps=1e-9,
random_state=0,
n_init=4,
dissimilarity="precomputed",
metric=True,
)
mds.fit(d_pair_col)
emb = mds.embedding_
origin = emb[np.argsort(np.linalg.norm(emb, axis=1))[-1]].reshape(1, -1)
x_mapping = np.linalg.norm(emb - origin, axis=1)
return [list(output_index), x_mapping.tolist()]
class MVDMTransformer(TransformerMixin):
"""
Usage:
```
X = pd.DataFrame({'x0': np.random.choice(range(2), 100), 'x1': np.random.choice(range(2), 100), 'x2':np.random.choice(range(3), 100)})
y_cats = 4
y = np.random.choice(range(y_cats), 100)
# alibi also suggests centering...
mvdm = MVDMTransformer(['x0', 'x1', 'x2'])
X_trans = mvdm.fit_transform(X, y)
pipeline = make_pipeline(MVDMTransformer(['x0', 'x1', 'x2']), StandardScaler())
X_trans = pipeline.fit_transform(X, y)
# now apply k-nn for counterfactuals
```
"""
def __init__(self, col=[]):
"""
For numeric columns, please pre-process using
`sklearn.preprcoessing.KBinsDiscretizer`; or keep as is.
"""
self.col = col
self.y_ohe = None
self.x_mds = None
self.mapping = None
def fit(self, X, y):
mapping = {}
X_sel = X[self.col]
y_ohe = OneHotEncoder(sparse=False).fit_transform(y.reshape(-1, 1))
y_ohe = pd.DataFrame(y_ohe)
for idx, col in enumerate(self.col):
mapping[col] = multidim_scaling(X_sel[col], y_ohe)
self.mapping = mapping.copy()
return self
def transform(self, X, y=None):
# transforms dataset to one with categorical mapping
X_sel = X[self.col].copy()
for col in self.col:
X_sel[col] = X_sel[col].replace(*self.mapping[col])
return X_sel
def fit_transform(self, X, y):
return self.fit(X, y).transform(X)
class ABDMTransformer(TransformerMixin):
"""
This is an unsupervised approach. We have two inputs:
* categorical columns, whereby the transformation is done
* numeric columns, whereby this is first binned and then used as a category to
calculate statistics
Usage:
```
X = pd.DataFrame({'x0': np.random.choice(range(2), 100),
'x1': np.random.choice(range(2), 100),
'x2':np.random.choice(range(3), 100),
'x3': np.random.normal(size=100), 'x4': np.random.normal(size=100)})
y_cats = 4
y = np.random.choice(range(y_cats), 100)
# alibi also suggests centering...
abdm = ABDMTransformer(['x0', 'x1', 'x2'], ['x3', 'x4'])
X_trans_all = abdm.fit_transform(X, y)
abdm = ABDMTransformer(['x0', 'x1', 'x2'], [])
X_trans_cats = abdm.fit_transform(X, y)
pipeline = make_pipeline(ABDMTransformer(['x0', 'x1', 'x2']), StandardScaler())
X_trans = pipeline.fit_transform(X, y)
# See if there is a linear transformation betwene the two
from sklearn.linear_model import SGDRegressor
mod = SGDRegressor()
X_in = manhattan_distances(X_trans_all)[np.argwhere(manhattan_distances(X_trans_all) != 0)].flatten()
y_in = manhattan_distances(X_trans_cats)[np.argwhere(manhattan_distances(X_trans_cats) != 0)].flatten()
mod.fit(X_in.reshape(-1, 1), y_in)
# calculate mse
from sklearn.metrics import mean_squared_error
mean_squared_error(y_in, mod.predict(X_in.reshape(-1, 1)))
# now apply k-nn for counterfactuals
```
"""
def __init__(self, col=[], num_col=[]):
"""
For numeric columns, please pre-process using
`sklearn.preprcoessing.KBinsDiscretizer`
"""
self.col = col
self.num_col = num_col
self.kbins_discrete = None
self.x_mds = None
self.mapping = None
def fit(self, X, y=None):
mapping = {}
X_sel = X[self.col]
if len(self.num_col) > 0:
X_num = X[self.num_col]
self.kbins_discrete = KBinsDiscretizer(encode="ordinal")
X_num = pd.DataFrame(
self.kbins_discrete.fit_transform(X_num).astype(int),
columns=self.num_col,
)
X_sel = pd.concat([X_sel, X_num], axis=1)
all_cols = list(X_sel.columns)
for idx, col in enumerate(self.col):
all_other_cols = [x for x in all_cols if x != col]
X_other = pd.DataFrame(
OneHotEncoder(sparse=False).fit_transform(X_sel[all_other_cols])
)
mapping[col] = multidim_scaling(
X_sel[col], X_other, iterable_transform=kl_div_dist
)
self.mapping = mapping.copy()
return self
def transform(self, X, y=None):
# transforms dataset to one with categorical mapping
X_sel = X[self.col].copy()
for col in self.col:
X_sel[col] = X_sel[col].replace(*self.mapping[col])
return X_sel
def fit_transform(self, X, y):
return self.fit(X, y).transform(X)