"""
Tokenizer base class and implementations.
This module is part of the internal implementation and should not be imported directly.
Use the public API in the root package instead.
"""
from abc import abstractmethod, ABC
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Column
from typing import Iterator
import re
import numpy as np
from ..utils import get_logger
log = get_logger(__name__)
[docs]
class Tokenizer(ABC):
def __str__(self):
return self.NAME
[docs]
def tokenize_spark(self, input_col : Column):
'''
return a column expression that gives the same output
as the tokenize method. required for effeciency when building metadata for
certain methods
'''
# spark treats whitespace differently than str.split
# so make a udf to keep tokenization consistent
@F.pandas_udf(T.ArrayType(T.StringType()))
def t(itr : Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in itr:
yield s.apply(self.tokenize)
return t(input_col)
[docs]
@abstractmethod
def tokenize(self, s):
'''
convert the string into a BAG of tokens (tokens should not be deduped)
'''
pass
[docs]
def out_col_name(self, input_col):
'''
the name of the output column from the tokenizer
e.g. for a 3gram tokenizer, the tokens from the name columns could
be "3gram(name)"
'''
return f'{str(self)}({input_col})'
[docs]
def tokenize_set(self, s):
'''
tokenize the string and return a set or None if the tokenize returns None
'''
r = self.tokenize(s)
return set(r) if r is not None else None
def __eq__(self, o):
return isinstance(o, type(self)) and self.NAME == o.NAME
class StrippedWhiteSpaceTokenizer(Tokenizer):
WHITESPACE_NORM = re.compile(r'[\s]+')
RE = re.compile('[^a-z0-9 ]+')
NAME='stripped_whitespace_tokens'
def __init__(self):
pass
def tokenize(self, s):
if isinstance(s, str):
s = self.WHITESPACE_NORM.sub(' ', s).lower()
s = self.RE.sub('', s)
return s.split()
else:
return None
class ShingleTokenizer(Tokenizer):
base_tokenize = StrippedWhiteSpaceTokenizer().tokenize
def __init__(self, n):
self._n = n
self.NAME = f'{self._n}shingle_tokens'
def tokenize(self, s : str) -> list:
single_toks = self.base_tokenize(s)
if single_toks is None:
return None
if len(single_toks) < self._n:
return []
offsets = [0] + np.cumsum(list(map(len, single_toks))).tolist()
slices = zip(offsets[:len(single_toks) - self._n], offsets[self._n:])
combined = ''.join(single_toks)
return [combined[s:e] for s,e in slices]
class WhiteSpaceTokenizer(Tokenizer):
NAME='whitespace_tokens'
def __init__(self):
pass
def tokenize(self, s):
return s.lower().split() if isinstance(s, str) else None
class NumericTokenizer(Tokenizer):
NAME = 'num_tokens'
def __init__(self):
self._re = re.compile('[0-9]+')
def tokenize(self, s):
return self._re.findall(s) if isinstance(s, str) else None
class AlphaNumericTokenizer(Tokenizer):
# TODO drop short tokens and stop words?
# stopword removal didn't improve accuracy
#STOP_WORDS = set(stopwords.words('english'))
NAME = 'alnum_tokens'
def __init__(self):
self._re = re.compile('[a-z0-9]+')
def tokenize(self, s):
if not isinstance(s, str):
return None
else:
return self._re.findall(s.lower())
class QGramTokenizer(Tokenizer):
def __init__(self, n):
self._q = n
self.NAME = f'{self._q}gram_tokens'
def tokenize(self, s : str) -> list:
if not isinstance(s, str):
return None
if len(s) < self._q:
return []
s = s.lower()
# TODO can this be optimized?
return [s[i:i+self._q] for i in range(len(s) - self._q + 1)]
class StrippedQGramTokenizer(Tokenizer):
RE = re.compile('\\W+')
def __init__(self, n):
self._q = n
self.NAME = f'stripped_{self._q}gram_tokens'
def _preproc(self, s : str) -> str:
# strip all non-word chars
return self.RE.sub('', s)
def tokenize(self, s : str) -> list:
if not isinstance(s, str):
return None
s = self._preproc(s).lower()
if len(s) < self._q:
return []
# TODO can this be optimized?
return [s[i:i+self._q] for i in range(len(s) - self._q + 1)]