Merge pull request #41 from yyysjz1997/main

Revised DCdetector and added more evaluation metrics
This commit is contained in:
Hongzuo Xu 2023-11-09 20:28:00 +08:00 committed by GitHub
commit bdbfc905c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 3226 additions and 14 deletions

View File

@ -3,6 +3,7 @@ from deepod.metrics._anomaly_detection import auc_pr
from deepod.metrics._anomaly_detection import tabular_metrics
from deepod.metrics._anomaly_detection import ts_metrics
from deepod.metrics._tsad_adjustment import point_adjustment
from deepod.metrics._anomaly_detection import ts_metrics_enhanced
__all__ = [
@ -10,5 +11,6 @@ __all__ = [
'auc_roc',
'tabular_metrics',
'ts_metrics',
'point_adjustment'
'point_adjustment',
'ts_metrics_enhanced'
]

View File

@ -38,4 +38,33 @@ def get_best_f1(label, score):
return best_f1, best_p, best_r
# revised by Yiyuan Yang 2023/11/08
# Compored with ts_metrics, this function can return more metrics with one more intput y_test(predictions of events)
def ts_metrics_enhanced(y_true, y_score, y_test):
"""
Use Case Demo:
from deepod.models.time_series import DCdetector
clf = DCdetector()
clf.fit(X_train)
pred, scores = clf.decision_function(X_test)
from deepod.metrics import point_adjustment
from deepod.metrics import ts_metrics_enhanced
adj_eval_metrics = ts_metrics_enhanced(labels, point_adjustment(labels, scores), pred)
print('adj_eval_metrics',adj_eval_metrics)
"""
from .affiliation.generics import convert_vector_to_events
from .vus.metrics import get_range_vus_roc
from .affiliation.metrics import pr_from_events
best_f1, best_p, best_r = get_best_f1(y_true, y_score)
events_pred = convert_vector_to_events(y_test)
events_gt = convert_vector_to_events(y_true)
Trange = (0, len(y_test))
affiliation = pr_from_events(events_pred, events_gt, Trange)
vus_results = get_range_vus_roc(y_score, y_true, 100) # default slidingWindow = 100
return auc_roc(y_true, y_score), auc_pr(y_true, y_score), best_f1, best_p, best_r, affiliation['Affiliation_Precision'], affiliation['Affiliation_Recall'], vus_results["R_AUC_ROC"], vus_results["R_AUC_PR"], vus_results["VUS_ROC"], vus_results["VUS_PR"]

View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from ._integral_interval import interval_intersection
def t_start(j, Js = [(1,2),(3,4),(5,6)], Trange = (1,10)):
"""
Helper for `E_gt_func`
:param j: index from 0 to len(Js) (included) on which to get the start
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included
:return: generalized start such that the middle of t_start and t_stop
always gives the affiliation zone
"""
b = max(Trange)
n = len(Js)
if j == n:
return(2*b - t_stop(n-1, Js, Trange))
else:
return(Js[j][0])
def t_stop(j, Js = [(1,2),(3,4),(5,6)], Trange = (1,10)):
"""
Helper for `E_gt_func`
:param j: index from 0 to len(Js) (included) on which to get the stop
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included
:return: generalized stop such that the middle of t_start and t_stop
always gives the affiliation zone
"""
if j == -1:
a = min(Trange)
return(2*a - t_start(0, Js, Trange))
else:
return(Js[j][1])
def E_gt_func(j, Js, Trange):
"""
Get the affiliation zone of element j of the ground truth
:param j: index from 0 to len(Js) (excluded) on which to get the zone
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included, can
be (-math.inf, math.inf) for distance measures
:return: affiliation zone of element j of the ground truth represented
as a couple
"""
range_left = (t_stop(j-1, Js, Trange) + t_start(j, Js, Trange))/2
range_right = (t_stop(j, Js, Trange) + t_start(j+1, Js, Trange))/2
return((range_left, range_right))
def get_all_E_gt_func(Js, Trange):
"""
Get the affiliation partition from the ground truth point of view
:param Js: ground truth events, as a list of couples
:param Trange: range of the series where Js is included, can
be (-math.inf, math.inf) for distance measures
:return: affiliation partition of the events
"""
# E_gt is the limit of affiliation/attraction for each ground truth event
E_gt = [E_gt_func(j, Js, Trange) for j in range(len(Js))]
return(E_gt)
def affiliation_partition(Is = [(1,1.5),(2,5),(5,6),(8,9)], E_gt = [(1,2.5),(2.5,4.5),(4.5,10)]):
"""
Cut the events into the affiliation zones
The presentation given here is from the ground truth point of view,
but it is also used in the reversed direction in the main function.
:param Is: events as a list of couples
:param E_gt: range of the affiliation zones
:return: a list of list of intervals (each interval represented by either
a couple or None for empty interval). The outer list is indexed by each
affiliation zone of `E_gt`. The inner list is indexed by the events of `Is`.
"""
out = [None] * len(E_gt)
for j in range(len(E_gt)):
E_gt_j = E_gt[j]
discarded_idx_before = [I[1] < E_gt_j[0] for I in Is] # end point of predicted I is before the begin of E
discarded_idx_after = [I[0] > E_gt_j[1] for I in Is] # start of predicted I is after the end of E
kept_index = [not(a or b) for a, b in zip(discarded_idx_before, discarded_idx_after)]
Is_j = [x for x, y in zip(Is, kept_index)]
out[j] = [interval_intersection(I, E_gt[j]) for I in Is_j]
return(out)

View File

@ -0,0 +1,464 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import math
from .generics import _sum_wo_nan
"""
In order to shorten the length of the variables,
the general convention in this file is to let:
- I for a predicted event (start, stop),
- Is for a list of predicted events,
- J for a ground truth event,
- Js for a list of ground truth events.
"""
def interval_length(J = (1,2)):
"""
Length of an interval
:param J: couple representating the start and stop of an interval, or None
:return: length of the interval, and 0 for a None interval
"""
if J is None:
return(0)
return(J[1] - J[0])
def sum_interval_lengths(Is = [(1,2),(3,4),(5,6)]):
"""
Sum of length of the intervals
:param Is: list of intervals represented by starts and stops
:return: sum of the interval length
"""
return(sum([interval_length(I) for I in Is]))
def interval_intersection(I = (1, 3), J = (2, 4)):
"""
Intersection between two intervals I and J
I and J should be either empty or represent a positive interval (no point)
:param I: an interval represented by start and stop
:param J: a second interval of the same form
:return: an interval representing the start and stop of the intersection (or None if empty)
"""
if I is None:
return(None)
if J is None:
return(None)
I_inter_J = (max(I[0], J[0]), min(I[1], J[1]))
if I_inter_J[0] >= I_inter_J[1]:
return(None)
else:
return(I_inter_J)
def interval_subset(I = (1, 3), J = (0, 6)):
"""
Checks whether I is a subset of J
:param I: an non empty interval represented by start and stop
:param J: a second non empty interval of the same form
:return: True if I is a subset of J
"""
if (I[0] >= J[0]) and (I[1] <= J[1]):
return True
else:
return False
def cut_into_three_func(I, J):
"""
Cut an interval I into a partition of 3 subsets:
the elements before J,
the elements belonging to J,
and the elements after J
:param I: an interval represented by start and stop, or None for an empty one
:param J: a non empty interval
:return: a triplet of three intervals, each represented by either (start, stop) or None
"""
if I is None:
return((None, None, None))
I_inter_J = interval_intersection(I, J)
if I == I_inter_J:
I_before = None
I_after = None
elif I[1] <= J[0]:
I_before = I
I_after = None
elif I[0] >= J[1]:
I_before = None
I_after = I
elif (I[0] <= J[0]) and (I[1] >= J[1]):
I_before = (I[0], I_inter_J[0])
I_after = (I_inter_J[1], I[1])
elif I[0] <= J[0]:
I_before = (I[0], I_inter_J[0])
I_after = None
elif I[1] >= J[1]:
I_before = None
I_after = (I_inter_J[1], I[1])
else:
raise ValueError('unexpected unconsidered case')
return(I_before, I_inter_J, I_after)
def get_pivot_j(I, J):
"""
Get the single point of J that is the closest to I, called 'pivot' here,
with the requirement that I should be outside J
:param I: a non empty interval (start, stop)
:param J: another non empty interval, with empty intersection with I
:return: the element j of J that is the closest to I
"""
if interval_intersection(I, J) is not None:
raise ValueError('I and J should have a void intersection')
j_pivot = None # j_pivot is a border of J
if max(I) <= min(J):
j_pivot = min(J)
elif min(I) >= max(J):
j_pivot = max(J)
else:
raise ValueError('I should be outside J')
return(j_pivot)
def integral_mini_interval(I, J):
"""
In the specific case where interval I is located outside J,
integral of distance from x to J over the interval x \in I.
This is the *integral* i.e. the sum.
It's not the mean (not divided by the length of I yet)
:param I: a interval (start, stop), or None
:param J: a non empty interval, with empty intersection with I
:return: the integral of distances d(x, J) over x \in I
"""
if I is None:
return(0)
j_pivot = get_pivot_j(I, J)
a = min(I)
b = max(I)
return((b-a)*abs((j_pivot - (a+b)/2)))
def integral_interval_distance(I, J):
"""
For any non empty intervals I, J, compute the
integral of distance from x to J over the interval x \in I.
This is the *integral* i.e. the sum.
It's not the mean (not divided by the length of I yet)
The interval I can intersect J or not
:param I: a interval (start, stop), or None
:param J: a non empty interval
:return: the integral of distances d(x, J) over x \in I
"""
# I and J are single intervals (not generic sets)
# I is a predicted interval in the range of affiliation of J
def f(I_cut):
return(integral_mini_interval(I_cut, J))
# If I_middle is fully included into J, it is
# the distance to J is always 0
def f0(I_middle):
return(0)
cut_into_three = cut_into_three_func(I, J)
# Distance for now, not the mean:
# Distance left: Between cut_into_three[0] and the point min(J)
d_left = f(cut_into_three[0])
# Distance middle: Between cut_into_three[1] = I inter J, and J
d_middle = f0(cut_into_three[1])
# Distance right: Between cut_into_three[2] and the point max(J)
d_right = f(cut_into_three[2])
# It's an integral so summable
return(d_left + d_middle + d_right)
def integral_mini_interval_P_CDFmethod__min_piece(I, J, E):
"""
Helper of `integral_mini_interval_Pprecision_CDFmethod`
In the specific case where interval I is located outside J,
compute the integral $\int_{d_min}^{d_max} \min(m, x) dx$, with:
- m the smallest distance from J to E,
- d_min the smallest distance d(x, J) from x \in I to J
- d_max the largest distance d(x, J) from x \in I to J
:param I: a single predicted interval, a non empty interval (start, stop)
:param J: ground truth interval, a non empty interval, with empty intersection with I
:param E: the affiliation/influence zone for J, represented as a couple (start, stop)
:return: the integral $\int_{d_min}^{d_max} \min(m, x) dx$
"""
if interval_intersection(I, J) is not None:
raise ValueError('I and J should have a void intersection')
if not interval_subset(J, E):
raise ValueError('J should be included in E')
if not interval_subset(I, E):
raise ValueError('I should be included in E')
e_min = min(E)
j_min = min(J)
j_max = max(J)
e_max = max(E)
i_min = min(I)
i_max = max(I)
d_min = max(i_min - j_max, j_min - i_max)
d_max = max(i_max - j_max, j_min - i_min)
m = min(j_min - e_min, e_max - j_max)
A = min(d_max, m)**2 - min(d_min, m)**2
B = max(d_max, m) - max(d_min, m)
C = (1/2)*A + m*B
return(C)
def integral_mini_interval_Pprecision_CDFmethod(I, J, E):
"""
Integral of the probability of distances over the interval I.
In the specific case where interval I is located outside J,
compute the integral $\int_{x \in I} Fbar(dist(x,J)) dx$.
This is the *integral* i.e. the sum (not the mean)
:param I: a single predicted interval, a non empty interval (start, stop)
:param J: ground truth interval, a non empty interval, with empty intersection with I
:param E: the affiliation/influence zone for J, represented as a couple (start, stop)
:return: the integral $\int_{x \in I} Fbar(dist(x,J)) dx$
"""
integral_min_piece = integral_mini_interval_P_CDFmethod__min_piece(I, J, E)
e_min = min(E)
j_min = min(J)
j_max = max(J)
e_max = max(E)
i_min = min(I)
i_max = max(I)
d_min = max(i_min - j_max, j_min - i_max)
d_max = max(i_max - j_max, j_min - i_min)
integral_linear_piece = (1/2)*(d_max**2 - d_min**2)
integral_remaining_piece = (j_max - j_min)*(i_max - i_min)
DeltaI = i_max - i_min
DeltaE = e_max - e_min
output = DeltaI - (1/DeltaE)*(integral_min_piece + integral_linear_piece + integral_remaining_piece)
return(output)
def integral_interval_probaCDF_precision(I, J, E):
"""
Integral of the probability of distances over the interval I.
Compute the integral $\int_{x \in I} Fbar(dist(x,J)) dx$.
This is the *integral* i.e. the sum (not the mean)
:param I: a single (non empty) predicted interval in the zone of affiliation of J
:param J: ground truth interval
:param E: affiliation/influence zone for J
:return: the integral $\int_{x \in I} Fbar(dist(x,J)) dx$
"""
# I and J are single intervals (not generic sets)
def f(I_cut):
if I_cut is None:
return(0)
else:
return(integral_mini_interval_Pprecision_CDFmethod(I_cut, J, E))
# If I_middle is fully included into J, it is
# integral of 1 on the interval I_middle, so it's |I_middle|
def f0(I_middle):
if I_middle is None:
return(0)
else:
return(max(I_middle) - min(I_middle))
cut_into_three = cut_into_three_func(I, J)
# Distance for now, not the mean:
# Distance left: Between cut_into_three[0] and the point min(J)
d_left = f(cut_into_three[0])
# Distance middle: Between cut_into_three[1] = I inter J, and J
d_middle = f0(cut_into_three[1])
# Distance right: Between cut_into_three[2] and the point max(J)
d_right = f(cut_into_three[2])
# It's an integral so summable
return(d_left + d_middle + d_right)
def cut_J_based_on_mean_func(J, e_mean):
"""
Helper function for the recall.
Partition J into two intervals: before and after e_mean
(e_mean represents the center element of E the zone of affiliation)
:param J: ground truth interval
:param e_mean: a float number (center value of E)
:return: a couple partitionning J into (J_before, J_after)
"""
if J is None:
J_before = None
J_after = None
elif e_mean >= max(J):
J_before = J
J_after = None
elif e_mean <= min(J):
J_before = None
J_after = J
else: # e_mean is across J
J_before = (min(J), e_mean)
J_after = (e_mean, max(J))
return((J_before, J_after))
def integral_mini_interval_Precall_CDFmethod(I, J, E):
"""
Integral of the probability of distances over the interval J.
In the specific case where interval J is located outside I,
compute the integral $\int_{y \in J} Fbar_y(dist(y,I)) dy$.
This is the *integral* i.e. the sum (not the mean)
:param I: a single (non empty) predicted interval
:param J: ground truth (non empty) interval, with empty intersection with I
:param E: the affiliation/influence zone for J, represented as a couple (start, stop)
:return: the integral $\int_{y \in J} Fbar_y(dist(y,I)) dy$
"""
# The interval J should be located outside I
# (so it's either the left piece or the right piece w.r.t I)
i_pivot = get_pivot_j(J, I)
e_min = min(E)
e_max = max(E)
e_mean = (e_min + e_max) / 2
# If i_pivot is outside E (it's possible), then
# the distance is worst that any random element within E,
# so we set the recall to 0
if i_pivot <= min(E):
return(0)
elif i_pivot >= max(E):
return(0)
# Otherwise, we have at least i_pivot in E and so d < M so min(d,M)=d
cut_J_based_on_e_mean = cut_J_based_on_mean_func(J, e_mean)
J_before = cut_J_based_on_e_mean[0]
J_after = cut_J_based_on_e_mean[1]
iemin_mean = (e_min + i_pivot)/2
cut_Jbefore_based_on_iemin_mean = cut_J_based_on_mean_func(J_before, iemin_mean)
J_before_closeE = cut_Jbefore_based_on_iemin_mean[0] # before e_mean and closer to e_min than i_pivot ~ J_before_before
J_before_closeI = cut_Jbefore_based_on_iemin_mean[1] # before e_mean and closer to i_pivot than e_min ~ J_before_after
iemax_mean = (e_max + i_pivot)/2
cut_Jafter_based_on_iemax_mean = cut_J_based_on_mean_func(J_after, iemax_mean)
J_after_closeI = cut_Jafter_based_on_iemax_mean[0] # after e_mean and closer to i_pivot than e_max ~ J_after_before
J_after_closeE = cut_Jafter_based_on_iemax_mean[1] # after e_mean and closer to e_max than i_pivot ~ J_after_after
if J_before_closeE is not None:
j_before_before_min = min(J_before_closeE) # == min(J)
j_before_before_max = max(J_before_closeE)
else:
j_before_before_min = math.nan
j_before_before_max = math.nan
if J_before_closeI is not None:
j_before_after_min = min(J_before_closeI) # == j_before_before_max if existing
j_before_after_max = max(J_before_closeI) # == max(J_before)
else:
j_before_after_min = math.nan
j_before_after_max = math.nan
if J_after_closeI is not None:
j_after_before_min = min(J_after_closeI) # == min(J_after)
j_after_before_max = max(J_after_closeI)
else:
j_after_before_min = math.nan
j_after_before_max = math.nan
if J_after_closeE is not None:
j_after_after_min = min(J_after_closeE) # == j_after_before_max if existing
j_after_after_max = max(J_after_closeE) # == max(J)
else:
j_after_after_min = math.nan
j_after_after_max = math.nan
# <-- J_before_closeE --> <-- J_before_closeI --> <-- J_after_closeI --> <-- J_after_closeE -->
# j_bb_min j_bb_max j_ba_min j_ba_max j_ab_min j_ab_max j_aa_min j_aa_max
# (with `b` for before and `a` for after in the previous variable names)
# vs e_mean m = min(t-e_min, e_max-t) d=|i_pivot-t| min(d,m) \int min(d,m)dt \int d dt \int_(min(d,m)+d)dt \int_{t \in J}(min(d,m)+d)dt
# Case J_before_closeE & i_pivot after J before t-e_min i_pivot-t min(i_pivot-t,t-e_min) = t-e_min t^2/2-e_min*t i_pivot*t-t^2/2 t^2/2-e_min*t+i_pivot*t-t^2/2 = (i_pivot-e_min)*t (i_pivot-e_min)*tB - (i_pivot-e_min)*tA = (i_pivot-e_min)*(tB-tA)
# Case J_before_closeI & i_pivot after J before t-e_min i_pivot-t min(i_pivot-t,t-e_min) = i_pivot-t i_pivot*t-t^2/2 i_pivot*t-t^2/2 i_pivot*t-t^2/2+i_pivot*t-t^2/2 = 2*i_pivot*t-t^2 2*i_pivot*tB-tB^2 - 2*i_pivot*tA + tA^2 = 2*i_pivot*(tB-tA) - (tB^2 - tA^2)
# Case J_after_closeI & i_pivot after J after e_max-t i_pivot-t min(i_pivot-t,e_max-t) = i_pivot-t i_pivot*t-t^2/2 i_pivot*t-t^2/2 i_pivot*t-t^2/2+i_pivot*t-t^2/2 = 2*i_pivot*t-t^2 2*i_pivot*tB-tB^2 - 2*i_pivot*tA + tA^2 = 2*i_pivot*(tB-tA) - (tB^2 - tA^2)
# Case J_after_closeE & i_pivot after J after e_max-t i_pivot-t min(i_pivot-t,e_max-t) = e_max-t e_max*t-t^2/2 i_pivot*t-t^2/2 e_max*t-t^2/2+i_pivot*t-t^2/2 = (e_max+i_pivot)*t-t^2 (e_max+i_pivot)*tB-tB^2 - (e_max+i_pivot)*tA + tA^2 = (e_max+i_pivot)*(tB-tA) - (tB^2 - tA^2)
#
# Case J_before_closeE & i_pivot before J before t-e_min t-i_pivot min(t-i_pivot,t-e_min) = t-e_min t^2/2-e_min*t t^2/2-i_pivot*t t^2/2-e_min*t+t^2/2-i_pivot*t = t^2-(e_min+i_pivot)*t tB^2-(e_min+i_pivot)*tB - tA^2 + (e_min+i_pivot)*tA = (tB^2 - tA^2) - (e_min+i_pivot)*(tB-tA)
# Case J_before_closeI & i_pivot before J before t-e_min t-i_pivot min(t-i_pivot,t-e_min) = t-i_pivot t^2/2-i_pivot*t t^2/2-i_pivot*t t^2/2-i_pivot*t+t^2/2-i_pivot*t = t^2-2*i_pivot*t tB^2-2*i_pivot*tB - tA^2 + 2*i_pivot*tA = (tB^2 - tA^2) - 2*i_pivot*(tB-tA)
# Case J_after_closeI & i_pivot before J after e_max-t t-i_pivot min(t-i_pivot,e_max-t) = t-i_pivot t^2/2-i_pivot*t t^2/2-i_pivot*t t^2/2-i_pivot*t+t^2/2-i_pivot*t = t^2-2*i_pivot*t tB^2-2*i_pivot*tB - tA^2 + 2*i_pivot*tA = (tB^2 - tA^2) - 2*i_pivot*(tB-tA)
# Case J_after_closeE & i_pivot before J after e_max-t t-i_pivot min(t-i_pivot,e_max-t) = e_max-t e_max*t-t^2/2 t^2/2-i_pivot*t e_max*t-t^2/2+t^2/2-i_pivot*t = (e_max-i_pivot)*t (e_max-i_pivot)*tB - (e_max-i_pivot)*tA = (e_max-i_pivot)*(tB-tA)
if i_pivot >= max(J):
part1_before_closeE = (i_pivot-e_min)*(j_before_before_max - j_before_before_min) # (i_pivot-e_min)*(tB-tA) # j_before_before_max - j_before_before_min
part2_before_closeI = 2*i_pivot*(j_before_after_max-j_before_after_min) - (j_before_after_max**2 - j_before_after_min**2) # 2*i_pivot*(tB-tA) - (tB^2 - tA^2) # j_before_after_max - j_before_after_min
part3_after_closeI = 2*i_pivot*(j_after_before_max-j_after_before_min) - (j_after_before_max**2 - j_after_before_min**2) # 2*i_pivot*(tB-tA) - (tB^2 - tA^2) # j_after_before_max - j_after_before_min
part4_after_closeE = (e_max+i_pivot)*(j_after_after_max-j_after_after_min) - (j_after_after_max**2 - j_after_after_min**2) # (e_max+i_pivot)*(tB-tA) - (tB^2 - tA^2) # j_after_after_max - j_after_after_min
out_parts = [part1_before_closeE, part2_before_closeI, part3_after_closeI, part4_after_closeE]
elif i_pivot <= min(J):
part1_before_closeE = (j_before_before_max**2 - j_before_before_min**2) - (e_min+i_pivot)*(j_before_before_max-j_before_before_min) # (tB^2 - tA^2) - (e_min+i_pivot)*(tB-tA) # j_before_before_max - j_before_before_min
part2_before_closeI = (j_before_after_max**2 - j_before_after_min**2) - 2*i_pivot*(j_before_after_max-j_before_after_min) # (tB^2 - tA^2) - 2*i_pivot*(tB-tA) # j_before_after_max - j_before_after_min
part3_after_closeI = (j_after_before_max**2 - j_after_before_min**2) - 2*i_pivot*(j_after_before_max - j_after_before_min) # (tB^2 - tA^2) - 2*i_pivot*(tB-tA) # j_after_before_max - j_after_before_min
part4_after_closeE = (e_max-i_pivot)*(j_after_after_max - j_after_after_min) # (e_max-i_pivot)*(tB-tA) # j_after_after_max - j_after_after_min
out_parts = [part1_before_closeE, part2_before_closeI, part3_after_closeI, part4_after_closeE]
else:
raise ValueError('The i_pivot should be outside J')
out_integral_min_dm_plus_d = _sum_wo_nan(out_parts) # integral on all J, i.e. sum of the disjoint parts
# We have for each point t of J:
# \bar{F}_{t, recall}(d) = 1 - (1/|E|) * (min(d,m) + d)
# Since t is a single-point here, and we are in the case where i_pivot is inside E.
# The integral is then given by:
# C = \int_{t \in J} \bar{F}_{t, recall}(D(t)) dt
# = \int_{t \in J} 1 - (1/|E|) * (min(d,m) + d) dt
# = |J| - (1/|E|) * [\int_{t \in J} (min(d,m) + d) dt]
# = |J| - (1/|E|) * out_integral_min_dm_plus_d
DeltaJ = max(J) - min(J)
DeltaE = max(E) - min(E)
C = DeltaJ - (1/DeltaE) * out_integral_min_dm_plus_d
return(C)
def integral_interval_probaCDF_recall(I, J, E):
"""
Integral of the probability of distances over the interval J.
Compute the integral $\int_{y \in J} Fbar_y(dist(y,I)) dy$.
This is the *integral* i.e. the sum (not the mean)
:param I: a single (non empty) predicted interval
:param J: ground truth (non empty) interval
:param E: the affiliation/influence zone for J
:return: the integral $\int_{y \in J} Fbar_y(dist(y,I)) dy$
"""
# I and J are single intervals (not generic sets)
# E is the outside affiliation interval of J (even for recall!)
# (in particular J \subset E)
#
# J is the portion of the ground truth affiliated to I
# I is a predicted interval (can be outside E possibly since it's recall)
def f(J_cut):
if J_cut is None:
return(0)
else:
return integral_mini_interval_Precall_CDFmethod(I, J_cut, E)
# If J_middle is fully included into I, it is
# integral of 1 on the interval J_middle, so it's |J_middle|
def f0(J_middle):
if J_middle is None:
return(0)
else:
return(max(J_middle) - min(J_middle))
cut_into_three = cut_into_three_func(J, I) # it's J that we cut into 3, depending on the position w.r.t I
# since we integrate over J this time.
#
# Distance for now, not the mean:
# Distance left: Between cut_into_three[0] and the point min(I)
d_left = f(cut_into_three[0])
# Distance middle: Between cut_into_three[1] = J inter I, and I
d_middle = f0(cut_into_three[1])
# Distance right: Between cut_into_three[2] and the point max(I)
d_right = f(cut_into_three[2])
# It's an integral so summable
return(d_left + d_middle + d_right)

View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import math
from ._affiliation_zone import (
get_all_E_gt_func,
affiliation_partition)
from ._integral_interval import (
integral_interval_distance,
integral_interval_probaCDF_precision,
integral_interval_probaCDF_recall,
interval_length,
sum_interval_lengths)
def affiliation_precision_distance(Is = [(1,2),(3,4),(5,6)], J = (2,5.5)):
"""
Compute the individual average distance from Is to a single ground truth J
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:return: individual average precision directed distance number
"""
if all([I is None for I in Is]): # no prediction in the current area
return(math.nan) # undefined
return(sum([integral_interval_distance(I, J) for I in Is]) / sum_interval_lengths(Is))
def affiliation_precision_proba(Is = [(1,2),(3,4),(5,6)], J = (2,5.5), E = (0,8)):
"""
Compute the individual precision probability from Is to a single ground truth J
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:param E: couple representing the start and stop of the zone of affiliation of J
:return: individual precision probability in [0, 1], or math.nan if undefined
"""
if all([I is None for I in Is]): # no prediction in the current area
return(math.nan) # undefined
return(sum([integral_interval_probaCDF_precision(I, J, E) for I in Is]) / sum_interval_lengths(Is))
def affiliation_recall_distance(Is = [(1,2),(3,4),(5,6)], J = (2,5.5)):
"""
Compute the individual average distance from a single J to the predictions Is
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:return: individual average recall directed distance number
"""
Is = [I for I in Is if I is not None] # filter possible None in Is
if len(Is) == 0: # there is no prediction in the current area
return(math.inf)
E_gt_recall = get_all_E_gt_func(Is, (-math.inf, math.inf)) # here from the point of view of the predictions
Js = affiliation_partition([J], E_gt_recall) # partition of J depending of proximity with Is
return(sum([integral_interval_distance(J[0], I) for I, J in zip(Is, Js)]) / interval_length(J))
def affiliation_recall_proba(Is = [(1,2),(3,4),(5,6)], J = (2,5.5), E = (0,8)):
"""
Compute the individual recall probability from a single ground truth J to Is
:param Is: list of predicted events within the affiliation zone of J
:param J: couple representating the start and stop of a ground truth interval
:param E: couple representing the start and stop of the zone of affiliation of J
:return: individual recall probability in [0, 1]
"""
Is = [I for I in Is if I is not None] # filter possible None in Is
if len(Is) == 0: # there is no prediction in the current area
return(0)
E_gt_recall = get_all_E_gt_func(Is, E) # here from the point of view of the predictions
Js = affiliation_partition([J], E_gt_recall) # partition of J depending of proximity with Is
return(sum([integral_interval_probaCDF_recall(I, J[0], E) for I, J in zip(Is, Js)]) / interval_length(J))

View File

@ -0,0 +1,135 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from itertools import groupby
from operator import itemgetter
import math
import gzip
import glob
import os
def convert_vector_to_events(vector = [0, 1, 1, 0, 0, 1, 0]):
"""
Convert a binary vector (indicating 1 for the anomalous instances)
to a list of events. The events are considered as durations,
i.e. setting 1 at index i corresponds to an anomalous interval [i, i+1).
:param vector: a list of elements belonging to {0, 1}
:return: a list of couples, each couple representing the start and stop of
each event
"""
positive_indexes = [idx for idx, val in enumerate(vector) if val > 0]
events = []
for k, g in groupby(enumerate(positive_indexes), lambda ix : ix[0] - ix[1]):
cur_cut = list(map(itemgetter(1), g))
events.append((cur_cut[0], cur_cut[-1]))
# Consistent conversion in case of range anomalies (for indexes):
# A positive index i is considered as the interval [i, i+1),
# so the last index should be moved by 1
events = [(x, y+1) for (x,y) in events]
return(events)
def infer_Trange(events_pred, events_gt):
"""
Given the list of events events_pred and events_gt, get the
smallest possible Trange corresponding to the start and stop indexes
of the whole series.
Trange will not influence the measure of distances, but will impact the
measures of probabilities.
:param events_pred: a list of couples corresponding to predicted events
:param events_gt: a list of couples corresponding to ground truth events
:return: a couple corresponding to the smallest range containing the events
"""
if len(events_gt) == 0:
raise ValueError('The gt events should contain at least one event')
if len(events_pred) == 0:
# empty prediction, base Trange only on events_gt (which is non empty)
return(infer_Trange(events_gt, events_gt))
min_pred = min([x[0] for x in events_pred])
min_gt = min([x[0] for x in events_gt])
max_pred = max([x[1] for x in events_pred])
max_gt = max([x[1] for x in events_gt])
Trange = (min(min_pred, min_gt), max(max_pred, max_gt))
return(Trange)
def has_point_anomalies(events):
"""
Checking whether events contain point anomalies, i.e.
events starting and stopping at the same time.
:param events: a list of couples corresponding to predicted events
:return: True is the events have any point anomalies, False otherwise
"""
if len(events) == 0:
return(False)
return(min([x[1] - x[0] for x in events]) == 0)
def _sum_wo_nan(vec):
"""
Sum of elements, ignoring math.isnan ones
:param vec: vector of floating numbers
:return: sum of the elements, ignoring math.isnan ones
"""
vec_wo_nan = [e for e in vec if not math.isnan(e)]
return(sum(vec_wo_nan))
def _len_wo_nan(vec):
"""
Count of elements, ignoring math.isnan ones
:param vec: vector of floating numbers
:return: count of the elements, ignoring math.isnan ones
"""
vec_wo_nan = [e for e in vec if not math.isnan(e)]
return(len(vec_wo_nan))
def read_gz_data(filename = 'data/machinetemp_groundtruth.gz'):
"""
Load a file compressed with gz, such that each line of the
file is either 0 (representing a normal instance) or 1 (representing)
an anomalous instance.
:param filename: file path to the gz compressed file
:return: list of integers with either 0 or 1
"""
with gzip.open(filename, 'rb') as f:
content = f.read().splitlines()
content = [int(x) for x in content]
return(content)
def read_all_as_events():
"""
Load the files contained in the folder `data/` and convert
to events. The length of the series is kept.
The convention for the file name is: `dataset_algorithm.gz`
:return: two dictionaries:
- the first containing the list of events for each dataset and algorithm,
- the second containing the range of the series for each dataset
"""
filepaths = glob.glob('data/*.gz')
datasets = dict()
Tranges = dict()
for filepath in filepaths:
vector = read_gz_data(filepath)
events = convert_vector_to_events(vector)
# ad hoc cut for those files
cut_filepath = (os.path.split(filepath)[1]).split('_')
data_name = cut_filepath[0]
algo_name = (cut_filepath[1]).split('.')[0]
if not data_name in datasets:
datasets[data_name] = dict()
Tranges[data_name] = (0, len(vector))
datasets[data_name][algo_name] = events
return(datasets, Tranges)
def f1_func(p, r):
"""
Compute the f1 function
:param p: precision numeric value
:param r: recall numeric value
:return: f1 numeric value
"""
return(2*p*r/(p+r))

View File

@ -0,0 +1,116 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from .generics import (
infer_Trange,
has_point_anomalies,
_len_wo_nan,
_sum_wo_nan,
read_all_as_events)
from ._affiliation_zone import (
get_all_E_gt_func,
affiliation_partition)
from ._single_ground_truth_event import (
affiliation_precision_distance,
affiliation_recall_distance,
affiliation_precision_proba,
affiliation_recall_proba)
def test_events(events):
"""
Verify the validity of the input events
:param events: list of events, each represented by a couple (start, stop)
:return: None. Raise an error for incorrect formed or non ordered events
"""
if type(events) is not list:
raise TypeError('Input `events` should be a list of couples')
if not all([type(x) is tuple for x in events]):
raise TypeError('Input `events` should be a list of tuples')
if not all([len(x) == 2 for x in events]):
raise ValueError('Input `events` should be a list of couples (start, stop)')
if not all([x[0] <= x[1] for x in events]):
raise ValueError('Input `events` should be a list of couples (start, stop) with start <= stop')
if not all([events[i][1] < events[i+1][0] for i in range(len(events) - 1)]):
raise ValueError('Couples of input `events` should be disjoint and ordered')
def pr_from_events(events_pred, events_gt, Trange):
"""
Compute the affiliation metrics including the precision/recall in [0,1],
along with the individual precision/recall distances and probabilities
:param events_pred: list of predicted events, each represented by a couple
indicating the start and the stop of the event
:param events_gt: list of ground truth events, each represented by a couple
indicating the start and the stop of the event
:param Trange: range of the series where events_pred and events_gt are included,
represented as a couple (start, stop)
:return: dictionary with precision, recall, and the individual metrics
"""
# testing the inputs
test_events(events_pred)
test_events(events_gt)
# other tests
minimal_Trange = infer_Trange(events_pred, events_gt)
if not Trange[0] <= minimal_Trange[0]:
raise ValueError('`Trange` should include all the events')
if not minimal_Trange[1] <= Trange[1]:
raise ValueError('`Trange` should include all the events')
if len(events_gt) == 0:
raise ValueError('Input `events_gt` should have at least one event')
if has_point_anomalies(events_pred) or has_point_anomalies(events_gt):
raise ValueError('Cannot manage point anomalies currently')
if Trange is None:
# Set as default, but Trange should be indicated if probabilities are used
raise ValueError('Trange should be indicated (or inferred with the `infer_Trange` function')
E_gt = get_all_E_gt_func(events_gt, Trange)
aff_partition = affiliation_partition(events_pred, E_gt)
# Computing precision distance
d_precision = [affiliation_precision_distance(Is, J) for Is, J in zip(aff_partition, events_gt)]
# Computing recall distance
d_recall = [affiliation_recall_distance(Is, J) for Is, J in zip(aff_partition, events_gt)]
# Computing precision
p_precision = [affiliation_precision_proba(Is, J, E) for Is, J, E in zip(aff_partition, events_gt, E_gt)]
# Computing recall
p_recall = [affiliation_recall_proba(Is, J, E) for Is, J, E in zip(aff_partition, events_gt, E_gt)]
if _len_wo_nan(p_precision) > 0:
p_precision_average = _sum_wo_nan(p_precision) / _len_wo_nan(p_precision)
else:
p_precision_average = p_precision[0] # math.nan
p_recall_average = sum(p_recall) / len(p_recall)
dict_out = dict({'Affiliation_Precision': p_precision_average,
'Affiliation_Recall': p_recall_average,
'individual_precision_probabilities': p_precision,
'individual_recall_probabilities': p_recall,
'individual_precision_distances': d_precision,
'individual_recall_distances': d_recall})
return(dict_out)
def produce_all_results():
"""
Produce the affiliation precision/recall for all files
contained in the `data` repository
:return: a dictionary indexed by data names, each containing a dictionary
indexed by algorithm names, each containing the results of the affiliation
metrics (precision, recall, individual probabilities and distances)
"""
datasets, Tranges = read_all_as_events() # read all the events in folder `data`
results = dict()
for data_name in datasets.keys():
results_data = dict()
for algo_name in datasets[data_name].keys():
if algo_name != 'groundtruth':
results_data[algo_name] = pr_from_events(datasets[data_name][algo_name],
datasets[data_name]['groundtruth'],
Tranges[data_name])
results[data_name] = results_data
return(results)

View File

@ -0,0 +1,343 @@
from random import shuffle
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import cm
import pandas as pd
from tqdm import tqdm as tqdm
import time
from sklearn.preprocessing import MinMaxScaler
import random
import os
import sys
module_path = os.path.abspath(os.path.join('../..'))
if module_path not in sys.path:
sys.path.append(module_path)
from deepod.metrics.vus.utils.slidingWindows import find_length
from deepod.metrics.vus.utils.metrics import metricor
from deepod.metrics.vus.models.distance import Fourier
from deepod.metrics.vus.models.feature import Window
def generate_new_label(label,lag):
if lag < 0:
return np.array(list(label[-lag:]) + [0]*(-lag))
elif lag > 0:
return np.array([0]*lag + list(label[:-lag]))
elif lag == 0:
return label
def compute_anomaly_acc_lag(methods_scores,label,slidingWindow,methods_keys):
lag_range = list(range(-slidingWindow//4,slidingWindow//4,5))
methods_acc = {}
for i,methods_score in enumerate(tqdm(methods_keys)):
dict_acc = {
'R_AUC_ROC': [],
'AUC_ROC': [],
'R_AUC_PR': [],
'AUC_PR': [],
'VUS_ROC': [],
'VUS_PR': [],
'Precision': [],
'Recall': [],
'F': [],
'ExistenceReward':[],
'OverlapReward': [],
'Precision@k': [],
'Rprecision': [],
'Rrecall': [],
'RF': []}
for lag in tqdm(lag_range):
new_label = generate_new_label(label,lag)
grader = metricor()
R_AUC, R_AP, R_fpr, R_tpr, R_prec = grader.RangeAUC(labels=new_label, score=methods_scores[methods_score], window=slidingWindow, plot_ROC=True)
L, fpr, tpr= grader.metric_new(new_label, methods_scores[methods_score], plot_ROC=True)
precision, recall, AP = grader.metric_PR(new_label, methods_scores[methods_score])
Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d = generate_curve(new_label,methods_scores[methods_score],2*slidingWindow)
L1 = [ elem for elem in L]
dict_acc['R_AUC_ROC'] +=[R_AUC]
dict_acc['AUC_ROC'] +=[L1[0]]
dict_acc['R_AUC_PR'] +=[R_AP]
dict_acc['AUC_PR'] +=[AP]
dict_acc['VUS_ROC'] +=[avg_auc_3d]
dict_acc['VUS_PR'] +=[avg_ap_3d]
dict_acc['Precision'] +=[L1[1]]
dict_acc['Recall'] +=[L1[2]]
dict_acc['F'] +=[L1[3]]
dict_acc['ExistenceReward']+=[L1[5]]
dict_acc['OverlapReward'] +=[L1[6]]
dict_acc['Precision@k'] +=[L1[9]]
dict_acc['Rprecision'] +=[L1[7]]
dict_acc['Rrecall'] +=[L1[4]]
dict_acc['RF'] +=[L1[8]]
methods_acc[methods_score] = dict_acc
return methods_acc
def compute_anomaly_acc_percentage(methods_scores,label,slidingWindow,methods_keys,pos_first_anom):
list_pos = []
step_a = max(0,(len(label) - pos_first_anom-200))//20
step_b = max(0,pos_first_anom-200)//20
pos_a = min(len(label),pos_first_anom + 200)
pos_b = max(0,pos_first_anom - 200)
list_pos.append((pos_b,pos_a))
for pos_iter in range(20):
pos_a = min(len(label),pos_a + step_a)
pos_b = max(0,pos_b - step_b)
list_pos.append((pos_b,pos_a))
methods_acc = {}
print(list_pos)
for i,methods_score in enumerate(tqdm(methods_keys)):
dict_acc = {
'R_AUC_ROC': [],
'AUC_ROC': [],
'R_AUC_PR': [],
'AUC_PR': [],
'VUS_ROC': [],
'VUS_PR': [],
'Precision': [],
'Recall': [],
'F': [],
'ExistenceReward':[],
'OverlapReward': [],
'Precision@k': [],
'Rprecision': [],
'Rrecall': [],
'RF': []}
for end_pos in tqdm(list_pos):
new_label = label[end_pos[0]:end_pos[1]]
new_score = np.array(methods_scores[methods_score])[end_pos[0]:end_pos[1]]
grader = metricor()
R_AUC, R_AP, R_fpr, R_tpr, R_prec = grader.RangeAUC(labels=new_label, score=new_score, window=slidingWindow, plot_ROC=True)
L, fpr, tpr= grader.metric_new(new_label, new_score, plot_ROC=True)
precision, recall, AP = grader.metric_PR(new_label, new_score)
Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d = generate_curve(new_label,new_score,2*slidingWindow)
L1 = [ elem for elem in L]
dict_acc['R_AUC_ROC'] +=[R_AUC]
dict_acc['AUC_ROC'] +=[L1[0]]
dict_acc['R_AUC_PR'] +=[R_AP]
dict_acc['AUC_PR'] +=[AP]
dict_acc['VUS_ROC'] +=[avg_auc_3d]
dict_acc['VUS_PR'] +=[avg_ap_3d]
dict_acc['Precision'] +=[L1[1]]
dict_acc['Recall'] +=[L1[2]]
dict_acc['F'] +=[L1[3]]
dict_acc['ExistenceReward']+=[L1[5]]
dict_acc['OverlapReward'] +=[L1[6]]
dict_acc['Precision@k'] +=[L1[9]]
dict_acc['Rprecision'] +=[L1[7]]
dict_acc['Rrecall'] +=[L1[4]]
dict_acc['RF'] +=[L1[8]]
methods_acc[methods_score] = dict_acc
return methods_acc
def compute_anomaly_acc_noise(methods_scores,label,slidingWindow,methods_keys):
lag_range = list(range(-slidingWindow//2,slidingWindow//2,10))
methods_acc = {}
for i,methods_score in enumerate(tqdm(methods_keys)):
dict_acc = {
'R_AUC_ROC': [],
'AUC_ROC': [],
'R_AUC_PR': [],
'AUC_PR': [],
'VUS_ROC': [],
'VUS_PR': [],
'Precision': [],
'Recall': [],
'F': [],
'ExistenceReward':[],
'OverlapReward': [],
'Precision@k': [],
'Rprecision': [],
'Rrecall': [],
'RF': []}
for lag in tqdm(lag_range):
new_label = label
grader = metricor()
noise = np.random.normal(-0.1,0.1,len(methods_scores[methods_score]))
new_score = np.array(methods_scores[methods_score]) + noise
new_score = (new_score - min(new_score))/(max(new_score) - min(new_score))
R_AUC, R_AP, R_fpr, R_tpr, R_prec = grader.RangeAUC(labels=new_label, score=new_score, window=slidingWindow, plot_ROC=True)
L, fpr, tpr= grader.metric_new(new_label, new_score, plot_ROC=True)
precision, recall, AP = grader.metric_PR(new_label, new_score)
Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d = generate_curve(new_label,new_score,2*slidingWindow)
L1 = [ elem for elem in L]
dict_acc['R_AUC_ROC'] +=[R_AUC]
dict_acc['AUC_ROC'] +=[L1[0]]
dict_acc['R_AUC_PR'] +=[R_AP]
dict_acc['AUC_PR'] +=[AP]
dict_acc['VUS_ROC'] +=[avg_auc_3d]
dict_acc['VUS_PR'] +=[avg_ap_3d]
dict_acc['Precision'] +=[L1[1]]
dict_acc['Recall'] +=[L1[2]]
dict_acc['F'] +=[L1[3]]
dict_acc['ExistenceReward']+=[L1[5]]
dict_acc['OverlapReward'] +=[L1[6]]
dict_acc['Precision@k'] +=[L1[9]]
dict_acc['Rprecision'] +=[L1[7]]
dict_acc['Rrecall'] +=[L1[4]]
dict_acc['RF'] +=[L1[8]]
methods_acc[methods_score] = dict_acc
return methods_acc
def compute_anomaly_acc_pairwise(methods_scores,label,slidingWindow,method1,method2):
lag_range = list(range(-slidingWindow//4,slidingWindow//4,5))
methods_acc = {}
method_key = [method1]
if method2 is not None:
method_key = [method1,method2]
for i,methods_score in enumerate(tqdm(method_key)):
dict_acc = {
'R_AUC_ROC': [],
'AUC_ROC': [],
'R_AUC_PR': [],
'AUC_PR': [],
'VUS_ROC': [],
'VUS_PR': [],
'Precision': [],
'Recall': [],
'F': [],
'ExistenceReward':[],
'OverlapReward': [],
'Precision@k': [],
'Rprecision': [],
'Rrecall': [],
'RF': []}
for lag in tqdm(range(60)):
new_lag = random.randint(-slidingWindow//4,slidingWindow//4)
new_label = generate_new_label(label,new_lag)
noise = np.random.normal(-0.1,0.1,len(methods_scores[methods_score]))
new_score = np.array(methods_scores[methods_score]) + noise
new_score = (new_score - min(new_score))/(max(new_score) - min(new_score))
grader = metricor()
R_AUC, R_AP, R_fpr, R_tpr, R_prec = grader.RangeAUC(labels=new_label, score=new_score, window=slidingWindow, plot_ROC=True)
L, fpr, tpr= grader.metric_new(new_label, new_score, plot_ROC=True)
precision, recall, AP = grader.metric_PR(new_label, new_score)
#range_anomaly = grader.range_convers_new(new_label)
Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d = generate_curve(new_label,new_score,2*slidingWindow)
L1 = [ elem for elem in L]
dict_acc['R_AUC_ROC'] +=[R_AUC]
dict_acc['AUC_ROC'] +=[L1[0]]
dict_acc['R_AUC_PR'] +=[R_AP]
dict_acc['AUC_PR'] +=[AP]
dict_acc['VUS_ROC'] +=[avg_auc_3d]
dict_acc['VUS_PR'] +=[avg_ap_3d]
dict_acc['Precision'] +=[L1[1]]
dict_acc['Recall'] +=[L1[2]]
dict_acc['F'] +=[L1[3]]
dict_acc['ExistenceReward']+=[L1[5]]
dict_acc['OverlapReward'] +=[L1[6]]
dict_acc['Precision@k'] +=[L1[9]]
dict_acc['Rprecision'] +=[L1[7]]
dict_acc['Rrecall'] +=[L1[4]]
dict_acc['RF'] +=[L1[8]]
methods_acc[methods_score] = dict_acc
return methods_acc
def normalize_dict_exp(methods_acc_lag,methods_keys):
key_metrics = [
'VUS_ROC',
'VUS_PR',
'R_AUC_ROC',
'R_AUC_PR',
'AUC_ROC',
'AUC_PR',
'Rprecision',
'Rrecall',
'RF',
'Precision',
'Recall',
'F',
'Precision@k'
][::-1]
norm_methods_acc_lag = {}
for key in methods_keys:
norm_methods_acc_lag[key] = {}
for key_metric in key_metrics:
ts = methods_acc_lag[key][key_metric]
new_ts = list(np.array(ts) - np.mean(ts))
norm_methods_acc_lag[key][key_metric] = new_ts
return norm_methods_acc_lag
def group_dict(methods_acc_lag,methods_keys):
key_metrics = [
'VUS_ROC',
'VUS_PR',
'R_AUC_ROC',
'R_AUC_PR',
'AUC_ROC',
'AUC_PR',
'Rprecision',
'Rrecall',
'RF',
'Precision',
'Recall',
'F',
'Precision@k'
][::-1]
norm_methods_acc_lag = {key:[] for key in key_metrics}
for key in methods_keys:
for key_metric in key_metrics:
ts = list(methods_acc_lag[key][key_metric])
new_ts = list(np.array(ts) - np.mean(ts))
norm_methods_acc_lag[key_metric] += new_ts
return norm_methods_acc_lag
def generate_curve(label,score,slidingWindow):
tpr_3d, fpr_3d, prec_3d, window_3d, avg_auc_3d, avg_ap_3d = metricor().RangeAUC_volume(labels_original=label, score=score, windowSize=1*slidingWindow)
X = np.array(tpr_3d).reshape(1,-1).ravel()
X_ap = np.array(tpr_3d)[:,:-1].reshape(1,-1).ravel()
Y = np.array(fpr_3d).reshape(1,-1).ravel()
W = np.array(prec_3d).reshape(1,-1).ravel()
Z = np.repeat(window_3d, len(tpr_3d[0]))
Z_ap = np.repeat(window_3d, len(tpr_3d[0])-1)
return Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d
def box_plot(data, edge_color, fill_color):
bp = ax.boxplot(data, patch_artist=True)
for element in ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps']:
plt.setp(bp[element], color=edge_color)
for patch in bp['boxes']:
patch.set(facecolor=fill_color)
return bp

View File

@ -0,0 +1,192 @@
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import cm
import pandas as pd
from tqdm import tqdm as tqdm
import time
from sklearn.preprocessing import MinMaxScaler
import random
import os
import sys
module_path = os.path.abspath(os.path.join('../..'))
if module_path not in sys.path:
sys.path.append(module_path)
from deepod.metrics.vus.utils.slidingWindows import find_length
from deepod.metrics.vus.utils.metrics import metricor
from deepod.metrics.vus.models.distance import Fourier
from deepod.metrics.vus.models.feature import Window
from deepod.metrics.vus.models.cnn import cnn
from deepod.metrics.vus.models.AE_mlp2 import AE_MLP2
from deepod.metrics.vus.models.lstm import lstm
from deepod.metrics.vus.models.ocsvm import OCSVM
from deepod.metrics.vus.models.poly import POLY
from deepod.metrics.vus.models.pca import PCA
from deepod.metrics.vus.models.norma import NORMA
from deepod.metrics.vus.models.matrix_profile import MatrixProfile
from deepod.metrics.vus.models.lof import LOF
from deepod.metrics.vus.models.iforest import IForest
def find_section_length(label,length):
best_i = None
best_sum = None
current_subseq = False
for i in range(len(label)):
changed = False
if label[i] == 1:
if current_subseq == False:
current_subseq = True
if best_i is None:
changed = True
best_i = i
best_sum = np.sum(label[max(0,i-200):min(len(label),i+9800)])
else:
if np.sum(label[max(0,i-200):min(len(label),i+9800)]) < best_sum:
changed = True
best_i = i
best_sum = np.sum(label[max(0,i-200):min(len(label),i+9800)])
else:
changed = False
if changed:
diff = i+9800 - len(label)
pos1 = max(0,i-200 - max(0,diff))
pos2 = min(i+9800,len(label))
else:
current_subseq = False
if best_i is not None:
return best_i-pos1,(pos1,pos2)
else:
return None,None
def generate_data(filepath,init_pos,max_length):
df = pd.read_csv(filepath, header=None).to_numpy()
name = filepath.split('/')[-1]
#max_length = 30000
data = df[init_pos:init_pos+max_length,0].astype(float)
label = df[init_pos:init_pos+max_length,1]
pos_first_anom,pos = find_section_length(label,max_length)
data = df[pos[0]:pos[1],0].astype(float)
label = df[pos[0]:pos[1],1]
slidingWindow = find_length(data)
#slidingWindow = 70
X_data = Window(window = slidingWindow).convert(data).to_numpy()
data_train = data[:int(0.1*len(data))]
data_test = data
X_train = Window(window = slidingWindow).convert(data_train).to_numpy()
X_test = Window(window = slidingWindow).convert(data_test).to_numpy()
return pos_first_anom,slidingWindow,data,X_data,data_train,data_test,X_train,X_test,label
def compute_score(methods,slidingWindow,data,X_data,data_train,data_test,X_train,X_test):
methods_scores = {}
for method in methods:
start_time = time.time()
if method == 'IForest':
clf = IForest(n_jobs=1)
x = X_data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'LOF':
clf = LOF(n_neighbors=20, n_jobs=1)
x = X_data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'MatrixProfile':
clf = MatrixProfile(window = slidingWindow)
x = data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'NormA':
clf = NORMA(pattern_length = slidingWindow, nm_size=3*slidingWindow)
x = data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*((slidingWindow-1)//2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'PCA':
clf = PCA()
x = X_data
clf.fit(x)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
elif method == 'POLY':
clf = POLY(power=3, window = slidingWindow)
x = data
clf.fit(x)
measure = Fourier()
measure.detector = clf
measure.set_param()
clf.decision_function(measure=measure)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'OCSVM':
X_train_ = MinMaxScaler(feature_range=(0,1)).fit_transform(X_train.T).T
X_test_ = MinMaxScaler(feature_range=(0,1)).fit_transform(X_test.T).T
clf = OCSVM(nu=0.05)
clf.fit(X_train_, X_test_)
score = clf.decision_scores_
score = np.array([score[0]]*math.ceil((slidingWindow-1)/2) + list(score) + [score[-1]]*((slidingWindow-1)//2))
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'LSTM':
clf = lstm(slidingwindow = slidingWindow, predict_time_steps=1, epochs = 50, patience = 5, verbose=0)
clf.fit(data_train, data_test)
measure = Fourier()
measure.detector = clf
measure.set_param()
clf.decision_function(measure=measure)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'AE':
clf = AE_MLP2(slidingWindow = slidingWindow, epochs=100, verbose=0)
clf.fit(data_train, data_test)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
elif method == 'CNN':
clf = cnn(slidingwindow = slidingWindow, predict_time_steps=1, epochs = 100, patience = 5, verbose=0)
clf.fit(data_train, data_test)
measure = Fourier()
measure.detector = clf
measure.set_param()
clf.decision_function(measure=measure)
score = clf.decision_scores_
score = MinMaxScaler(feature_range=(0,1)).fit_transform(score.reshape(-1,1)).ravel()
#end_time = time.time()
#time_exec = end_time - start_time
#print(method,"\t time: {}".format(time_exec))
methods_scores[method] = score
return methods_scores

View File

@ -0,0 +1,11 @@
from .utils.metrics import metricor
from .analysis.robustness_eval import generate_curve
def get_range_vus_roc(score, labels, slidingWindow):
grader = metricor()
R_AUC_ROC, R_AUC_PR, _, _, _ = grader.RangeAUC(labels=labels, score=score, window=slidingWindow, plot_ROC=True)
_, _, _, _, _, _,VUS_ROC, VUS_PR = generate_curve(labels, score, 2*slidingWindow)
metrics = {'R_AUC_ROC': R_AUC_ROC, 'R_AUC_PR': R_AUC_PR, 'VUS_ROC': VUS_ROC, 'VUS_PR': VUS_PR}
return metrics

View File

@ -0,0 +1,848 @@
# -*- coding: utf-8 -*-
"""Classes of distance measure for model type A
"""
import numpy as np
# import matplotlib.pyplot as plt
# import random
from arch import arch_model
# import pandas as pd
import math
# import pmdarima as pm
# from pmdarima import model_selection
# import os
# import dis
# import statistics
# from sklearn import metrics
# import sklearn
class Euclidean:
""" The function class for Lp euclidean norm
----------
Power : int, optional (default=1)
The power of the lp norm. For power = k, the measure is calculagted by |x - y|_k
neighborhood : int, optional (default=max (100, 10*window size))
The length of neighborhood to derivete the normalizing constant D which is based on
the difference of maximum and minimum in the neighborhood minus window.
window: int, optional (default = length of input data)
The length of the subsequence to be compaired
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, power = 1, neighborhood = 100, window = 20, norm = False):
self.power = power
self.window = window
self.neighborhood = neighborhood
self.detector = None
self.decision_scores_ = []
self.norm = norm
self.X_train = 2
def measure(self, X, Y, index):
"""Derive the decision score based on the given distance measure
Parameters
----------
X : numpy array of shape (n_samples, )
The real input samples subsequence.
Y : numpy array of shape (n_samples, )
The estimated input samples subsequence.
Index : int
the index of the starting point in the subsequence
Returns
-------
score : float
dissimiarity score between the two subsquence
"""
X_train = self.X_train
X_train = self.detector.X_train_
power = self.power
window = self.window
neighborhood = self.neighborhood
norm = self.norm
data = X_train
if norm == False:
if X.shape[0] == 0:
score = 0
else:
score = np.linalg.norm(X-Y, power)/(X.shape[0])
self.decision_scores_.append((index, score))
return score
elif type(X_train) == int:
print('Error! Detector is not fed to the object and X_train is not known')
elif neighborhood != 'all':
length = X.shape[0]
neighbor = int(self.neighborhood/2)
if index + neighbor < self.n_train_ and index - neighbor > 0:
region = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] ))
D = np.max(region) - np.min(region)
elif index + neighbor >= self.n_train_ and index + window < self.n_train_:
region = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] ))
D = np.max(region) - np.min(region)
elif index + window >= self.n_train_:
region = data[self.n_train_ - neighborhood: index]
D = np.max(region) - np.min(region)
else:
region = np.concatenate((data[0: index], data[index + window: index + neighborhood] ))
D = np.max(region) - np.min(region)
score = np.linalg.norm(X-Y, power)/D/(X.shape[0]**power)
self.decision_scores_.append((index, score))
return score
def set_param(self):
if self.detector != None:
self.window = self.detector.window
self.neighborhood = self.detector.neighborhood
self.n_train_ = self.detector.n_train_
self.X_train = self.detector.X_train_
else:
print('Error! Detector is not fed to the object and X_train is not known')
return self
class Mahalanobis:
""" The function class for Mahalanobis measure
----------
Probability : boolean, optional (default=False)
Whether to derive the anomoly score by the probability that such point occurs
neighborhood : int, optional (default=max (100, 10*window size))
The length of neighborhood to derivete the normalizing constant D which is based on
the difference of maximum and minimum in the neighborhood minus window.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, probability = False):
self.probability = probability
self.detector = None
self.decision_scores_ = []
self.mu = 0
def set_param(self):
'''update the parameters with the detector that is used
'''
self.n_initial_ = self.detector.n_initial_
self.estimation = self.detector.estimation
self.X_train = self.detector.X_train_
self.window = self.detector.window
window = self.window
resid = self.X_train - self.estimation
number = max(100, self.window)
self.residual = np.zeros((window, number))
for i in range(number):
self.residual[:, i] = resid[self.n_initial_+i:self.n_initial_+i+window]
self.mu = np.zeros(number)
self.cov = np.cov(self.residual, rowvar=1)
if self.window == 1:
self.cov = (np.sum(np.square(self.residual))/(number - 1))**0.5
return self
def norm_pdf_multivariate(self, x):
'''multivarite normal density function
'''
try:
mu = self.mu
except:
mu = np.zeros(x.shape[0])
sigma = self.cov
size = x.shape[0]
if size == len(mu) and (size, size) == sigma.shape:
det = np.linalg.det(sigma)
if det == 0:
raise NameError("The covariance matrix can't be singular")
norm_const = 1.0/ ( math.pow((2*math.pi),float(size)/2) * math.pow(det,1.0/2) )
x_mu = np.matrix(x - mu)
inv = np.linalg.inv(sigma)
result = math.pow(math.e, -0.5 * (x_mu * inv * x_mu.T))
return norm_const * result
else:
raise NameError("The dimensions of the input don't match")
def normpdf(self, x):
'''univariate normal
'''
mean = 0
sd = np.asscalar(self.cov)
var = float(sd)**2
denom = (2*math.pi*var)**.5
num = math.exp(-(float(x)-float(mean))**2/(2*var))
return num/denom
def measure(self, X, Y, index):
"""Derive the decision score based on the given distance measure
Parameters
----------
X : numpy array of shape (n_samples, )
The real input samples subsequence.
Y : numpy array of shape (n_samples, )
The estimated input samples subsequence.
Index : int
the index of the starting point in the subsequence
Returns
-------
score : float
dissimiarity score between the two subsquence
"""
mu = np.zeros(self.detector.window)
cov = self.cov
if self.probability == False:
if X.shape[0] == mu.shape[0]:
score = np.matmul(np.matmul((X-Y-mu).T, cov), (X-Y-mu))/(X.shape[0])
self.decision_scores_.append((index, score))
return score
else:
return (X-Y).T.dot(X-Y)
else:
if len(X) > 1:
prob = self.norm_pdf_multivariate(X-Y)
elif len(X) == 1:
X = np.asscalar(X)
Y = np.asscalar(Y)
prob = self.normpdf(X-Y)
else:
prob = 1
score = 1 - prob
score = max(score, 0)
self.decision_scores_.append((index, score))
return score
class Garch:
""" The function class for garch measure
----------
p, q : int, optional (default=1, 1)
The order of the garch model to be fitted on the residual
mean : string, optional (default='zero' )
The forecast conditional mean.
vol: string, optional (default = 'garch')
he forecast conditional variance.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, p = 1, q = 1, mean = 'zero', vol = 'garch'):
self.p = p
self.q = q
self.vol = vol
self.mean = mean
self.decision_scores_ = []
def set_param(self):
'''update the parameters with the detector that is used
'''
q = self.q
p=self.p
mean = self.mean
vol = self.vol
if self.detector != None:
self.n_initial_ = self.detector.n_initial_
self.estimation = self.detector.estimation
self.X_train = self.detector.X_train_
self.window = self.detector.window
window = self.window
resid = 10 * (self.X_train - self.estimation)
model = arch_model(resid, mean=mean, vol=vol, p=p, q=q)
model_fit = model.fit(disp='off')
self.votility = model_fit.conditional_volatility/10
else:
print('Error! Detector not fed to the measure')
return self
def measure(self, X, Y, index):
"""Derive the decision score based on the given distance measure
Parameters
----------
X : numpy array of shape (n_samples, )
The real input samples subsequence.
Y : numpy array of shape (n_samples, )
The estimated input samples subsequence.
Index : int
the index of the starting point in the subsequence
Returns
-------
score : float
dissimiarity score between the two subsquences
"""
X = np.array(X)
Y = np.array(Y)
length = len(X)
score = 0
if length != 0:
for i in range(length):
sigma = self.votility[index + i]
if sigma != 0:
score += abs(X[i]-Y[i])/sigma
score = score/length
return score
class SSA_DISTANCE:
""" The function class for SSA measure
good for contextual anomolies
----------
method : string, optional (default='linear' )
The method to fit the line and derives the SSA score
e: float, optional (default = 1)
The upper bound to start new line search for linear method
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, method ='linear', e = 1):
self.method = method
self.decision_scores_ = []
self.e = e
def Linearization(self, X2):
"""Obtain the linearized curve.
Parameters
----------
X2 : numpy array of shape (n, )
the time series curve to be fitted
e: float, integer, or numpy array
weights to obtain the
Returns
-------
fit: parameters for the fitted linear curve
"""
e = self.e
i = 0
fit = {}
fit['index'] = []
fit['rep'] = []
while i < len(X2):
fit['index'].append(i)
try:
fit['Y'+str(i)]= X2[i]
except:
print(X2.shape, X2)
fit['rep'].append(np.array([i, X2[i]]))
if i+1 >= len(X2):
break
k = X2[i+1]-X2[i]
b = -i*(X2[i+1]-X2[i])+X2[i]
fit['reg' +str(i)]= np.array([k, b])
i += 2
if i >= len(X2):
break
d = np.abs(X2[i]- (k * i+b))
while d < e:
i +=1
if i >= len(X2):
break
d = np.abs(X2[i]- (k * i+b))
return fit
def set_param(self):
'''update the parameters with the detector that is used.
Since the SSA measure doens't need the attributes of detector
or characteristics of X_train, the process is omitted.
'''
return self
def measure(self, X2, X3, start_index):
"""Obtain the SSA similarity score.
Parameters
----------
X2 : numpy array of shape (n, )
the reference timeseries
X3 : numpy array of shape (n, )
the tested timeseries
e: float, integer, or numpy array
weights to obtain the
Returns
-------
score: float, the higher the more dissimilar are the two curves
"""
#linearization of data X2 and X3
X2 = np.array(X2)
X3 = np.array(X3)
e = self.e
fit = self.Linearization(X2)
fit2 = self.Linearization(X3)
#line alinement
Index = []
test_list = fit['index'] + fit2['index']
[Index.append(x) for x in test_list if x not in Index]
Y = 0
#Similarity Computation
for i in Index:
if i in fit['index'] and i in fit2['index']:
Y += abs(fit['Y'+str(i)]-fit2['Y'+str(i)])
elif i in fit['index']:
J = np.max(np.where(np.array(fit2['index']) < i ))
index = fit2['index'][J]
k = fit2['reg'+str(index)][0]
b = fit2['reg'+str(index)][1]
value = abs(k * i + b - fit['Y'+str(i)])
Y += value
elif i in fit2['index']:
J = np.max(np.where(np.array(fit['index']) < i ))
index = fit['index'][J]
k = fit['reg'+str(index)][0]
b = fit['reg'+str(index)][1]
value = abs(k * i + b - fit2['Y'+str(i)])
Y += value
if len(Index) != 0:
score = Y/len(Index)
else:
score = 0
self.decision_scores_.append((start_index, score))
if len(X2) == 1:
print('Error! SSA measure doesn\'t apply to singleton' )
else:
return score
class Fourier:
""" The function class for Fourier measure
good for contextual anomolies
----------
power: int, optional (default = 2)
Lp norm for dissimiarlity measure considered
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, power = 2):
self.decision_scores_ = []
self.power = power
def set_param(self):
'''update the parameters with the detector that is used
since the FFT measure doens't need the attributes of detector
or characteristics of X_train, the process is omitted.
'''
return self
def measure(self, X2, X3, start_index):
"""Obtain the SSA similarity score.
Parameters
----------
X2 : numpy array of shape (n, )
the reference timeseries
X3 : numpy array of shape (n, )
the tested timeseries
index: int,
current index for the subseqeuence that is being measured
Returns
-------
score: float, the higher the more dissimilar are the two curves
"""
power = self.power
X2 = np.array(X2)
X3 = np.array(X3)
if len(X2) == 0:
score = 0
else:
X2 = np.fft.fft(X2);
X3 = np.fft.fft(X3)
score = np.linalg.norm(X2 - X3, ord = power)/len(X3)
self.decision_scores_.append((start_index, score))
return score
class DTW:
""" The function class for dynamic time warping measure
----------
method : string, optional (default='L2' )
The distance measure to derive DTW.
Avaliable "L2", "L1", and custom
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, method = 'L2'):
self.decision_scores_ = []
if type(method) == str:
if method == 'L1':
distance = lambda x, y: abs(x-y)
elif method == 'L2':
distance = lambda x, y: (x-y)**2
else:
distance = method
self.distance = distance
def set_param(self):
'''update the parameters with the detector that is used
since the FFT measure doens't need the attributes of detector
or characteristics of X_train, the process is omitted.
'''
return self
def measure(self, X1, X2, start_index):
"""Obtain the SSA similarity score.
Parameters
----------
X1 : numpy array of shape (n, )
the reference timeseries
X2 : numpy array of shape (n, )
the tested timeseries
index: int,
current index for the subseqeuence that is being measured
Returns
-------
score: float, the higher the more dissimilar are the two curves
"""
distance = self.distance
X1 = np.array(X1)
X2 = np.array(X2)
value = 1
if len(X1)==0:
value =0
X1= np.zeros(5)
X2 = X1
M = np.zeros((len(X1), len(X2)))
for index_i in range(len(X1)):
for index_j in range(len(X1) - index_i):
L = []
i = index_i
j = index_i + index_j
D = distance(X1[i], X2[j])
try:
L.append(M[i-1, j-1])
except:
L.append(np.inf)
try:
L.append(M[i, j-1])
except:
L.append(np.inf)
try:
L.append(M[i-1, j])
except:
L.append(np.inf)
D += min(L)
M[i,j] = D
if i !=j:
L = []
j = index_i
i = index_i + index_j
D = distance(X1[i], X2[j])
try:
L.append(M[i-1, j-1])
except:
L.append(np.inf)
try:
L.append(M[i, j-1])
except:
L.append(np.inf)
try:
L.append(M[i-1, j])
except:
L.append(np.inf)
D += min(L)
M[i,j] = D
score = M[len(X1)-1, len(X1)-1]/len(X1)
if value == 0:
score = 0
self.decision_scores_.append((start_index, score))
return score
class EDRS:
""" The function class for edit distance on real sequences
----------
method : string, optional (default='L2' )
The distance measure to derive DTW.
Avaliable "L2", "L1", and custom
ep: float, optiona (default = 0.1)
the threshold value to decide Di_j
vot : boolean, optional (default = False)
whether to adapt a chaging votilities estimaed by garch
for ep at different windows.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, method = 'L1', ep = False, vol = False):
self.decision_scores_ = []
if type(method) == str:
if method == 'L1':
distance = lambda x, y: abs(x-y)
else:
distance = method
self.distance = distance
self.ep = ep
self.vot = vol
def set_param(self):
'''update the ep based on the votalitiy of the model
'''
estimation = np.array(self.detector.estimation )
initial = self.detector.n_initial_
X = np.array(self.detector.X_train_)
self.initial = initial
residual = estimation[initial:] - X[initial:]
number = len(residual)
#var = (np.sum(np.square(residual))/(number - 1))**0.5
vot = self.vot
if vot == False:
var = np.var(residual)
else:
model = arch_model(10 * residual, mean='Constant', vol='garch', p=1, q=1)
model_fit = model.fit(disp='off')
var = model_fit.conditional_volatility/10
if self.ep == False:
self.ep = 3 * (np.sum(np.square(residual))/(len(residual) - 1))**0.5
else:
self.ep = self.ep
return self
def measure(self, X1, X2, start_index):
"""Obtain the SSA similarity score.
Parameters
----------
X1 : numpy array of shape (n, )
the reference timeseries
X2 : numpy array of shape (n, )
the tested timeseries
index: int,
current index for the subseqeuence that is being measured
Returns
-------
score: float, the higher the more dissimilar are the two curves
"""
distance = self.distance
X1 = np.array(X1)
X2 = np.array(X2)
vot = self.vot
if vot == False:
ep = self.ep
else:
try:
ep = self.ep[start_index - self.initial]
except:
#sometime start_index is the length of the number
ep = 0
value = 1
if len(X1)==0:
value =0
X1= np.zeros(5)
X2 = X1
M = np.zeros((len(X1), len(X2)))
M[:, 0] = np.arange(len(X1))
M[0, :] = np.arange(len(X1))
for index_i in range(1, len(X1)):
for index_j in range(len(X1) - index_i):
L = []
i = index_i
j = index_i + index_j
D = distance(X1[i], X2[j])
if D < ep:
M[i, j]= M[i-1, j-1]
else:
try:
L.append(M[i-1, j-1])
except:
L.append(np.inf)
try:
L.append(M[i, j-1])
except:
L.append(np.inf)
try:
L.append(M[i-1, j])
except:
L.append(np.inf)
M[i,j] = 1 + min(L)
if i !=j:
L = []
j = index_i
i = index_i + index_j
D = distance(X1[i], X2[j])
if D < ep:
M[i, j]= M[i-1, j-1]
else:
try:
L.append(M[i-1, j-1])
except:
L.append(np.inf)
try:
L.append(M[i, j-1])
except:
L.append(np.inf)
try:
L.append(M[i-1, j])
except:
L.append(np.inf)
M[i,j] = 1 + min(L)
score = M[len(X1)-1, len(X1)-1]/len(X1)
if value == 0:
score = 0
self.decision_scores_.append((start_index, score))
return score
class TWED:
""" Function class for Time-warped edit distance(TWED) measure
----------
method : string, optional (default='L2' )
The distance measure to derive DTW.
Avaliable "L2", "L1", and custom
gamma: float, optiona (default = 0.1)
mismatch penalty
v : float, optional (default = False)
stifness parameter
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
detector: Object classifier
the anomaly detector that is used
"""
def __init__(self, gamma = 0.1, v = 0.1):
self.decision_scores_ = []
self.gamma = gamma
self.v = v
def set_param(self):
'''No need'''
return self
def measure(self, A, B, start_index):
"""Obtain the SSA similarity score.
Parameters
----------
X1 : numpy array of shape (n, )
the reference timeseries
X2 : numpy array of shape (n, )
the tested timeseries
index: int,
current index for the subseqeuence that is being measured
Returns
-------
score: float, the higher the more dissimilar are the two curves
"""
#code modifed from wikipedia
Dlp = lambda x,y: abs(x-y)
timeSB = np.arange(1,len(B)+1)
timeSA = np.arange(1,len(A)+1)
nu = self.v
_lambda = self.gamma
# Reference :
# Marteau, P.; F. (2009). "Time Warp Edit Distance with Stiffness Adjustment for Time Series Matching".
# IEEE Transactions on Pattern Analysis and Machine Intelligence. 31 (2): 306318. arXiv:cs/0703033
# http://people.irisa.fr/Pierre-Francois.Marteau/
# Check if input arguments
if len(A) != len(timeSA):
print("The length of A is not equal length of timeSA")
return None, None
if len(B) != len(timeSB):
print("The length of B is not equal length of timeSB")
return None, None
if nu < 0:
print("nu is negative")
return None, None
# Add padding
A = np.array([0] + list(A))
timeSA = np.array([0] + list(timeSA))
B = np.array([0] + list(B))
timeSB = np.array([0] + list(timeSB))
n = len(A)
m = len(B)
# Dynamical programming
DP = np.zeros((n, m))
# Initialize DP Matrix and set first row and column to infinity
DP[0, :] = np.inf
DP[:, 0] = np.inf
DP[0, 0] = 0
# Compute minimal cost
for i in range(1, n):
for j in range(1, m):
# Calculate and save cost of various operations
C = np.ones((3, 1)) * np.inf
# Deletion in A
C[0] = (
DP[i - 1, j]
+ Dlp(A[i - 1], A[i])
+ nu * (timeSA[i] - timeSA[i - 1])
+ _lambda
)
# Deletion in B
C[1] = (
DP[i, j - 1]
+ Dlp(B[j - 1], B[j])
+ nu * (timeSB[j] - timeSB[j - 1])
+ _lambda
)
# Keep data points in both time series
C[2] = (
DP[i - 1, j - 1]
+ Dlp(A[i], B[j])
+ Dlp(A[i - 1], B[j - 1])
+ nu * (abs(timeSA[i] - timeSB[j]) + abs(timeSA[i - 1] - timeSB[j - 1]))
)
# Choose the operation with the minimal cost and update DP Matrix
DP[i, j] = np.min(C)
distance = DP[n - 1, m - 1]
self.M = DP
self.decision_scores_.append((start_index, distance))
return distance

View File

@ -0,0 +1,359 @@
# -*- coding: utf-8 -*-
"""Classes of feature mapping for model type B
"""
import numpy as np
# import matplotlib.pyplot as plt
# import random
# from arch import arch_model
import pandas as pd
import math
# import pmdarima as pm
# from pmdarima import model_selection
# import os
# import dis
# import statistics
# from sklearn import metrics
# import sklearn
from tsfresh import extract_features
from statsmodels.tsa.seasonal import seasonal_decompose
# import itertools
# import functools
import warnings
from builtins import range
# from collections import defaultdict
from numpy.linalg import LinAlgError
# from scipy.signal import cwt, find_peaks_cwt, ricker, welch
# from scipy.stats import linregress
# from statsmodels.tools.sm_exceptions import MissingDataError
with warnings.catch_warnings():
# Ignore warnings of the patsy package
warnings.simplefilter("ignore", DeprecationWarning)
from statsmodels.tsa.ar_model import AR
# from statsmodels.tsa.stattools import acf, adfuller, pacf
from hurst import compute_Hc
class Window:
""" The class for rolling window feature mapping.
The mapping converts the original timeseries X into a matrix.
The matrix consists of rows of sliding windows of original X.
"""
def __init__(self, window = 100):
self.window = window
self.detector = None
def convert(self, X):
n = self.window
X = pd.Series(X)
L = []
if n == 0:
df = X
else:
for i in range(n):
L.append(X.shift(i))
df = pd.concat(L, axis = 1)
df = df.iloc[n-1:]
return df
class tf_Stat:
'''statisitc feature extraction using the tf_feature package.
It calculates 763 features in total so it might be over complicated for some models.
Recommend to use for methods like Isolation Forest which randomly picks a feature
and then perform the classification. To use for other distance-based model like KNN,
LOF, CBLOF, etc, first train to pass a function that give weights to individual features so that
inconsequential features won't cloud the important ones (mean, variance, kurtosis, etc).
'''
def __init__(self, window = 100, step = 25):
self.window = window
self.step = step
self.detector = None
def convert(self, X):
window = self.window
step = self.step
pos = math.ceil(window/2)
#step <= window
length = X.shape[0]
Xd = pd.DataFrame(X)
Xd.columns = pd.Index(['x'], dtype='object')
Xd['id'] = 1
Xd['time'] = Xd.index
test = np.array(extract_features(Xd.iloc[0+pos-math.ceil(window/2):0+pos + math.floor(window/2)], column_id="id", column_sort="time", column_kind=None, column_value=None).fillna(0))
M = np.zeros((length - window, test.shape[1]+1 ))
i = 0
while i + window <= M.shape[0]:
M[i:i+step, 0]= X[pos + i: pos + i + step]
vector = np.array(extract_features(Xd.iloc[i+pos-math.ceil(window/2):i+pos + math.floor(window/2)], column_id="id", column_sort="time", column_kind=None, column_value=None).fillna(0))
M[i:i+step, 1:] = vector
i+= step
num = M.shape[0]
if i < num:
M[i: num, 0]= X[pos + i: pos + num]
M[i: num, 1:] = np.array(extract_features(Xd.iloc[i+pos-math.ceil(window/2):], column_id="id", column_sort="time", column_kind=None, column_value=None).fillna(0))
return M
class Stat:
'''statisitc feature extraction.
Features include [mean, variance, skewness, kurtosis, autocorrelation, maximum,
minimum, entropy, seasonality, hurst component, AR coef]
'''
def __init__(self, window = 100, data_step = 10, param = [{"coeff": 0, "k": 5}], lag = 1, freq = 720):
self.window = window
self.data_step = data_step
self.detector = None
self.param = param
self.lag = lag
self.freq =freq
if data_step > int(window/2):
raise ValueError('value step shoudm\'t be greater than half of the window')
def convert(self, X):
freq = self.freq
n = self.window
data_step = self.data_step
X = pd.Series(X)
L = []
if n == 0:
df = X
raise ValueError('window lenght is set to zero')
else:
for i in range(n):
L.append(X.shift(i))
df = pd.concat(L, axis = 1)
df = df.iloc[n:]
df2 = pd.concat(L[:data_step], axis = 1)
df = df.reset_index()
#value
x0 = df2[math.ceil(n/2) : - math.floor(n/2)].reset_index()
#mean
x1 = (df.mean(axis=1))
#variance
x2 = df.var(axis=1)
#AR-coef
self.ar_function = lambda x: self.ar_coefficient(x)
x3 = df.apply(self.ar_function, axis =1, result_type='expand' )
#autocorrelation
self.auto_function = lambda x: self.autocorrelation(x)
x4 = df.apply(self.auto_function, axis =1, result_type='expand' )
#kurtosis
x5 = (df.kurtosis(axis=1))
#skewness
x6 = (df.skew(axis=1))
#maximum
x7 = (df.max(axis=1))
#minimum
x8 = (df.min(axis=1))
#entropy
self.entropy_function = lambda x: self.sample_entropy(x)
x9 = df.apply(self.entropy_function, axis =1, result_type='expand')
#seasonality
result = seasonal_decompose(X, model='additive', freq = freq, extrapolate_trend='freq')
#seasonal
x10 = pd.Series(np.array(result.seasonal[math.ceil(n/2) : - math.floor(n/2)]))
#trend
x11 = pd.Series(np.array(result.trend[math.ceil(n/2) : - math.floor(n/2)]))
#resid
x12 = pd.Series(np.array(result.resid[math.ceil(n/2) : - math.floor(n/2)]))
#Hurst component
self.hurst_function = lambda x: self.hurst_f(x)
x13 = df.apply(self.hurst_function, axis =1, result_type='expand')
L = [x0, x1,x2,x3,x4,x5,x6,x7,x8,x9,x10,x11,x12, x13]
M = pd.concat(L, axis = 1)
M = M.drop(columns=['index'])
return M
def ar_coefficient(self, x):
"""
This feature calculator fits the unconditional maximum likelihood
of an autoregressive AR(k) process.
The k parameter is the maximum lag of the process
.. math::
X_{t}=\\varphi_0 +\\sum _{{i=1}}^{k}\\varphi_{i}X_{{t-i}}+\\varepsilon_{t}
For the configurations from param which should contain the maxlag "k" and such an AR process is calculated. Then
the coefficients :math:`\\varphi_{i}` whose index :math:`i` contained from "coeff" are returned.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param param: contains dictionaries {"coeff": x, "k": y} with x,y int
:type param: list
:return x: the different feature values
:return type: pandas.Series
"""
calculated_ar_params = {}
param = self.param
x_as_list = list(x)
res = {}
for parameter_combination in param:
k = parameter_combination["k"]
p = parameter_combination["coeff"]
column_name = "coeff_{}__k_{}".format(p, k)
if k not in calculated_ar_params:
try:
calculated_AR = AR(x_as_list)
calculated_ar_params[k] = calculated_AR.fit(maxlag=k, solver="mle").params
except (LinAlgError, ValueError):
calculated_ar_params[k] = [np.NaN] * k
mod = calculated_ar_params[k]
if p <= k:
try:
res[column_name] = mod[p]
except IndexError:
res[column_name] = 0
else:
res[column_name] = np.NaN
L = [(key, value) for key, value in res.items()]
L0 = []
for item in L:
L0.append(item[1])
return L0
def autocorrelation(self, x):
"""
Calculates the autocorrelation of the specified lag, according to the formula [1]
.. math::
\\frac{1}{(n-l)\\sigma^{2}} \\sum_{t=1}^{n-l}(X_{t}-\\mu )(X_{t+l}-\\mu)
where :math:`n` is the length of the time series :math:`X_i`, :math:`\\sigma^2` its variance and :math:`\\mu` its
mean. `l` denotes the lag.
.. rubric:: References
[1] https://en.wikipedia.org/wiki/Autocorrelation#Estimation
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param lag: the lag
:type lag: int
:return: the value of this feature
:return type: float
"""
lag = self.lag
# This is important: If a series is passed, the product below is calculated
# based on the index, which corresponds to squaring the series.
if isinstance(x, pd.Series):
x = x.values
if len(x) < lag:
return np.nan
# Slice the relevant subseries based on the lag
y1 = x[:(len(x) - lag)]
y2 = x[lag:]
# Subtract the mean of the whole series x
x_mean = np.mean(x)
# The result is sometimes referred to as "covariation"
sum_product = np.sum((y1 - x_mean) * (y2 - x_mean))
# Return the normalized unbiased covariance
v = np.var(x)
if np.isclose(v, 0):
return np.NaN
else:
return sum_product / ((len(x) - lag) * v)
def _into_subchunks(self, x, subchunk_length, every_n=1):
"""
Split the time series x into subwindows of length "subchunk_length", starting every "every_n".
For example, the input data if [0, 1, 2, 3, 4, 5, 6] will be turned into a matrix
0 2 4
1 3 5
2 4 6
with the settings subchunk_length = 3 and every_n = 2
"""
len_x = len(x)
assert subchunk_length > 1
assert every_n > 0
# how often can we shift a window of size subchunk_length over the input?
num_shifts = (len_x - subchunk_length) // every_n + 1
shift_starts = every_n * np.arange(num_shifts)
indices = np.arange(subchunk_length)
indexer = np.expand_dims(indices, axis=0) + np.expand_dims(shift_starts, axis=1)
return np.asarray(x)[indexer]
def sample_entropy(self, x):
"""
Calculate and return sample entropy of x.
.. rubric:: References
| [1] http://en.wikipedia.org/wiki/Sample_Entropy
| [2] https://www.ncbi.nlm.nih.gov/pubmed/10843903?dopt=Abstract
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
x = np.array(x)
# if one of the values is NaN, we can not compute anything meaningful
if np.isnan(x).any():
return np.nan
m = 2 # common value for m, according to wikipedia...
tolerance = 0.2 * np.std(x) # 0.2 is a common value for r, according to wikipedia...
# Split time series and save all templates of length m
# Basically we turn [1, 2, 3, 4] into [1, 2], [2, 3], [3, 4]
xm = self._into_subchunks(x, m)
# Now calculate the maximum distance between each of those pairs
# np.abs(xmi - xm).max(axis=1)
# and check how many are below the tolerance.
# For speed reasons, we are not doing this in a nested for loop,
# but with numpy magic.
# Example:
# if x = [1, 2, 3]
# then xm = [[1, 2], [2, 3]]
# so we will substract xm from [1, 2] => [[0, 0], [-1, -1]]
# and from [2, 3] => [[1, 1], [0, 0]]
# taking the abs and max gives us:
# [0, 1] and [1, 0]
# as the diagonal elements are always 0, we substract 1.
B = np.sum([np.sum(np.abs(xmi - xm).max(axis=1) <= tolerance) - 1 for xmi in xm])
# Similar for computing A
xmp1 = self._into_subchunks(x, m + 1)
A = np.sum([np.sum(np.abs(xmi - xmp1).max(axis=1) <= tolerance) - 1 for xmi in xmp1])
# Return SampEn
return -np.log(A / B)
def hurst_f(self, x):
H,c, M = compute_Hc(x)
return [H, c]

View File

@ -0,0 +1,440 @@
from sklearn import metrics
import numpy as np
import math
# import matplotlib.pyplot as plt
class metricor:
def __init__(self, a = 1, probability = True, bias = 'flat', ):
self.a = a
self.probability = probability
self.bias = bias
def detect_model(self, model, label, contamination = 0.1, window = 100, is_A = False, is_threshold = True):
if is_threshold:
score = self.scale_threshold(model.decision_scores_, model._mu, model._sigma)
else:
score = self.scale_contamination(model.decision_scores_, contamination = contamination)
if is_A is False:
scoreX = np.zeros(len(score)+window)
scoreX[math.ceil(window/2): len(score)+window - math.floor(window/2)] = score
else:
scoreX = score
self.score_=scoreX
L = self.metric(label, scoreX)
return L
def labels_conv(self, preds):
'''return indices of predicted anomaly
'''
# p = np.zeros(len(preds))
index = np.where(preds >= 0.5)
return index[0]
def labels_conv_binary(self, preds):
'''return predicted label
'''
p = np.zeros(len(preds))
index = np.where(preds >= 0.5)
p[index[0]] = 1
return p
def w(self, AnomalyRange, p):
MyValue = 0
MaxValue = 0
start = AnomalyRange[0]
AnomalyLength = AnomalyRange[1] - AnomalyRange[0] + 1
for i in range(start, start +AnomalyLength):
bi = self.b(i, AnomalyLength)
MaxValue += bi
if i in p:
MyValue += bi
return MyValue/MaxValue
def Cardinality_factor(self, Anomolyrange, Prange):
score = 0
start = Anomolyrange[0]
end = Anomolyrange[1]
for i in Prange:
if i[0] >= start and i[0] <= end:
score +=1
elif start >= i[0] and start <= i[1]:
score += 1
elif end >= i[0] and end <= i[1]:
score += 1
elif start >= i[0] and end <= i[1]:
score += 1
if score == 0:
return 0
else:
return 1/score
def b(self, i, length):
bias = self.bias
if bias == 'flat':
return 1
elif bias == 'front-end bias':
return length - i + 1
elif bias == 'back-end bias':
return i
else:
if i <= length/2:
return i
else:
return length - i + 1
def scale_threshold(self, score, score_mu, score_sigma):
return (score >= (score_mu + 3*score_sigma)).astype(int)
def metric_new(self, label, score, plot_ROC=False, alpha=0.2,coeff=3):
'''input:
Real labels and anomaly score in prediction
output:
AUC,
Precision,
Recall,
F-score,
Range-precision,
Range-recall,
Range-Fscore,
Precison@k,
k is chosen to be # of outliers in real labels
'''
if np.sum(label) == 0:
print('All labels are 0. Label must have groud truth value for calculating AUC score.')
return None
if np.isnan(score).any() or score is None:
print('Score must not be none.')
return None
#area under curve
auc = metrics.roc_auc_score(label, score)
# plor ROC curve
if plot_ROC:
fpr, tpr, thresholds = metrics.roc_curve(label, score)
# display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc)
# display.plot()
#precision, recall, F
preds = score > (np.mean(score)+coeff*np.std(score))
if np.sum(preds) == 0:
preds = score > (np.mean(score)+2*np.std(score))
if np.sum(preds) == 0:
preds = score > (np.mean(score)+1*np.std(score))
Precision, Recall, F, Support = metrics.precision_recall_fscore_support(label, preds, zero_division=0)
precision = Precision[1]
recall = Recall[1]
f = F[1]
#range anomaly
Rrecall, ExistenceReward, OverlapReward = self.range_recall_new(label, preds, alpha)
Rprecision = self.range_recall_new(preds, label, 0)[0]
if Rprecision + Rrecall==0:
Rf=0
else:
Rf = 2 * Rrecall * Rprecision / (Rprecision + Rrecall)
# top-k
k = int(np.sum(label))
threshold = np.percentile(score, 100 * (1-k/len(label)))
# precision_at_k = metrics.top_k_accuracy_score(label, score, k)
p_at_k = np.where(preds > threshold)[0]
TP_at_k = sum(label[p_at_k])
precision_at_k = TP_at_k/k
L = [auc, precision, recall, f, Rrecall, ExistenceReward, OverlapReward, Rprecision, Rf, precision_at_k]
if plot_ROC:
return L, fpr, tpr
return L
def metric_PR(self, label, score):
precision, recall, thresholds = metrics.precision_recall_curve(label, score)
# plt.figure()
# disp = metrics.PrecisionRecallDisplay(precision=precision, recall=recall)
# disp.plot()
AP = metrics.auc(recall, precision)
#AP = metrics.average_precision_score(label, score)
return precision, recall, AP
def range_recall_new(self, labels, preds, alpha):
p = np.where(preds == 1)[0] # positions of predicted label==1
range_pred = self.range_convers_new(preds)
range_label = self.range_convers_new(labels)
Nr = len(range_label) # total # of real anomaly segments
ExistenceReward = self.existence_reward(range_label, p)
OverlapReward = 0
for i in range_label:
OverlapReward += self.w(i, p) * self.Cardinality_factor(i, range_pred)
score = alpha * ExistenceReward + (1-alpha) * OverlapReward
if Nr != 0:
return score/Nr, ExistenceReward/Nr, OverlapReward/Nr
else:
return 0,0,0
def range_convers_new(self, label):
'''
input: arrays of binary values
output: list of ordered pair [[a0,b0], [a1,b1]... ] of the inputs
'''
L = []
i = 0
j = 0
while j < len(label):
# print(i)
while label[i] == 0:
i+=1
if i >= len(label):
break
j = i+1
# print('j'+str(j))
if j >= len(label):
if j==len(label):
L.append((i,j-1))
break
while label[j] != 0:
j+=1
if j >= len(label):
L.append((i,j-1))
break
if j >= len(label):
break
L.append((i, j-1))
i = j
return L
def existence_reward(self, labels, preds):
'''
labels: list of ordered pair
preds predicted data
'''
score = 0
for i in labels:
if np.sum(np.multiply(preds <= i[1], preds >= i[0])) > 0:
score += 1
return score
def num_nonzero_segments(self, x):
count=0
if x[0]>0:
count+=1
for i in range(1, len(x)):
if x[i]>0 and x[i-1]==0:
count+=1
return count
def extend_postive_range(self, x, window=5):
label = x.copy().astype(float)
L = self.range_convers_new(label) # index of non-zero segments
length = len(label)
for k in range(len(L)):
s = L[k][0]
e = L[k][1]
x1 = np.arange(e,min(e+window//2,length))
label[x1] += np.sqrt(1 - (x1-e)/(window))
x2 = np.arange(max(s-window//2,0),s)
label[x2] += np.sqrt(1 - (s-x2)/(window))
label = np.minimum(np.ones(length), label)
return label
def extend_postive_range_individual(self, x, percentage=0.2):
label = x.copy().astype(float)
L = self.range_convers_new(label) # index of non-zero segments
length = len(label)
for k in range(len(L)):
s = L[k][0]
e = L[k][1]
l0 = int((e-s+1)*percentage)
x1 = np.arange(e,min(e+l0,length))
label[x1] += np.sqrt(1 - (x1-e)/(2*l0))
x2 = np.arange(max(s-l0,0),s)
label[x2] += np.sqrt(1 - (s-x2)/(2*l0))
label = np.minimum(np.ones(length), label)
return label
def TPR_FPR_RangeAUC(self, labels, pred, P, L):
product = labels * pred
TP = np.sum(product)
# recall = min(TP/P,1)
P_new = (P+np.sum(labels))/2 # so TPR is neither large nor small
# P_new = np.sum(labels)
recall = min(TP/P_new,1)
# recall = TP/np.sum(labels)
# print('recall '+str(recall))
existence = 0
for seg in L:
if np.sum(product[seg[0]:(seg[1]+1)])>0:
existence += 1
existence_ratio = existence/len(L)
# print(existence_ratio)
# TPR_RangeAUC = np.sqrt(recall*existence_ratio)
# print(existence_ratio)
TPR_RangeAUC = recall*existence_ratio
FP = np.sum(pred) - TP
# TN = np.sum((1-pred) * (1-labels))
# FPR_RangeAUC = FP/(FP+TN)
N_new = len(labels) - P_new
FPR_RangeAUC = FP/N_new
Precision_RangeAUC = TP/np.sum(pred)
return TPR_RangeAUC, FPR_RangeAUC, Precision_RangeAUC
def RangeAUC(self, labels, score, window=0, percentage=0, plot_ROC=False, AUC_type='window'):
# AUC_type='window'/'percentage'
score_sorted = -np.sort(-score)
P = np.sum(labels)
# print(np.sum(labels))
if AUC_type=='window':
labels = self.extend_postive_range(labels, window=window)
else:
labels = self.extend_postive_range_individual(labels, percentage=percentage)
# print(np.sum(labels))
L = self.range_convers_new(labels)
TPR_list = [0]
FPR_list = [0]
Precision_list = [1]
for i in np.linspace(0, len(score)-1, 250).astype(int):
threshold = score_sorted[i]
# print('thre='+str(threshold))
pred = score>= threshold
TPR, FPR, Precision = self.TPR_FPR_RangeAUC(labels, pred, P,L)
TPR_list.append(TPR)
FPR_list.append(FPR)
Precision_list.append(Precision)
TPR_list.append(1)
FPR_list.append(1) # otherwise, range-AUC will stop earlier than (1,1)
tpr = np.array(TPR_list)
fpr = np.array(FPR_list)
prec = np.array(Precision_list)
width = fpr[1:] - fpr[:-1]
height = (tpr[1:] + tpr[:-1])/2
AUC_range = np.sum(width*height)
width_PR = tpr[1:-1] - tpr[:-2]
height_PR = (prec[1:] + prec[:-1])/2
AP_range = np.sum(width_PR*height_PR)
if plot_ROC:
return AUC_range, AP_range, fpr, tpr, prec
return AUC_range
# TPR_FPR_window
def RangeAUC_volume(self, labels_original, score, windowSize):
score_sorted = -np.sort(-score)
tpr_3d=[]
fpr_3d=[]
prec_3d=[]
auc_3d=[]
ap_3d=[]
window_3d = np.arange(0, windowSize+1, 1)
P = np.sum(labels_original)
for window in window_3d:
labels = self.extend_postive_range(labels_original, window)
# print(np.sum(labels))
L = self.range_convers_new(labels)
TPR_list = [0]
FPR_list = [0]
Precision_list = [1]
for i in np.linspace(0, len(score)-1, 250).astype(int):
threshold = score_sorted[i]
# print('thre='+str(threshold))
pred = score>= threshold
TPR, FPR, Precision = self.TPR_FPR_RangeAUC(labels, pred, P,L)
TPR_list.append(TPR)
FPR_list.append(FPR)
Precision_list.append(Precision)
TPR_list.append(1)
FPR_list.append(1) # otherwise, range-AUC will stop earlier than (1,1)
tpr = np.array(TPR_list)
fpr = np.array(FPR_list)
prec = np.array(Precision_list)
tpr_3d.append(tpr)
fpr_3d.append(fpr)
prec_3d.append(prec)
width = fpr[1:] - fpr[:-1]
height = (tpr[1:] + tpr[:-1])/2
AUC_range = np.sum(width*height)
auc_3d.append(AUC_range)
width_PR = tpr[1:-1] - tpr[:-2]
height_PR = (prec[1:] + prec[:-1])/2
AP_range = np.sum(width_PR*height_PR)
ap_3d.append(AP_range)
return tpr_3d, fpr_3d, prec_3d, window_3d, sum(auc_3d)/len(window_3d), sum(ap_3d)/len(window_3d)
def generate_curve(label,score,slidingWindow):
tpr_3d, fpr_3d, prec_3d, window_3d, avg_auc_3d, avg_ap_3d = metricor().RangeAUC_volume(labels_original=label, score=score, windowSize=1*slidingWindow)
X = np.array(tpr_3d).reshape(1,-1).ravel()
X_ap = np.array(tpr_3d)[:,:-1].reshape(1,-1).ravel()
Y = np.array(fpr_3d).reshape(1,-1).ravel()
W = np.array(prec_3d).reshape(1,-1).ravel()
Z = np.repeat(window_3d, len(tpr_3d[0]))
Z_ap = np.repeat(window_3d, len(tpr_3d[0])-1)
return Y, Z, X, X_ap, W, Z_ap,avg_auc_3d, avg_ap_3d

View File

@ -0,0 +1,24 @@
from statsmodels.tsa.stattools import acf
from scipy.signal import argrelextrema
import numpy as np
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
# determine sliding window (period) based on ACF
def find_length(data):
if len(data.shape)>1:
return 0
data = data[:min(20000, len(data))]
base = 3
auto_corr = acf(data, nlags=400, fft=True)[base:]
local_max = argrelextrema(auto_corr, np.greater)[0]
try:
max_local_max = np.argmax([auto_corr[lcm] for lcm in local_max])
if local_max[max_local_max]<3 or local_max[max_local_max]>300:
return 125
return local_max[max_local_max]+base
except:
return 125

View File

@ -1,3 +1,6 @@
# Revised by Yiyuan Yang on 2023/10/28
# There are some differences between the original code and the revised code.
import torch
import torch.nn as nn
import torch.nn.functional as F
@ -24,7 +27,7 @@ class DCdetector(BaseDeepAD):
def __init__(self, seq_len=100, stride=1, lr=0.0001, epochs=5, batch_size=128,
epoch_steps=20, prt_steps=1, device='cuda',
n_heads=1, d_model=256, e_layers=3, patch_size=None,
verbose=2, random_state=42):
verbose=2, random_state=42, threshold_ = 1):
super(DCdetector, self).__init__(
model_name='DCdetector', data_type='ts', epochs=epochs, batch_size=batch_size, lr=lr,
seq_len=seq_len, stride=stride,
@ -38,8 +41,8 @@ class DCdetector(BaseDeepAD):
self.n_heads = n_heads
self.d_model = d_model
self.e_layers = e_layers
self.threshold = threshold_
self.criterion = nn.MSELoss()
return
def fit(self, X, y=None):
@ -59,27 +62,25 @@ class DCdetector(BaseDeepAD):
self.model.train()
for e in range(self.epochs):
loss = self.training(dataloader)
print(f'Epoch {e + 1},\t L1 = {loss}')
print(f'Epoch {e + 1}')
self.decision_scores_ = self.decision_function(X)
self.labels_ = self._process_decision_scores()
return
def decision_function(self, X, return_rep=False):
seqs = get_sub_seqs(X, seq_len=self.seq_len, stride=1)
dataloader = DataLoader(seqs, batch_size=self.batch_size,
shuffle=False, drop_last=False)
shuffle=False, drop_last=True)
self.model.eval()
loss, _ = self.inference(dataloader) # (n,d)
loss_final = np.mean(loss, axis=1) # (n,)
padding_list = np.zeros([X.shape[0]-loss.shape[0], loss.shape[1]])
loss_pad = np.concatenate([padding_list, loss], axis=0)
loss_final_pad = np.hstack([0 * np.ones(X.shape[0] - loss_final.shape[0]), loss_final])
return loss_final_pad
def training(self, dataloader):
loss_list = []
@ -125,7 +126,84 @@ class DCdetector(BaseDeepAD):
return np.average(loss_list)
def inference(self, dataloader):
temperature = 50
temperature = 10 #hyperparameter need to be tuned
attens_energy = []
preds = []
for input_data in dataloader: # test_set
input = input_data.float().to(self.device)
series, prior = self.model(input)
series_loss = 0.0
prior_loss = 0.0
for u in range(len(prior)):
if u == 0:
series_loss = my_kl_loss(series[u], (
prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)).detach()) * temperature
prior_loss = my_kl_loss(
(prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)),
series[u].detach()) * temperature
else:
series_loss += my_kl_loss(series[u], (
prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)).detach()) * temperature
prior_loss += my_kl_loss(
(prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)),
series[u].detach()) * temperature
metric = torch.softmax((-series_loss - prior_loss), dim=-1)
cri = metric.detach().cpu().numpy()
attens_energy.append(cri)
attens_energy = np.concatenate(attens_energy, axis=0) # anomaly scores
test_energy = np.array(attens_energy) # anomaly scores
return test_energy, preds # (n,d)
def predict(self, X, return_confidence=True):
seqs = get_sub_seqs(X, seq_len=self.seq_len, stride=1)
dataloader = DataLoader(seqs, batch_size=self.batch_size,
shuffle=False, drop_last=True)
attens_energy = []
for input_data in dataloader: # threhold
input = input_data.float().to(self.device)
series, prior = self.model(input)
series_loss = 0.0
prior_loss = 0.0
for u in range(len(prior)):
if u == 0:
series_loss = my_kl_loss(series[u], (
prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)).detach())
prior_loss = my_kl_loss(
(prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)),
series[u].detach())
else:
series_loss += my_kl_loss(series[u], (
prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)).detach())
prior_loss += my_kl_loss(
(prior[u] / torch.unsqueeze(torch.sum(prior[u], dim=-1), dim=-1).repeat(1, 1, 1,
self.seq_len)),
series[u].detach())
metric = torch.softmax((-series_loss - prior_loss), dim=-1)
cri = metric.detach().cpu().numpy()
attens_energy.append(cri)
attens_energy = np.concatenate(attens_energy, axis=0).reshape(-1)
test_energy = np.array(attens_energy)
thresh = np.percentile(test_energy, 100 - self.threshold) #hyperparameter need to be tuned
print('Threshold:', thresh)
temperature = 10 #hyperparameter need to be tuned
attens_energy = []
preds = []
@ -158,7 +236,19 @@ class DCdetector(BaseDeepAD):
attens_energy = np.concatenate(attens_energy, axis=0) # anomaly scores
test_energy = np.array(attens_energy) # anomaly scores
return test_energy, preds # (n,d)
preds = (test_energy > thresh).astype(int)
loss_final = np.mean(test_energy, axis=1) # (n,)
loss_final_pad = np.hstack([0 * np.ones(X.shape[0] - loss_final.shape[0]), loss_final])
preds_final = np.mean(preds, axis=1) # (n,)
preds_final_pad = np.hstack([0 * np.ones(X.shape[0] - preds_final.shape[0]), preds_final])
if return_confidence:
return loss_final_pad, preds_final_pad
else:
return preds_final_pad
def predict(self, X, return_confidence=False):
@ -199,6 +289,11 @@ class DCdetector(BaseDeepAD):
return
# Proposed Model
class Encoder(nn.Module):

View File

@ -32,8 +32,8 @@ class TestDCdetector(unittest.TestCase):
self.yts_test = y
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.clf = DCdetector(seq_len=100, stride=1, epochs=2,
batch_size=32, lr=1e-4, patch_size=[1,2,5],
self.clf = DCdetector(seq_len=90, stride=1, epochs=2,
batch_size=32, lr=1e-4, patch_size=[5],
device=device, random_state=42)
self.clf.fit(self.Xts_train)
@ -53,7 +53,7 @@ class TestDCdetector(unittest.TestCase):
assert_equal(pred_scores.shape[0], self.Xts_test.shape[0])
def test_prediction_labels(self):
pred_labels = self.clf.predict(self.Xts_test)
pred_labels = self.clf.predict(self.Xts_test, return_confidence=False)
assert_equal(pred_labels.shape, self.yts_test.shape)
# def test_prediction_proba(self):