Merge pull request #37 from nuhdv/main

I add the method FLAD. The test code has not yet been debugged, and a new version will be submitted later
This commit is contained in:
Hongzuo Xu 2023-10-22 09:01:43 +08:00 committed by GitHub
commit 2bbd126550
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 621 additions and 0 deletions

View File

@ -0,0 +1,481 @@
"""
FLAD is adapted from https://github.com/nuhdv/FLAD
"""
import numpy as np
import torch
from torch.utils.data import DataLoader
from deepod.core.base_model import BaseDeepAD
import torch.nn as nn
from torch.nn.utils import weight_norm
import math
class FLAD(BaseDeepAD):
def __init__(self, seq_len=100, stride=1, epochs=10, batch_size=32, lr=1e-3,
rep_dim=32, hidden_dims=32, kernel_size=3, act='ReLU', num_channels=[64, 128, 256], bias=False, dropout=0.2,
epoch_steps=-1, prt_steps=10, device='cuda',
verbose=2, random_state=42):
super(FLAD, self).__init__(
model_name='FLAD', data_type='ts', epochs=epochs, batch_size=batch_size, lr=lr,
seq_len=seq_len, stride=stride,
epoch_steps=epoch_steps, prt_steps=prt_steps, device=device,
verbose=verbose, random_state=random_state
)
self.hidden_dims = hidden_dims
self.rep_dim = rep_dim
self.kernel_size = kernel_size
self.dropout = dropout
self.act = act
self.bias = bias
self.num_channels = num_channels
return
def get_model(self, inputs):
backbone_dict, decoder_dict = {}, {}
cross_stitch_kwargs = {'alpha': 0.8, 'beta': 0.2, 'stages': ['layer1', 'layer2', 'layer3'],
'channels': {'layer1': 64, 'layer2': 128, 'layer3': 256},
'num_channels': self.num_channels}
TCN = SingleTaskModel(TCN_encoder(num_inputs=inputs, num_channels=self.num_channels, kernel_size=self.kernel_size, dropout=0.2),
TCN_decoder(num_inputs=inputs, num_channels=list(reversed(self.num_channels)), kernel_size=self.kernel_size, dropout=0.2), 'reconstruct')
TCN = torch.nn.DataParallel(TCN)
backbone_dict['reconstruct'] = TCN.module.encoder
decoder_dict['reconstruct'] = TCN.module.decoder
Trans = SingleTaskModel(Trans_encoder(num_inputs=inputs, feature_size=self.num_channels[-1], num_channels=self.num_channels, num_layers=1, dropout=0.1),
Trans_decoder(num_inputs=inputs, feature_size=self.num_channels[-1], num_layers=1, dropout=0.1), 'predict')
Trans = torch.nn.DataParallel(Trans)
backbone_dict['predict'] = Trans.module.encoder
decoder_dict['predict'] = Trans.module.decoder
model = CrossStitchNetwork(['reconstruct', 'predict'], torch.nn.ModuleDict(backbone_dict), torch.nn.ModuleDict(decoder_dict), **cross_stitch_kwargs)
return model
def training_prepare(self, X, y):
train_loader = DataLoader(X, batch_size=self.batch_size, shuffle=True)
net = self.get_model(self.n_features)
criterion = torch.nn.MSELoss(reduction="mean")
if self.verbose >= 2:
print(net)
return train_loader, net, criterion
def inference_prepare(self, X):
test_loader = DataLoader(X, batch_size=self.batch_size,
drop_last=False, shuffle=False)
self.criterion.reduction = 'none'
return test_loader
def training_forward(self, batch_x, net, criterion):
ts_batch = batch_x.float().to(self.device)
output, _ = net(ts_batch)
loss = criterion(output[:, -1], ts_batch[:, -1])
return loss
def inference_forward(self, batch_x, net, criterion):
batch_x = batch_x.float().to(self.device)
output, _ = net(batch_x)
error = torch.nn.L1Loss(reduction='none')(output[:, -1], batch_x[:, -1])
error = torch.sum(error, dim=1)
return output, error
class SingleTaskModel(nn.Module):
""" Single-task baseline model with encoder + decoder """
def __init__(self, encoder: nn.Module, decoder: nn.Module, task: str):
super(SingleTaskModel, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.task = task
def forward(self, x):
out_size = x.size()[2:]
out = self.decoder(self.backbone(x))
return {self.task: F.interpolate(out, out_size, mode='bilinear')}
class ChannelWiseMultiply(nn.Module):
def __init__(self, num_channels):
super(ChannelWiseMultiply, self).__init__()
self.param = nn.Parameter(torch.FloatTensor(num_channels), requires_grad=True)
def init_value(self, value):
with torch.no_grad():
self.param.data.fill_(value)
def forward(self, x):
return torch.mul(self.param.view(1, -1, 1), x)
class CrossStitchUnit(nn.Module):
def __init__(self, tasks, num_channels, alpha, beta):
super(CrossStitchUnit, self).__init__()
self.cross_stitch_unit = nn.ModuleDict(
{t: nn.ModuleDict({t: ChannelWiseMultiply(num_channels) for t in tasks}) for t in tasks})
for t_i in tasks:
for t_j in tasks:
if t_i == t_j:
self.cross_stitch_unit[t_i][t_j].init_value(alpha)
else:
self.cross_stitch_unit[t_i][t_j].init_value(beta)
def forward(self, task_features):
out = {}
for t_i in task_features.keys():
prod = torch.stack([self.cross_stitch_unit[t_i][t_j](task_features[t_j]) for t_j in task_features.keys()])
out[t_i] = torch.sum(prod, dim=0)
return out
class CrossStitchNetwork(nn.Module):
"""
Implementation of cross-stitch networks.
We insert a cross-stitch unit, to combine features from the task-specific backbones
after every stage.
Argument:
backbone:
nn.ModuleDict object which contains pre-trained task-specific backbones.
{task: backbone for task in p.TASKS.NAMES}
heads:
nn.ModuleDict object which contains the task-specific heads.
{task: head for task in p.TASKS.NAMES}
stages:
list of stages where we instert a cross-stitch unit between the task-specific backbones.
Note: the backbone modules require a method 'forward_stage' to get feature representations
at the respective stages.
channels:
dict which contains the number of channels in every stage
alpha, beta:
floats for initializing cross-stitch units (see paper)
"""
def __init__(self, TASKS, backbone: nn.ModuleDict, heads: nn.ModuleDict,
stages: list, channels: dict, alpha: float, beta: float, num_channels: list):
super(CrossStitchNetwork, self).__init__()
# Tasks, backbone and heads
self.tasks = TASKS
self.backbone = backbone
self.heads = heads
self.stages = stages
# Cross-stitch units
self.cross_stitch = nn.ModuleDict(
{stage: CrossStitchUnit(self.tasks, channels[stage], alpha, beta) for stage in stages})
def forward(self, x, tgt):
x = x.permute(0, 2, 1)
x = {task: x for task in self.tasks} # Feed as input to every single-task network
# Backbone
for stage in self.stages:
x['reconstruct'] = self.backbone['reconstruct'].forward_stage(x['reconstruct'], stage)
x['predict'], w = self.backbone['predict'].forward_stage(x['predict'], stage)
# Cross-stitch the task-specific features
x = self.cross_stitch[stage](x)
out = {task: self.heads[task](x[task], tgt) for task in self.tasks}
return out
class Chomp1d(nn.Module):
def __init__(self, chomp_size):
super(Chomp1d, self).__init__()
self.chomp_size = chomp_size
def forward(self, x):
return x[:, :, :-self.chomp_size].contiguous()
class pad1d(nn.Module):
def __init__(self, pad_size):
super(pad1d, self).__init__()
self.pad_size = pad_size
def forward(self, x):
return torch.cat([x, x[:, :, -self.pad_size:]], dim = 2).contiguous()
class TemporalBlockTranspose(nn.Module):
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding,
dropout=0.2):
super(TemporalBlockTranspose, self).__init__()
self.conv1 = weight_norm(nn.ConvTranspose1d(n_inputs, n_outputs, kernel_size,
stride=stride, padding=padding,
dilation=dilation))
self.pad1 = pad1d(padding)
self.relu1 = nn.ReLU()
self.dropout1 = nn.Dropout(dropout)
self.conv2 = weight_norm(nn.ConvTranspose1d(n_outputs, n_outputs, kernel_size,
stride=stride, padding=padding,
dilation=dilation))
self.pad2 = pad1d(padding)
self.relu2 = nn.ReLU()
self.dropout2 = nn.Dropout(dropout)
self.net = nn.Sequential(self.dropout1, self.relu1, self.pad1, self.conv1,
self.dropout2, self.relu2, self.pad2, self.conv2)
self.downsample = nn.ConvTranspose1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
self.relu = nn.ReLU()
self.init_weights()
def init_weights(self):
self.conv1.weight.data.normal_(0, 0.01)
self.conv2.weight.data.normal_(0, 0.01)
if self.downsample is not None:
self.downsample.weight.data.normal_(0, 0.01)
def forward(self, x):
out = self.net(x)
res = x if self.downsample is None else self.downsample(x)
return self.relu(out + res)
class TemporalBlock(nn.Module):
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
super(TemporalBlock, self).__init__()
self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation))
self.chomp1 = Chomp1d(padding)
self.relu1 = nn.ReLU()
self.dropout1 = nn.Dropout(dropout)
self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation))
self.chomp2 = Chomp1d(padding)
self.relu2 = nn.ReLU()
self.dropout2 = nn.Dropout(dropout)
self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
self.conv2, self.chomp2, self.relu2, self.dropout2)
self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
self.relu = nn.ReLU()
self.init_weights()
def init_weights(self):
self.conv1.weight.data.normal_(0, 0.01)
self.conv2.weight.data.normal_(0, 0.01)
if self.downsample is not None:
self.downsample.weight.data.normal_(0, 0.01)
def forward(self, x):
out = self.net(x)
res = x if self.downsample is None else self.downsample(x)
return self.relu(out + res)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.src_mask = None
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def _generate_square_subsequent_mask(self, sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
def forward(self, x):
if self.src_mask is None or self.src_mask.size(0) != len(x):
device = x.device
mask = self._generate_square_subsequent_mask(len(x)).to(device)
self.src_mask = mask
return self.dropout(x + self.pe[:x.size(0), :])
class TokenEmbedding(nn.Module):
def __init__(self, c_in, d_model):
super(TokenEmbedding, self).__init__()
padding = 1 if torch.__version__>='1.5.0' else 2
self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
kernel_size=3, padding=padding, padding_mode='circular')
for m in self.modules():
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight,mode='fan_in',nonlinearity='leaky_relu')
def forward(self, x):
x = self.tokenConv(x)
return x.permute(2, 0, 1)
class TCN_encoder(nn.Module):
def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
super(TCN_encoder, self).__init__()
self.layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation_size = 2 ** i
in_channels = num_inputs if i == 0 else num_channels[i-1]
out_channels = num_channels[i]
self.layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
padding=(kernel_size-1) * dilation_size, dropout=dropout)]
self.network = nn.Sequential(*self.layers)
def forward_stage(self, x, stage):
assert (stage in ['layer1', 'layer2', 'layer3', 'layer4'])
if stage == 'layer1':
x = self.layers[0](x)
return x
elif stage == 'layer2':
x = self.layers[1](x)
return x
elif stage == 'layer3':
x = self.layers[2](x)
return x
def forward(self, x):
out = x.permute(0, 2, 1)
return self.network(out)
class TCN_decoder(nn.Module):
def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
super(TCN_decoder, self).__init__()
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
# no dilation in decoder
in_channels = num_channels[i]
out_channels = num_inputs if i == (num_levels - 1) else num_channels[i + 1]
dilation_size = 2 ** (num_levels - 1 - i)
padding_size = (kernel_size - 1) * dilation_size
layers += [TemporalBlockTranspose(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
padding=padding_size, dropout=dropout)]
self.network = nn.Sequential(*layers)
self.fcn = nn.Sequential(nn.Linear(num_channels[0], num_inputs), nn.Sigmoid())
def forward(self, x, tgt):
out = self.network(x)
out = out.permute(0, 2, 1)
return out[:, -1].view(out.shape[0], 1, out.shape[2])
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=16, dropout=0.1):
super(TransformerEncoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = nn.LeakyReLU(True)
def forward(self, src, src_mask=None, src_key_padding_mask=None):
src2, weight = self.self_attn(src, src, src, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)
src = src + self.dropout1(src2)
src = self.norm1(src)
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)
return src, weight
class TransformerDecoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=16, dropout=0.1):
super(TransformerDecoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
self.activation = nn.LeakyReLU(True)
def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask,
key_padding_mask=tgt_key_padding_mask)[0]
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)
tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask,
key_padding_mask=memory_key_padding_mask)[0]
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
tgt = tgt + self.dropout3(tgt2)
tgt = self.norm3(tgt)
return tgt
class Trans_encoder(nn.Module):
def __init__(self, num_inputs, feature_size=512, num_channels=[64, 128, 256], num_layers=1, dropout=0.1):
super(Trans_encoder, self).__init__()
self.src_mask = None
self.embedding = TokenEmbedding(c_in=num_inputs * 2, d_model=feature_size)
# self.embed = TokenEmbedding(c_in=num_inputs, d_model=feature_size)
self.pos_encoder = PositionalEncoding(feature_size, dropout=0.1)
# self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=feature_size, dropout=dropout)
self.encoder_layer = TransformerEncoderLayer(d_model=feature_size, nhead=feature_size, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
self.layer1 = self._make_layer(inputs=num_inputs, feature_size=num_channels[0], num_layers=num_layers,
dropout=dropout)
self.layer2 = self._make_layer(inputs=num_channels[0], feature_size=num_channels[1], num_layers=num_layers,
dropout=dropout)
self.layer3 = self._make_layer(inputs=num_channels[1], feature_size=num_channels[2], num_layers=num_layers,
dropout=dropout)
def _make_layer(self, inputs, feature_size, num_layers, dropout):
# layers = []
embedding = TokenEmbedding(c_in=inputs, d_model=feature_size)
pos_encoder = PositionalEncoding(feature_size, dropout=0.1)
encoder_layer = TransformerEncoderLayer(d_model=feature_size, nhead=16, dropout=dropout)
transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
return nn.Sequential(embedding, pos_encoder, transformer_encoder)
def forward_stage(self, x, stage):
assert(stage in ['layer1', 'layer2', 'layer3'])
layer = getattr(self, stage)
x ,w = layer(x)
return x.permute(1, 2, 0), w
def forward(self, src, c):
src = self.embedding(torch.cat((src, c), dim=2))
src = src.permute(1, 0, 2)
if self.src_mask is None or self.src_mask.size(0) != len(src):
device = src.device
mask = self._generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask
src = self.pos_encoder(src)
output = self.transformer_encoder(src, self.src_mask)
return output.permute(1, 2, 0)
class Trans_decoder(nn.Module):
def __init__(self, num_inputs, feature_size=512, num_layers=1, dropout=0.1):
super(Trans_decoder, self).__init__()
self.embed = TokenEmbedding(c_in=num_inputs, d_model=feature_size)
decoder_layer = TransformerDecoderLayer(d_model=feature_size, nhead=16, dropout=dropout)
self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
self.decoder = nn.Linear(feature_size, num_inputs)
self.fcn = nn.Sequential(nn.Linear(feature_size, num_inputs), nn.Sigmoid())
def forward(self, output, tgt):
tgt = tgt.permute(0, 2, 1)
out = self.transformer_decoder(self.embed(tgt), output.permute(2, 0, 1))
out = self.decoder(out)
return out.permute(1, 0, 2)[:, -1].view(out.shape[1], 1, out.shape[2])

140
deepod/test/test_flad.py Normal file
View File

@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import print_function
import os
import sys
import unittest
# noinspection PyProtectedMember
from numpy.testing import assert_equal
import torch
import pandas as pd
# temporary solution for relative imports in case pyod is not installed
# if deepod is installed, no need to use the following line
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from deepod.models.time_series.flad import FLAD
class TestFLAD(unittest.TestCase):
def setUp(self):
train_file = 'data/omi-1/omi-1_train.csv'
test_file = 'data/omi-1/omi-1_test.csv'
train_df = pd.read_csv(train_file, sep=',', index_col=0)
test_df = pd.read_csv(test_file, index_col=0)
y = test_df['label'].values
train_df, test_df = train_df.drop('label', axis=1), test_df.drop('label', axis=1)
self.Xts_train = train_df.values
self.Xts_test = test_df.values
self.yts_test = y
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.clf = FLAD(seq_len=100, stride=5,
epochs=5, hidden_dims=50,
device=device, random_state=42)
self.clf.fit(self.Xts_train)
def test_parameters(self):
assert (hasattr(self.clf, 'decision_scores_') and
self.clf.decision_scores_ is not None)
assert (hasattr(self.clf, 'labels_') and
self.clf.labels_ is not None)
assert (hasattr(self.clf, 'threshold_') and
self.clf.threshold_ is not None)
def test_train_scores(self):
assert_equal(len(self.clf.decision_scores_), self.Xts_train.shape[0])
def test_prediction_scores(self):
pred_scores = self.clf.decision_function(self.Xts_test)
assert_equal(pred_scores.shape[0], self.Xts_test.shape[0])
def test_prediction_labels(self):
pred_labels = self.clf.predict(self.Xts_test)
assert_equal(pred_labels.shape, self.yts_test.shape)
# def test_prediction_proba(self):
# pred_proba = self.clf.predict_proba(self.X_test)
# assert (pred_proba.min() >= 0)
# assert (pred_proba.max() <= 1)
#
# def test_prediction_proba_linear(self):
# pred_proba = self.clf.predict_proba(self.X_test, method='linear')
# assert (pred_proba.min() >= 0)
# assert (pred_proba.max() <= 1)
#
# def test_prediction_proba_unify(self):
# pred_proba = self.clf.predict_proba(self.X_test, method='unify')
# assert (pred_proba.min() >= 0)
# assert (pred_proba.max() <= 1)
#
# def test_prediction_proba_parameter(self):
# with assert_raises(ValueError):
# self.clf.predict_proba(self.X_test, method='something')
def test_prediction_labels_confidence(self):
pred_labels, confidence = self.clf.predict(self.Xts_test, return_confidence=True)
assert_equal(pred_labels.shape, self.yts_test.shape)
assert_equal(confidence.shape, self.yts_test.shape)
assert (confidence.min() >= 0)
assert (confidence.max() <= 1)
# def test_prediction_proba_linear_confidence(self):
# pred_proba, confidence = self.clf.predict_proba(self.X_test,
# method='linear',
# return_confidence=True)
# assert (pred_proba.min() >= 0)
# assert (pred_proba.max() <= 1)
#
# assert_equal(confidence.shape, self.y_test.shape)
# assert (confidence.min() >= 0)
# assert (confidence.max() <= 1)
#
# def test_fit_predict(self):
# pred_labels = self.clf.fit_predict(self.X_train)
# assert_equal(pred_labels.shape, self.y_train.shape)
#
# def test_fit_predict_score(self):
# self.clf.fit_predict_score(self.X_test, self.y_test)
# self.clf.fit_predict_score(self.X_test, self.y_test,
# scoring='roc_auc_score')
# self.clf.fit_predict_score(self.X_test, self.y_test,
# scoring='prc_n_score')
# with assert_raises(NotImplementedError):
# self.clf.fit_predict_score(self.X_test, self.y_test,
# scoring='something')
#
# def test_predict_rank(self):
# pred_socres = self.clf.decision_function(self.X_test)
# pred_ranks = self.clf._predict_rank(self.X_test)
#
# # assert the order is reserved
# assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=3)
# assert_array_less(pred_ranks, self.X_train.shape[0] + 1)
# assert_array_less(-0.1, pred_ranks)
#
# def test_predict_rank_normalized(self):
# pred_socres = self.clf.decision_function(self.X_test)
# pred_ranks = self.clf._predict_rank(self.X_test, normalized=True)
#
# # assert the order is reserved
# assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=3)
# assert_array_less(pred_ranks, 1.01)
# assert_array_less(-0.1, pred_ranks)
# def test_plot(self):
# os, cutoff1, cutoff2 = self.clf.explain_outlier(ind=1)
# assert_array_less(0, os)
# def test_model_clone(self):
# clone_clf = clone(self.clf)
def tearDown(self):
pass
if __name__ == '__main__':
unittest.main()