# Copyright (C) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in project root for information.
from typing import List, Set, Optional, Tuple
import pandas as pd
import random
from synapse.ml.cyber.anomaly.collaborative_filtering import AccessAnomalyConfig
[docs]class DataFactory:
def __init__(
self,
num_hr_users: int = 7,
num_hr_resources: int = 30,
num_fin_users: int = 5,
num_fin_resources: int = 25,
num_eng_users: int = 10,
num_eng_resources: int = 50,
single_component: bool = True,
):
self.hr_users = ["hr_user_" + str(i) for i in range(num_hr_users)]
self.hr_resources = ["hr_res_" + str(i) for i in range(num_hr_resources)]
self.fin_users = ["fin_user_" + str(i) for i in range(num_fin_users)]
self.fin_resources = ["fin_res_" + str(i) for i in range(num_fin_resources)]
self.eng_users = ["eng_user_" + str(i) for i in range(num_eng_users)]
self.eng_resources = ["eng_res_" + str(i) for i in range(num_eng_resources)]
if single_component:
self.join_resources = ["ffa"]
else:
self.join_resources = []
self.rand = random.Random(42)
[docs] def to_pdf(
self,
users: List[str],
resources: List[str],
likelihoods: List[float],
) -> pd.DataFrame:
return pd.DataFrame(
data={
AccessAnomalyConfig.default_user_col: [str(u) for u in users],
AccessAnomalyConfig.default_res_col: [str(r) for r in resources],
AccessAnomalyConfig.default_likelihood_col: [
float(s) for s in likelihoods
],
},
)
[docs] def tups2pdf(self, tup_arr: List[Tuple[str, str, float]]) -> pd.DataFrame:
user_lst = [tup[0] for tup in tup_arr]
res_lst = [tup[1] for tup in tup_arr]
likelihood_lst = [tup[2] for tup in tup_arr]
return self.to_pdf(user_lst, res_lst, likelihood_lst)
[docs] def edges_between(
self,
users: List[str],
resources: List[str],
ratio: float,
full_node_coverage: bool,
not_set: Optional[Set[Tuple[str, str]]] = None,
) -> List[Tuple[str, str, float]]:
import itertools
if len(users) == 0 or len(resources) == 0:
return []
required_edge_cnt = len(users) * len(resources) * ratio
tups = []
seen = set([])
seen_users = set([])
seen_resources = set([])
# optimization for creating dense access patterns (fill all the possible pairs in advance)
cart = (
list(itertools.product(range(len(users)), range(len(resources))))
if ratio >= 0.5
else None
)
while len(tups) < required_edge_cnt or (
full_node_coverage
and (len(seen_users) < len(users))
or (len(seen_resources) < len(resources))
):
if cart is not None:
assert len(cart) > 0, cart
ii = self.rand.randint(0, len(cart) - 1)
ui, ri = cart[ii]
cart[ii] = cart[-1]
cart.pop()
else:
assert len(users) > 0, users
assert len(resources) > 0, resources
ui = self.rand.randint(0, len(users) - 1)
ri = self.rand.randint(0, len(resources) - 1)
user = users[ui]
res = resources[ri]
if ((ui, ri) in seen) or (
(not_set is not None) and ((user, res) in not_set)
):
continue
seen.add((ui, ri))
seen_users.add(ui)
seen_resources.add(ri)
assert users[ui] is not None
assert resources[ri] is not None
score = self.rand.randint(500, 1000)
tups.append((user, res, score))
return tups
[docs] def create_clustered_training_data(self, ratio: float = 0.25):
return self.tups2pdf(
self.edges_between(self.hr_users, self.join_resources, 1.0, True)
+ self.edges_between(self.fin_users, self.join_resources, 1.0, True)
+ self.edges_between(self.eng_users, self.join_resources, 1.0, True)
+ self.edges_between(self.hr_users, self.hr_resources, ratio, True)
+ self.edges_between(self.fin_users, self.fin_resources, ratio, True)
+ self.edges_between(self.eng_users, self.eng_resources, ratio, True),
)
[docs] def create_clustered_intra_test_data(
self,
train: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
not_set = (
set(
[
(
row[AccessAnomalyConfig.default_user_col],
row[AccessAnomalyConfig.default_res_col],
)
for _, row in train.iterrows()
],
)
if train is not None
else None
)
return self.tups2pdf(
self.edges_between(self.hr_users, self.join_resources, 1.0, True)
+ self.edges_between(self.fin_users, self.join_resources, 1.0, True)
+ self.edges_between(self.eng_users, self.join_resources, 1.0, True)
+ self.edges_between(
self.hr_users,
self.hr_resources,
0.025,
False,
not_set,
)
+ self.edges_between(
self.fin_users,
self.fin_resources,
0.05,
False,
not_set,
)
+ self.edges_between(
self.eng_users,
self.eng_resources,
0.035,
False,
not_set,
),
)
[docs] def create_clustered_inter_test_data(self) -> pd.DataFrame:
return self.tups2pdf(
self.edges_between(self.hr_users, self.join_resources, 1.0, True)
+ self.edges_between(self.fin_users, self.join_resources, 1.0, True)
+ self.edges_between(self.eng_users, self.join_resources, 1.0, True)
+ self.edges_between(self.hr_users, self.fin_resources, 0.025, False)
+ self.edges_between(self.hr_users, self.eng_resources, 0.025, False)
+ self.edges_between(self.fin_users, self.hr_resources, 0.05, False)
+ self.edges_between(self.fin_users, self.eng_resources, 0.05, False)
+ self.edges_between(self.eng_users, self.fin_resources, 0.035, False)
+ self.edges_between(self.eng_users, self.hr_resources, 0.035, False),
)
[docs] def create_fixed_training_data(self) -> pd.DataFrame:
users = [
6,
2,
8,
6,
7,
8,
2,
8,
10,
3,
11,
6,
7,
3,
2,
10,
11,
1,
8,
4,
9,
3,
5,
6,
7,
]
resources = [
2,
8,
2,
8,
2,
6,
4,
1,
5,
5,
6,
1,
6,
7,
6,
3,
3,
4,
4,
8,
2,
1,
7,
7,
5,
]
likelihoods = [
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
46.77678298950195,
35.94552993774414,
36.470367431640625,
33.59884262084961,
10.0,
43.84599304199219,
14.908903121948242,
10.0,
19.817806243896484,
21.398120880126953,
14.908903121948242,
]
return self.to_pdf(users, resources, likelihoods)