Source code for synapse.ml.cyber.feature.scalers

__author__ = "rolevin"

from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Union

from synapse.ml.cyber.utils.spark_utils import (
    ExplainBuilder,
    HasSetInputCol,
    HasSetOutputCol,
)

from pyspark.ml import Estimator, Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.sql import DataFrame, functions as f, types as t


def _pyudf(func, use_pandas):
    from pyspark.sql.functions import pandas_udf, PandasUDFType, udf

    return (
        pandas_udf(func, t.DoubleType(), PandasUDFType.SCALAR)
        if use_pandas
        else udf(func, t.DoubleType())
    )


[docs]class PerPartitionScalarScalerModel(ABC, Transformer, HasSetInputCol, HasSetOutputCol): partitionKey = Param( Params._dummy(), "partitionKey", "The name of the column to partition by, i.e., scale the values of inputCol within each partition. ", ) def __init__( self, input_col: str, partition_key: Optional[str], output_col: str, per_group_stats: Union[DataFrame, Dict[str, float]], use_pandas: bool = True, ): super().__init__() ExplainBuilder.build( self, inputCol=input_col, partitionKey=partition_key, outputCol=output_col, ) self._per_group_stats = per_group_stats self._use_pandas = use_pandas @property def per_group_stats(self): return self._per_group_stats @property def use_pandas(self): return self._use_pandas @abstractmethod def _make_partitioned_stats_method(self) -> Callable: raise NotImplementedError @abstractmethod def _make_unpartitioned_stats_method(self) -> Callable: raise NotImplementedError
[docs] def is_partitioned(self) -> bool: if self.partition_key is not None or isinstance( self.per_group_stats, DataFrame, ): assert self.partition_key is not None and isinstance( self.per_group_stats, DataFrame, ) res = True elif self.partition_key is None or isinstance(self.per_group_stats, Dict): assert self.partition_key is None and isinstance(self.per_group_stats, Dict) res = False else: assert False, "unsupported type for per_group_stats: {0}".format( type(self.per_group_stats), ) return res
def _make_stats_method(self) -> Callable: return ( self._make_partitioned_stats_method() if self.is_partitioned() else _pyudf(self._make_unpartitioned_stats_method(), self.use_pandas) ) def _transform(self, df: DataFrame) -> DataFrame: stats_method = self._make_stats_method() input_col = self.input_col output_col = self.output_col if self.is_partitioned(): partition_key = self.partition_key per_group_stats = self.per_group_stats with_stats_df = df.join(per_group_stats, partition_key, how="left") else: with_stats_df = df return with_stats_df.withColumn(output_col, stats_method(f.col(input_col)))
[docs]class PerPartitionScalarScalerEstimator( ABC, Estimator, HasSetInputCol, HasSetOutputCol, ): partitionKey = Param( Params._dummy(), "partitionKey", "The name of the column to partition by, i.e., scale the values of inputCol within each partition. ", ) def __init__( self, input_col: str, partition_key: Optional[str], output_col: str, use_pandas: bool = True, ): super().__init__() ExplainBuilder.build( self, inputCol=input_col, partitionKey=partition_key, outputCol=output_col, ) self._use_pandas = use_pandas @property def use_pandas(self): return self._use_pandas @abstractmethod def _apply_on_cols(self) -> List[Callable]: raise NotImplementedError() @abstractmethod def _create_model( self, per_group_stats: Union[DataFrame, Dict[str, float]], ) -> PerPartitionScalarScalerModel: raise NotImplementedError() def _fit(self, df: DataFrame) -> PerPartitionScalarScalerModel: partition_key = self.partition_key apply_on_cols = self._apply_on_cols() if partition_key is None: rows = df.select(*apply_on_cols).collect() assert len(rows) == 1 per_group_stats = rows[0].asDict() else: per_group_stats = df.groupBy(partition_key).agg(*apply_on_cols) assert per_group_stats is not None return self._create_model(per_group_stats)
[docs]class StandardScalarScalerConfig: """ The tokens to use for temporary representation of mean and standard deviation """ mean_token = "__mean__" std_token = "__std__"
[docs]class StandardScalarScalerModel(PerPartitionScalarScalerModel): coefficientFactor = Param( Params._dummy(), "coefficientFactor", "After scaling values of outputCol are multiplied by coefficient (defaults to 1.0). ", ) def __init__( self, input_col: str, partition_key: Optional[str], output_col: str, per_group_stats: Union[DataFrame, Dict[str, float]], coefficient_factor: float = 1.0, use_pandas: bool = True, ): super().__init__( input_col, partition_key, output_col, per_group_stats, use_pandas, ) self.coefficient_factor = coefficient_factor def _make_partitioned_stats_method(self) -> Callable: assert isinstance(self.per_group_stats, DataFrame) coefficient_factor = self.coefficient_factor def norm(x): return f.when( f.col(StandardScalarScalerConfig.std_token) != 0.0, f.lit(coefficient_factor) * ( (x - f.col(StandardScalarScalerConfig.mean_token)) / f.col(StandardScalarScalerConfig.std_token) ), ).otherwise(x - f.col(StandardScalarScalerConfig.mean_token)) return norm def _make_unpartitioned_stats_method(self) -> Callable: assert isinstance(self.per_group_stats, dict) mean = self.per_group_stats[StandardScalarScalerConfig.mean_token] std = self.per_group_stats[StandardScalarScalerConfig.std_token] coefficient_factor = self.coefficient_factor assert mean is not None assert std is not None def norm(x): return coefficient_factor * (x - mean) / std return norm
[docs]class StandardScalarScaler(PerPartitionScalarScalerEstimator): coefficientFactor = Param( Params._dummy(), "coefficientFactor", "After scaling values of outputCol are multiplied by coefficient (defaults to 1.0). ", ) def __init__( self, input_col: str, partition_key: Optional[str], output_col: str, coefficient_factor: float = 1.0, use_pandas: bool = True, ): super().__init__(input_col, partition_key, output_col, use_pandas) self.coefficient_factor = coefficient_factor def _apply_on_cols(self) -> List[Callable]: input_col = self.input_col return [ f.mean(f.col(input_col)).alias(StandardScalarScalerConfig.mean_token), f.stddev_pop(f.col(input_col)).alias(StandardScalarScalerConfig.std_token), ] def _create_model( self, per_group_stats: Union[DataFrame, Dict[str, float]], ) -> PerPartitionScalarScalerModel: return StandardScalarScalerModel( self.input_col, self.partition_key, self.output_col, per_group_stats, self.coefficient_factor, self.use_pandas, )
[docs]class LinearScalarScalerConfig: min_actual_value_token = "__min_actual_value__" max_actual_value_token = "__max_actual_value__"
[docs]class LinearScalarScalerModel(PerPartitionScalarScalerModel): def __init__( self, input_col: str, partition_key: Optional[str], output_col: str, per_group_stats: Union[DataFrame, Dict[str, float]], min_required_value: float, max_required_value: float, use_pandas: bool = True, ): super().__init__( input_col, partition_key, output_col, per_group_stats, use_pandas, ) self.min_required_value = min_required_value self.max_required_value = max_required_value def _make_partitioned_stats_method(self) -> Callable: req_delta = self.max_required_value - self.min_required_value def actual_delta(): return f.col(LinearScalarScalerConfig.max_actual_value_token) - f.col( LinearScalarScalerConfig.min_actual_value_token, ) def a(): return f.when( actual_delta() != f.lit(0), f.lit(req_delta) / actual_delta(), ).otherwise(f.lit(0.0)) def b(): return f.when( actual_delta() != f.lit(0), self.max_required_value - a() * f.col(LinearScalarScalerConfig.max_actual_value_token), ).otherwise( f.lit((self.min_required_value + self.max_required_value) / 2.0), ) def norm(x): return a() * x + b() return norm def _make_unpartitioned_stats_method(self) -> Callable: assert isinstance(self.per_group_stats, Dict) min_actual_value = self.per_group_stats[ LinearScalarScalerConfig.min_actual_value_token ] max_actual_value = self.per_group_stats[ LinearScalarScalerConfig.max_actual_value_token ] delta = max_actual_value - min_actual_value a = ( (self.max_required_value - self.min_required_value) / delta if delta != 0.0 else 0.0 ) b = ( self.max_required_value - a * max_actual_value if delta != 0.0 else (self.min_required_value + self.max_required_value) / 2.0 ) def norm(x): return a * x + b return norm
[docs]class LinearScalarScaler(PerPartitionScalarScalerEstimator): minRequiredValue = Param( Params._dummy(), "minRequiredValue", "Scale the outputCol to have a value between [minRequiredValue, maxRequiredValue].", ) maxRequiredValue = Param( Params._dummy(), "maxRequiredValue", "Scale the outputCol to have a value between [minRequiredValue, maxRequiredValue].", ) def __init__( self, input_col: str, partition_key: Optional[str], output_col: str, min_required_value: float = 0.0, max_required_value: float = 1.0, use_pandas: bool = True, ): super().__init__(input_col, partition_key, output_col, use_pandas) self.min_required_value = min_required_value self.max_required_value = max_required_value def _apply_on_cols(self) -> List[Callable]: input_col = self.input_col return [ f.min(f.col(input_col)).alias( LinearScalarScalerConfig.min_actual_value_token, ), f.max(f.col(input_col)).alias( LinearScalarScalerConfig.max_actual_value_token, ), ] def _create_model( self, per_group_stats: Union[DataFrame, Dict[str, float]], ) -> PerPartitionScalarScalerModel: return LinearScalarScalerModel( self.input_col, self.partition_key, self.output_col, per_group_stats, self.min_required_value, self.max_required_value, self.use_pandas, )