__author__ = "rolevin"
import os
from typing import List, Optional, Tuple
from synapse.ml.cyber.anomaly.complement_access import ComplementAccessTransformer
from synapse.ml.cyber.feature import indexers, scalers
from synapse.ml.cyber.utils import spark_utils
import numpy as np
from pyspark import SQLContext # noqa
from pyspark.ml import Estimator, Transformer
from pyspark.ml.param.shared import Param, Params
from pyspark.ml.recommendation import ALS
from pyspark.sql import DataFrame, functions as f, types as t
"""
Glossary:
the term 'res' in this package is a shorthand for resource
"""
def _make_dot():
"""
create a method that performs a dot product between two vectors (list of doubles)
:return: the method
"""
@f.udf(t.DoubleType())
def dot(v, u):
if (v is not None) and (u is not None):
vv = (
np.pad(
np.array(v),
(0, len(u) - len(v)),
"constant",
constant_values=1.0,
)
if len(v) < len(u)
else np.array(v)
)
uu = (
np.pad(
np.array(u),
(0, len(v) - len(u)),
"constant",
constant_values=1.0,
)
if len(u) < len(v)
else np.array(u)
)
return float(vv.dot(uu))
else:
return None
return dot
[docs]class AccessAnomalyConfig:
"""
Define default values for AccessAnomaly Params
"""
default_tenant_col = "tenant"
default_user_col = "user"
default_res_col = "res"
default_likelihood_col = "likelihood"
default_output_col = "anomaly_score"
default_rank = 10
default_max_iter = 25
default_reg_param = 1.0
default_num_blocks = None # |tenants| if separate_tenants is False else 10
default_separate_tenants = False
default_low_value = 5.0
default_high_value = 10.0
default_apply_implicit_cf = True
default_alpha = 1.0
default_complementset_factor = 2
default_neg_score = 1.0
class _UserResourceFeatureVectorMapping:
"""
Private class used to pass the mappings as calculated by the AccessAnomaliesEstimator.
An object representing the user and resource models
(mapping from name to latent vector)
and the relevant column names
"""
def __init__(
self,
tenant_col: str,
user_col: str,
user_vec_col: str,
res_col: str,
res_vec_col: str,
history_access_df: Optional[DataFrame],
user2component_mappings_df: Optional[DataFrame],
res2component_mappings_df: Optional[DataFrame],
user_feature_vector_mapping_df: DataFrame,
res_feature_vector_mapping_df: DataFrame,
):
self.tenant_col = tenant_col
self.user_col = user_col
self.user_vec_col = user_vec_col
self.res_col = res_col
self.res_vec_col = res_vec_col
self.history_access_df = history_access_df
self.user2component_mappings_df = user2component_mappings_df
self.res2component_mappings_df = res2component_mappings_df
self.user_feature_vector_mapping_df = user_feature_vector_mapping_df
self.res_feature_vector_mapping_df = res_feature_vector_mapping_df
assert self.history_access_df is None or set(
self.history_access_df.schema.fieldNames(),
) == {tenant_col, user_col, res_col}, self.history_access_df.schema.fieldNames()
def replace_mappings(
self,
user_feature_vector_mapping_df: Optional[DataFrame] = None,
res_feature_vector_mapping_df: Optional[DataFrame] = None,
):
"""
create a new model replacing the user and resource models with new ones (optional)
:param user_feature_vector_mapping_df: optional new user model mapping names to latent vectors
:param res_feature_vector_mapping_df: optional new resource model mapping names to latent vectors
:return:
"""
return _UserResourceFeatureVectorMapping(
self.tenant_col,
self.user_col,
self.user_vec_col,
self.res_col,
self.res_vec_col,
self.history_access_df,
self.user2component_mappings_df,
self.res2component_mappings_df,
user_feature_vector_mapping_df
if user_feature_vector_mapping_df is not None
else self.user_feature_vector_mapping_df,
res_feature_vector_mapping_df
if res_feature_vector_mapping_df is not None
else self.res_feature_vector_mapping_df,
)
def check(self):
"""
check the validity of the model
:return: boolean value where True indicating the verification succeeded
"""
return self._check_user_mapping() and self._check_res_mapping()
def _check_user_mapping(self):
field_map = {
ff.name: ff for ff in self.user_feature_vector_mapping_df.schema.fields
}
assert field_map.get(self.tenant_col) is not None, field_map
assert field_map.get(self.user_col) is not None
return (
self.user_feature_vector_mapping_df.select(self.tenant_col, self.user_col)
.distinct()
.count()
== self.user_feature_vector_mapping_df.count()
)
def _check_res_mapping(self):
field_map = {
ff.name: ff for ff in self.res_feature_vector_mapping_df.schema.fields
}
assert field_map.get(self.tenant_col) is not None, field_map
assert field_map.get(self.res_col) is not None
return (
self.res_feature_vector_mapping_df.select(self.tenant_col, self.res_col)
.distinct()
.count()
== self.res_feature_vector_mapping_df.count()
)
# noinspection PyPep8Naming
[docs]class AccessAnomalyModel(Transformer):
outputCol = Param(
Params._dummy(),
"outputCol",
"The name of the output column representing the calculated anomaly score. "
"Values will be between (-inf, +inf) with an estimated mean of 0.0 and standard deviation of 1.0. ",
)
"""
A pyspark.ml.Transformer model that can predict anomaly scores for user, resource access pairs
"""
def __init__(
self,
userResourceFeatureVectorMapping: _UserResourceFeatureVectorMapping,
outputCol: str,
):
super().__init__()
self.user_res_feature_vector_mapping = userResourceFeatureVectorMapping
has_user2component_mappings = (
self.user_res_feature_vector_mapping.user2component_mappings_df is not None
)
has_res2component_mappings = (
self.user_res_feature_vector_mapping.res2component_mappings_df is not None
)
assert has_user2component_mappings == has_res2component_mappings
self.has_components = has_user2component_mappings and has_res2component_mappings
self.preserve_history = True
if self.has_components:
self._user_mapping_df = (
self.user_res_feature_vector_mapping.user_feature_vector_mapping_df.join(
self.user_res_feature_vector_mapping.user2component_mappings_df,
[self.tenant_col, self.user_col],
)
.select(
self.tenant_col,
self.user_col,
self.user_vec_col,
f.col("component").alias("user_component"),
)
.cache()
)
self._res_mapping_df = (
self.user_res_feature_vector_mapping.res_feature_vector_mapping_df.join(
self.user_res_feature_vector_mapping.res2component_mappings_df,
[self.tenant_col, self.res_col],
)
.select(
self.tenant_col,
self.res_col,
self.res_vec_col,
f.col("component").alias("res_component"),
)
.cache()
)
else:
self._user_mapping_df = (
self.user_res_feature_vector_mapping.user_feature_vector_mapping_df
)
self._res_mapping_df = (
self.user_res_feature_vector_mapping.res_feature_vector_mapping_df
)
spark_utils.ExplainBuilder.build(self, outputCol=outputCol)
@staticmethod
def _metadata_schema() -> t.StructType:
return t.StructType(
[
t.StructField("tenant_col", t.StringType(), False),
t.StructField("user_col", t.StringType(), False),
t.StructField("user_vec_col", t.StringType(), False),
t.StructField("res_col", t.StringType(), False),
t.StructField("res_vec_col", t.StringType(), False),
t.StructField("output_col", t.StringType(), False),
t.StructField("has_history_access_df", t.BooleanType(), False),
t.StructField("has_user2component_mappings_df", t.BooleanType(), False),
t.StructField("has_res2component_mappings_df", t.BooleanType(), False),
t.StructField(
"has_user_feature_vector_mapping_df",
t.BooleanType(),
False,
),
t.StructField(
"has_res_feature_vector_mapping_df",
t.BooleanType(),
False,
),
],
)
[docs] def save(self, path: str, path_suffix: str = "", output_format: str = "parquet"):
dfs = [
self.user_res_feature_vector_mapping.history_access_df,
self.user_res_feature_vector_mapping.user2component_mappings_df,
self.user_res_feature_vector_mapping.res2component_mappings_df,
self.user_res_feature_vector_mapping.user_feature_vector_mapping_df,
self.user_res_feature_vector_mapping.res_feature_vector_mapping_df,
]
adf = next(iter([df for df in dfs if df is not None]))
assert adf is not None
spark = spark_utils.DataFrameUtils.get_spark_session(adf)
metadata_df = spark.createDataFrame(
[
(
self.tenant_col,
self.user_col,
self.user_vec_col,
self.res_col,
self.res_vec_col,
self.output_col,
self.user_res_feature_vector_mapping.history_access_df is not None,
self.user_res_feature_vector_mapping.user2component_mappings_df
is not None,
self.user_res_feature_vector_mapping.res2component_mappings_df
is not None,
self.user_res_feature_vector_mapping.user_feature_vector_mapping_df
is not None,
self.user_res_feature_vector_mapping.res_feature_vector_mapping_df
is not None,
),
],
AccessAnomalyModel._metadata_schema(),
)
metadata_df.write.format(output_format).save(
os.path.join(path, "metadata_df", path_suffix),
)
if self.user_res_feature_vector_mapping.history_access_df is not None:
self.user_res_feature_vector_mapping.history_access_df.write.format(
output_format,
).save(os.path.join(path, "history_access_df", path_suffix))
if self.user_res_feature_vector_mapping.user2component_mappings_df is not None:
self.user_res_feature_vector_mapping.user2component_mappings_df.write.format(
output_format,
).save(
os.path.join(path, "user2component_mappings_df", path_suffix),
)
if self.user_res_feature_vector_mapping.res2component_mappings_df is not None:
self.user_res_feature_vector_mapping.res2component_mappings_df.write.format(
output_format,
).save(os.path.join(path, "res2component_mappings_df", path_suffix))
if (
self.user_res_feature_vector_mapping.user_feature_vector_mapping_df
is not None
):
self.user_res_feature_vector_mapping.user_feature_vector_mapping_df.write.format(
output_format,
).save(
os.path.join(path, "user_feature_vector_mapping_df", path_suffix),
)
if (
self.user_res_feature_vector_mapping.res_feature_vector_mapping_df
is not None
):
self.user_res_feature_vector_mapping.res_feature_vector_mapping_df.write.format(
output_format,
).save(
os.path.join(path, "res_feature_vector_mapping_df", path_suffix),
)
[docs] @staticmethod
def load(
spark: SQLContext,
path: str,
output_format: str = "parquet",
) -> "AccessAnomalyModel":
metadata_df = spark.read.format(output_format).load(
os.path.join(path, "metadata_df"),
)
assert metadata_df.count() == 1
metadata_row = metadata_df.collect()[0]
tenant_col = metadata_row["tenant_col"]
user_col = metadata_row["user_col"]
user_vec_col = metadata_row["user_vec_col"]
res_col = metadata_row["res_col"]
res_vec_col = metadata_row["res_vec_col"]
output_col = metadata_row["output_col"]
has_history_access_df = metadata_row["has_history_access_df"]
has_user2component_mappings_df = metadata_row["has_user2component_mappings_df"]
has_res2component_mappings_df = metadata_row["has_res2component_mappings_df"]
has_user_feature_vector_mapping_df = metadata_row[
"has_user_feature_vector_mapping_df"
]
has_res_feature_vector_mapping_df = metadata_row[
"has_res_feature_vector_mapping_df"
]
history_access_df = (
spark.read.format(output_format).load(
os.path.join(path, "history_access_df"),
)
if has_history_access_df
else None
)
user2component_mappings_df = (
spark.read.format(output_format).load(
os.path.join(path, "user2component_mappings_df"),
)
if has_user2component_mappings_df
else None
)
res2component_mappings_df = (
spark.read.format(output_format).load(
os.path.join(path, "res2component_mappings_df"),
)
if has_res2component_mappings_df
else None
)
user_feature_vector_mapping_df = (
spark.read.format(output_format).load(
os.path.join(path, "user_feature_vector_mapping_df"),
)
if has_user_feature_vector_mapping_df
else None
)
res_feature_vector_mapping_df = (
spark.read.format(output_format).load(
os.path.join(path, "res_feature_vector_mapping_df"),
)
if has_res_feature_vector_mapping_df
else None
)
return AccessAnomalyModel(
_UserResourceFeatureVectorMapping(
tenant_col,
user_col,
user_vec_col,
res_col,
res_vec_col,
history_access_df,
user2component_mappings_df,
res2component_mappings_df,
user_feature_vector_mapping_df,
res_feature_vector_mapping_df,
),
output_col,
)
@property
def tenant_col(self):
return self.user_res_feature_vector_mapping.tenant_col
@property
def user_col(self):
return self.user_res_feature_vector_mapping.user_col
@property
def user_vec_col(self):
return self.user_res_feature_vector_mapping.user_vec_col
@property
def res_col(self):
return self.user_res_feature_vector_mapping.res_col
@property
def res_vec_col(self):
return self.user_res_feature_vector_mapping.res_vec_col
@property
def user_mapping_df(self):
return self._user_mapping_df
@property
def res_mapping_df(self):
return self._res_mapping_df
def _transform(self, df: DataFrame) -> DataFrame:
dot = _make_dot()
tenant_col = self.tenant_col
user_col = self.user_col
user_vec_col = self.user_vec_col
res_col = self.res_col
res_vec_col = self.res_vec_col
output_col = self.output_col
seen_token = "__seen__"
def value_calc():
return f.when(
f.col(seen_token).isNull() | ~f.col(seen_token),
f.when(
f.col(user_vec_col).isNotNull() & f.col(res_vec_col).isNotNull(),
f.when(
f.col("user_component") == f.col("res_component"),
dot(f.col(user_vec_col), f.col(res_vec_col)),
).otherwise(f.lit(float("inf"))),
).otherwise(f.lit(None))
if self.has_components
else f.when(
f.col(user_vec_col).isNotNull() & f.col(res_vec_col).isNotNull(),
dot(f.col(user_vec_col), f.col(res_vec_col)),
).otherwise(f.lit(None)),
).otherwise(f.lit(0.0))
history_access_df = self.user_res_feature_vector_mapping.history_access_df
the_df = (
df.join(
history_access_df.withColumn(seen_token, f.lit(True)),
[tenant_col, user_col, res_col],
how="left",
)
if self.preserve_history and history_access_df is not None
else df.withColumn(seen_token, f.lit(False))
)
user_mapping_df = self.user_mapping_df
res_mapping_df = self.res_mapping_df
return (
the_df.join(user_mapping_df, [tenant_col, user_col], how="left")
.join(res_mapping_df, [tenant_col, res_col], how="left")
.withColumn(output_col, value_calc())
.drop(
user_vec_col,
res_vec_col,
"user_component",
"res_component",
seen_token,
)
)
# noinspection PyPep8Naming
[docs]class ConnectedComponents:
def __init__(
self,
tenantCol: str,
userCol: str,
res_col: str,
componentColName: str = "component",
):
self.tenant_col = tenantCol
self.user_col = userCol
self.res_col = res_col
self.component_col_name = componentColName
# noinspection PyPep8Naming
[docs]class AccessAnomaly(Estimator):
"""
This is the AccessAnomaly, a pyspark.ml.Estimator which
creates the AccessAnomalyModel which is a pyspark.ml.Transformer
"""
tenantCol = Param(
Params._dummy(),
"tenantCol",
"The name of the tenant column. "
"This is a unique identifier used to partition the dataframe into independent "
"groups where the values in each such group are completely isolated from one another. "
"Note: if this column is irrelevant for your data, "
"then just create a tenant column and give it a single value for all rows.",
)
userCol = Param(
Params._dummy(),
"userCol",
"The name of the user column. "
"This is a the name of the user column in the dataframe.",
)
resCol = Param(
Params._dummy(),
"resCol",
"The name of the resource column. "
"This is a the name of the resource column in the dataframe.",
)
likelihoodCol = Param(
Params._dummy(),
"likelihoodCol",
"The name of the column with the likelihood estimate for user, res access "
"(usually based on access counts per time unit). ",
)
outputCol = Param(
Params._dummy(),
"outputCol",
"The name of the output column representing the calculated anomaly score. "
"Values will be between (-inf, +inf) with an estimated mean of 0.0 and standard deviation of 1.0. ",
)
rankParam = Param(
Params._dummy(),
"rankParam",
"rankParam is the number of latent factors in the model (defaults to 10).",
)
maxIter = Param(
Params._dummy(),
"maxIter",
"maxIter is the maximum number of iterations to run (defaults to 25).",
)
regParam = Param(
Params._dummy(),
"regParam",
"regParam specifies the regularization parameter in ALS (defaults to 0.1).",
)
numBlocks = Param(
Params._dummy(),
"numBlocks",
"numBlocks is the number of blocks the users and items will be partitioned into "
"in order to parallelize computation "
"(defaults to |tenants| if separate_tenants is False else 10).",
)
separateTenants = Param(
Params._dummy(),
"separateTenants",
"separateTenants applies the algorithm per tenant in isolation. "
"Setting to True may reduce runtime significantly, if number of tenant is large, "
"but will increase accuracy. (defaults to False).",
)
lowValue = Param(
Params._dummy(),
"lowValue",
"lowValue is used to scale the values of likelihood_col to be in the range [lowValue, highValue] "
"(defaults to 5.0).",
)
highValue = Param(
Params._dummy(),
"highValue",
"highValue is used to scale the values of likelihood_col to be in the range [lowValue, highValue] "
"(defaults to 10.0).",
)
applyImplicitCf = Param(
Params._dummy(),
"applyImplicitCf",
"specifies whether to use the implicit/explicit feedback ALS for the data "
"(defaults to True which means using implicit feedback).",
)
alphaParam = Param(
Params._dummy(),
"alphaParam",
"alphaParam is a parameter applicable to the implicit feedback variant "
"of ALS that governs the baseline confidence in preference observations."
"(defaults to 1.0).",
)
complementsetFactor = Param(
Params._dummy(),
"complementsetFactor",
"complementsetFactor is a parameter applicable to the implicit feedback variant "
"of ALS that governs the baseline confidence in preference observations."
"(defaults to 2).",
)
negScore = Param(
Params._dummy(),
"negScore",
"negScore is a parameter applicable to the explicit feedback variant of ALS that governs "
"the value to assign to the values of the complement set."
"(defaults to 1.0).",
)
historyAccessDf = Param(
Params._dummy(),
"historyAccessDf",
"historyAccessDf is an optional spark dataframe which includes the "
"list of seen user resource pairs for which the anomaly score should be zero.",
)
def __init__(
self,
tenantCol: str = AccessAnomalyConfig.default_tenant_col,
userCol: str = AccessAnomalyConfig.default_user_col,
resCol: str = AccessAnomalyConfig.default_res_col,
likelihoodCol: str = AccessAnomalyConfig.default_likelihood_col,
outputCol: str = AccessAnomalyConfig.default_output_col,
rankParam: int = AccessAnomalyConfig.default_rank,
maxIter: int = AccessAnomalyConfig.default_max_iter,
regParam: float = AccessAnomalyConfig.default_reg_param,
numBlocks: Optional[int] = AccessAnomalyConfig.default_num_blocks,
separateTenants: bool = AccessAnomalyConfig.default_separate_tenants,
lowValue: Optional[float] = AccessAnomalyConfig.default_low_value,
highValue: Optional[float] = AccessAnomalyConfig.default_high_value,
applyImplicitCf: bool = AccessAnomalyConfig.default_apply_implicit_cf,
alphaParam: Optional[float] = None,
complementsetFactor: Optional[int] = None,
negScore: Optional[float] = None,
historyAccessDf: Optional[DataFrame] = None,
):
super().__init__()
if applyImplicitCf:
alphaParam = (
alphaParam
if alphaParam is not None
else AccessAnomalyConfig.default_alpha
)
assert complementsetFactor is None and negScore is None
else:
assert alphaParam is None
complementsetFactor = (
complementsetFactor
if complementsetFactor is not None
else AccessAnomalyConfig.default_complementset_factor
)
negScore = (
negScore
if negScore is not None
else AccessAnomalyConfig.default_neg_score
)
# must either both be None or both be not None
assert (lowValue is None) == (highValue is None)
assert lowValue is None or lowValue >= 1.0
assert (lowValue is None or highValue is None) or highValue > lowValue
assert (lowValue is None or negScore is None) or (
lowValue is not None and negScore < lowValue
)
spark_utils.ExplainBuilder.build(
self,
tenantCol=tenantCol,
userCol=userCol,
resCol=resCol,
likelihoodCol=likelihoodCol,
outputCol=outputCol,
rankParam=rankParam,
maxIter=maxIter,
regParam=regParam,
numBlocks=numBlocks,
separateTenants=separateTenants,
lowValue=lowValue,
highValue=highValue,
applyImplicitCf=applyImplicitCf,
alphaParam=alphaParam,
complementsetFactor=complementsetFactor,
negScore=negScore,
historyAccessDf=historyAccessDf,
)
# --- getters and setters
@property
def indexed_user_col(self):
return self.user_col + "_index"
@property
def user_vec_col(self):
return self.user_col + "_vector"
@property
def indexed_res_col(self):
return self.res_col + "_index"
@property
def res_vec_col(self):
return self.res_col + "_vector"
@property
def scaled_likelihood_col(self):
return self.likelihood_col + "_scaled"
def _get_scaled_df(self, df: DataFrame) -> DataFrame:
return (
scalers.LinearScalarScaler(
input_col=self.likelihood_col,
partition_key=self.tenant_col,
output_col=self.scaled_likelihood_col,
min_required_value=self.low_value,
max_required_value=self.high_value,
)
.fit(df)
.transform(df)
if self.low_value is not None and self.high_value is not None
else df
)
def _enrich_and_normalize(self, indexed_df: DataFrame) -> DataFrame:
tenant_col = self.tenant_col
indexed_user_col = self.indexed_user_col
indexed_res_col = self.indexed_res_col
scaled_likelihood_col = self.scaled_likelihood_col
if not self.apply_implicit_cf:
complementset_factor = self.complementset_factor
neg_score = self.neg_score
assert complementset_factor is not None and neg_score is not None
comp_df = (
ComplementAccessTransformer(
tenant_col,
[indexed_user_col, indexed_res_col],
complementset_factor,
)
.transform(indexed_df)
.withColumn(scaled_likelihood_col, f.lit(neg_score))
)
else:
comp_df = None
scaled_df = self._get_scaled_df(indexed_df).select(
tenant_col,
indexed_user_col,
indexed_res_col,
scaled_likelihood_col,
)
return scaled_df.union(comp_df) if comp_df is not None else scaled_df
def _train_cf(self, als: ALS, df: DataFrame) -> Tuple[DataFrame, DataFrame]:
tenant_col = self.tenant_col
indexed_user_col = self.indexed_user_col
user_vec_col = self.user_vec_col
indexed_res_col = self.indexed_res_col
res_vec_col = self.res_vec_col
spark_model = als.fit(df)
user_mapping_df = (
spark_model.userFactors.select(
f.col("id").alias(indexed_user_col),
f.col("features").alias(user_vec_col),
)
.join(df.select(indexed_user_col, tenant_col).distinct(), indexed_user_col)
.select(tenant_col, indexed_user_col, user_vec_col)
)
res_mapping_df = (
spark_model.itemFactors.select(
f.col("id").alias(indexed_res_col),
f.col("features").alias(res_vec_col),
)
.join(df.select(indexed_res_col, tenant_col).distinct(), indexed_res_col)
.select(tenant_col, indexed_res_col, res_vec_col)
)
return user_mapping_df, res_mapping_df
[docs] def create_spark_model_vectors_df(
self,
df: DataFrame,
) -> _UserResourceFeatureVectorMapping:
tenant_col = self.tenant_col
indexed_user_col = self.indexed_user_col
user_vec_col = self.user_vec_col
indexed_res_col = self.indexed_res_col
res_vec_col = self.res_vec_col
max_iter = self.max_iter
distinct_tenants = df.select(tenant_col).distinct().cache()
num_tenants = distinct_tenants.count()
separate_tenants = self.separate_tenants
num_blocks = (
self.num_blocks
if self.num_blocks is not None
else (num_tenants if not separate_tenants else 10)
)
als = ALS(
rank=self.rank_param,
maxIter=max_iter,
regParam=self.reg_param,
numUserBlocks=num_blocks,
numItemBlocks=num_blocks,
implicitPrefs=self.apply_implicit_cf,
userCol=self.indexed_user_col,
itemCol=self.indexed_res_col,
ratingCol=self.scaled_likelihood_col,
nonnegative=True,
coldStartStrategy="drop",
)
alpha = self.alpha_param
if alpha is not None:
als.setAlpha(alpha)
if separate_tenants:
tenants = [
row[tenant_col]
for row in distinct_tenants.orderBy(tenant_col).collect()
]
user_mapping_df: Optional[DataFrame] = None
res_mapping_df: Optional[DataFrame] = None
for curr_tenant in tenants:
curr_df = df.filter(f.col(tenant_col) == curr_tenant).cache()
curr_user_mapping_df, curr_res_mapping_df = self._train_cf(als, curr_df)
user_mapping_df = (
user_mapping_df.union(curr_user_mapping_df)
if user_mapping_df is not None
else curr_user_mapping_df
)
res_mapping_df = (
res_mapping_df.union(curr_res_mapping_df)
if res_mapping_df is not None
else curr_res_mapping_df
)
else:
user_mapping_df, res_mapping_df = self._train_cf(als, df)
assert user_mapping_df is not None and res_mapping_df is not None
return _UserResourceFeatureVectorMapping(
tenant_col,
indexed_user_col,
user_vec_col,
indexed_res_col,
res_vec_col,
None,
None,
None,
user_mapping_df,
res_mapping_df,
)
def _fit(self, df: DataFrame) -> AccessAnomalyModel:
# index the user and resource columns to allow running the spark ALS algorithm
the_indexer = indexers.MultiIndexer(
indexers=[
indexers.IdIndexer(
input_col=self.user_col,
partition_key=self.tenant_col,
output_col=self.indexed_user_col,
reset_per_partition=self.separate_tenants,
),
indexers.IdIndexer(
input_col=self.res_col,
partition_key=self.tenant_col,
output_col=self.indexed_res_col,
reset_per_partition=self.separate_tenants,
),
],
)
the_indexer_model = the_indexer.fit(df)
# indexed_df is the dataframe with the indices for user and resource
indexed_df = the_indexer_model.transform(df)
enriched_df = self._enrich_and_normalize(indexed_df).cache()
user_res_feature_vector_mapping_df = self.create_spark_model_vectors_df(
enriched_df,
)
user_res_norm_cf_df_model = ModelNormalizeTransformer(
enriched_df,
self.rank_param,
).transform(user_res_feature_vector_mapping_df)
# convert user and resource indices back to names
user_index_model = the_indexer_model.get_model_by_input_col(self.user_col)
res_index_model = the_indexer_model.get_model_by_input_col(self.res_col)
assert user_index_model is not None and res_index_model is not None
norm_user_mapping_df = user_res_norm_cf_df_model.user_feature_vector_mapping_df
norm_res_mapping_df = user_res_norm_cf_df_model.res_feature_vector_mapping_df
indexed_user_col = self.indexed_user_col
indexed_res_col = self.indexed_res_col
# do the actual index to name mapping (using undo_transform)
final_user_mapping_df = user_index_model.undo_transform(
norm_user_mapping_df,
).drop(indexed_user_col)
final_res_mapping_df = res_index_model.undo_transform(norm_res_mapping_df).drop(
indexed_res_col,
)
tenant_col, user_col, res_col = self.tenant_col, self.user_col, self.res_col
history_access_df = self.history_access_df
access_df = (
history_access_df
if history_access_df is not None
else df.select(tenant_col, user_col, res_col).cache()
)
user2component_mappings_df, res2component_mappings_df = ConnectedComponents(
tenant_col,
user_col,
res_col,
).transform(access_df)
return AccessAnomalyModel(
_UserResourceFeatureVectorMapping(
tenant_col=self.tenant_col,
user_col=self.user_col,
user_vec_col=self.user_vec_col,
res_col=self.res_col,
res_vec_col=self.res_vec_col,
history_access_df=history_access_df,
user2component_mappings_df=user2component_mappings_df,
res2component_mappings_df=res2component_mappings_df,
user_feature_vector_mapping_df=final_user_mapping_df.cache(),
res_feature_vector_mapping_df=final_res_mapping_df.cache(),
),
self.output_col,
)