Source code for lightwood.encoder.numeric.numeric

import math
from typing import Union, Dict
from copy import deepcopy as dc

import torch
import numpy as np
import pandas as pd
from type_infer.dtype import dtype

from lightwood.encoder.base import BaseEncoder
from lightwood.helpers.general import is_none


[docs]class NumericEncoder(BaseEncoder): """ The numeric encoder takes numbers (float or integer) and converts it into tensors of the form: ``[0 if the number is none, otherwise 1, 1 if the number is positive, otherwise 0, natural_log(abs(number)), number/absolute_mean]`` This representation is: ``[1 if the number is positive, otherwise 0, natural_log(abs(number)), number/absolute_mean]]`` if encoding target values, since target values can't be none. The ``absolute_mean`` is computed in the ``prepare`` method and is just the mean of the absolute values of all numbers feed to prepare (which are not none) ``none`` stands for any number that is an actual python ``None`` value or any sort of non-numeric value (a string, nan, inf) """ # noqa def __init__(self, data_type: dtype = None, target_weights: Dict[float, float] = None, is_target: bool = False, positive_domain: bool = False): """ :param data_type: The data type of the number (integer, float, quantity) :param target_weights: a dictionary of weights to use on the examples. :param is_target: Indicates whether the encoder refers to a target column or feature column (True==target) :param positive_domain: Forces the encoder to always output positive values """ super().__init__(is_target) self._abs_mean = None self.positive_domain = positive_domain self.decode_log = False self.output_size = 4 if not self.is_target else 3 # Weight-balance info if encoder represents target self.target_weights = None self.index_weights = None if self.is_target and target_weights is not None: self.target_weights = dc(target_weights) self.index_weights = torch.tensor(list(self.target_weights.values()))
[docs] def prepare(self, priming_data: pd.Series): """ "NumericalEncoder" uses a rule-based form to prepare results on training (priming) data. The averages etc. are taken from this distribution. :param priming_data: an iterable data structure containing numbers numbers which will be used to compute the values used for normalizing the encoded representations """ # noqa if self.is_prepared: raise Exception('You can only call "prepare" once for a given encoder.') self._abs_mean = priming_data.abs().mean() self.is_prepared = True
[docs] def encode(self, data: Union[np.ndarray, pd.Series]): """ :param data: A pandas series or numpy array containing the numbers to be encoded :returns: A torch tensor with the representations of each number """ if not self.is_prepared: raise Exception('You need to call "prepare" before calling "encode" or "decode".') if isinstance(data, pd.Series): data = data.values inp_data = np.nan_to_num(data.astype(float), nan=0, posinf=np.finfo(np.float32).max, neginf=np.finfo(np.float32).min) # noqa if not self.positive_domain: sign = np.vectorize(self._sign_fn, otypes=[float])(inp_data) else: sign = np.zeros(len(data)) log_value = np.vectorize(self._log_fn, otypes=[float])(inp_data) log_value = np.nan_to_num(log_value, nan=0, posinf=20, neginf=-20) norm = np.vectorize(self._norm_fn, otypes=[float])(inp_data) norm = np.nan_to_num(norm, nan=0, posinf=20, neginf=-20) if self.is_target: components = [sign, log_value, norm] else: nones = np.vectorize(self._none_fn, otypes=[float])(data) components = [sign, log_value, norm, nones] return torch.Tensor(np.asarray(components)).T
@staticmethod def _sign_fn(x: float) -> float: return 0 if x < 0 else 1 @staticmethod def _log_fn(x: float) -> float: return math.log(abs(x)) if abs(x) > 0 else -20 def _norm_fn(self, x: float) -> float: return x / self._abs_mean @staticmethod def _none_fn(x: float) -> float: return 1 if is_none(x) else 0
[docs] def decode(self, encoded_values: torch.Tensor, decode_log: bool = None) -> list: """ :param encoded_values: The encoded values to decode into single numbers :param decode_log: Whether to decode the ``log`` or ``linear`` part of the representation, since the encoded vector contains both a log and a linear part :returns: The decoded array """ # noqa if not self.is_prepared: raise Exception('You need to call "prepare" before calling "encode" or "decode".') if decode_log is None: decode_log = self.decode_log # force = True prevents side effects on the original encoded_values ev = encoded_values.numpy(force=True) # set "divergent" value as default (note: finfo.max() instead of pow(10, 63)) ret = np.full((ev.shape[0],), dtype=float, fill_value=np.finfo(np.float64).max) # `none` filter (if not a target column) if not self.is_target: mask_none = ev[:, -1] == 1 ret[mask_none] = np.nan else: mask_none = np.zeros_like(ret) # sign component sign = np.ones(ev.shape[0], dtype=float) mask_sign = ev[:, 0] < 0.5 sign[mask_sign] = -1 # real component if decode_log: real_value = np.exp(ev[:, 1]) * sign overflow_mask = ev[:, 1] >= 63 real_value[overflow_mask] = 10 ** 63 valid_mask = ~overflow_mask else: real_value = ev[:, 2] * self._abs_mean valid_mask = np.ones_like(real_value, dtype=bool) # final filters if self.positive_domain: real_value = abs(real_value) ret[valid_mask] = real_value[valid_mask] # set nan back to None if mask_none.sum() > 0: ret = ret.astype(object) ret[mask_none] = None return ret.tolist() # TODO: update signature on BaseEncoder and replace all encs to return ndarrays
def get_weights(self, label_data): # get a sorted list of intervals to assign weights. Keys are the interval edges. target_weight_keys = np.array(list(self.target_weights.keys())) target_weight_values = np.array(list(self.target_weights.values())) sorted_indices = np.argsort(target_weight_keys) # get sorted arrays for vector numpy operations target_weight_keys = target_weight_keys[sorted_indices] target_weight_values = target_weight_values[sorted_indices] # find the indices of the bins according to the keys. clip to the length of the weight values (search sorted # returns indices from 0 to N with N = len(target_weight_keys). assigned_target_weight_indices = np.clip(a=np.searchsorted(target_weight_keys, label_data), a_min=0, a_max=len(target_weight_keys) - 1).astype(np.int32) return target_weight_values[assigned_target_weight_indices]