Using your own pre-processing methods in Lightwood

Date: 2021.10.07

For the notebook below, we’ll be exploring how to make custom pre-processing methods for our data. Lightwood has standard cleaning protocols to handle a variety of different data types, however, we want users to feel comfortable augmenting and addressing their own changes. To do so, we’ll highlight the approach we would take below:

We will use data from Kaggle.

The data has several columns, but ultimately aims to use text to predict a readability score. There are also some columns that I do not want to use when making predictions, such as url_legal, license, among others.

In this tutorial, we’re going to focus on making changes to 2 columns: (1) excerpt, a text column, and ensuring we remove stop words using NLTK. (2) target, the goal to predict; we will make this explicitly non-negative.

Note, for this ACTUAL challenge, negative and positive are meaningful. We are using this as an example dataset to demonstrate how you can make changes to your underlying dataset and proceed to building powerful predictors.

Let’s get started!

[1]:
import numpy as np
import pandas as pd
import torch
import nltk

import os
import sys

# Lightwood modules
import lightwood as lw
from lightwood import ProblemDefinition, \
                      JsonAI, \
                      json_ai_from_problem, \
                      code_from_json_ai, \
                      predictor_from_code
INFO:lightwood-2730:No torchvision detected, image helpers not supported.
INFO:lightwood-2730:No torchvision/pillow detected, image encoder not supported

1) Load your data

Lightwood uses pandas in order to handle datasets, as this is a very standard package in datascience. We can load our dataset using pandas in the following manner (make sure your data is in the data folder!)

[2]:
# Load the data
data = pd.read_csv("https://mindsdb-example-data.s3.eu-west-2.amazonaws.com/jupyter/train.csv.zip")
data.head()
[2]:
id url_legal license excerpt target standard_error
0 c12129c31 NaN NaN When the young people returned to the ballroom... -0.340259 0.464009
1 85aa80a4c NaN NaN All through dinner time, Mrs. Fayre was somewh... -0.315372 0.480805
2 b69ac6792 NaN NaN As Roger had predicted, the snow departed as q... -0.580118 0.476676
3 dd1000b26 NaN NaN And outside before the palace a great garden w... -1.054013 0.450007
4 37c1b32fb NaN NaN Once upon a time there were Three Bears who li... 0.247197 0.510845

We see 6 columns, a variety which are numerical, missing numbers, text, and identifiers or “ids”. For our predictive task, we are only interested in 2 such columns, the excerpt and target columns.

2) Create a JSON-AI default object

Before we create a custom cleaner object, let’s first create JSON-AI syntax for our problem based on its specifications. We can do so by setting up a ProblemDefinition. The ProblemDefinition allows us to specify the target, the column we intend to predict, along with other details.

The end goal of JSON-AI is to provide a set of instructions on how to compile a machine learning pipeline.

In this case, let’s specify our target, the aptly named target column. We will also tell JSON-AI to throw away features we never intend to use, such as “url_legal”, “license”, and “standard_error”. We can do so in the following lines:

[3]:
# Setup the problem definition
problem_definition = {
    'target': 'target',
    "ignore_features": ["url_legal", "license", "standard_error"]
}

# Generate the j{ai}son syntax
json_ai = json_ai_from_problem(data, problem_definition)

INFO:lightwood-2730:Dropping features: ['url_legal', 'license', 'standard_error']
INFO:type_infer-2730:Analyzing a sample of 2478
INFO:type_infer-2730:from a total population of 2834, this is equivalent to 87.4% of your data.
INFO:type_infer-2730:Infering type for: id
INFO:type_infer-2730:Doing text detection for column: id
INFO:type_infer-2730:Column id has data type categorical
INFO:type_infer-2730:Infering type for: excerpt
INFO:type_infer-2730:Doing text detection for column: excerpt
INFO:type_infer-2730:Infering type for: target
INFO:type_infer-2730:Column target has data type float
WARNING:type_infer-2730:Column id is an identifier of type "Hash-like identifier"
INFO:dataprep_ml-2730:Starting statistical analysis
INFO:dataprep_ml-2730:Dropping features: ['id']
INFO:dataprep_ml-2730:Finished statistical analysis

Lightwood, as it processes the data, will provide the user a few pieces of information.

  1. It drops the features we specify in the ignore_features argument

  2. It takes a small sample of data from each column to automatically infer the data type

  3. For each column that was not ignored, it identifies the most likely data type.

  4. It notices that “ID” is a hash-like-identifier.

  5. It conducts a small statistical analysis on the distributions in order to generate syntax.

As soon as you request a JSON-AI object, Lightwood automatically creates functional syntax from your data. You can see it as follows:

[4]:
print(json_ai.to_json())
{
    "encoders": {
        "target": {
            "module": "NumericEncoder",
            "args": {
                "is_target": "True",
                "positive_domain": "$statistical_analysis.positive_domain"
            }
        },
        "excerpt": {
            "module": "PretrainedLangEncoder",
            "args": {
                "output_type": "$dtype_dict[$target]",
                "stop_after": "$problem_definition.seconds_per_encoder"
            }
        }
    },
    "dtype_dict": {
        "excerpt": "rich_text",
        "target": "float"
    },
    "dependency_dict": {},
    "model": {
        "module": "BestOf",
        "args": {
            "submodels": [
                {
                    "module": "Neural",
                    "args": {
                        "fit_on_dev": true,
                        "stop_after": "$problem_definition.seconds_per_mixer",
                        "search_hyperparameters": true
                    }
                },
                {
                    "module": "XGBoostMixer",
                    "args": {
                        "stop_after": "$problem_definition.seconds_per_mixer",
                        "fit_on_dev": true
                    }
                },
                {
                    "module": "Regression",
                    "args": {
                        "stop_after": "$problem_definition.seconds_per_mixer"
                    }
                },
                {
                    "module": "RandomForest",
                    "args": {
                        "stop_after": "$problem_definition.seconds_per_mixer",
                        "fit_on_dev": true
                    }
                }
            ]
        }
    },
    "problem_definition": {
        "target": "target",
        "pct_invalid": 2,
        "unbias_target": true,
        "seconds_per_mixer": 21384.0,
        "seconds_per_encoder": 85536.0,
        "expected_additional_time": 15.443372249603271,
        "time_aim": 259200,
        "target_weights": null,
        "positive_domain": false,
        "timeseries_settings": {
            "is_timeseries": false,
            "order_by": null,
            "window": null,
            "group_by": null,
            "use_previous_target": true,
            "horizon": null,
            "historical_columns": null,
            "target_type": "",
            "allow_incomplete_history": true,
            "eval_incomplete": false,
            "interval_periods": []
        },
        "anomaly_detection": false,
        "use_default_analysis": true,
        "embedding_only": false,
        "dtype_dict": {},
        "ignore_features": [
            "url_legal",
            "license",
            "standard_error"
        ],
        "fit_on_all": true,
        "strict_mode": true,
        "seed_nr": 1
    },
    "identifiers": {
        "id": "Hash-like identifier"
    },
    "imputers": [],
    "accuracy_functions": [
        "r2_score"
    ]
}

The above shows the minimal syntax required to create a functional JSON-AI object. For each feature you consider in the dataset, we specify the name of the feature, the type of encoder (feature-engineering method) to process the feature, and key word arguments to process the encoder. For the output, we perform a similar operation, but specify the types of mixers, or algorithms used in making a predictor that can estimate the target. Lastly, we populate the “problem_definition” key with the ingredients for our ML pipeline.

These are the only elements required to get off the ground with JSON-AI. However, we’re interested in making a custom approach. So, let’s make this syntax a file, and introduce our own changes.

3) Build your own cleaner module

Let’s make a file called MyCustomCleaner.py. To write this file, we will use dataprep_ml.cleaners.cleaner as inspiration. dataprep_ml is a companion library that is part of the broader MindsDB ecosystem, and specializes in data cleaning, data splitting and data analysis.

The goal output of the cleaner is to provide pre-processing to your dataset - the output is only a pandas DataFrame. In theory, any pre-processing can be done here. However, data can be highly irregular - our default Cleaner function has several main goals:

  1. Strip away any identifier, etc. unwanted columns

  2. Apply a cleaning function to each column in the dataset, according to that column’s data type

  3. Standardize NaN values within each column for appropriate downstream treatment

You can choose to omit many of these details and completely write this module from scratch, but the easiest way to introduce your custom changes is to borrow the Cleaner function, and add core changes in a custom block.

This can be done as follows

You can see individual cleaning functions in dataprep_ml.cleaners. If you want to entirely replace a cleaning technique given a particular data-type, we invite you to change dataprep_ml.cleaners.get_cleaning_func using the argument custom_cleaning_functions; in this dictionary, for a datatype (specified in type_infer.dtype), you can assign your own function to override our defaults.

[5]:
%%writefile MyCustomCleaner.py

import numpy as np
import pandas as pd
from type_infer.dtype import dtype

from lightwood.helpers import text
from lightwood.helpers.log import log
from lightwood.api.types import TimeseriesSettings

from nltk.corpus import stopwords

stop_words = set(stopwords.words("english"))

from typing import Dict

# Borrow cleaner functions
from dataprep_ml.cleaners import (
    _remove_columns,
    _get_columns_to_clean,
    get_cleaning_func,
)

# Use for standardizing NaNs
VALUES_FOR_NAN_AND_NONE_IN_PANDAS = [np.nan, "nan", "NaN", "Nan", "None"]


def cleaner(
    data: pd.DataFrame,
    dtype_dict: Dict[str, str],
    identifiers: Dict[str, str],
    target: str,
    mode: str,
    timeseries_settings: TimeseriesSettings,
    anomaly_detection: bool,
    custom_cleaning_functions: Dict[str, str] = {},
) -> pd.DataFrame:
    """
    The cleaner is a function which takes in the raw data, plus additional information about it's types and about the problem. Based on this it generates a "clean" representation of the data, where each column has an ideal standardized type and all malformed or otherwise missing or invalid elements are turned into ``None``

    :param data: The raw data
    :param dtype_dict: Type information for each column
    :param identifiers: A dict containing all identifier typed columns
    :param target: The target columns
    :param mode: Can be "predict" or "train"
    :param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object
    :param anomaly_detection: Are we detecting anomalies with this predictor?

    :returns: The cleaned data
    """  # noqa

    data = _remove_columns(
        data,
        identifiers,
        target,
        mode,
        timeseries_settings,
        anomaly_detection,
        dtype_dict,
    )

    for col in _get_columns_to_clean(data, dtype_dict, mode, target):

        log.info("Cleaning column =" + str(col))
        # Get and apply a cleaning function for each data type
        # If you want to customize the cleaner, it's likely you can to modify ``get_cleaning_func``
        fn, vec = get_cleaning_func(dtype_dict[col], custom_cleaning_functions)
        if not vec:
            data[col] = data[col].apply(fn)
        if vec:
            data[col] = fn(data[col])

        # ------------------------ #
        # INTRODUCE YOUR CUSTOM BLOCK

        # If column data type is a text type, remove stop-words
        if dtype_dict[col] in (dtype.rich_text, dtype.short_text):
            data[col] = data[col].apply(
                lambda x: " ".join(
                    [word for word in x.split() if word not in stop_words]
                )
            )

        # Enforce numerical columns as non-negative
        if dtype_dict[col] in (dtype.integer, dtype.float):
            log.info("Converted " + str(col) + " into strictly non-negative")
            data[col] = data[col].apply(lambda x: x if x > 0 else 0.0)

        # ------------------------ #
        data[col] = data[col].replace(
            to_replace=VALUES_FOR_NAN_AND_NONE_IN_PANDAS, value=None
        )

    return data

Writing MyCustomCleaner.py

Place your custom module in ~/lightwood_modules or /etc/lightwood_modules

We automatically search for custom scripts in your ~/lightwood_modules and /etc/lightwood_modules path. Place your file there. Later, you’ll see when we autogenerate code, that you can change your import location if you choose.

[6]:
from lightwood import load_custom_module

# Lightwood automatically does this for us if we want
load_custom_module('MyCustomCleaner.py')

4) Introduce your custom cleaner in JSON-AI

Now let’s introduce our custom cleaner. JSON-AI keeps a lightweight syntax but fills in many default modules (like splitting, cleaning). As you can see, it is also agnostic to the origin of the module, as long as it behaves as expected of the other modules that could be used in any given key.

For the custom cleaner, we’ll work by editing the “cleaner” key. We will change properties within it as follows: (1) “module” - place the name of the function. In our case it will be “MyCustomCleaner.cleaner” (2) “args” - any keyword argument specific to your cleaner’s internals.

This will look as follows:

"cleaner": {
    "module": "MyCustomCleaner.cleaner",
    "args": {
        "identifiers": "$identifiers",
        "data": "data",
        "dtype_dict": "$dtype_dict",
        "target": "$target",
        "mode": "$mode",
        "timeseries_settings": "$problem_definition.timeseries_settings",
        "anomaly_detection": "$problem_definition.anomaly_detection"
    }

You may be wondering what the “$” variables reference. In certain cases, we’d like JSON-AI to auto-fill internal variables when automatically generating code, for example, we’ve already specified the “target” - it would be easier to simply refer in a modular sense what that term is. That is what these variables represent.

As we borrowed most of the default Cleaner; we keep these arguments. In theory, if we were writing much of these details from scratch, we can customize these values as necessary.

5) Generate Python code representing your ML pipeline

Now we’re ready to load up our custom JSON-AI and generate the predictor code!

We can do this by first reading in our custom json-syntax, and then calling the function code_from_json_ai.

[7]:
# Make changes to your JSON-AI
json_ai.cleaner = {
        "module": "MyCustomCleaner.cleaner",
        "args": {
            "identifiers": "$identifiers",
            "data": "data",
            "dtype_dict": "$dtype_dict",
            "target": "$target",
            "mode": "$mode",
            "timeseries_settings": "$problem_definition.timeseries_settings.to_dict()",
            "anomaly_detection": "$problem_definition.anomaly_detection"
        }
}

#Generate python code that fills in your pipeline
code = code_from_json_ai(json_ai)

print(code)

# Save code to a file (Optional)
with open('custom_cleaner_pipeline.py', 'w') as fp:
    fp.write(code)
import lightwood
from lightwood import __version__ as lightwood_version
from lightwood.analysis import *
from lightwood.api import *
from lightwood.data import *
from lightwood.encoder import *
from lightwood.ensemble import *
from lightwood.helpers.device import *
from lightwood.helpers.general import *
from lightwood.helpers.ts import *
from lightwood.helpers.log import *
from lightwood.helpers.numeric import *
from lightwood.helpers.parallelism import *
from lightwood.helpers.seed import *
from lightwood.helpers.text import *
from lightwood.helpers.torch import *
from lightwood.mixer import *

from dataprep_ml.insights import statistical_analysis
from dataprep_ml.cleaners import cleaner
from dataprep_ml.splitters import splitter
from dataprep_ml.imputers import *

from mindsdb_evaluator import evaluate_accuracies
from mindsdb_evaluator.accuracy import __all__ as mdb_eval_accuracy_metrics

import pandas as pd
from typing import Dict, List, Union, Optional
import os
from types import ModuleType
import importlib.machinery
import sys
import time


for import_dir in [
    os.path.join(
        os.path.expanduser("~/lightwood_modules"), lightwood_version.replace(".", "_")
    ),
    os.path.join("/etc/lightwood_modules", lightwood_version.replace(".", "_")),
]:
    if os.path.exists(import_dir) and os.access(import_dir, os.R_OK):
        for file_name in list(os.walk(import_dir))[0][2]:
            if file_name[-3:] != ".py":
                continue
            mod_name = file_name[:-3]
            loader = importlib.machinery.SourceFileLoader(
                mod_name, os.path.join(import_dir, file_name)
            )
            module = ModuleType(loader.name)
            loader.exec_module(module)
            sys.modules[mod_name] = module
            exec(f"import {mod_name}")


class Predictor(PredictorInterface):
    target: str
    mixers: List[BaseMixer]
    encoders: Dict[str, BaseEncoder]
    ensemble: BaseEnsemble
    mode: str

    def __init__(self):
        seed(1)
        self.target = "target"
        self.mode = "inactive"
        self.problem_definition = ProblemDefinition.from_dict(
            {
                "target": "target",
                "pct_invalid": 2,
                "unbias_target": True,
                "seconds_per_mixer": 21384.0,
                "seconds_per_encoder": 85536.0,
                "expected_additional_time": 15.443372249603271,
                "time_aim": 259200,
                "target_weights": None,
                "positive_domain": False,
                "timeseries_settings": {
                    "is_timeseries": False,
                    "order_by": None,
                    "window": None,
                    "group_by": None,
                    "use_previous_target": True,
                    "horizon": None,
                    "historical_columns": None,
                    "target_type": "",
                    "allow_incomplete_history": True,
                    "eval_incomplete": False,
                    "interval_periods": [],
                },
                "anomaly_detection": False,
                "use_default_analysis": True,
                "embedding_only": False,
                "dtype_dict": {},
                "ignore_features": ["url_legal", "license", "standard_error"],
                "fit_on_all": True,
                "strict_mode": True,
                "seed_nr": 1,
            }
        )
        self.accuracy_functions = ["r2_score"]
        self.identifiers = {"id": "Hash-like identifier"}
        self.dtype_dict = {"excerpt": "rich_text", "target": "float"}
        self.lightwood_version = "24.5.2.0"
        self.pred_args = PredictionArguments()

        # Any feature-column dependencies
        self.dependencies = {"target": [], "excerpt": []}

        self.input_cols = ["excerpt"]

        # Initial stats analysis
        self.statistical_analysis = None
        self.ts_analysis = None
        self.runtime_log = dict()
        self.global_insights = dict()

        # Feature cache
        self.feature_cache = dict()

    @timed_predictor
    def analyze_data(self, data: pd.DataFrame) -> None:
        # Perform a statistical analysis on the unprocessed data

        self.statistical_analysis = statistical_analysis(
            data,
            self.dtype_dict,
            self.problem_definition.to_dict(),
            {"id": "Hash-like identifier"},
        )

        # Instantiate post-training evaluation
        self.analysis_blocks = [
            ICP(fixed_significance=None, confidence_normalizer=False, deps=[]),
            ConfStats(deps=["ICP"]),
            AccStats(deps=["ICP"]),
            PermutationFeatureImportance(deps=["AccStats"]),
        ]

    @timed_predictor
    def preprocess(self, data: pd.DataFrame) -> pd.DataFrame:
        # Preprocess and clean data

        log.info("Cleaning the data")
        self.imputers = {}
        data = MyCustomCleaner.cleaner(
            data=data,
            identifiers=self.identifiers,
            dtype_dict=self.dtype_dict,
            target=self.target,
            mode=self.mode,
            timeseries_settings=self.problem_definition.timeseries_settings.to_dict(),
            anomaly_detection=self.problem_definition.anomaly_detection,
        )

        # Time-series blocks

        return data

    @timed_predictor
    def split(self, data: pd.DataFrame) -> Dict[str, pd.DataFrame]:
        # Split the data into training/testing splits

        log.info("Splitting the data into train/test")
        train_test_data = splitter(
            data=data,
            pct_train=0.8,
            pct_dev=0.1,
            pct_test=0.1,
            tss=self.problem_definition.timeseries_settings.to_dict(),
            seed=self.problem_definition.seed_nr,
            target=self.target,
            dtype_dict=self.dtype_dict,
        )

        return train_test_data

    @timed_predictor
    def prepare(self, data: Dict[str, pd.DataFrame]) -> None:
        # Prepare encoders to featurize data

        self.mode = "train"

        if self.statistical_analysis is None:
            raise Exception("Please run analyze_data first")

        # Column to encoder mapping
        self.encoders = {
            "target": NumericEncoder(
                is_target=True,
                positive_domain=self.statistical_analysis.positive_domain,
            ),
            "excerpt": PretrainedLangEncoder(
                output_type=self.dtype_dict[self.target],
                stop_after=self.problem_definition.seconds_per_encoder,
            ),
        }

        # Prepare the training + dev data
        concatenated_train_dev = pd.concat([data["train"], data["dev"]])

        prepped_encoders = {}

        # Prepare input encoders
        parallel_encoding = parallel_encoding_check(data["train"], self.encoders)

        if parallel_encoding:
            log.debug("Preparing in parallel...")
            for col_name, encoder in self.encoders.items():
                if col_name != self.target and not encoder.is_trainable_encoder:
                    prepped_encoders[col_name] = (
                        encoder,
                        concatenated_train_dev[col_name],
                        "prepare",
                    )
            prepped_encoders = mut_method_call(prepped_encoders)

        else:
            log.debug("Preparing sequentially...")
            for col_name, encoder in self.encoders.items():
                if col_name != self.target and not encoder.is_trainable_encoder:
                    log.debug(f"Preparing encoder for {col_name}...")
                    encoder.prepare(concatenated_train_dev[col_name])
                    prepped_encoders[col_name] = encoder

        # Store encoders
        for col_name, encoder in prepped_encoders.items():
            self.encoders[col_name] = encoder

        # Prepare the target
        if self.target not in prepped_encoders:
            if self.encoders[self.target].is_trainable_encoder:
                self.encoders[self.target].prepare(
                    data["train"][self.target], data["dev"][self.target]
                )
            else:
                self.encoders[self.target].prepare(
                    pd.concat([data["train"], data["dev"]])[self.target]
                )

        # Prepare any non-target encoders that are learned
        for col_name, encoder in self.encoders.items():
            if col_name != self.target and encoder.is_trainable_encoder:
                priming_data = pd.concat([data["train"], data["dev"]])
                kwargs = {}
                if self.dependencies[col_name]:
                    kwargs["dependency_data"] = {}
                    for col in self.dependencies[col_name]:
                        kwargs["dependency_data"][col] = {
                            "original_type": self.dtype_dict[col],
                            "data": priming_data[col],
                        }

                # If an encoder representation requires the target, provide priming data
                if hasattr(encoder, "uses_target"):
                    kwargs["encoded_target_values"] = self.encoders[self.target].encode(
                        priming_data[self.target]
                    )

                encoder.prepare(
                    data["train"][col_name], data["dev"][col_name], **kwargs
                )

    @timed_predictor
    def featurize(self, split_data: Dict[str, pd.DataFrame]):
        # Featurize data into numerical representations for models

        log.info("Featurizing the data")

        tss = self.problem_definition.timeseries_settings

        feature_data = dict()
        for key, data in split_data.items():
            if key != "stratified_on":

                # compute and store two splits - full and filtered (useful for time series post-train analysis)
                if key not in self.feature_cache:
                    featurized_split = EncodedDs(self.encoders, data, self.target)
                    filtered_subset = EncodedDs(
                        self.encoders, filter_ts(data, tss), self.target
                    )

                    for k, s in zip(
                        (key, f"{key}_filtered"), (featurized_split, filtered_subset)
                    ):
                        self.feature_cache[k] = s

                for k in (key, f"{key}_filtered"):
                    feature_data[k] = self.feature_cache[k]

        return feature_data

    @timed_predictor
    def fit(self, enc_data: Dict[str, pd.DataFrame]) -> None:
        # Fit predictors to estimate target

        self.mode = "train"

        # --------------- #
        # Extract data
        # --------------- #
        # Extract the featurized data into train/dev/test
        encoded_train_data = enc_data["train"]
        encoded_dev_data = enc_data["dev"]
        encoded_test_data = enc_data["test_filtered"]

        log.info("Training the mixers")

        # --------------- #
        # Fit Models
        # --------------- #
        # Assign list of mixers
        self.mixers = [
            Neural(
                fit_on_dev=True,
                search_hyperparameters=True,
                net="DefaultNet",
                stop_after=self.problem_definition.seconds_per_mixer,
                target=self.target,
                dtype_dict=self.dtype_dict,
                target_encoder=self.encoders[self.target],
            ),
            XGBoostMixer(
                fit_on_dev=True,
                use_optuna=True,
                stop_after=self.problem_definition.seconds_per_mixer,
                target=self.target,
                dtype_dict=self.dtype_dict,
                input_cols=self.input_cols,
                target_encoder=self.encoders[self.target],
            ),
            Regression(
                stop_after=self.problem_definition.seconds_per_mixer,
                target=self.target,
                dtype_dict=self.dtype_dict,
                target_encoder=self.encoders[self.target],
            ),
            RandomForest(
                fit_on_dev=True,
                stop_after=self.problem_definition.seconds_per_mixer,
                target=self.target,
                dtype_dict=self.dtype_dict,
                target_encoder=self.encoders[self.target],
            ),
        ]

        # Train mixers
        trained_mixers = []
        for mixer in self.mixers:
            try:
                if mixer.trains_once:
                    self.fit_mixer(
                        mixer,
                        ConcatedEncodedDs([encoded_train_data, encoded_dev_data]),
                        encoded_test_data,
                    )
                else:
                    self.fit_mixer(mixer, encoded_train_data, encoded_dev_data)
                trained_mixers.append(mixer)
            except Exception as e:
                log.warning(f"Exception: {e} when training mixer: {mixer}")
                if True and mixer.stable:
                    raise e

        # Update mixers to trained versions
        if not trained_mixers:
            raise Exception(
                "No mixers could be trained! Please verify your problem definition or JsonAI model representation."
            )
        self.mixers = trained_mixers

        # --------------- #
        # Create Ensembles
        # --------------- #
        log.info("Ensembling the mixer")
        # Create an ensemble of mixers to identify best performing model
        # Dirty hack
        self.ensemble = BestOf(
            data=encoded_test_data,
            fit=True,
            ts_analysis=None,
            target=self.target,
            mixers=self.mixers,
            args=self.pred_args,
            accuracy_functions=self.accuracy_functions,
        )
        self.supports_proba = self.ensemble.supports_proba

    @timed_predictor
    def fit_mixer(self, mixer, encoded_train_data, encoded_dev_data) -> None:
        mixer.fit(encoded_train_data, encoded_dev_data)

    @timed_predictor
    def analyze_ensemble(self, enc_data: Dict[str, pd.DataFrame]) -> None:
        # Evaluate quality of fit for the ensemble of mixers

        # --------------- #
        # Extract data
        # --------------- #
        # Extract the featurized data into train/dev/test
        encoded_train_data = enc_data["train"]
        encoded_dev_data = enc_data["dev"]
        encoded_test_data = enc_data["test"]

        # --------------- #
        # Analyze Ensembles
        # --------------- #
        log.info("Analyzing the ensemble of mixers")
        self.model_analysis, self.runtime_analyzer = model_analyzer(
            data=encoded_test_data,
            train_data=encoded_train_data,
            ts_analysis=None,
            stats_info=self.statistical_analysis,
            pdef=self.problem_definition,
            accuracy_functions=self.accuracy_functions,
            predictor=self.ensemble,
            target=self.target,
            dtype_dict=self.dtype_dict,
            analysis_blocks=self.analysis_blocks,
        )

    @timed_predictor
    def learn(self, data: pd.DataFrame) -> None:
        if self.problem_definition.ignore_features:
            log.info(f"Dropping features: {self.problem_definition.ignore_features}")
            data = data.drop(
                columns=self.problem_definition.ignore_features, errors="ignore"
            )

        self.mode = "train"
        n_phases = 8 if self.problem_definition.fit_on_all else 7

        # Perform stats analysis
        log.info(f"[Learn phase 1/{n_phases}] - Statistical analysis")
        self.analyze_data(data)

        # Pre-process the data
        log.info(f"[Learn phase 2/{n_phases}] - Data preprocessing")
        data = self.preprocess(data)

        # Create train/test (dev) split
        log.info(f"[Learn phase 3/{n_phases}] - Data splitting")
        train_dev_test = self.split(data)

        # Prepare encoders
        log.info(f"[Learn phase 4/{n_phases}] - Preparing encoders")
        self.prepare(train_dev_test)

        # Create feature vectors from data
        log.info(f"[Learn phase 5/{n_phases}] - Feature generation")
        enc_train_test = self.featurize(train_dev_test)

        # Prepare mixers
        log.info(f"[Learn phase 6/{n_phases}] - Mixer training")
        if not self.problem_definition.embedding_only:
            self.fit(enc_train_test)
        else:
            self.mixers = []
            self.ensemble = Embedder(
                self.target, mixers=list(), data=enc_train_test["train"]
            )
            self.supports_proba = self.ensemble.supports_proba

        # Analyze the ensemble
        log.info(f"[Learn phase 7/{n_phases}] - Ensemble analysis")
        self.analyze_ensemble(enc_train_test)

        # ------------------------ #
        # Enable model partial fit AFTER it is trained and evaluated for performance with the appropriate train/dev/test splits.
        # This assumes the predictor could continuously evolve, hence including reserved testing data may improve predictions.
        # SET `json_ai.problem_definition.fit_on_all=False` TO TURN THIS BLOCK OFF.

        # Update the mixers with partial fit
        if self.problem_definition.fit_on_all and all(
            [not m.trains_once for m in self.mixers]
        ):
            log.info(f"[Learn phase 8/{n_phases}] - Adjustment on validation requested")
            self.adjust(
                enc_train_test["test"].data_frame,
                ConcatedEncodedDs(
                    [enc_train_test["train"], enc_train_test["dev"]]
                ).data_frame,
                adjust_args={"learn_call": True},
            )

        self.feature_cache = (
            dict()
        )  # empty feature cache to avoid large predictor objects

    @timed_predictor
    def adjust(
        self,
        train_data: Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame],
        dev_data: Optional[Union[EncodedDs, ConcatedEncodedDs, pd.DataFrame]] = None,
        adjust_args: Optional[dict] = None,
    ) -> None:
        # Update mixers with new information

        self.mode = "train"

        # --------------- #
        # Prepare data
        # --------------- #
        if dev_data is None:
            data = train_data
            split = splitter(
                data=data,
                pct_train=0.8,
                pct_dev=0.2,
                pct_test=0,
                tss=self.problem_definition.timeseries_settings.to_dict(),
                seed=self.problem_definition.seed_nr,
                target=self.target,
                dtype_dict=self.dtype_dict,
            )
            train_data = split["train"]
            dev_data = split["dev"]

        if adjust_args is None or not adjust_args.get("learn_call"):
            train_data = self.preprocess(train_data)
            dev_data = self.preprocess(dev_data)

        dev_data = EncodedDs(self.encoders, dev_data, self.target)
        train_data = EncodedDs(self.encoders, train_data, self.target)

        # --------------- #
        # Update/Adjust Mixers
        # --------------- #
        log.info("Updating the mixers")

        for mixer in self.mixers:
            mixer.partial_fit(train_data, dev_data, adjust_args)

    @timed_predictor
    def predict(self, data: pd.DataFrame, args: Dict = {}) -> pd.DataFrame:

        self.mode = "predict"
        n_phases = 3 if self.pred_args.all_mixers else 4

        if len(data) == 0:
            raise Exception(
                "Empty input, aborting prediction. Please try again with some input data."
            )

        self.pred_args = PredictionArguments.from_dict(args)

        log.info(f"[Predict phase 1/{n_phases}] - Data preprocessing")
        if self.problem_definition.ignore_features:
            log.info(f"Dropping features: {self.problem_definition.ignore_features}")
            data = data.drop(
                columns=self.problem_definition.ignore_features, errors="ignore"
            )
        for col in self.input_cols:
            if col not in data.columns:
                data[col] = [None] * len(data)

        # Pre-process the data
        data = self.preprocess(data)

        # Featurize the data
        log.info(f"[Predict phase 2/{n_phases}] - Feature generation")
        encoded_ds = self.featurize({"predict_data": data})["predict_data"]
        encoded_data = encoded_ds.get_encoded_data(include_target=False)

        log.info(f"[Predict phase 3/{n_phases}] - Calling ensemble")

        @timed
        def _timed_call(encoded_ds):
            if self.pred_args.return_embedding:
                embedder = Embedder(self.target, mixers=list(), data=encoded_ds)
                df = embedder(encoded_ds, args=self.pred_args)
            else:
                df = self.ensemble(encoded_ds, args=self.pred_args)
            return df

        df = _timed_call(encoded_ds)

        if not (
            any(
                [
                    self.pred_args.all_mixers,
                    self.pred_args.return_embedding,
                    self.problem_definition.embedding_only,
                ]
            )
        ):
            log.info(f"[Predict phase 4/{n_phases}] - Analyzing output")
            df, global_insights = explain(
                data=data,
                encoded_data=encoded_data,
                predictions=df,
                ts_analysis=None,
                problem_definition=self.problem_definition,
                stat_analysis=self.statistical_analysis,
                runtime_analysis=self.runtime_analyzer,
                target_name=self.target,
                target_dtype=self.dtype_dict[self.target],
                explainer_blocks=self.analysis_blocks,
                pred_args=self.pred_args,
            )
            self.global_insights = {**self.global_insights, **global_insights}

        self.feature_cache = (
            dict()
        )  # empty feature cache to avoid large predictor objects

        return df

    def test(
        self,
        data: pd.DataFrame,
        metrics: list,
        args: Dict[str, object] = {},
        strict: bool = False,
    ) -> pd.DataFrame:

        preds = self.predict(data, args)
        preds = preds.rename(columns={"prediction": self.target})
        filtered = []

        # filter metrics if not supported
        for metric in metrics:
            # metric should be one of: an actual function, registered in the model class, or supported by the evaluator
            if not (
                callable(metric)
                or metric in self.accuracy_functions
                or metric in mdb_eval_accuracy_metrics
            ):
                if strict:
                    raise Exception(f"Invalid metric: {metric}")
                else:
                    log.warning(f"Invalid metric: {metric}. Skipping...")
            else:
                filtered.append(metric)

        metrics = filtered
        try:
            labels = self.model_analysis.histograms[self.target]["x"]
        except:
            if strict:
                raise Exception("Label histogram not found")
            else:
                label_map = (
                    None  # some accuracy functions will crash without this, be mindful
                )
        scores = evaluate_accuracies(
            data,
            preds[self.target],
            self.target,
            metrics,
            ts_analysis=self.ts_analysis,
            labels=labels,
        )

        # TODO: remove once mdb_eval returns an actual list
        scores = {k: [v] for k, v in scores.items() if not isinstance(v, list)}

        return pd.DataFrame.from_records(
            scores
        )  # TODO: add logic to disaggregate per-mixer

As you can see, an end-to-end pipeline of our entire ML procedure has been generating. There are several abstracted functions to enable transparency as to what processes your data goes through in order to build these models.

The key steps of the pipeline are as follows:

  1. Run a statistical analysis with analyze_data

  2. Clean your data with preprocess

  3. Make a training/dev/testing split with split

  4. Prepare your feature-engineering pipelines with prepare

  5. Create your features with featurize

  6. Fit your predictor models with fit

You can customize this further if necessary, but you have all the steps necessary to train a model!

We recommend familiarizing with these steps by calling the above commands, ideally in order. Some commands (namely prepare, featurize, and fit) do depend on other steps.

If you want to omit the individual steps, we recommend your simply call the learn method, which compiles all the necessary steps implemented to give your fully trained predictive models starting with unprocessed data!

6) Call python to run your code and see your preprocessed outputs

Once we have code, we can turn this into a python object by calling predictor_from_code. This instantiates the PredictorInterface object.

This predictor object can be then used to run your pipeline.

[8]:
# Turn the code above into a predictor object
predictor = predictor_from_code(code)
[9]:
predictor.mode = "train"

# Perform stats analysis
predictor.analyze_data(data)

# Pre-process the data
cleaned_data = predictor.preprocess(data)

cleaned_data.head()
INFO:dataprep_ml-2730:Starting statistical analysis
INFO:dataprep_ml-2730:Dropping features: ['id']
INFO:dataprep_ml-2730:Finished statistical analysis
DEBUG:lightwood-2730: `analyze_data` runtime: 0.05 seconds
INFO:dataprep_ml-2730:Cleaning the data
INFO:dataprep_ml-2730:Dropping features: ['id']
INFO:lightwood-2730:Cleaning column =excerpt
INFO:lightwood-2730:Cleaning column =target
INFO:lightwood-2730:Converted target into strictly non-negative
DEBUG:lightwood-2730: `preprocess` runtime: 0.07 seconds
[9]:
excerpt target
0 When young people returned ballroom, presented... 0.000000
1 All dinner time, Mrs. Fayre somewhat silent, e... 0.000000
2 As Roger predicted, snow departed quickly came... 0.000000
3 And outside palace great garden walled round, ... 0.000000
4 Once upon time Three Bears lived together hous... 0.247197
[10]:
print("\033[1m"  + "Original Data\n" + "\033[0m")
print("Excerpt:\n", data.iloc[0]["excerpt"])
print("\nTarget:\n", data.iloc[0]["target"])

print("\033[1m"  + "\n\nCleaned Data\n" + "\033[0m")
print("Excerpt:\n", cleaned_data.iloc[0]["excerpt"])
print("\nTarget:\n", cleaned_data.iloc[0]["target"])
Original Data

Excerpt:
 When the young people returned to the ballroom, it presented a decidedly changed appearance. Instead of an interior scene, it was a winter landscape.
The floor was covered with snow-white canvas, not laid on smoothly, but rumpled over bumps and hillocks, like a real snow field. The numerous palms and evergreens that had decorated the room, were powdered with flour and strewn with tufts of cotton, like snow. Also diamond dust had been lightly sprinkled on them, and glittering crystal icicles hung from the branches.
At each end of the room, on the wall, hung a beautiful bear-skin rug.
These rugs were for prizes, one for the girls and one for the boys. And this was the game.
The girls were gathered at one end of the room and the boys at the other, and one end was called the North Pole, and the other the South Pole. Each player was given a small flag which they were to plant on reaching the Pole.
This would have been an easy matter, but each traveller was obliged to wear snowshoes.

Target:
 -0.340259125


Cleaned Data

Excerpt:
 When young people returned ballroom, presented decidedly changed appearance. Instead interior scene, winter landscape. The floor covered snow-white canvas, laid smoothly, rumpled bumps hillocks, like real snow field. The numerous palms evergreens decorated room, powdered flour strewn tufts cotton, like snow. Also diamond dust lightly sprinkled them, glittering crystal icicles hung branches. At end room, wall, hung beautiful bear-skin rug. These rugs prizes, one girls one boys. And game. The girls gathered one end room boys other, one end called North Pole, South Pole. Each player given small flag plant reaching Pole. This would easy matter, traveller obliged wear snowshoes.

Target:
 0.0

As you can see, the cleaning-process we introduced cut out the stop-words from the Excerpt, and enforced the target data to stay positive.

We hope this tutorial was informative on how to introduce a custom preprocessing method to your datasets! For more customization tutorials, please check our documentation.

If you want to download the Jupyter-notebook version of this tutorial, check out the source github location found here: lightwood/docssrc/source/tutorials/custom_cleaner.