import inspect
from copy import deepcopy
from typing import Dict, Tuple, Optional
from types import SimpleNamespace
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
from type_infer.dtype import dtype
from lightwood.api.types import PredictionArguments
from lightwood.helpers.ts import add_tn_num_conf_bounds, add_tn_cat_conf_bounds, get_ts_groups
from lightwood.data import EncodedDs
from lightwood.analysis.base import BaseAnalysisBlock
from lightwood.analysis.nc.norm import Normalizer
from lightwood.analysis.nc.icp import IcpRegressor, IcpClassifier, IcpTSRegressor
from lightwood.analysis.nc.base import CachedRegressorAdapter, CachedClassifierAdapter, CachedTSAdapter
from lightwood.analysis.nc.nc import BoostedAbsErrorErrFunc, RegressorNc, ClassifierNc, MarginErrFunc, TSNc, \
TSAbsErrorErrFunc
from lightwood.analysis.nc.util import clean_df, set_conf_range, get_numeric_conf_range, \
get_categorical_conf, get_anomalies, get_ts_conf_range
[docs]class ICP(BaseAnalysisBlock):
""" Confidence estimation block, uses inductive conformal predictors (ICPs) for model agnosticity """
def __init__(self,
confidence_normalizer: Optional[bool] = False,
fixed_significance: Optional[float] = None,
deps: Optional[tuple] = tuple()
):
super().__init__(deps=deps)
self.fixed_significance = fixed_significance
self.confidence_normalizer = confidence_normalizer
self.validation_size = 100 # determines size of nonconformity score arrays (has sizable impact in runtime)
[docs] def analyze(self, info: Dict[str, object], **kwargs) -> Dict[str, object]:
ns = SimpleNamespace(**kwargs)
data_type = ns.dtype_dict[ns.target]
output = {'icp': {'__mdb_active': False}}
if 'confidence' in ns.normal_predictions.columns:
# bypass calibrator if model already outputs confidence
output['result_df'] = ns.normal_predictions[['confidence', 'lower', 'upper']]
return {**info, **output}
fit_params = {'horizon': ns.tss.horizon or 0, 'columns_to_ignore': []}
fit_params['columns_to_ignore'].extend([f'timestep_{i}' for i in range(1, fit_params['horizon'])])
if ns.is_classification:
if ns.predictor.supports_proba:
all_cat_cols = [col for col in ns.normal_predictions.columns
if '__mdb_proba' in col and '__mdb_unknown_cat' not in col]
all_classes = np.array([col.replace('__mdb_proba_', '') for col in all_cat_cols])
else:
class_keys = sorted(ns.encoded_val_data.encoders[ns.target].rev_map.keys())
all_classes = np.array([ns.encoded_val_data.encoders[ns.target].rev_map[idx] for idx in class_keys])
if data_type != dtype.tags:
enc = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
enc.fit(all_classes.reshape(-1, 1))
output['label_encoders'] = enc # needed to repr cat labels inside nonconformist
else:
output['label_encoders'] = None
adapter = CachedClassifierAdapter
nc_function = MarginErrFunc()
nc_class = ClassifierNc
icp_class = IcpClassifier
elif ns.is_multi_ts:
adapter = CachedTSAdapter
nc_function = TSAbsErrorErrFunc(horizon_length=ns.tss.horizon)
nc_class = TSNc
icp_class = IcpTSRegressor
else:
adapter = CachedRegressorAdapter
nc_function = BoostedAbsErrorErrFunc()
nc_class = RegressorNc
icp_class = IcpRegressor
result_df = pd.DataFrame()
if ns.is_numerical or (ns.is_classification and data_type != dtype.tags):
model = adapter(ns.predictor)
norm_params = {'target': ns.target, 'dtype_dict': ns.dtype_dict, 'predictor': ns.predictor,
'encoders': ns.encoded_val_data.encoders, 'is_multi_ts': ns.is_multi_ts, 'stop_after': 1e2}
if self.confidence_normalizer:
normalizer = Normalizer(fit_params=norm_params)
normalizer.fit(ns.train_data)
normalizer.prediction_cache = normalizer(ns.encoded_val_data, args=PredictionArguments())
else:
normalizer = None
# instance the ICP
nc = nc_class(model, nc_function, normalizer=normalizer)
if 'horizon_length' in inspect.signature(icp_class).parameters:
icp = icp_class(nc, horizon_length=ns.tss.horizon, cal_size=self.validation_size)
else:
icp = icp_class(nc, cal_size=self.validation_size)
output['icp']['__default'] = icp
icp_df = deepcopy(ns.data)
# setup prediction cache to avoid additional .predict() calls
try:
pred_is_list = isinstance(ns.normal_predictions['prediction'][0], list)
except KeyError:
pred_is_list = False
if ns.is_classification:
if ns.predictor.supports_proba:
icp.nc_function.model.prediction_cache = ns.normal_predictions[all_cat_cols].values
else:
if ns.is_multi_ts:
preds = np.array([p[0] for p in ns.normal_predictions['prediction']])
else:
preds = ns.normal_predictions['prediction'].values
predicted_classes = output['label_encoders'].transform(preds.reshape(-1, 1)) # inflate OHE
icp.nc_function.model.prediction_cache = predicted_classes
elif ns.is_multi_ts or pred_is_list:
icp.nc_function.model.prediction_cache = np.array(
[np.array(p) for p in ns.normal_predictions['prediction']])
else:
icp.nc_function.model.prediction_cache = np.array(ns.normal_predictions['prediction'])
if not ns.is_classification:
output['df_target_stddev'] = {'__default': ns.stats_info.df_target_stddev}
# fit additional ICPs in time series tasks with grouped columns
if ns.tss.is_timeseries and ns.tss.group_by:
# generate a multiindex
midx = pd.MultiIndex.from_frame(icp_df[[*ns.tss.group_by, f'__mdb_original_{ns.tss.order_by}']])
icp_df.index = midx
result_df.index = midx
# create an ICP for each possible group
group_info = ns.data[ns.tss.group_by].to_dict('list')
all_group_combinations = get_ts_groups(ns.data, ns.tss)
all_group_combinations.remove('__default')
output['icp']['__mdb_groups'] = all_group_combinations
output['icp']['__mdb_group_keys'] = [x for x in group_info.keys()]
for combination in all_group_combinations:
output['icp'][tuple(combination)] = deepcopy(icp)
# calibrate ICP
icp_df = deepcopy(ns.data)
icp_df, y = clean_df(icp_df, ns, output.get('label_encoders', None))
if ns.tss.is_timeseries and ns.tss.group_by:
icp_df.index = midx
output['icp']['__default'].index = icp_df.columns
output['icp']['__default'].calibrate(icp_df.values, y)
# get confidence estimation for validation dataset
conf, ranges = set_conf_range(
icp_df, icp, ns.dtype_dict[ns.target],
output, positive_domain=ns.stats_info.positive_domain, significance=self.fixed_significance)
if not ns.is_classification:
result_df = pd.DataFrame(index=icp_df.index, columns=['confidence', 'lower', 'upper'], dtype=float)
result_df.loc[icp_df.index, 'lower'] = ranges[:, 0]
result_df.loc[icp_df.index, 'upper'] = ranges[:, 1]
else:
result_df = pd.DataFrame(index=icp_df.index, columns=['confidence'], dtype=float)
result_df.loc[icp_df.index, 'confidence'] = conf
# calibrate additional grouped ICPs
if ns.tss.is_timeseries and ns.tss.group_by:
icps = output['icp']
group_keys = icps['__mdb_group_keys']
# add all predictions to DF
icps_df = deepcopy(ns.data)
midx = pd.MultiIndex.from_frame(icps_df[[*ns.tss.group_by, f'__mdb_original_{ns.tss.order_by}']])
icps_df.index = midx
if ns.is_multi_ts or pred_is_list:
icps_df[f'__predicted_{ns.target}'] = [np.array(p) for p in ns.normal_predictions['prediction']]
else:
icps_df[f'__predicted_{ns.target}'] = np.array(ns.normal_predictions['prediction'])
for group in icps['__mdb_groups']:
icp_df = icps_df
# filter irrelevant rows for each group combination
icp_df['__mdb_norm_index'] = np.arange(len(icp_df))
for key, val in zip(group_keys, group):
icp_df = icp_df[icp_df[key] == val]
if icps[tuple(group)].nc_function.normalizer is not None:
group_normalizer = icps[tuple(group)].nc_function.normalizer
norm_input_df = ns.encoded_val_data.data_frame.iloc[icp_df.pop('__mdb_norm_index')]
norm_input = EncodedDs(ns.encoded_val_data.encoders, norm_input_df, ns.target)
norm_cache = group_normalizer(norm_input, args=PredictionArguments())
icp_df[f'__norm_{ns.target}'] = norm_cache
# save relevant predictions in the caches, then calibrate the ICP
pred_cache = icp_df.pop(f'__predicted_{ns.target}').values
if ns.is_multi_ts and ns.is_classification:
pred_cache = output['label_encoders'].transform([[p[0] for p in pred_cache]])
elif ns.is_multi_ts:
pred_cache = np.array([np.array(p) for p in pred_cache])
elif ns.is_classification:
pred_cache = output['label_encoders'].transform(pred_cache.reshape(-1, 1))
icps[tuple(group)].nc_function.model.prediction_cache = pred_cache
icp_df, y = clean_df(icp_df, ns, output.get('label_encoders', None))
if icps[tuple(group)].nc_function.normalizer is not None:
icps[tuple(group)].nc_function.normalizer.prediction_cache = icp_df.pop(
f'__norm_{ns.target}').values
icps[tuple(group)].index = icp_df.columns # important at inference time
icps[tuple(group)].calibrate(icp_df.values, y)
# save training std() for bounds width selection
if not ns.is_classification:
icp_train_df = ns.data
for key, val in zip(group_keys, group):
icp_train_df = icp_train_df[icp_train_df[key] == val]
y_train = icp_train_df[ns.target].values
output['df_target_stddev'][tuple(group)] = y_train.std()
# get bounds for relevant rows in validation dataset
conf, group_ranges = set_conf_range(
icp_df, icps[tuple(group)],
ns.dtype_dict[ns.target],
output, group=tuple(group),
positive_domain=ns.stats_info.positive_domain, significance=self.fixed_significance)
# save group bounds
if not ns.is_classification:
result_df.loc[icp_df.index, 'lower'] = group_ranges[:, 0]
result_df.loc[icp_df.index, 'upper'] = group_ranges[:, 1]
result_df.loc[icp_df.index, 'confidence'] = conf
# consolidate all groups here
output['icp']['__mdb_active'] = True
result_df.index = ns.data.index
output['result_df'] = result_df
info = {**info, **output}
return info
[docs] def explain(self, row_insights: pd.DataFrame, global_insights: Dict[str, object],
**kwargs) -> Tuple[pd.DataFrame, Dict[str, object]]:
ns = SimpleNamespace(**kwargs)
is_categorical = ns.target_dtype in (dtype.binary, dtype.categorical, dtype.cat_array, dtype.cat_tsarray)
is_numerical = ns.target_dtype in (dtype.integer, dtype.float,
dtype.quantity, dtype.num_array, dtype.num_tsarray)
is_multi_ts = ns.tss.is_timeseries and ns.tss.horizon > 1
is_anomaly_task = is_numerical and ns.tss.is_timeseries and ns.anomaly_detection
if 'confidence' in ns.predictions.columns:
# bypass calibrator if model already outputs confidence
row_insights['prediction'] = ns.predictions['prediction']
if 'upper' in ns.predictions.columns and 'lower' in ns.predictions.columns:
row_insights['upper'] = ns.predictions['upper']
row_insights['lower'] = ns.predictions['lower']
if not isinstance(ns.predictions['confidence'].iloc[0], list) and ns.tss.horizon > 1:
row_insights['confidence'] = ns.predictions['confidence'].astype(object)
row_insights['confidence'] = row_insights['confidence'].apply(
lambda x: [x for _ in range(ns.tss.horizon)])
else:
row_insights['confidence'] = ns.predictions['confidence']
return self._formatted(row_insights, global_insights, ns, is_numerical)
if ns.analysis['icp']['__mdb_active']:
icp_X = deepcopy(ns.data)
# replace observed data w/predictions
preds = ns.predictions['prediction']
if is_multi_ts and is_numerical:
preds = np.array([np.array(p) for p in preds])
for col in [f'timestep_{i}' for i in range(1, ns.tss.horizon)]:
if col in icp_X.columns:
icp_X.pop(col) # erase ignorable columns
target_cols = [ns.target_name] + [f'{ns.target_name}_timestep_{i}' for i in range(1, ns.tss.horizon)]
icp_X[target_cols] = preds
elif is_multi_ts and is_categorical:
preds = [p[0] for p in preds]
icp_X[ns.target_name] = preds
else:
icp_X[ns.target_name] = preds
if (is_numerical or is_categorical) and ns.analysis['icp'].get('__mdb_active', False):
base_icp = ns.analysis['icp']['__default']
# reorder DF index
index = base_icp.index.values
if ns.target_name not in index:
if is_multi_ts:
index = np.array(list(index) + [ns.target_name] +
[f'{ns.target_name}_timestep_{i}' for i in range(1, ns.tss.horizon)])
else:
index = np.append(index, ns.target_name)
icp_X = icp_X.reindex(columns=index) # important, else bounds can be invalid
# only one normalizer, even if it's a grouped time series task
normalizer = base_icp.nc_function.normalizer
if normalizer:
normalizer.prediction_cache = normalizer(ns.encoded_data, args=PredictionArguments)
icp_X['__mdb_selfaware_scores'] = normalizer.prediction_cache
# get ICP predictions
if is_multi_ts:
result_cols = ['significance', 'lower', 'upper', ] + \
[f'lower_timestep_{i}' for i in range(1, ns.tss.horizon)] + \
[f'upper_timestep_{i}' for i in range(1, ns.tss.horizon)] + \
[f'significance_timestep_{i}' for i in range(1, ns.tss.horizon)]
elif is_numerical:
result_cols = ['lower', 'upper', 'significance']
else:
result_cols = ['significance']
result = pd.DataFrame(index=icp_X.index, columns=result_cols)
# base ICP
X = deepcopy(icp_X)
# Calling `values` multiple times increased runtime of this function; referenced var is faster
icp_values = X.values
# get all possible ranges
if is_numerical:
base_icp.nc_function.model.prediction_cache = preds
all_confs = base_icp.predict(icp_values)
# categorical
else:
predicted_proba = True if any(['__mdb_proba' in col for col in ns.predictions.columns]) else False
if predicted_proba:
all_cat_cols = [col for col in ns.predictions.columns
if '__mdb_proba' in col and '__mdb_unknown_cat' not in col]
class_dists = ns.predictions[all_cat_cols].values
for icol, cat_col in enumerate(all_cat_cols):
row_insights.loc[X.index, cat_col] = class_dists[:, icol]
else:
ohe_enc = ns.analysis['label_encoders']
class_dists = ohe_enc.transform(np.array([p[0] for p in preds]).reshape(-1, 1))
base_icp.nc_function.model.prediction_cache = class_dists
all_ranges = np.array([base_icp.predict(icp_values)])
all_confs = np.swapaxes(np.swapaxes(all_ranges, 0, 2), 0, 1)
# convert (B, 2, 99) into (B, 2) given width or error rate constraints
if is_multi_ts and is_numerical:
significances, confs = get_ts_conf_range(all_confs,
df_target_stddev=ns.analysis['df_target_stddev'],
positive_domain=ns.positive_domain,
fixed_conf=ns.pred_args.fixed_confidence)
result = self._ts_assign_confs(result, X, confs, significances, ns.tss)
elif is_numerical:
significances, confs = get_numeric_conf_range(all_confs,
df_target_stddev=ns.analysis['df_target_stddev'],
positive_domain=ns.positive_domain,
fixed_conf=ns.pred_args.fixed_confidence)
result.loc[X.index, 'lower'] = confs[:, 0]
result.loc[X.index, 'upper'] = confs[:, 1]
result.loc[X.index, 'significance'] = significances
else:
significances = get_categorical_conf(all_confs)
result.loc[X.index, 'significance'] = significances.flatten()
# grouped time series, we replace bounds in rows that have a trained ICP
if ns.analysis['icp'].get('__mdb_groups', False):
icps = ns.analysis['icp']
group_keys = icps['__mdb_group_keys']
for group in icps['__mdb_groups']:
icp = icps[tuple(group)]
# check ICP has calibration scores
if icp.cal_scores[0].shape[0] > 0:
# filter rows by group
X = deepcopy(icp_X)
for key, val in zip(group_keys, group):
X = X[X[key] == val]
if X.size > 0:
# set ICP caches
if is_multi_ts and is_numerical:
target_cols = [ns.target_name] + [f'{ns.target_name}_timestep_{i}'
for i in range(1, ns.tss.horizon)]
icp.nc_function.model.prediction_cache = X[target_cols].values
[X.pop(col) for col in target_cols]
elif is_multi_ts and is_categorical:
ohe_enc = ns.analysis['label_encoders']
preds = X.pop(ns.target_name).values
pred_cache = ohe_enc.transform(np.array([p[0] for p in preds]).reshape(-1, 1))
icp.nc_function.model.prediction_cache = pred_cache
else:
icp.nc_function.model.prediction_cache = X.pop(ns.target_name).values
if icp.nc_function.normalizer:
icp.nc_function.normalizer.prediction_cache = X.pop('__mdb_selfaware_scores').values
# predict and get confidence level given width or error rate constraints
if is_multi_ts and is_numerical:
all_confs = icp.predict(X.values)
fixed_conf = ns.pred_args.fixed_confidence
significances, confs = get_ts_conf_range(
all_confs,
df_target_stddev=ns.analysis['df_target_stddev'],
positive_domain=ns.positive_domain,
group=tuple(group),
fixed_conf=fixed_conf
)
result = self._ts_assign_confs(result, X, confs, significances, ns.tss)
elif is_numerical:
all_confs = icp.predict(X.values)
fixed_conf = ns.pred_args.fixed_confidence
significances, confs = get_numeric_conf_range(
all_confs,
df_target_stddev=ns.analysis['df_target_stddev'],
positive_domain=ns.positive_domain,
group=tuple(group),
fixed_conf=fixed_conf
)
# only replace where grouped ICP is more informative (i.e. tighter)
if fixed_conf is None:
default_widths = result.loc[X.index, 'upper'] - result.loc[X.index, 'lower']
grouped_widths = np.subtract(confs[:, 1], confs[:, 0])
insert_index = (default_widths > grouped_widths)[lambda x: x.isin([True])].index
conf_index = (default_widths.reset_index(drop=True) >
grouped_widths)[lambda x: x.isin([True])].index
result.loc[insert_index, 'lower'] = confs[conf_index, 0]
result.loc[insert_index, 'upper'] = confs[conf_index, 1]
result.loc[insert_index, 'significance'] = significances[conf_index]
else:
all_ranges = np.array([icp.predict(X.values)])
all_confs = np.swapaxes(np.swapaxes(all_ranges, 0, 2), 0, 1)
significances = get_categorical_conf(all_confs)
result.loc[X.index, 'significance'] = significances.flatten()
row_insights['confidence'] = result['significance']
if is_numerical:
row_insights['lower'] = result['lower']
row_insights['upper'] = result['upper']
# anomaly detection
if is_anomaly_task:
row_insights['anomaly'] = None
if ns.target_name in ns.data.columns:
anomalies = get_anomalies(row_insights,
ns.data[ns.target_name],
cooldown=ns.pred_args.anomaly_cooldown)
if '__mdb_ts_inferred' in ns.data.columns:
ignore_idxs = np.where(ns.data['__mdb_ts_inferred'].values)
anomalies[ignore_idxs] = None
row_insights['anomaly'] = list(anomalies)
if ns.tss.is_timeseries and ns.tss.horizon > 1:
if is_numerical and ns.pred_args.simple_ts_bounds:
row_insights = add_tn_num_conf_bounds(row_insights, ns.tss)
elif not is_numerical:
row_insights = add_tn_cat_conf_bounds(row_insights, ns.tss)
row_insights, global_insights = self._formatted(row_insights, global_insights, ns, is_numerical)
return row_insights, global_insights
@staticmethod
def _formatted(row_insights, global_insights, ns, is_numerical):
# clip if necessary
if is_numerical:
lower_limit = 0.0 if ns.positive_domain else -pow(2, 62)
upper_limit = pow(2, 62)
if not (ns.tss.is_timeseries and ns.tss.horizon > 1):
row_insights['upper'] = row_insights['upper'].clip(lower_limit, upper_limit)
row_insights['lower'] = row_insights['lower'].clip(lower_limit, upper_limit)
row_insights['prediction'] = row_insights['prediction'].clip(lower_limit, upper_limit)
else:
row_insights['upper'] = [np.array(row).clip(lower_limit, upper_limit).tolist()
for row in row_insights['upper']]
row_insights['lower'] = [np.array(row).clip(lower_limit, upper_limit).tolist()
for row in row_insights['lower']]
row_insights['prediction'] = [np.array(row).clip(lower_limit, upper_limit).tolist()
for row in row_insights['prediction']]
# Make sure the target and real values are of an appropriate type
if ns.target_dtype in (dtype.integer, ):
row_insights['prediction'] = row_insights['prediction'].astype(int)
row_insights['upper'] = row_insights['upper'].astype(int)
row_insights['lower'] = row_insights['lower'].astype(int)
elif ns.target_dtype in (dtype.float, dtype.quantity):
row_insights['prediction'] = row_insights['prediction'].astype(float)
row_insights['upper'] = row_insights['upper'].astype(float)
row_insights['lower'] = row_insights['lower'].astype(float)
elif ns.target_dtype in (dtype.short_text, dtype.rich_text, dtype.binary, dtype.categorical):
row_insights['prediction'] = row_insights['prediction'].astype(str)
# horizon collapse
if ns.tss.is_timeseries and is_numerical and ns.tss.horizon > 1:
row_insights['prediction_sum'] = row_insights['prediction'].apply(lambda x: sum(x))
row_insights['lower_sum'] = row_insights['lower'].apply(lambda x: min(x))
row_insights['upper_sum'] = row_insights['upper'].apply(lambda x: max(x))
row_insights['confidence_mean'] = row_insights['confidence'].apply(lambda x: np.mean(x))
return row_insights, global_insights
@staticmethod
def _ts_assign_confs(result, df, confs, significances, tss) -> pd.DataFrame:
result.loc[df.index, 'lower'] = confs[:, 0, 0]
result.loc[df.index, 'upper'] = confs[:, 0, 1]
result.loc[df.index, 'significance'] = significances[:, 0]
for timestep in range(1, tss.horizon):
result.loc[df.index, f'lower_timestep_{timestep}'] = confs[:, timestep, 0]
result.loc[df.index, f'upper_timestep_{timestep}'] = confs[:, timestep, 1]
result.loc[df.index,
f'significance_timestep_{timestep}'] = significances[:, timestep]
# TODO: only if tighter
# merge all significances, lower and upper bounds into a single column
for base_col in ['significance', 'lower', 'upper']:
added_cols = [f'{base_col}_timestep_{t}' for t in range(1, tss.horizon)]
cols = [base_col] + added_cols
result.loc[df.index, base_col] = result.loc[df.index, cols].values.tolist()
return result