Using your own pre-processing methods in Lightwood
Date: 2021.10.07
For the notebook below, we’ll be exploring how to make custom pre-processing methods for our data. Lightwood has standard cleaning protocols to handle a variety of different data types, however, we want users to feel comfortable augmenting and addressing their own changes. To do so, we’ll highlight the approach we would take below:
We will use data from Kaggle.
The data has several columns, but ultimately aims to use text to predict a readability score. There are also some columns that I do not want to use when making predictions, such as url_legal
, license
, among others.
In this tutorial, we’re going to focus on making changes to 2 columns: (1) excerpt, a text column, and ensuring we remove stop words using NLTK. (2) target, the goal to predict; we will make this explicitly non-negative.
Note, for this ACTUAL challenge, negative and positive are meaningful. We are using this as an example dataset to demonstrate how you can make changes to your underlying dataset and proceed to building powerful predictors.
Let’s get started!
[1]:
import numpy as np
import pandas as pd
import torch
import nltk
import os
import sys
# Lightwood modules
import lightwood as lw
from lightwood import ProblemDefinition, \
JsonAI, \
json_ai_from_problem, \
code_from_json_ai, \
predictor_from_code
INFO:lightwood-2520:No torchvision detected, image helpers not supported.
INFO:lightwood-2520:No torchvision/pillow detected, image encoder not supported
1) Load your data
Lightwood uses pandas
in order to handle datasets, as this is a very standard package in datascience. We can load our dataset using pandas in the following manner (make sure your data is in the data folder!)
[2]:
# Load the data
data = pd.read_csv("https://mindsdb-example-data.s3.eu-west-2.amazonaws.com/jupyter/train.csv.zip")
data.head()
[2]:
id | url_legal | license | excerpt | target | standard_error | |
---|---|---|---|---|---|---|
0 | c12129c31 | NaN | NaN | When the young people returned to the ballroom... | -0.340259 | 0.464009 |
1 | 85aa80a4c | NaN | NaN | All through dinner time, Mrs. Fayre was somewh... | -0.315372 | 0.480805 |
2 | b69ac6792 | NaN | NaN | As Roger had predicted, the snow departed as q... | -0.580118 | 0.476676 |
3 | dd1000b26 | NaN | NaN | And outside before the palace a great garden w... | -1.054013 | 0.450007 |
4 | 37c1b32fb | NaN | NaN | Once upon a time there were Three Bears who li... | 0.247197 | 0.510845 |
We see 6 columns, a variety which are numerical, missing numbers, text, and identifiers or “ids”. For our predictive task, we are only interested in 2 such columns, the excerpt and target columns.
2) Create a JSON-AI default object
Before we create a custom cleaner object, let’s first create JSON-AI syntax for our problem based on its specifications. We can do so by setting up a ProblemDefinition
. The ProblemDefinition
allows us to specify the target, the column we intend to predict, along with other details.
The end goal of JSON-AI is to provide a set of instructions on how to compile a machine learning pipeline.
In this case, let’s specify our target, the aptly named target column. We will also tell JSON-AI to throw away features we never intend to use, such as “url_legal”, “license”, and “standard_error”. We can do so in the following lines:
[3]:
# Setup the problem definition
problem_definition = {
'target': 'target',
"ignore_features": ["url_legal", "license", "standard_error"]
}
# Generate the j{ai}son syntax
json_ai = json_ai_from_problem(data, problem_definition)
INFO:lightwood-2520:Dropping features: ['url_legal', 'license', 'standard_error']
INFO:type_infer-2520:Analyzing a sample of 2478
INFO:type_infer-2520:from a total population of 2834, this is equivalent to 87.4% of your data.
INFO:type_infer-2520:Infering type for: id
INFO:type_infer-2520:Doing text detection for column: id
INFO:type_infer-2520:Column id has data type categorical
INFO:type_infer-2520:Infering type for: excerpt
INFO:type_infer-2520:Doing text detection for column: excerpt
INFO:type_infer-2520:Infering type for: target
INFO:type_infer-2520:Column target has data type float
WARNING:type_infer-2520:Column id is an identifier of type "Hash-like identifier"
INFO:dataprep_ml-2520:Starting statistical analysis
INFO:dataprep_ml-2520:Dropping features: ['id']
INFO:dataprep_ml-2520:Finished statistical analysis
Lightwood, as it processes the data, will provide the user a few pieces of information.
It drops the features we specify in the
ignore_features
argumentIt takes a small sample of data from each column to automatically infer the data type
For each column that was not ignored, it identifies the most likely data type.
It notices that “ID” is a hash-like-identifier.
It conducts a small statistical analysis on the distributions in order to generate syntax.
As soon as you request a JSON-AI object, Lightwood automatically creates functional syntax from your data. You can see it as follows:
[4]:
print(json_ai.to_json())
{
"encoders": {
"target": {
"module": "NumericEncoder",
"args": {
"is_target": "True",
"positive_domain": "$statistical_analysis.positive_domain"
}
},
"excerpt": {
"module": "PretrainedLangEncoder",
"args": {
"output_type": "$dtype_dict[$target]",
"stop_after": "$problem_definition.seconds_per_encoder"
}
}
},
"dtype_dict": {
"excerpt": "rich_text",
"target": "float"
},
"dependency_dict": {},
"model": {
"module": "BestOf",
"args": {
"submodels": [
{
"module": "Neural",
"args": {
"fit_on_dev": true,
"stop_after": "$problem_definition.seconds_per_mixer",
"search_hyperparameters": true
}
},
{
"module": "XGBoostMixer",
"args": {
"stop_after": "$problem_definition.seconds_per_mixer",
"fit_on_dev": true
}
},
{
"module": "Regression",
"args": {
"stop_after": "$problem_definition.seconds_per_mixer"
}
},
{
"module": "RandomForest",
"args": {
"stop_after": "$problem_definition.seconds_per_mixer",
"fit_on_dev": true
}
}
]
}
},
"problem_definition": {
"target": "target",
"pct_invalid": 2,
"unbias_target": true,
"seconds_per_mixer": 21384.0,
"seconds_per_encoder": 85536.0,
"expected_additional_time": 15.51400375366211,
"time_aim": 259200,
"target_weights": null,
"positive_domain": false,
"timeseries_settings": {
"is_timeseries": false,
"order_by": null,
"window": null,
"group_by": null,
"use_previous_target": true,
"horizon": null,
"historical_columns": null,
"target_type": "",
"allow_incomplete_history": true,
"eval_incomplete": false,
"interval_periods": []
},
"anomaly_detection": false,
"use_default_analysis": true,
"embedding_only": false,
"dtype_dict": {},
"ignore_features": [
"url_legal",
"license",
"standard_error"
],
"fit_on_all": true,
"strict_mode": true,
"seed_nr": 1
},
"identifiers": {
"id": "Hash-like identifier"
},
"imputers": [],
"accuracy_functions": [
"r2_score"
]
}
The above shows the minimal syntax required to create a functional JSON-AI object. For each feature you consider in the dataset, we specify the name of the feature, the type of encoder (feature-engineering method) to process the feature, and key word arguments to process the encoder. For the output, we perform a similar operation, but specify the types of mixers, or algorithms used in making a predictor that can estimate the target. Lastly, we populate the “problem_definition” key with the ingredients for our ML pipeline.
These are the only elements required to get off the ground with JSON-AI. However, we’re interested in making a custom approach. So, let’s make this syntax a file, and introduce our own changes.
3) Build your own cleaner module
Let’s make a file called MyCustomCleaner.py
. To write this file, we will use dataprep_ml.cleaners.cleaner
as inspiration. dataprep_ml
is a companion library that is part of the broader MindsDB ecosystem, and specializes in data cleaning, data splitting and data analysis.
The goal output of the cleaner is to provide pre-processing to your dataset - the output is only a pandas DataFrame. In theory, any pre-processing can be done here. However, data can be highly irregular - our default Cleaner
function has several main goals:
Strip away any identifier, etc. unwanted columns
Apply a cleaning function to each column in the dataset, according to that column’s data type
Standardize NaN values within each column for appropriate downstream treatment
You can choose to omit many of these details and completely write this module from scratch, but the easiest way to introduce your custom changes is to borrow the Cleaner
function, and add core changes in a custom block.
This can be done as follows
You can see individual cleaning functions in dataprep_ml.cleaners
. If you want to entirely replace a cleaning technique given a particular data-type, we invite you to change dataprep_ml.cleaners.get_cleaning_func
using the argument custom_cleaning_functions
; in this dictionary, for a datatype (specified in type_infer.dtype
), you can assign your own function to override our defaults.
[5]:
%%writefile MyCustomCleaner.py
import numpy as np
import pandas as pd
from type_infer.dtype import dtype
from lightwood.helpers import text
from lightwood.helpers.log import log
from lightwood.api.types import TimeseriesSettings
from nltk.corpus import stopwords
stop_words = set(stopwords.words("english"))
from typing import Dict
# Borrow cleaner functions
from dataprep_ml.cleaners import (
_remove_columns,
_get_columns_to_clean,
get_cleaning_func,
)
# Use for standardizing NaNs
VALUES_FOR_NAN_AND_NONE_IN_PANDAS = [np.nan, "nan", "NaN", "Nan", "None"]
def cleaner(
data: pd.DataFrame,
dtype_dict: Dict[str, str],
identifiers: Dict[str, str],
target: str,
mode: str,
timeseries_settings: TimeseriesSettings,
anomaly_detection: bool,
custom_cleaning_functions: Dict[str, str] = {},
) -> pd.DataFrame:
"""
The cleaner is a function which takes in the raw data, plus additional information about it's types and about the problem. Based on this it generates a "clean" representation of the data, where each column has an ideal standardized type and all malformed or otherwise missing or invalid elements are turned into ``None``
:param data: The raw data
:param dtype_dict: Type information for each column
:param identifiers: A dict containing all identifier typed columns
:param target: The target columns
:param mode: Can be "predict" or "train"
:param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object
:param anomaly_detection: Are we detecting anomalies with this predictor?
:returns: The cleaned data
""" # noqa
data = _remove_columns(
data,
identifiers,
target,
mode,
timeseries_settings,
anomaly_detection,
dtype_dict,
)
for col in _get_columns_to_clean(data, dtype_dict, mode, target):
log.info("Cleaning column =" + str(col))
# Get and apply a cleaning function for each data type
# If you want to customize the cleaner, it's likely you can to modify ``get_cleaning_func``
fn, vec = get_cleaning_func(dtype_dict[col], custom_cleaning_functions)
if not vec:
data[col] = data[col].apply(fn)
if vec:
data[col] = fn(data[col])
# ------------------------ #
# INTRODUCE YOUR CUSTOM BLOCK
# If column data type is a text type, remove stop-words
if dtype_dict[col] in (dtype.rich_text, dtype.short_text):
data[col] = data[col].apply(
lambda x: " ".join(
[word for word in x.split() if word not in stop_words]
)
)
# Enforce numerical columns as non-negative
if dtype_dict[col] in (dtype.integer, dtype.float):
log.info("Converted " + str(col) + " into strictly non-negative")
data[col] = data[col].apply(lambda x: x if x > 0 else 0.0)
# ------------------------ #
data[col] = data[col].replace(
to_replace=VALUES_FOR_NAN_AND_NONE_IN_PANDAS, value=None
)
return data
Writing MyCustomCleaner.py
Place your custom module in ~/lightwood_modules
or /etc/lightwood_modules
We automatically search for custom scripts in your ~/lightwood_modules
and /etc/lightwood_modules
path. Place your file there. Later, you’ll see when we autogenerate code, that you can change your import location if you choose.
[6]:
from lightwood import load_custom_module
# Lightwood automatically does this for us if we want
load_custom_module('MyCustomCleaner.py')
4) Introduce your custom cleaner in JSON-AI
Now let’s introduce our custom cleaner. JSON-AI keeps a lightweight syntax but fills in many default modules (like splitting, cleaning). As you can see, it is also agnostic to the origin of the module, as long as it behaves as expected of the other modules that could be used in any given key.
For the custom cleaner, we’ll work by editing the “cleaner” key. We will change properties within it as follows: (1) “module” - place the name of the function. In our case it will be “MyCustomCleaner.cleaner” (2) “args” - any keyword argument specific to your cleaner’s internals.
This will look as follows:
"cleaner": {
"module": "MyCustomCleaner.cleaner",
"args": {
"identifiers": "$identifiers",
"data": "data",
"dtype_dict": "$dtype_dict",
"target": "$target",
"mode": "$mode",
"timeseries_settings": "$problem_definition.timeseries_settings",
"anomaly_detection": "$problem_definition.anomaly_detection"
}
You may be wondering what the “$” variables reference. In certain cases, we’d like JSON-AI to auto-fill internal variables when automatically generating code, for example, we’ve already specified the “target” - it would be easier to simply refer in a modular sense what that term is. That is what these variables represent.
As we borrowed most of the default Cleaner
; we keep these arguments. In theory, if we were writing much of these details from scratch, we can customize these values as necessary.
5) Generate Python code representing your ML pipeline
Now we’re ready to load up our custom JSON-AI and generate the predictor code!
We can do this by first reading in our custom json-syntax, and then calling the function code_from_json_ai
.
[7]:
# Make changes to your JSON-AI
json_ai.cleaner = {
"module": "MyCustomCleaner.cleaner",
"args": {
"identifiers": "$identifiers",
"data": "data",
"dtype_dict": "$dtype_dict",
"target": "$target",
"mode": "$mode",
"timeseries_settings": "$problem_definition.timeseries_settings.to_dict()",
"anomaly_detection": "$problem_definition.anomaly_detection"
}
}
#Generate python code that fills in your pipeline
code = code_from_json_ai(json_ai)
print(code)
# Save code to a file (Optional)
with open('custom_cleaner_pipeline.py', 'w') as fp:
fp.write(code)
import lightwood
from lightwood import __version__ as lightwood_version
from lightwood.analysis import *
from lightwood.api import *
from lightwood.data import *
from lightwood.encoder import *
from lightwood.ensemble import *
from lightwood.helpers.device import *
from lightwood.helpers.general import *
from lightwood.helpers.ts import *
from lightwood.helpers.log import *
from lightwood.helpers.numeric import *
from lightwood.helpers.parallelism import *
from lightwood.helpers.seed import *
from lightwood.helpers.text import *
from lightwood.helpers.torch import *
from lightwood.mixer import *
from dataprep_ml.insights import statistical_analysis
from dataprep_ml.cleaners import cleaner
from dataprep_ml.splitters import splitter
from dataprep_ml.imputers import *
from mindsdb_evaluator import evaluate_accuracies
from mindsdb_evaluator.accuracy import __all__ as mdb_eval_accuracy_metrics
import pandas as pd
from typing import Dict, List, Union, Optional
import os
from types import ModuleType
import importlib.machinery
import sys
import time
for import_dir in [
os.path.join(
os.path.expanduser("~/lightwood_modules"), lightwood_version.replace(".", "_")
),
os.path.join("/etc/lightwood_modules", lightwood_version.replace(".", "_")),
]:
if os.path.exists(import_dir) and os.access(import_dir, os.R_OK):
for file_name in list(os.walk(import_dir))[0][2]:
if file_name[-3:] != ".py":
continue
mod_name = file_name[:-3]
loader = importlib.machinery.SourceFileLoader(
mod_name, os.path.join(import_dir, file_name)
)
module = ModuleType(loader.name)
loader.exec_module(module)
sys.modules[mod_name] = module
exec(f"import {mod_name}")
class Predictor(PredictorInterface):
target: str
mixers: List[BaseMixer]
encoders: Dict[str, BaseEncoder]
ensemble: BaseEnsemble
mode: str
def __init__(self):
seed(1)
self.target = "target"
self.mode = "inactive"
self.problem_definition = ProblemDefinition.from_dict(
{
"target": "target",
"pct_invalid": 2,
"unbias_target": True,
"seconds_per_mixer": 21384.0,
"seconds_per_encoder": 85536.0,
"expected_additional_time": 15.51400375366211,
"time_aim": 259200,
"target_weights": None,
"positive_domain": False,
"timeseries_settings": {
"is_timeseries": False,
"order_by": None,
"window": None,
"group_by": None,
"use_previous_target": True,
"horizon": None,
"historical_columns": None,
"target_type": "",
"allow_incomplete_history": True,
"eval_incomplete": False,
"interval_periods": [],
},
"anomaly_detection": False,
"use_default_analysis": True,
"embedding_only": False,
"dtype_dict": {},
"ignore_features": ["url_legal", "license", "standard_error"],
"fit_on_all": True,
"strict_mode": True,
"seed_nr": 1,
}
)
self.accuracy_functions = ["r2_score"]
self.identifiers = {"id": "Hash-like identifier"}
self.dtype_dict = {"excerpt": "rich_text", "target": "float"}
self.lightwood_version = "24.12.3.0"
self.pred_args = PredictionArguments()
# Any feature-column dependencies
self.dependencies = {"target": [], "excerpt": []}
self.input_cols = ["excerpt"]
# Initial stats analysis
self.statistical_analysis = None
self.ts_analysis = None
self.runtime_log = dict()
self.global_insights = dict()
# Feature cache
self.feature_cache = dict()
@timed_predictor
def analyze_data(self, data: pd.DataFrame) -> None:
# Perform a statistical analysis on the unprocessed data
self.statistical_analysis = statistical_analysis(
data,
self.dtype_dict,
self.problem_definition.to_dict(),
{"id": "Hash-like identifier"},
)
# Instantiate post-training evaluation
self.analysis_blocks = [
ICP(fixed_significance=None, confidence_normalizer=False, deps=[]),
ConfStats(deps=["ICP"]),
AccStats(deps=["ICP"]),
PermutationFeatureImportance(deps=["AccStats"]),
]
@timed_predictor
def preprocess(self, data: pd.DataFrame) -> pd.DataFrame:
# Preprocess and clean data
log.info("Cleaning the data")
self.imputers = {}
data = MyCustomCleaner.cleaner(
data=data,
identifiers=self.identifiers,
dtype_dict=self.dtype_dict,
target=self.target,
mode=self.mode,
timeseries_settings=self.problem_definition.timeseries_settings.to_dict(),
anomaly_detection=self.problem_definition.anomaly_detection,
)
# Time-series blocks
return data
@timed_predictor
def split(self, data: pd.DataFrame) -> Dict[str, pd.DataFrame]:
# Split the data into training/testing splits
log.info("Splitting the data into train/test")
train_test_data = splitter(
data=data,
pct_train=0.8,
pct_dev=0.1,
pct_test=0.1,
tss=self.problem_definition.timeseries_settings.to_dict(),
seed=self.problem_definition.seed_nr,
target=self.target,
dtype_dict=self.dtype_dict,
)
return train_test_data
@timed_predictor
def prepare(self, data: Dict[str, pd.DataFrame]) -> None:
# Prepare encoders to featurize data
self.mode = "train"
if self.statistical_analysis is None:
raise Exception("Please run analyze_data first")
# Column to encoder mapping
self.encoders = {
"target": NumericEncoder(
is_target=True,
positive_domain=self.statistical_analysis.positive_domain,
),
"excerpt": PretrainedLangEncoder(
output_type=self.dtype_dict[self.target],
stop_after=self.problem_definition.seconds_per_encoder,
),
}
# Prepare the training + dev data
concatenated_train_dev = pd.concat([data["train"], data["dev"]])
prepped_encoders = {}
# Prepare input encoders
parallel_encoding = parallel_encoding_check(data["train"], self.encoders)
if parallel_encoding:
log.debug("Preparing in parallel...")
for col_name, encoder in self.encoders.items():
if col_name != self.target and not encoder.is_trainable_encoder:
prepped_encoders[col_name] = (
encoder,
concatenated_train_dev[col_name],
"prepare",
)
prepped_encoders = mut_method_call(prepped_encoders)
else:
log.debug("Preparing sequentially...")
for col_name, encoder in self.encoders.items():
if col_name != self.target and not encoder.is_trainable_encoder:
log.debug(f"Preparing encoder for {col_name}...")
encoder.prepare(concatenated_train_dev[col_name])
prepped_encoders[col_name] = encoder
# Store encoders
for col_name, encoder in prepped_encoders.items():
self.encoders[col_name] = encoder
# Prepare the target
if self.target not in prepped_encoders:
if self.encoders[self.target].is_trainable_encoder:
self.encoders[self.target].prepare(
data["train"][self.target], data["dev"][self.target]
)
else:
self.encoders[self.target].prepare(
pd.concat([data["train"], data["dev"]])[self.target]
)
# Prepare any non-target encoders that are learned
for col_name, encoder in self.encoders.items():
if col_name != self.target and encoder.is_trainable_encoder:
priming_data = pd.concat([data["train"], data["dev"]])
kwargs = {}
if self.dependencies[col_name]:
kwargs["dependency_data"] = {}
for col in self.dependencies[col_name]:
kwargs["dependency_data"][col] = {
"original_type": self.dtype_dict[col],
"data": priming_data[col],
}
# If an encoder representation requires the target, provide priming data
if hasattr(encoder, "uses_target"):
kwargs["encoded_target_values"] = self.encoders[self.target].encode(
priming_data[self.target]
)
encoder.prepare(
data["train"][col_name], data["dev"][col_name], **kwargs
)
@timed_predictor
def featurize(self, split_data: Dict[str, pd.DataFrame]):
# Featurize data into numerical representations for models
log.info("Featurizing the data")
tss = self.problem_definition.timeseries_settings
feature_data = dict()
for key, data in split_data.items():
if key != "stratified_on":
# compute and store two splits - full and filtered (useful for time series post-train analysis)
if key not in self.feature_cache:
featurized_split = EncodedDs(self.encoders, data, self.target)
filtered_subset = EncodedDs(
self.encoders, filter_ts(data, tss), self.target
)
for k, s in zip(
(key, f"{key}_filtered"), (featurized_split, filtered_subset)
):
self.feature_cache[k] = s
for k in (key, f"{key}_filtered"):
feature_data[k] = self.feature_cache[k]
return feature_data
@timed_predictor
def fit(self, enc_data: Dict[str, pd.DataFrame]) -> None:
# Fit predictors to estimate target
self.mode = "train"
# --------------- #
# Extract data
# --------------- #
# Extract the featurized data into train/dev/test
encoded_train_data = enc_data["train"]
encoded_dev_data = enc_data["dev"]
encoded_test_data = enc_data["test_filtered"]
log.info("Training the mixers")
# --------------- #
# Fit Models
# --------------- #
# Assign list of mixers
self.mixers = [
Neural(
fit_on_dev=True,
search_hyperparameters=True,
net="DefaultNet",
stop_after=self.problem_definition.seconds_per_mixer,
target=self.target,
dtype_dict=self.dtype_dict,
target_encoder=self.encoders[self.target],
),
XGBoostMixer(
fit_on_dev=True,
use_optuna=True,
stop_after=self.problem_definition.seconds_per_mixer,
target=self.target,
dtype_dict=self.dtype_dict,
input_cols=self.input_cols,
target_encoder=self.encoders[self.target],
),
Regression(
stop_after=self.problem_definition.seconds_per_mixer,
target=self.target,
dtype_dict=self.dtype_dict,
target_encoder=self.encoders[self.target],
),
RandomForest(
fit_on_dev=True,
stop_after=self.problem_definition.seconds_per_mixer,
target=self.target,
dtype_dict=self.dtype_dict,
target_encoder=self.encoders[self.target],
),
]
# Train mixers
trained_mixers = []
for mixer in self.mixers:
try:
if mixer.trains_once:
self.fit_mixer(
mixer,
ConcatedEncodedDs([encoded_train_data, encoded_dev_data]),
encoded_test_data,
)
else:
self.fit_mixer(mixer, encoded_train_data, encoded_dev_data)
trained_mixers.append(mixer)
except Exception as e:
log.warning(f"Exception: {e} when training mixer: {mixer}")
if True and mixer.stable:
raise e
# Update mixers to trained versions
if not trained_mixers:
raise Exception(
"No mixers could be trained! Please verify your problem definition or JsonAI model representation."
)
self.mixers = trained_mixers
# --------------- #
# Create Ensembles
# --------------- #
log.info("Ensembling the mixer")
# Create an ensemble of mixers to identify best performing model
# Dirty hack
self.ensemble = BestOf(
data=encoded_test_data,
fit=True,
ts_analysis=None,
target=self.target,
mixers=self.mixers,
args=self.pred_args,
accuracy_functions=self.accuracy_functions,
)
self.supports_proba = self.ensemble.supports_proba
@timed_predictor
def fit_mixer(self, mixer, encoded_train_data, encoded_dev_data) -> None:
mixer.fit(encoded_train_data, encoded_dev_data)
@timed_predictor
def analyze_ensemble(self, enc_data: Dict[str, pd.DataFrame]) -> None:
# Evaluate quality of fit for the ensemble of mixers
# --------------- #
# Extract data
# --------------- #
# Extract the featurized data into train/dev/test
encoded_train_data = enc_data["train"]
encoded_dev_data = enc_data["dev"]
encoded_test_data = enc_data["test"]
# --------------- #
# Analyze Ensembles
# --------------- #
log.info("Analyzing the ensemble of mixers")
self.model_analysis, self.runtime_analyzer = model_analyzer(
data=encoded_test_data,
train_data=encoded_train_data,
ts_analysis=None,
stats_info=self.statistical_analysis,
pdef=self.problem_definition,
accuracy_functions=self.accuracy_functions,
predictor=self.ensemble,
target=self.target,
dtype_dict=self.dtype_dict,
analysis_blocks=self.analysis_blocks,
)
@timed_predictor
def learn(self, data: pd.DataFrame) -> None:
if self.problem_definition.ignore_features:
log.info(f"Dropping features: {self.problem_definition.ignore_features}")
data = data.drop(
columns=self.problem_definition.ignore_features, errors="ignore"
)
self.mode = "train"
n_phases = 8 if self.problem_definition.fit_on_all else 7
# Perform stats analysis
log.info(f"[Learn phase 1/{n_phases}] - Statistical analysis")
self.analyze_data(data)
# Pre-process the data
log.info(f"[Learn phase 2/{n_phases}] - Data preprocessing")
data = self.preprocess(data)
# Create train/test (dev) split
log.info(f"[Learn phase 3/{n_phases}] - Data splitting")
train_dev_test = self.split(data)
# Prepare encoders
log.info(f"[Learn phase 4/{n_phases}] - Preparing encoders")
self.prepare(train_dev_test)
# Create feature vectors from data
log.info(f"[Learn phase 5/{n_phases}] - Feature generation")
enc_train_test = self.featurize(train_dev_test)
# Prepare mixers
log.info(f"[Learn phase 6/{n_phases}] - Mixer training")
if not self.problem_definition.embedding_only:
self.fit(enc_train_test)
else:
self.mixers = []
self.ensemble = Embedder(
self.target, mixers=list(), data=enc_train_test["train"]
)
self.supports_proba = self.ensemble.supports_proba
# Analyze the ensemble
log.info(f"[Learn phase 7/{n_phases}] - Ensemble analysis")
self.analyze_ensemble(enc_train_test)
# ------------------------ #
# Enable model partial fit AFTER it is trained and evaluated for performance with the appropriate train/dev/test splits.
# This assumes the predictor could continuously evolve, hence including reserved testing data may improve predictions.
# SET `json_ai.problem_definition.fit_on_all=False` TO TURN THIS BLOCK OFF.
# Update the mixers with partial fit
if self.problem_definition.fit_on_all and all(
[not m.trains_once for m in self.mixers]
):
log.info(f"[Learn phase 8/{n_phases}] - Adjustment on validation requested")
self.adjust(
enc_train_test["test"].data_frame,
ConcatedEncodedDs(
[enc_train_test["train"], enc_train_test["dev"]]
).data_frame,
adjust_args={"learn_call": True},
)
self.feature_cache = (
dict()
) # empty feature cache to avoid large predictor objects
@timed_predictor
def adjust(
self,
train_data: Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame],
dev_data: Optional[Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame]] = None,
adjust_args: Optional[dict] = None,
) -> None:
# Update mixers with new information
self.mode = "train"
# --------------- #
# Prepare data
# --------------- #
if dev_data is None:
data = train_data
split = splitter(
data=data,
pct_train=0.8,
pct_dev=0.2,
pct_test=0,
tss=self.problem_definition.timeseries_settings.to_dict(),
seed=self.problem_definition.seed_nr,
target=self.target,
dtype_dict=self.dtype_dict,
)
train_data = split["train"]
dev_data = split["dev"]
if adjust_args is None or not adjust_args.get("learn_call"):
train_data = self.preprocess(train_data)
dev_data = self.preprocess(dev_data)
dev_data = EncodedDs(self.encoders, dev_data, self.target)
train_data = EncodedDs(self.encoders, train_data, self.target)
# --------------- #
# Update/Adjust Mixers
# --------------- #
log.info("Updating the mixers")
for mixer in self.mixers:
mixer.partial_fit(train_data, dev_data, adjust_args)
@timed_predictor
def predict(self, data: pd.DataFrame, args: Dict = {}) -> pd.DataFrame:
self.mode = "predict"
n_phases = 3 if self.pred_args.all_mixers else 4
if len(data) == 0:
raise Exception(
"Empty input, aborting prediction. Please try again with some input data."
)
self.pred_args = PredictionArguments.from_dict(args)
log.info(f"[Predict phase 1/{n_phases}] - Data preprocessing")
if self.problem_definition.ignore_features:
log.info(f"Dropping features: {self.problem_definition.ignore_features}")
data = data.drop(
columns=self.problem_definition.ignore_features, errors="ignore"
)
for col in self.input_cols:
if col not in data.columns:
data[col] = [None] * len(data)
# Pre-process the data
data = self.preprocess(data)
# Featurize the data
log.info(f"[Predict phase 2/{n_phases}] - Feature generation")
encoded_ds = self.featurize({"predict_data": data})["predict_data"]
encoded_data = encoded_ds.get_encoded_data(include_target=False)
log.info(f"[Predict phase 3/{n_phases}] - Calling ensemble")
@timed
def _timed_call(encoded_ds):
if self.pred_args.return_embedding:
embedder = Embedder(self.target, mixers=list(), data=encoded_ds)
df = embedder(encoded_ds, args=self.pred_args)
else:
df = self.ensemble(encoded_ds, args=self.pred_args)
return df
df = _timed_call(encoded_ds)
if not (
any(
[
self.pred_args.all_mixers,
self.pred_args.return_embedding,
self.problem_definition.embedding_only,
]
)
):
log.info(f"[Predict phase 4/{n_phases}] - Analyzing output")
df, global_insights = explain(
data=data,
encoded_data=encoded_data,
predictions=df,
ts_analysis=None,
problem_definition=self.problem_definition,
stat_analysis=self.statistical_analysis,
runtime_analysis=self.runtime_analyzer,
target_name=self.target,
target_dtype=self.dtype_dict[self.target],
explainer_blocks=self.analysis_blocks,
pred_args=self.pred_args,
)
self.global_insights = {**self.global_insights, **global_insights}
self.feature_cache = (
dict()
) # empty feature cache to avoid large predictor objects
return df
def test(
self,
data: pd.DataFrame,
metrics: list,
args: Dict[str, object] = {},
strict: bool = False,
) -> pd.DataFrame:
preds = self.predict(data, args)
preds = preds.rename(columns={"prediction": self.target})
filtered = []
# filter metrics if not supported
for metric in metrics:
# metric should be one of: an actual function, registered in the model class, or supported by the evaluator
if not (
callable(metric)
or metric in self.accuracy_functions
or metric in mdb_eval_accuracy_metrics
):
if strict:
raise Exception(f"Invalid metric: {metric}")
else:
log.warning(f"Invalid metric: {metric}. Skipping...")
else:
filtered.append(metric)
metrics = filtered
try:
labels = self.model_analysis.histograms[self.target]["x"]
except:
if strict:
raise Exception("Label histogram not found")
else:
label_map = (
None # some accuracy functions will crash without this, be mindful
)
scores = evaluate_accuracies(
data,
preds[self.target],
self.target,
metrics,
ts_analysis=self.ts_analysis,
labels=labels,
)
# TODO: remove once mdb_eval returns an actual list
scores = {k: [v] for k, v in scores.items() if not isinstance(v, list)}
return pd.DataFrame.from_records(
scores
) # TODO: add logic to disaggregate per-mixer
As you can see, an end-to-end pipeline of our entire ML procedure has been generating. There are several abstracted functions to enable transparency as to what processes your data goes through in order to build these models.
The key steps of the pipeline are as follows:
Run a statistical analysis with
analyze_data
Clean your data with
preprocess
Make a training/dev/testing split with
split
Prepare your feature-engineering pipelines with
prepare
Create your features with
featurize
Fit your predictor models with
fit
You can customize this further if necessary, but you have all the steps necessary to train a model!
We recommend familiarizing with these steps by calling the above commands, ideally in order. Some commands (namely prepare
, featurize
, and fit
) do depend on other steps.
If you want to omit the individual steps, we recommend your simply call the learn
method, which compiles all the necessary steps implemented to give your fully trained predictive models starting with unprocessed data!
6) Call python to run your code and see your preprocessed outputs
Once we have code, we can turn this into a python object by calling predictor_from_code
. This instantiates the PredictorInterface
object.
This predictor object can be then used to run your pipeline.
[8]:
# Turn the code above into a predictor object
predictor = predictor_from_code(code)
[9]:
predictor.mode = "train"
# Perform stats analysis
predictor.analyze_data(data)
# Pre-process the data
cleaned_data = predictor.preprocess(data)
cleaned_data.head()
INFO:dataprep_ml-2520:Starting statistical analysis
INFO:dataprep_ml-2520:Dropping features: ['id']
INFO:dataprep_ml-2520:Finished statistical analysis
DEBUG:lightwood-2520: `analyze_data` runtime: 0.05 seconds
INFO:dataprep_ml-2520:Cleaning the data
INFO:dataprep_ml-2520:Dropping features: ['id']
INFO:lightwood-2520:Cleaning column =excerpt
INFO:lightwood-2520:Cleaning column =target
INFO:lightwood-2520:Converted target into strictly non-negative
DEBUG:lightwood-2520: `preprocess` runtime: 0.08 seconds
[9]:
excerpt | target | |
---|---|---|
0 | When young people returned ballroom, presented... | 0.000000 |
1 | All dinner time, Mrs. Fayre somewhat silent, e... | 0.000000 |
2 | As Roger predicted, snow departed quickly came... | 0.000000 |
3 | And outside palace great garden walled round, ... | 0.000000 |
4 | Once upon time Three Bears lived together hous... | 0.247197 |
[10]:
print("\033[1m" + "Original Data\n" + "\033[0m")
print("Excerpt:\n", data.iloc[0]["excerpt"])
print("\nTarget:\n", data.iloc[0]["target"])
print("\033[1m" + "\n\nCleaned Data\n" + "\033[0m")
print("Excerpt:\n", cleaned_data.iloc[0]["excerpt"])
print("\nTarget:\n", cleaned_data.iloc[0]["target"])
Original Data
Excerpt:
When the young people returned to the ballroom, it presented a decidedly changed appearance. Instead of an interior scene, it was a winter landscape.
The floor was covered with snow-white canvas, not laid on smoothly, but rumpled over bumps and hillocks, like a real snow field. The numerous palms and evergreens that had decorated the room, were powdered with flour and strewn with tufts of cotton, like snow. Also diamond dust had been lightly sprinkled on them, and glittering crystal icicles hung from the branches.
At each end of the room, on the wall, hung a beautiful bear-skin rug.
These rugs were for prizes, one for the girls and one for the boys. And this was the game.
The girls were gathered at one end of the room and the boys at the other, and one end was called the North Pole, and the other the South Pole. Each player was given a small flag which they were to plant on reaching the Pole.
This would have been an easy matter, but each traveller was obliged to wear snowshoes.
Target:
-0.340259125
Cleaned Data
Excerpt:
When young people returned ballroom, presented decidedly changed appearance. Instead interior scene, winter landscape. The floor covered snow-white canvas, laid smoothly, rumpled bumps hillocks, like real snow field. The numerous palms evergreens decorated room, powdered flour strewn tufts cotton, like snow. Also diamond dust lightly sprinkled them, glittering crystal icicles hung branches. At end room, wall, hung beautiful bear-skin rug. These rugs prizes, one girls one boys. And game. The girls gathered one end room boys other, one end called North Pole, South Pole. Each player given small flag plant reaching Pole. This would easy matter, traveller obliged wear snowshoes.
Target:
0.0
As you can see, the cleaning-process we introduced cut out the stop-words from the Excerpt, and enforced the target data to stay positive.
We hope this tutorial was informative on how to introduce a custom preprocessing method to your datasets! For more customization tutorials, please check our documentation.
If you want to download the Jupyter-notebook version of this tutorial, check out the source github location found here: lightwood/docssrc/source/tutorials/custom_cleaner
.