"""Machine Learning Model base classes and implementations.
This module provides base classes and implementations for machine learning models,
supporting both scikit-learn and PySpark ML models. It includes functionality for
training, prediction, confidence estimation, and entropy calculation.
"""
from abc import abstractmethod, ABC, abstractproperty
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, DoubleType
import pyspark.sql.types as T
from pyspark.ml.functions import vector_to_array, array_to_vector
from pyspark.ml.linalg import VectorUDT
from pyspark.sql import DataFrame as SparkDataFrame, SparkSession
from pyspark.ml import Transformer
import numpy as np
import warnings
from typing import Iterator, overload, Union, Optional
import pandas as pd
from sklearn.model_selection import cross_val_score
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_is_fitted
from sklearn.exceptions import NotFittedError
from joblib import parallel_backend
from threadpoolctl import ThreadpoolController
from .utils import get_logger
log = get_logger(__name__)
[docs]
class MLModel(ABC):
"""Abstract base class for machine learning models.
This class defines the interface that all machine learning models must implement,
whether they are scikit-learn models or PySpark ML models. It provides methods
for training, prediction, confidence estimation, and entropy calculation.
Attributes
----------
nan_fill : float or None
Value to use for filling NaN values in feature vectors
use_vectors : bool
Whether the model expects feature vectors in vector format
use_floats : bool
Whether the model uses float32 (True) or float64 (False) precision
"""
@abstractproperty
def nan_fill(self) -> Optional[float]:
"""Value to use for filling NaN values in feature vectors.
Returns
-------
float or None
The value to use for filling NaN values, or None if no filling is needed
"""
pass
@abstractproperty
def use_vectors(self) -> bool:
"""Whether the model expects feature vectors in vector format.
Returns
-------
bool
True if the model expects vectors, False if it expects arrays
"""
pass
@abstractproperty
def use_floats(self) -> bool:
"""Whether the model uses float32 or float64 precision.
Returns
-------
bool
True if the model uses float32, False if it uses float64
"""
pass
@abstractproperty
def trained_model(self):
""" The trained ML Model object
Returns
-------
MLModel
The trained ML Model object
"""
[docs]
@abstractmethod
def predict(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, output_col: str) -> Union[pd.DataFrame, SparkDataFrame]:
"""Make predictions using the trained model.
Parameters
----------
df : pandas.DataFrame or pyspark.sql.DataFrame
The DataFrame containing the feature vectors to predict on
vector_col : str
Name of the column containing feature vectors
output_col : str
Name of the column to store predictions in
Returns
-------
pandas.DataFrame or pyspark.sql.DataFrame
The input DataFrame with predictions added in the output_col
"""
pass
[docs]
@abstractmethod
def predict_with_confidence(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, prediction_col: str, confidence_col: str) -> Union[pd.DataFrame, SparkDataFrame]:
"""Make predictions and confidence scores using the trained model.
This method is more efficient than calling predict() and prediction_conf() separately
as it computes both in a single pass when possible.
Parameters
----------
df : pandas.DataFrame or pyspark.sql.DataFrame
The DataFrame containing the feature vectors to predict on
vector_col : str
Name of the column containing feature vectors
prediction_col : str
Name of the column to store predictions in
confidence_col : str
Name of the column to store confidence scores in
Returns
-------
pandas.DataFrame or pyspark.sql.DataFrame
The input DataFrame with predictions and confidence scores added
"""
pass
[docs]
@abstractmethod
def prediction_conf(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, label_column: str) -> Union[pd.DataFrame, SparkDataFrame]:
"""Calculate prediction confidence scores.
Parameters
----------
df : pandas.DataFrame or pyspark.sql.DataFrame
The DataFrame containing the feature vectors
vector_col : str
Name of the column containing feature vectors
label_column : str
Name of the column containing true labels
Returns
-------
pandas.DataFrame or pyspark.sql.DataFrame
The input DataFrame with confidence scores added
"""
pass
[docs]
@abstractmethod
def entropy(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, output_col: str) -> Union[pd.DataFrame, SparkDataFrame]:
"""Calculate entropy of predictions.
Parameters
----------
df : pandas.DataFrame or pyspark.sql.DataFrame
The DataFrame containing the feature vectors
vector_col : str
Name of the column containing feature vectors
output_col : str
Name of the column to store entropy values in
Returns
-------
pandas.DataFrame or pyspark.sql.DataFrame
The input DataFrame with entropy values added in the output_col
"""
pass
[docs]
@abstractmethod
def train(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, label_column: str):
"""Train the model on the given data.
Parameters
----------
df : pandas.DataFrame or pyspark.sql.DataFrame
The DataFrame containing training data
vector_col : str
Name of the column containing feature vectors
label_column : str
Name of the column containing labels
Returns
-------
MLModel
The trained model (self)
"""
pass
[docs]
@abstractmethod
def params_dict(self) -> dict:
"""Get a dictionary of model parameters.
Returns
-------
dict
Dictionary containing model parameters and configuration
"""
pass
[docs]
def prep_fvs(self, fvs: Union[pd.DataFrame, SparkDataFrame], feature_col: str = 'feature_vectors') -> Union[pd.DataFrame, SparkDataFrame]:
"""Prepare feature vectors for model input.
This method handles NaN filling and conversion between vector and array formats
based on the model's requirements.
Parameters
----------
fvs : pandas.DataFrame or pyspark.sql.DataFrame
DataFrame containing feature vectors
feature_col : str, optional
Name of the column containing feature vectors
Returns
-------
pandas.DataFrame or pyspark.sql.DataFrame
DataFrame with prepared feature vectors
"""
if isinstance(fvs, pd.DataFrame):
if self.nan_fill is not None:
fvs = fvs.copy()
feature_data = fvs[feature_col]
if hasattr(feature_data.iloc[0], '__iter__') and not isinstance(feature_data.iloc[0], str):
filled_features = []
for feature in feature_data:
if isinstance(feature, (list, np.ndarray)):
feature_array = np.array(feature)
feature_array = np.nan_to_num(feature_array, nan=self.nan_fill)
filled_features.append(feature_array)
else:
filled_features.append(feature)
fvs[feature_col] = filled_features
else:
fvs[feature_col] = fvs[feature_col].fillna(self.nan_fill)
elif isinstance(fvs, SparkDataFrame):
if self.nan_fill is not None:
fvs = fvs.withColumn(feature_col, F.transform(feature_col, lambda x : F.when(x.isNotNull() & ~F.isnan(x), x).otherwise(self.nan_fill)))
if self.use_vectors:
fvs = convert_to_vector(fvs, feature_col)
else:
fvs = convert_to_array(fvs, feature_col)
if self.use_floats:
fvs = fvs.withColumn(feature_col, fvs[feature_col].cast('array<float>'))
else:
fvs = fvs.withColumn(feature_col, fvs[feature_col].cast('array<double>'))
else:
raise TypeError(f"Unsupported DataFrame type: {type(fvs)}")
return fvs
def convert_to_vector(df, col):
"""Convert array column to vector format for both pandas and Spark DataFrames.
Parameters
----------
df : pandas.DataFrame or pyspark.sql.DataFrame
The DataFrame containing the column to convert
col : str
Name of the column to convert
Returns
-------
pandas.DataFrame or pyspark.sql.DataFrame
DataFrame with the column converted to vector format
"""
if isinstance(df, pd.DataFrame):
if col in df.columns:
return df
else:
raise ValueError(f"Column '{col}' not found in DataFrame")
elif isinstance(df, SparkDataFrame):
if not isinstance(df.schema[col].dataType, VectorUDT):
df = df.withColumn(col, array_to_vector(col))
return df
else:
raise TypeError(f"Unsupported DataFrame type: {type(df)}")
_DOUBLE_ARRAY = T.ArrayType(T.DoubleType())
_FLOAT_ARRAY = T.ArrayType(T.FloatType())
_ARRAY_TYPES = {_DOUBLE_ARRAY, _FLOAT_ARRAY}
def convert_to_array(df, col):
"""Convert vector column to array format for both pandas and Spark DataFrames.
Parameters
----------
df : pandas.DataFrame or pyspark.sql.DataFrame
The DataFrame containing the column to convert
col : str
Name of the column to convert
Returns
-------
pandas.DataFrame or pyspark.sql.DataFrame
DataFrame with the column converted to array format
"""
if isinstance(df, pd.DataFrame):
if col in df.columns:
return df
else:
raise ValueError(f"Column '{col}' not found in DataFrame")
elif isinstance(df, SparkDataFrame):
if df.schema[col].dataType not in _ARRAY_TYPES:
df = df.withColumn(col, vector_to_array(col))
return df
else:
raise TypeError(f"Unsupported DataFrame type: {type(df)}")
[docs]
class SKLearnModel(MLModel):
"""Scikit-learn model wrapper.
This class wraps scikit-learn models to provide a consistent interface
with PySpark ML models. It handles conversion between pandas and PySpark
DataFrames, and manages model training and prediction.
Parameters
----------
model : sklearn.base.BaseEstimator or type
The scikit-learn model class or instance to use
nan_fill : float or None, optional
Value to use for filling NaN values
use_floats : bool, optional
Whether to use float32 (True) or float64 (False) precision
**model_args : dict
Additional arguments to pass to the model constructor
"""
def __init__(self, model, nan_fill=None, use_floats=True, **model_args):
try:
check_is_fitted(model)
self._trained_model = model
self._model = model.__class__
self._model_args = {}
except (NotFittedError, TypeError):
self._trained_model = None
self._model_args = model_args.copy()
self._model = model
self._nan_fill = nan_fill
self._use_floats = use_floats
self._vector_buffer = None
[docs]
def params_dict(self):
return {
'model' : str(self._model),
'nan_fill' : self._nan_fill,
'model_args' : self._model_args.copy()
}
def _no_threads(self):
tpc = ThreadpoolController()
tpc.limit(limits=1, user_api='openmp')
tpc.limit(limits=1, user_api='blas')
pass
@property
def nan_fill(self):
return self._nan_fill
@property
def use_vectors(self):
return False
@property
def use_floats(self):
return self._use_floats
@property
def trained_model(self):
return self._trained_model
[docs]
def get_model(self):
return self._model(**self._model_args)
def _allocate_buffer(self, nrows, ncols):
needed_size = nrows * ncols
if self._vector_buffer is None or self._vector_buffer.size < needed_size:
self._vector_buffer = np.empty(needed_size, dtype=(np.float32 if self.use_floats else np.float64) )
return self._vector_buffer[:needed_size].reshape(nrows, ncols)
def _make_feature_matrix(self, vecs):
if len(vecs) == 0:
return None
buffer = self._allocate_buffer(len(vecs), len(vecs[0]))
X = np.stack(vecs, axis=0, out=buffer)
if self._nan_fill is not None:
np.nan_to_num(X, copy=False, nan=self._nan_fill)
return X
def _predict(self, vec_itr : Iterator[pd.Series]) -> Iterator[pd.Series]:
warnings.filterwarnings('ignore', category=RuntimeWarning)
self._no_threads()
for vecs in vec_itr:
X = self._make_feature_matrix(vecs.values)
yield pd.Series(self._trained_model.predict(X))
[docs]
def predict(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, output_col: str) -> Union[pd.DataFrame, SparkDataFrame]:
if self._trained_model is None:
raise RuntimeError('Model must be trained to predict')
if isinstance(df, pd.DataFrame):
X = self._make_feature_matrix(df[vector_col].tolist())
df[output_col] = self._trained_model.predict(X)
return df
if isinstance(df, SparkDataFrame):
df = convert_to_array(df, vector_col)
f = F.pandas_udf(self._predict, T.DoubleType())
return df.withColumn(output_col, f(vector_col))
def _predict_with_confidence(self, vec_itr : Iterator[pd.Series]) -> Iterator[pd.DataFrame]:
warnings.filterwarnings('ignore', category=RuntimeWarning)
self._no_threads()
for vecs in vec_itr:
X = self._make_feature_matrix(vecs.values)
predictions = self._trained_model.predict(X)
confidences = self._trained_model.predict_proba(X).max(axis=1)
yield pd.DataFrame({
'prediction': predictions,
'confidence': confidences
})
[docs]
def predict_with_confidence(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, prediction_col: str, confidence_col: str) -> Union[pd.DataFrame, SparkDataFrame]:
if self._trained_model is None:
raise RuntimeError('Model must be trained to predict')
if isinstance(df, pd.DataFrame):
X = self._make_feature_matrix(df[vector_col].tolist())
predictions = self._trained_model.predict(X)
probs = self._trained_model.predict_proba(X).max(axis=1)
df[prediction_col] = predictions
df[confidence_col] = probs
return df
if isinstance(df, SparkDataFrame):
df = convert_to_array(df, vector_col)
schema = StructType([
StructField("prediction", DoubleType(), True),
StructField("confidence", DoubleType(), True)
])
f = F.pandas_udf(self._predict_with_confidence, schema)
result = df.withColumn("temp_result", f(vector_col))
result = result.withColumn(prediction_col, F.col("temp_result.prediction"))\
.withColumn(confidence_col, F.col("temp_result.confidence"))\
.drop("temp_result")
return result
def _prediction_conf(self, vec_itr : Iterator[pd.Series]) -> Iterator[pd.Series]:
warnings.filterwarnings('ignore', category=RuntimeWarning)
self._no_threads()
for vecs in vec_itr:
X = self._make_feature_matrix(vecs.values)
probs = self._trained_model.predict_proba(X)
yield pd.Series(probs.max(axis=1))
[docs]
def prediction_conf(self, df, vector_col : str, output_col : str):
if self._trained_model is None:
raise RuntimeError('Model must be trained to predict')
df = convert_to_array(df, vector_col)
f = F.pandas_udf(self._prediction_conf, T.DoubleType())
return df.withColumn(output_col, f(vector_col))
def _entropy(self, vec_itr : Iterator[pd.Series]) -> Iterator[pd.Series]:
warnings.filterwarnings('ignore', category=RuntimeWarning)
self._no_threads()
for vecs in vec_itr:
X = self._make_feature_matrix(vecs.values)
probs = self._trained_model.predict_proba(X)
yield pd.Series(np.nan_to_num((-probs * np.log2(probs)).sum(axis=1)))
[docs]
def entropy(self, df, vector_col : str, output_col : str):
if self._trained_model is None:
raise RuntimeError('Model must be trained to predict')
df = convert_to_array(df, vector_col)
f = F.pandas_udf(self._entropy, T.DoubleType())
return df.withColumn(output_col, f(vector_col))
[docs]
def train(self, df, vector_col : str, label_column : str):
if isinstance(df, SparkDataFrame):
df = convert_to_array(df, vector_col)
df = df.toPandas()
X = self._make_feature_matrix(df[vector_col].values)
y = df[label_column].values
self._trained_model = self._model(**self._model_args)
self._trained_model.fit(X, y)
return self
[docs]
def cross_val_scores(self, df, vector_col : str, label_column : str, cv : int = 10):
df = convert_to_array(df, vector_col)
df = df.toPandas()
X = self._make_feature_matrix(df[vector_col].values)
y = df[label_column].values
scores = cross_val_score(self.get_model(), X, y, cv=cv)
return scores
[docs]
class SparkMLModel(MLModel):
def __init__(self, model, nan_fill = 0.0, **model_args):
if isinstance(model, Transformer):
self._trained_model = model
self._model = model.__class__
self._model_args = {}
else:
self._trained_model = None
self._model_args = model_args.copy()
self._model = model
self._nan_fill = nan_fill
@property
def nan_fill(self):
return self._nan_fill
@property
def use_vectors(self):
return True
@property
def use_floats(self):
return False
@property
def trained_model(self):
return self._trained_model
[docs]
def get_model(self):
return self._model(**self._model_args)
[docs]
def params_dict(self):
return {
'model' : str(self._model),
'model_args' : self._model_args.copy()
}
[docs]
def prediction_conf(self, df, vector_col : str, output_col : str):
if self._trained_model is None:
raise RuntimeError('Model must be trained to predict')
df = convert_to_vector(df, vector_col)
cols = df.columns
out = F.array_max(vector_to_array(F.col(self._trained_model.getProbabilityCol()))).alias(output_col)
return self._trained_model.setFeaturesCol(vector_col)\
.transform(df)\
.select(*cols, out)
[docs]
def predict(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, output_col: str) -> Union[pd.DataFrame, SparkDataFrame]:
if self._trained_model is None:
raise RuntimeError('Model must be trained to predict')
return_pandas = isinstance(df, pd.DataFrame)
if isinstance(df, pd.DataFrame):
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark.createDataFrame(df)
if not hasattr(self, '_model_args'):
return SparkMLModel(self._trained_model).predict(df, vector_col, output_col)
df = convert_to_vector(df, vector_col)
cols = df.columns
out = F.col(self._trained_model.getPredictionCol()).alias(output_col)
predictions = self._trained_model.setFeaturesCol(vector_col)\
.transform(df)\
.select(*cols, out)
if return_pandas:
return predictions.toPandas()
else:
return predictions
[docs]
def predict_with_confidence(self, df: Union[pd.DataFrame, SparkDataFrame], vector_col: str, prediction_col: str, confidence_col: str) -> Union[pd.DataFrame, SparkDataFrame]:
if self._trained_model is None:
raise RuntimeError('Model must be trained to predict')
return_pandas = isinstance(df, pd.DataFrame)
if isinstance(df, pd.DataFrame):
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark.createDataFrame(df)
if not hasattr(self, '_model_args'):
return SparkMLModel(self._trained_model).predict_with_confidence(df, vector_col, prediction_col, confidence_col)
df = convert_to_vector(df, vector_col)
cols = df.columns
pred_out = F.col(self._trained_model.getPredictionCol()).alias(prediction_col)
conf_out = F.array_max(vector_to_array(F.col(self._trained_model.getProbabilityCol()))).alias(confidence_col)
result = self._trained_model.setFeaturesCol(vector_col)\
.transform(df)\
.select(*cols, pred_out, conf_out)
if return_pandas:
return result.toPandas()
else:
return result
def _entropy_component(self, p_col, idx):
return F.when(p_col.getItem(idx) != 0.0, -p_col.getItem(idx) * F.log2(p_col.getItem(idx))).otherwise(0.0)
def _entropy_expr(self, probs, classes=2):
p_col = F.col(probs)
e = self._entropy_component(p_col, 0)
for i in range(1, classes):
e = e + self._entropy_component(p_col, i)
return e
[docs]
def entropy(self, df, vector_col : str, output_col : str):
if self._trained_model is None:
raise RuntimeError('Model must be trained to compute entropy')
df = convert_to_vector(df, vector_col)
prob_col = self._trained_model.getProbabilityCol()
prob_array = 'prob_array'
cols = df.columns
return self._trained_model.setFeaturesCol(vector_col)\
.transform(df)\
.select(*cols, vector_to_array(prob_col).alias(prob_array))\
.withColumn(output_col, self._entropy_expr(prob_array))\
.drop(prob_array)
[docs]
def train(self, df, vector_col : str, label_column : str):
if isinstance(df, pd.DataFrame):
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark.createDataFrame(df)
df = convert_to_vector(df, vector_col)
self._trained_model = self.get_model().setFeaturesCol(vector_col)\
.setLabelCol(label_column)\
.fit(df)\
.setPredictionCol('__PREDICTION_TMP')\
.setProbabilityCol('__PROB_TMP')\
.setRawPredictionCol('__RAW_PREDICTION_TMP')
return self