__author__ = 'rolevin'
from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Union
from synapse.ml.cyber.utils.spark_utils import ExplainBuilder, HasSetInputCol, HasSetOutputCol
from pyspark.ml import Estimator, Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.sql import DataFrame, functions as f, types as t
def _pyudf(func, use_pandas):
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
return pandas_udf(func, t.DoubleType(), PandasUDFType.SCALAR) if use_pandas else udf(func, t.DoubleType())
[docs]class PerPartitionScalarScalerModel(ABC, Transformer, HasSetInputCol, HasSetOutputCol):
partitionKey = Param(
Params._dummy(),
"partitionKey",
"The name of the column to partition by, i.e., scale the values of inputCol within each partition. "
)
def __init__(self,
input_col: str,
partition_key: Optional[str],
output_col: str,
per_group_stats: Union[DataFrame, Dict[str, float]],
use_pandas: bool = True):
super().__init__()
ExplainBuilder.build(self, inputCol=input_col, partitionKey=partition_key, outputCol=output_col)
self._per_group_stats = per_group_stats
self._use_pandas = use_pandas
@property
def per_group_stats(self):
return self._per_group_stats
@property
def use_pandas(self):
return self._use_pandas
@abstractmethod
def _make_partitioned_stats_method(self) -> Callable:
raise NotImplementedError
@abstractmethod
def _make_unpartitioned_stats_method(self) -> Callable:
raise NotImplementedError
[docs] def is_partitioned(self) -> bool:
if self.partition_key is not None or isinstance(self.per_group_stats, DataFrame):
assert self.partition_key is not None and isinstance(self.per_group_stats, DataFrame)
res = True
elif self.partition_key is None or isinstance(self.per_group_stats, Dict):
assert self.partition_key is None and isinstance(self.per_group_stats, Dict)
res = False
else:
assert False, 'unsupported type for per_group_stats: {0}'.format(type(self.per_group_stats))
return res
def _make_stats_method(self) -> Callable:
return self._make_partitioned_stats_method() if self.is_partitioned() else _pyudf(
self._make_unpartitioned_stats_method(), self.use_pandas
)
def _transform(self, df: DataFrame) -> DataFrame:
stats_method = self._make_stats_method()
input_col = self.input_col
output_col = self.output_col
if self.is_partitioned():
partition_key = self.partition_key
per_group_stats = self.per_group_stats
with_stats_df = df.join(per_group_stats, partition_key, how='left')
else:
with_stats_df = df
return with_stats_df.withColumn(output_col, stats_method(f.col(input_col)))
[docs]class PerPartitionScalarScalerEstimator(ABC, Estimator, HasSetInputCol, HasSetOutputCol):
partitionKey = Param(
Params._dummy(),
"partitionKey",
"The name of the column to partition by, i.e., scale the values of inputCol within each partition. "
)
def __init__(self,
input_col: str,
partition_key: Optional[str],
output_col: str,
use_pandas: bool = True):
super().__init__()
ExplainBuilder.build(self, inputCol=input_col, partitionKey=partition_key, outputCol=output_col)
self._use_pandas = use_pandas
@property
def use_pandas(self):
return self._use_pandas
@abstractmethod
def _apply_on_cols(self) -> List[Callable]:
raise NotImplementedError()
@abstractmethod
def _create_model(self, per_group_stats: Union[DataFrame, Dict[str, float]]) -> PerPartitionScalarScalerModel:
raise NotImplementedError()
def _fit(self, df: DataFrame) -> PerPartitionScalarScalerModel:
partition_key = self.partition_key
apply_on_cols = self._apply_on_cols()
if partition_key is None:
rows = df.select(*apply_on_cols).collect()
assert len(rows) == 1
per_group_stats = rows[0].asDict()
else:
per_group_stats = df.groupBy(partition_key).agg(*apply_on_cols)
assert per_group_stats is not None
return self._create_model(per_group_stats)
[docs]class StandardScalarScalerConfig:
"""
The tokens to use for temporary representation of mean and standard deviation
"""
mean_token = '__mean__'
std_token = '__std__'
[docs]class StandardScalarScalerModel(PerPartitionScalarScalerModel):
coefficientFactor = Param(
Params._dummy(),
"coefficientFactor",
"After scaling values of outputCol are multiplied by coefficient (defaults to 1.0). "
)
def __init__(self,
input_col: str,
partition_key: Optional[str],
output_col: str,
per_group_stats: Union[DataFrame, Dict[str, float]],
coefficient_factor: float = 1.0,
use_pandas: bool = True):
super().__init__(input_col, partition_key, output_col, per_group_stats, use_pandas)
self.coefficient_factor = coefficient_factor
def _make_partitioned_stats_method(self) -> Callable:
assert isinstance(self.per_group_stats, DataFrame)
coefficient_factor = self.coefficient_factor
def norm(x):
return f.when(f.col(StandardScalarScalerConfig.std_token) != 0.0,
f.lit(coefficient_factor) * ((x - f.col(
StandardScalarScalerConfig.mean_token
)) / f.col(
StandardScalarScalerConfig.std_token
))).otherwise(x - f.col(StandardScalarScalerConfig.mean_token))
return norm
def _make_unpartitioned_stats_method(self) -> Callable:
assert isinstance(self.per_group_stats, dict)
mean = self.per_group_stats[StandardScalarScalerConfig.mean_token]
std = self.per_group_stats[StandardScalarScalerConfig.std_token]
coefficient_factor = self.coefficient_factor
assert mean is not None
assert std is not None
def norm(x):
return coefficient_factor * (x - mean) / std
return norm
[docs]class StandardScalarScaler(PerPartitionScalarScalerEstimator):
coefficientFactor = Param(
Params._dummy(),
"coefficientFactor",
"After scaling values of outputCol are multiplied by coefficient (defaults to 1.0). "
)
def __init__(self,
input_col: str,
partition_key: Optional[str],
output_col: str,
coefficient_factor: float = 1.0,
use_pandas: bool = True):
super().__init__(input_col, partition_key, output_col, use_pandas)
self.coefficient_factor = coefficient_factor
def _apply_on_cols(self) -> List[Callable]:
input_col = self.input_col
return [
f.mean(f.col(input_col)).alias(StandardScalarScalerConfig.mean_token),
f.stddev_pop(f.col(input_col)).alias(StandardScalarScalerConfig.std_token)
]
def _create_model(self, per_group_stats: Union[DataFrame, Dict[str, float]]) -> PerPartitionScalarScalerModel:
return StandardScalarScalerModel(
self.input_col,
self.partition_key,
self.output_col,
per_group_stats,
self.coefficient_factor,
self.use_pandas
)
[docs]class LinearScalarScalerConfig:
min_actual_value_token = '__min_actual_value__'
max_actual_value_token = '__max_actual_value__'
[docs]class LinearScalarScalerModel(PerPartitionScalarScalerModel):
def __init__(self,
input_col: str,
partition_key: Optional[str],
output_col: str,
per_group_stats: Union[DataFrame, Dict[str, float]],
min_required_value: float,
max_required_value: float,
use_pandas: bool = True):
super().__init__(input_col, partition_key, output_col, per_group_stats, use_pandas)
self.min_required_value = min_required_value
self.max_required_value = max_required_value
def _make_partitioned_stats_method(self) -> Callable:
req_delta = self.max_required_value - self.min_required_value
def actual_delta():
return f.col(
LinearScalarScalerConfig.max_actual_value_token
) - f.col(
LinearScalarScalerConfig.min_actual_value_token
)
def a():
return f.when(actual_delta() != f.lit(0), f.lit(req_delta) / actual_delta()).otherwise(f.lit(0.0))
def b():
return f.when(actual_delta() != f.lit(0),
self.max_required_value - a() * f.col(
LinearScalarScalerConfig.max_actual_value_token)
).otherwise(f.lit((self.min_required_value + self.max_required_value) / 2.0))
def norm(x):
return a() * x + b()
return norm
def _make_unpartitioned_stats_method(self) -> Callable:
assert isinstance(self.per_group_stats, Dict)
min_actual_value = self.per_group_stats[LinearScalarScalerConfig.min_actual_value_token]
max_actual_value = self.per_group_stats[LinearScalarScalerConfig.max_actual_value_token]
delta = max_actual_value - min_actual_value
a = (self.max_required_value - self.min_required_value) / delta if delta != 0.0 else 0.0
b = self.max_required_value - a * max_actual_value if delta != 0.0 else \
(self.min_required_value + self.max_required_value) / 2.0
def norm(x):
return a * x + b
return norm
[docs]class LinearScalarScaler(PerPartitionScalarScalerEstimator):
minRequiredValue = Param(
Params._dummy(),
"minRequiredValue",
"Scale the outputCol to have a value between [minRequiredValue, maxRequiredValue]."
)
maxRequiredValue = Param(
Params._dummy(),
"maxRequiredValue",
"Scale the outputCol to have a value between [minRequiredValue, maxRequiredValue]."
)
def __init__(self,
input_col: str,
partition_key: Optional[str],
output_col: str,
min_required_value: float = 0.0,
max_required_value: float = 1.0,
use_pandas: bool = True):
super().__init__(input_col, partition_key, output_col, use_pandas)
self.min_required_value = min_required_value
self.max_required_value = max_required_value
def _apply_on_cols(self) -> List[Callable]:
input_col = self.input_col
return [
f.min(f.col(input_col)).alias(LinearScalarScalerConfig.min_actual_value_token),
f.max(f.col(input_col)).alias(LinearScalarScalerConfig.max_actual_value_token)
]
def _create_model(self, per_group_stats: Union[DataFrame, Dict[str, float]]) -> PerPartitionScalarScalerModel:
return LinearScalarScalerModel(
self.input_col,
self.partition_key,
self.output_col,
per_group_stats,
self.min_required_value,
self.max_required_value,
self.use_pandas
)