-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPredicate.py
192 lines (156 loc) · 7.28 KB
/
Predicate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import re
import pandas as pd
class Predicate:
def __init__(self, predicate, params=[]):
self.predicate = predicate
self.params = params
def __str__(self):
return str(self.predicate + "(" + ", ".join(map(str, self.params)) + ")")
def getTitle(self):
return self.predicate
def eval(self, df):
return False
def print(self):
print( f"Predicate ({type(self)}) name : ", self.predicate,"\nPredicate contents : ", self.params)
# Todo : Ajouter les classes qui héritent de Predicate pour les prédicats avec agrégation
# et les prédicats de comparaison.
class AggregationPredicate(Predicate): # count, sum, avg, min, max.
def __init__(self, predicate, params=[]):
super().__init__(predicate, params)
def parseColumn(self, df, head_variables=[]):
columnGroup = self.params[:-2]
column = self.params[-2]
outputColumn = self.params[-1]
column = column if (column in head_variables) else f"#{column}#"
outputColumn = outputColumn if (outputColumn in head_variables) else f"#{outputColumn}#"
for i, param in enumerate(columnGroup):
columnGroup[i] = param if (param in head_variables) else f"#{param}#"
return columnGroup, column, outputColumn
def eval(self, df, head_variables=[]):
resultDf = df.copy()
columnGroup, column, outputColumn = self.parseColumn(resultDf, head_variables)
# TODO: Verify behaviour of the following predicates
if self.predicate == 'Count':
# Count the number of rows in the dataframe
resultDf = self.evalPredicate(resultDf, len, columnGroup, column, outputColumn)
elif self.predicate == 'Min':
# Get the single minimum value for Column variable, ignoring nulls
resultDf = self.evalPredicate(resultDf, pd.DataFrame.min, columnGroup, column, outputColumn)
elif self.predicate == 'Max':
# Get the single maximum value for Column variable, ignoring nulls
resultDf = self.evalPredicate(resultDf, pd.DataFrame.max, columnGroup, column, outputColumn)
elif self.predicate == 'Sum':
# Get the sum of the values for Column variable, ignoring nulls
resultDf = self.evalPredicate(resultDf, pd.DataFrame.sum, columnGroup, column, outputColumn)
elif self.predicate == 'Avg':
# Get the average of the values for Column variable, ignoring nulls without groupby
resultDf = self.evalPredicate(resultDf, pd.DataFrame.mean, columnGroup, column, outputColumn)
return resultDf
def evalPredicate(self, df, function, columnGroup, column, outputColumn):
if (len(columnGroup) > 0):
evalDf = df.groupby(columnGroup)[column].apply(function).reset_index(name=outputColumn)
df = pd.merge(df, evalDf, on=columnGroup, how='left')
else:
if (function == len):
evalDf = len(df)
elif (function == pd.DataFrame.min):
evalDf = df[column].min()
elif (function == pd.DataFrame.max):
evalDf = df[column].max()
elif (function == pd.DataFrame.sum):
evalDf = df[column].sum()
elif (function == pd.DataFrame.mean):
evalDf = df[column].mean()
df = df.assign(**{outputColumn: evalDf})
return df
class ComparisonPredicate(Predicate): # >=, >, <=, <, =\=, =:= .
def __init__(self, predicate, params=[]):
super().__init__(predicate, params)
def __str__(self):
return f"{self.params[0]} {self.predicate} {self.params[2]}"
def eval(self, df):
resultDf = df.copy()
isStatic = False
rotated = False
Val1, Val2 = self.params[0], self.params[2]
predicate = self.predicate
if (self.isStatic(Val1) | self.isStatic(Val2)):
isStatic = True
if (self.isStatic(Val1)):
rotated = True
Val1, Val2 = Val2, Val1
Column1 = Val1
Column2 = Val2
Column1 = Val1 if (Val1 in resultDf.columns) else f"#{Val1}#"
if not (isStatic):
Column2 = Val2 if (Val2 in resultDf.columns) else f"#{Val2}#"
# if rotated
if (rotated):
if (predicate == "<"):
predicate = ">"
elif (predicate == ">"):
predicate = "<"
elif (predicate == "=<"):
predicate = ">="
elif (predicate == ">="):
predicate = "=<"
if (predicate == "=:="):
if (isStatic):
# filter the dataframe columns that match with the static variable
resultDf = resultDf[resultDf[Column1] == Column2]
else:
# filter the dataframe columns that match with the variable
resultDf = resultDf[resultDf[Column1] == resultDf[Column2]]
elif (predicate == "=\="):
if (isStatic):
resultDf = resultDf[resultDf[Column1] != Column2]
else:
resultDf = resultDf[resultDf[Column1] != resultDf[Column2]]
else:
resultDf = self.filterNumeric(resultDf, predicate, Column1, Column2, isStatic)
return resultDf
def filterNumeric(self, df, predicate, column1, column2, isStatic = False):
# Making sure that column are either int or float
df[column1] = pd.to_numeric(df[column1], errors='coerce')
if (isStatic):
column2 = float(column2)
else:
df[column2] = pd.to_numeric(df[column2], errors='coerce')
if (predicate == "<"):
if (isStatic):
df = df[df[column1] < column2]
else:
df = df[df[column1] < df[column2]]
if (predicate == ">"):
if (isStatic):
df = df[df[column1] > column2]
else:
df = df[df[column1] > df[column2]]
if (predicate == "=<"):
if (isStatic):
df = df[df[column1] <= column2]
else:
df = df[df[column1] <= df[column2]]
if (predicate == ">="):
if (isStatic):
df = df[df[column1] >= column2]
else:
df = df[df[column1] >= df[column2]]
return df
def isStatic(self, variable):
return not not (re.match(r"^(?:[\"']\w+[\"']|[0-9.]+)+$", str(variable).strip()))
class AtomicPredicate(Predicate): # edb like, etc.
def __init__(self, predicate, params=[]):
super().__init__(predicate, params)
def eval(self, df, head_variables=[]):
resultDf = df.copy()
resultDf.columns = [f"#{str(param).strip()}#" if str(param).strip() not in head_variables else f"{str(param).strip()}" for i, param in enumerate(self.params)]
# Static filter on columns that match with static variables in the body
for i, param in enumerate(self.params):
if not (re.match(r"^[a-z0-9]+$", str(param).strip())):
continue
# Filter rows from column i that do not match with the value of param
resultDf = resultDf[resultDf.iloc[:, i] == param]
# Remove underscore columns
resultDf = resultDf.loc[:, ~resultDf.columns.str.contains('#_#')]
return resultDf