Source code for synapse.ml.cyber.anomaly.complement_access
__author__ = "rolevin"
from typing import List, Optional
from synapse.ml.cyber.utils.spark_utils import DataFrameUtils, ExplainBuilder
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param, Params
from pyspark.sql import DataFrame, functions as f, types as t
import random
[docs]class ComplementAccessTransformer(Transformer):
partitionKey = Param(
Params._dummy(),
"partitionKey",
"The name of the partition_key field name",
)
indexedColNamesArr = Param(
Params._dummy(),
"indexedColNamesArr",
"The name of the fields to use to generate the complement set from",
)
complementsetFactor = Param(
Params._dummy(),
"complementsetFactor",
"The estimated average size of the complement set to generate",
)
"""
Given a dataframe it returns a new dataframe with access patterns sampled from
the set of possible access patterns which did not occur in the given dataframe
(i.e., it returns a sample from the complement set).
"""
def __init__(
self,
partition_key: Optional[str],
indexed_col_names_arr: List[str],
complementset_factor: int,
):
super().__init__()
# we assume here that all indices of the columns are continuous within their partition_key
ExplainBuilder.build(
self,
partitionKey=partition_key,
indexedColNamesArr=indexed_col_names_arr,
complementsetFactor=complementset_factor,
)
@staticmethod
def _min_index_token(curr_col_name: str) -> str:
return "__min_{0}__".format(curr_col_name)
@staticmethod
def _max_index_token(curr_col_name: str) -> str:
return "__max_{0}__".format(curr_col_name)
@staticmethod
def _tuple_token() -> str:
return "__tuple__"
def _transform(self, df: DataFrame) -> DataFrame:
"""generate a dataframe which consists of a sample from the complement set
Parameters
----------
df: a given dataframe containing the columns in 'indexed_col_names_arr'
Returns
-------
dataframe which which consists of a sample from the complement set
"""
complementset_factor = self.complementset_factor
if complementset_factor == 0:
return DataFrameUtils.make_empty(df)
the_partition_key = self.partition_key
indexed_col_names_arr = self.indexed_col_names_arr
if the_partition_key is None:
partition_key = "__dummy_partition_key__"
df = df.withColumn(partition_key, f.lit(0)).cache()
else:
partition_key = the_partition_key
df = df.cache()
limits_dfs = [
df.select(partition_key, curr_col_name)
.distinct()
.groupBy(partition_key)
.agg(
f.min(curr_col_name).alias(
ComplementAccessTransformer._min_index_token(curr_col_name),
),
f.max(curr_col_name).alias(
ComplementAccessTransformer._max_index_token(curr_col_name),
),
)
.orderBy(partition_key)
for curr_col_name in indexed_col_names_arr
]
def make_randint(factor):
schema = t.ArrayType(
t.StructType(
[
t.StructField(curr_col_name, t.IntegerType())
for curr_col_name in indexed_col_names_arr
],
),
)
@f.udf(schema)
def randint(min_index_arr, max_index_arr):
return [
tuple(
[
random.randint(min_index, max_index)
for min_index, max_index in zip(
min_index_arr,
max_index_arr,
)
],
)
for _ in range(factor)
]
return randint
pre_complement_candidates_df = df.cache()
for limits_df in limits_dfs:
pre_complement_candidates_df = pre_complement_candidates_df.join(
limits_df,
partition_key,
).cache()
cols = [f.col(partition_key)] + [
f.col(curr_col_name) for curr_col_name in indexed_col_names_arr
]
randint = make_randint(complementset_factor)
complement_candidates_df = (
pre_complement_candidates_df.withColumn(
ComplementAccessTransformer._tuple_token(),
f.explode(
randint(
f.array(
[
f.col(
ComplementAccessTransformer._min_index_token(
curr_col_name,
),
)
for curr_col_name in indexed_col_names_arr
],
),
f.array(
[
f.col(
ComplementAccessTransformer._max_index_token(
curr_col_name,
),
)
for curr_col_name in indexed_col_names_arr
],
),
),
),
)
.select(
*(
[partition_key]
+ [
f.col(
"{0}.{1}".format(
ComplementAccessTransformer._tuple_token(),
curr_col_name,
),
).alias(curr_col_name)
for curr_col_name in indexed_col_names_arr
]
)
)
.distinct()
.orderBy(*cols)
)
tuples_df = df.select(*cols).distinct().orderBy(*cols)
res_df = (
complement_candidates_df.join(
tuples_df,
[partition_key] + indexed_col_names_arr,
how="left_anti",
)
.select(*cols)
.orderBy(*cols)
)
if the_partition_key is None:
res_df = res_df.drop(partition_key)
return res_df