Source code for MatchFlow.tools

"""
Public API functions for MatchFlow.

This module provides the main functions that users will interact with.
Implementation details are hidden in the _internal package.
"""

from typing import List, Optional, Callable, Any, Union, Literal, Dict, Type
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.types import StructType, StructField, DoubleType
from sklearn.base import BaseEstimator
import xxhash
import numpy as np
import pickle
from pathlib import Path
import logging
import sys

from ._internal.ml_model import MLModel, SKLearnModel, SparkMLModel
from ._internal.utils import get_logger, convert_arrays_for_spark, save_training_data_streaming, load_training_data_streaming
from ._internal.labeler import Labeler, CLILabeler, GoldLabeler
from ._internal.active_learning.ent_active_learner import EntropyActiveLearner
from ._internal.active_learning.cont_entropy_active_learner import ContinuousEntropyActiveLearner
from ._internal.featurization import (
    create_features,
    get_base_sim_functions,
    get_base_tokenizers,
    get_extra_tokenizers,
    featurize
)

logger = get_logger(__name__)

# Re-export the public functions
__all__ = [
    'create_features',
    'get_base_sim_functions',
    'get_base_tokenizers',
    'get_extra_tokenizers',
    'featurize',
    'down_sample',
    'create_seeds',
    'train_matcher',
    'apply_matcher',
    'label_data',
    'label_pairs',
    'save_features',
    'load_features',
    'save_dataframe',
    'load_dataframe'
]


[docs] def down_sample( fvs: Union[pd.DataFrame, SparkDataFrame], percent: float, search_id_column: str, score_column: str = 'score', bucket_size: int = 1_000, ) -> Union[pd.DataFrame, SparkDataFrame]: """ down sample by score_column to produce percent * fvs.count() rows Parameters ---------- fvs : Union[pd.DataFrame, SparkDataFrame] the feature vectors to be downsampled percent : float the portion of the vectors to be output, (0.0, 1.0] search_id_column: str the name of the column containing unique identifiers for each record score_column : str the column that scored the vectors, should be positively correlated with the probability of the pair being a match bucket_size : int = 1000 the size of the buckets for partitioning, default 1000 Returns ------- Union[pd.DataFrame, SparkDataFrame] the down sampled dataset with percent * fvs.count() rows with the same schema as fvs """ if isinstance(fvs, pd.DataFrame): if percent <= 0 or percent > 1.0: raise ValueError('percent must be in the range (0.0, 1.0]') fvs = fvs.copy() total = len(fvs) nparts = max(total // bucket_size, 1) fvs["_hash_bucket"] = fvs[search_id_column].astype(str).apply(lambda val: xxhash.xxh64(val).intdigest() % nparts) fvs["_rank_desc"] = fvs.groupby("_hash_bucket")[score_column] \ .rank(method="min", ascending=False) bucket_sizes = fvs["_hash_bucket"].value_counts().to_dict() fvs["_bucket_size"] = fvs["_hash_bucket"].map(bucket_sizes) fvs["_percent_rank"] = fvs.apply( lambda row: 0.0 if row["_bucket_size"] == 1 else (row["_rank_desc"] - 1) / (row["_bucket_size"] - 1), axis=1, ) fvs = fvs[fvs["_percent_rank"] <= percent].copy() fvs = fvs.drop(columns=["_hash_bucket", "_rank_desc", "_bucket_size", "_percent_rank"]) elif isinstance(fvs, SparkDataFrame): if percent <= 0 or percent > 1.0: raise ValueError('percent must be in the range (0.0, 1.0]') if bucket_size < 1000: raise ValueError('bucket_size must be >= 1000') if isinstance(score_column, str): score_column = F.col(score_column) # temp columns for sampling percentile_col = '_PERCENTILE' hash_col = '_HASH' window = Window().partitionBy(hash_col).orderBy(score_column.desc()) nparts = max(fvs.count() // bucket_size, 1) fvs = fvs.withColumn(hash_col, F.xxhash64(search_id_column) % nparts)\ .select('*', F.percent_rank().over(window).alias(percentile_col))\ .filter(F.col(percentile_col) <= percent)\ .drop(percentile_col, hash_col) return fvs
[docs] def create_seeds( fvs: Union[pd.DataFrame, SparkDataFrame], nseeds: int, labeler: Labeler, score_column: str = 'score', parquet_file_path: str = 'active-matcher-training-data.parquet' ) -> Union[pd.DataFrame, SparkDataFrame]: """Create labeled seed examples for active learning. Parameters ---------- fvs : Union[pd.DataFrame, SparkDataFrame] DataFrame containing feature vectors with scores nseeds : int the number of seeds you want to use to train an initial model labeler : Labeler the labeler object you want to use to assign labels to rows score_column : str, default='score' the name of the score column in your fvs DataFrame parquet_file_path : str, default='active-matcher-training-data.parquet' The path to save the labeled data to Returns ------- Union[pd.DataFrame, SparkDataFrame] A DataFrame with labeled seeds, schema is (previous schema of fvs, `label`) where the values in label are either 0.0 or 1.0 """ if nseeds == 0: raise ValueError("no seeds would be created") return_pandas = isinstance(fvs, pd.DataFrame) spark = None if return_pandas else SparkSession.builder.getOrCreate() existing_training_data = load_training_data_streaming(parquet_file_path, logger) if existing_training_data is not None: logger.info(f'Found {len(existing_training_data)} existing labeled examples') # Check if we already have enough seeds if len(existing_training_data) >= nseeds: logger.info(f'Already have {len(existing_training_data)} labeled examples, returning existing data') result_df = existing_training_data.head(nseeds) # Convert to Spark DataFrame if input was Spark DataFrame if not return_pandas: result_df = convert_arrays_for_spark(result_df) return spark.createDataFrame(result_df) return result_df else: logger.info(f'Using {len(existing_training_data)} existing examples, need {nseeds - len(existing_training_data)} more') seeds = existing_training_data.to_dict('records') pos_count = existing_training_data['label'].sum() neg_count = len(existing_training_data) - pos_count else: logger.info(f'No existing labeled examples found, creating new seeds') seeds = [] pos_count = 0 neg_count = 0 existing_ids = set(existing_training_data['_id'].tolist()) if existing_training_data is not None else set() # gold labeler overrides the score column with 1.0 for gold pairs and 0.0 for non-gold pairs if isinstance(labeler, GoldLabeler): gold_pairs = labeler._gold if isinstance(fvs, pd.DataFrame) and (score_column not in fvs.columns or fvs[score_column].isna().all()): is_gold_mask = [tuple(x) in gold_pairs for x in fvs[['id1', 'id2']].to_numpy()] fvs[score_column] = np.where(is_gold_mask, 1.0, 0.0) elif isinstance(fvs, SparkDataFrame) and score_column not in [col.name for col in fvs.schema.fields]: fvs = fvs.withColumn(score_column, F.when(F.struct('id1', 'id2').isin([F.struct(F.lit(p[0]), F.lit(p[1])) for p in gold_pairs]), 1.0) .otherwise(0.0)) if isinstance(fvs, pd.DataFrame): fvs = fvs[fvs[score_column].notna()] fvs_length = len(fvs) if nseeds > len(fvs): raise ValueError("number of seeds would exceed the size of the fvs DataFrame") maybe_pos = fvs.nlargest(fvs_length//2, score_column).iterrows() maybe_neg = fvs.nsmallest(fvs_length//2, score_column).iterrows() elif isinstance(fvs, SparkDataFrame): if isinstance(score_column, str): score_column = F.col(score_column) fvs = fvs.filter((~F.isnan(score_column)) & (score_column.isNotNull())) fvs_length = fvs.count() if nseeds > fvs_length: raise ValueError("number of seeds would exceed the size of the fvs DataFrame") # lowest scoring vectors maybe_neg = fvs.sort(score_column, ascending=True)\ .limit(fvs_length//2)\ .toPandas()\ .iterrows() # highest scoring vectors maybe_pos = fvs.sort(score_column, ascending=False)\ .limit(fvs_length//2)\ .toPandas()\ .iterrows() i = 0 while pos_count + neg_count < nseeds and i < fvs_length: try: _, ex = next(maybe_pos) if pos_count <= neg_count else next(maybe_neg) # Skip if this _id already exists in seeds if ex['_id'] in existing_ids: logger.debug(f'Skipping _id={ex["_id"]}, already exists in seeds') i += 1 continue label = float(labeler(ex['id1'], ex['id2'])) if label == -1.0: # User requested to stop break elif label == 2.0: # User marked as unsure i += 1 continue elif label == 1.0: # Positive match pos_count += 1 else: # label == 0.0, Negative match neg_count += 1 ex['label'] = label seeds.append(ex) new_seed_df = pd.DataFrame([ex]) save_training_data_streaming(new_seed_df, parquet_file_path, logger) existing_ids.add(ex['_id']) except StopIteration: print("Ran out of examples before reaching nseeds") break i += 1 if not seeds: raise RuntimeError("No seeds were labeled before stopping") print(f"seeds: pos_count = {pos_count} neg_count = {neg_count}") if return_pandas: return pd.DataFrame(seeds) else: seeds_clean = [] for seed in seeds: seed_clean = {k: v.tolist() if isinstance(v, np.ndarray) else float(v) if isinstance(v, np.number) else v for k, v in seed.items()} seeds_clean.append(seed_clean) return spark.createDataFrame(seeds_clean)
[docs] def train_matcher( model: MLModel, labeled_data: Union[pd.DataFrame, SparkDataFrame], feature_col: str = "feature_vectors", label_col: str = "label", ) -> MLModel: """Train a matcher model on labeled data. Parameters ---------- model : MLModel An MLModel instance to train labeled_data : pandas DataFrame DataFrame containing the labeled data feature_col : str, default="feature_vectors" Name of the column containing feature vectors label_col : str, default="label" Name of the column containing labels Returns ------- MLModel The trained model """ # the users choices for models: they should either give us a pre-trained model, their own custom MLModel # or, they should specify the necessary params. We should be returning to them the trained_model. # on apply, we should expect to get a trained model. return model.train(labeled_data, feature_col, label_col)
[docs] def apply_matcher( model: MLModel, df: Union[pd.DataFrame, SparkDataFrame], feature_col: str, prediction_col: str, confidence_col: Optional[str] = None, ) -> Union[pd.DataFrame, SparkDataFrame]: """Apply a trained model to make predictions. Parameters ---------- model : MLModel A trained MLModel instance df : pandas DataFrame The DataFrame to make predictions on feature_col : str Name of the column containing feature vectors prediction_col : str Name of the column to store predictions in confidence_col : str, optional Name of the column to store confidence scores in. If provided, both predictions and confidence scores will be computed efficiently in a single pass. Returns ------- Union[pd.DataFrame, SparkDataFrame] The input DataFrame with predictions added (and confidence scores if requested) """ if confidence_col is not None: return model.predict_with_confidence(df, feature_col, prediction_col, confidence_col) else: return model.predict(df, feature_col, prediction_col)
[docs] def label_data( model: MLModel, mode: Literal["batch", "continuous"], labeler: Labeler, fvs: Union[pd.DataFrame, SparkDataFrame], seeds: Optional[Union[pd.DataFrame, SparkDataFrame]] = None, parquet_file_path: str = 'active-matcher-training-data.parquet', **learner_kwargs ) -> Union[pd.DataFrame, SparkDataFrame]: """Generate labeled data using active learning. Parameters ---------- model : MLModel An MLModel instance mode : Literal["batch", "continuous"] Whether to use batch or continuous active learning labeler : Labeler A Labeler instance fvs : pandas DataFrame The data that needs to be labeled seeds : Union[pandas DataFrame, SparkDataFrame], optional Initial labeled examples to start with parquet_file_path : str, default='active-matcher-training-data.parquet' The path to save the labeled data to **learner_kwargs : Additional keyword arguments to pass to the active learner constructor. For batch mode, see EntropyActiveLearner (e.g. batch_size, max_iter). For continuous mode, see ContinuousEntropyActiveLearner (e.g. queue_size, max_labeled, on_demand_stop). Returns ------- Union[pd.DataFrame, SparkDataFrame] DataFrame with ids of potential matches and the corresponding label """ spark = SparkSession.builder.getOrCreate() # Create model and labeler return_pandas = isinstance(fvs, pd.DataFrame) if isinstance(fvs, pd.DataFrame): spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") fvs = spark.createDataFrame(fvs) if seeds is None: seeds = create_seeds(fvs=fvs, nseeds=min(10, fvs.count()), labeler=labeler, score_column='score', parquet_file_path=parquet_file_path if mode == "batch" else None) if isinstance(seeds, SparkDataFrame): seeds = seeds.toPandas() if mode == "batch": learner = EntropyActiveLearner(model, labeler, parquet_file_path=parquet_file_path, **learner_kwargs) elif mode == "continuous": learner = ContinuousEntropyActiveLearner(model, labeler, parquet_file_path=parquet_file_path, **learner_kwargs) else: raise ValueError(f"mode must be either 'batch' or 'continuous', not {mode}") labeled_data = learner.train(fvs, seeds) if return_pandas: return labeled_data else: # Convert arrays to PySpark-compatible format labeled_data = convert_arrays_for_spark(labeled_data) return spark.createDataFrame(labeled_data)
[docs] def label_pairs( labeler: Labeler, pairs: Union[pd.DataFrame, SparkDataFrame] ) -> Union[pd.DataFrame, SparkDataFrame]: """Label pairs without active learning. Parameters ---------- labeler : Labeler A Labeler instance pairs : Union[pd.DataFrame, SparkDataFrame] The pairs to label Returns ------- Union[pd.DataFrame, SparkDataFrame] DataFrame with labeled pairs """ if isinstance(pairs, pd.DataFrame): label = 0 labeled_pairs = [] id1_col, id2_col = pairs.columns[0], pairs.columns[1] for _, row in pairs.iterrows(): label = labeler(row[id1_col], row[id2_col]) if label == -1.0: # -1.0 means the user wants to stop labeling break labeled_pairs.append({ id1_col: row[id1_col], id2_col: row[id2_col], 'label': label }) labeled_pairs = pd.DataFrame(labeled_pairs) elif isinstance(pairs, SparkDataFrame): spark = SparkSession.builder.getOrCreate() labeled_pairs = [] id1_col, id2_col = pairs.columns[0], pairs.columns[1] for row in pairs.collect(): label = labeler(row[id1_col], row[id2_col]) if label == -1.0: # -1.0 means the user wants to stop labeling break labeled_pairs.append({ id1_col: row[id1_col], id2_col: row[id2_col], 'label': float(label) }) if labeled_pairs: labeled_pairs = spark.createDataFrame(labeled_pairs) else: # Create empty DF with the correct schema id1_type = pairs.schema[id1_col].dataType id2_type = pairs.schema[id2_col].dataType schema = StructType([ StructField(id1_col, id1_type, True), StructField(id2_col, id2_type, True), StructField('label', DoubleType(), True) ]) labeled_pairs = spark.createDataFrame([], schema) return labeled_pairs
[docs] def save_features(features, path): """ Save a list of feature objects to disk using pickle serialization. Parameters ---------- features : List[Callable] List of feature objects to save path : str Path where to save the features file Returns ------- None """ path = Path(path) logger.info(f"Saving {len(features)} features to {path}") with open(path, 'wb') as f: pickle.dump(features, f) logger.info(f"Successfully saved features to {path}")
[docs] def load_features(path): """ Load a list of feature objects from disk using pickle deserialization. Parameters ---------- path : str Path to the saved features file Returns ------- List[Callable] List of loaded feature objects """ path = Path(path) logger.info(f"Loading features from {path}") with open(path, 'rb') as f: features = pickle.load(f) logger.info(f"Successfully loaded {len(features)} features from {path}") return features
[docs] def save_dataframe(dataframe, path): """ Save a DataFrame to disk, automatically detecting whether it's a pandas or Spark DataFrame. Parameters ---------- dataframe : Union[pd.DataFrame, pyspark.sql.DataFrame] DataFrame to save (pandas or Spark) path : str Path where to save the DataFrame Returns ------- None """ path = Path(path) if isinstance(dataframe, pd.DataFrame): logger.info(f"Saving pandas DataFrame with shape {dataframe.shape} to {path}") dataframe.to_parquet(path) logger.info(f"Successfully saved pandas DataFrame to {path}") elif isinstance(dataframe, SparkDataFrame): spark = SparkSession.builder.getOrCreate() master = spark.sparkContext.master path_str = str(path) # if we are on a cluster but without a distributed file system, save as pandas if not master.startswith("local") and "://" not in path_str: pdf = dataframe.toPandas() logger.info(f"Saving pandas DataFrame with shape {pdf.shape} to {path_str}") pdf.to_parquet(path_str) logger.info(f"Successfully saved pandas DataFrame to {path_str}") return logger.info(f"Saving Spark DataFrame to {path}") dataframe.write.mode('overwrite').parquet(str(path)) logger.info(f"Successfully saved Spark DataFrame to {path}") else: raise TypeError(f"Unsupported DataFrame type: {type(dataframe)}. " f"Expected pandas.DataFrame or pyspark.sql.DataFrame")
[docs] def load_dataframe(path, df_type): """ Load a DataFrame from disk based on the specified type. Parameters ---------- path : str Path to the saved DataFrame file df_type : str Type of DataFrame to load ('pandas' or 'sparkdf') Returns ------- Union[pd.DataFrame, pyspark.sql.DataFrame] Loaded DataFrame """ path = Path(path) logger.info(f"Loading DataFrame from {path} as {df_type}") if df_type.lower() == 'pandas': dataframe = pd.read_parquet(path) logger.info(f"Successfully loaded pandas DataFrame with shape {dataframe.shape} from {path}") return dataframe elif df_type.lower() == 'sparkdf': spark = SparkSession.builder.getOrCreate() master = spark.sparkContext.master path_str = str(path) # if we are on a cluster but without a distributed file system, load as pandas if not master.startswith("local") and "://" not in path_str: pdf = pd.read_parquet(path_str) logger.info(f"Successfully loaded pandas DataFrame with shape {pdf.shape} from {path_str}") return spark.createDataFrame(pdf) spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") dataframe = spark.read.parquet(str(path)) logger.info(f"Successfully loaded Spark DataFrame from {path}") return dataframe else: raise ValueError(f"Unsupported DataFrame type: {df_type}. " f"Expected 'pandas' or 'sparkdf'")