Source code for synapse.ml.cyber.anomaly.complement_access

__author__ = "rolevin"

from typing import List, Optional

from synapse.ml.cyber.utils.spark_utils import DataFrameUtils, ExplainBuilder

from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param, Params
from pyspark.sql import DataFrame, functions as f, types as t
import random


[docs]class ComplementAccessTransformer(Transformer): partitionKey = Param( Params._dummy(), "partitionKey", "The name of the partition_key field name", ) indexedColNamesArr = Param( Params._dummy(), "indexedColNamesArr", "The name of the fields to use to generate the complement set from", ) complementsetFactor = Param( Params._dummy(), "complementsetFactor", "The estimated average size of the complement set to generate", ) """ Given a dataframe it returns a new dataframe with access patterns sampled from the set of possible access patterns which did not occur in the given dataframe (i.e., it returns a sample from the complement set). """ def __init__( self, partition_key: Optional[str], indexed_col_names_arr: List[str], complementset_factor: int, ): super().__init__() # we assume here that all indices of the columns are continuous within their partition_key ExplainBuilder.build( self, partitionKey=partition_key, indexedColNamesArr=indexed_col_names_arr, complementsetFactor=complementset_factor, ) @staticmethod def _min_index_token(curr_col_name: str) -> str: return "__min_{0}__".format(curr_col_name) @staticmethod def _max_index_token(curr_col_name: str) -> str: return "__max_{0}__".format(curr_col_name) @staticmethod def _tuple_token() -> str: return "__tuple__" def _transform(self, df: DataFrame) -> DataFrame: """generate a dataframe which consists of a sample from the complement set Parameters ---------- df: a given dataframe containing the columns in 'indexed_col_names_arr' Returns ------- dataframe which which consists of a sample from the complement set """ complementset_factor = self.complementset_factor if complementset_factor == 0: return DataFrameUtils.make_empty(df) the_partition_key = self.partition_key indexed_col_names_arr = self.indexed_col_names_arr if the_partition_key is None: partition_key = "__dummy_partition_key__" df = df.withColumn(partition_key, f.lit(0)).cache() else: partition_key = the_partition_key df = df.cache() limits_dfs = [ df.select(partition_key, curr_col_name) .distinct() .groupBy(partition_key) .agg( f.min(curr_col_name).alias( ComplementAccessTransformer._min_index_token(curr_col_name), ), f.max(curr_col_name).alias( ComplementAccessTransformer._max_index_token(curr_col_name), ), ) .orderBy(partition_key) for curr_col_name in indexed_col_names_arr ] def make_randint(factor): schema = t.ArrayType( t.StructType( [ t.StructField(curr_col_name, t.IntegerType()) for curr_col_name in indexed_col_names_arr ], ), ) @f.udf(schema) def randint(min_index_arr, max_index_arr): return [ tuple( [ random.randint(min_index, max_index) for min_index, max_index in zip( min_index_arr, max_index_arr, ) ], ) for _ in range(factor) ] return randint pre_complement_candidates_df = df.cache() for limits_df in limits_dfs: pre_complement_candidates_df = pre_complement_candidates_df.join( limits_df, partition_key, ).cache() cols = [f.col(partition_key)] + [ f.col(curr_col_name) for curr_col_name in indexed_col_names_arr ] randint = make_randint(complementset_factor) complement_candidates_df = ( pre_complement_candidates_df.withColumn( ComplementAccessTransformer._tuple_token(), f.explode( randint( f.array( [ f.col( ComplementAccessTransformer._min_index_token( curr_col_name, ), ) for curr_col_name in indexed_col_names_arr ], ), f.array( [ f.col( ComplementAccessTransformer._max_index_token( curr_col_name, ), ) for curr_col_name in indexed_col_names_arr ], ), ), ), ) .select( *( [partition_key] + [ f.col( "{0}.{1}".format( ComplementAccessTransformer._tuple_token(), curr_col_name, ), ).alias(curr_col_name) for curr_col_name in indexed_col_names_arr ] ) ) .distinct() .orderBy(*cols) ) tuples_df = df.select(*cols).distinct().orderBy(*cols) res_df = ( complement_candidates_df.join( tuples_df, [partition_key] + indexed_col_names_arr, how="left_anti", ) .select(*cols) .orderBy(*cols) ) if the_partition_key is None: res_df = res_df.drop(partition_key) return res_df