Top

ml_models.anomaly_detector module

ml_models.anomaly_detector ~~~~~~~~~~~~~ Functions for ...

:copyright: © 2019 Niels Goet @ PCC Project

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
    ml_models.anomaly_detector
    ~~~~~~~~~~~~~
    Functions for ...

    :copyright: © 2019 Niels Goet @ PCC Project
"""
import multiprocessing

import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import os
import sys
import pyprind
from typing import List

from pcc_validation import validation_utils

N_CPU = multiprocessing.cpu_count()  # Detect number of available CPUs

def implement_tfidf_cos(df: pd.DataFrame,
                        var_name: str,
                        primary_key: str,
                        id,
                        cosine_distances: List[float],
                        primary_key_vals: List[str]):
    """
    Applies the TFIDF-COS method, i.e. take the average cosine similarity
    between each value and the entire corpus of variable values as a
    measure of "outlierness"

    :param List[str] variable_values: A list of variable values from any of
    the PCC data frames
    :param int id: The process id required for
    :func execute_tfidf_cos_multi_threaded_job:
    :return List[float] cosine_distances: An array of the average cosine
    similarity between a value i and the entire corpus (including itself)
    """
    variable_values = df[var_name].values
    primary_keys = df[primary_key].values
    vectorizer = TfidfVectorizer(analyzer='char')
    corpus_representation = vectorizer.fit_transform(variable_values)

    total = len(variable_values)

    pb_bar = pyprind.ProgBar(total, stream=sys.stdout,
                             title='Implementing TFIDF-COS analysis for {} (process # {})'.format(var_name, id))
    for var_val in range(total):
        var_val_representation = vectorizer.transform([variable_values[var_val]])
        primary_key = primary_keys[var_val]

        cos_val = cosine_similarity(
            var_val_representation,
            corpus_representation
        ).flatten()

        cosine_distances.append(1-np.mean(cos_val))
        primary_key_vals.append(primary_key)

        pb_bar.update()


def execute_tfidf_cos_multi_threaded_job(df, var_name, primary_key):
    """
    Executes :func implement_tfidf_cos: on multiple threads.

    :param List[str] variable_values: A list of variable values from any of
    the PCC data frames
    :return List[float] cosine_distances: An array of the average cosine
    similarity between a value i and the entire corpus (including itself)
    """
    manager = multiprocessing.Manager()
    cosine_distances = manager.list()
    primary_key_vals = manager.list()
    jobs = []

    # Divide up the data int N_CPU chunks
    dfs_list = np.array_split(df, N_CPU)

    for i in range(0, N_CPU):
        process = multiprocessing.Process(target=implement_tfidf_cos,
                                          args=(dfs_list[i],
                                                var_name,
                                                primary_key,
                                                i,
                                                cosine_distances,
                                                primary_key_vals),
                                          name=str(i))

        jobs.append(process)

    # Start the processes
    for j in jobs:
        j.start()
        #print('Job started on CPU #{}'.format(multiprocessing.current_process().name))

    # Ensure all processes have finished
    for j in jobs:
        j.join()

    results_df = pd.DataFrame.from_records({primary_key: primary_key_vals,
                               'tfidf_cos_val': cosine_distances},
                              index=range(len(primary_key_vals)))

    return results_df

Module variables

var N_CPU

Functions

def execute_tfidf_cos_multi_threaded_job(

df, var_name, primary_key)

Executes :func implement_tfidf_cos: on multiple threads.

:param List[str] variable_values: A list of variable values from any of the PCC data frames :return List[float] cosine_distances: An array of the average cosine similarity between a value i and the entire corpus (including itself)

def execute_tfidf_cos_multi_threaded_job(df, var_name, primary_key):
    """
    Executes :func implement_tfidf_cos: on multiple threads.

    :param List[str] variable_values: A list of variable values from any of
    the PCC data frames
    :return List[float] cosine_distances: An array of the average cosine
    similarity between a value i and the entire corpus (including itself)
    """
    manager = multiprocessing.Manager()
    cosine_distances = manager.list()
    primary_key_vals = manager.list()
    jobs = []

    # Divide up the data int N_CPU chunks
    dfs_list = np.array_split(df, N_CPU)

    for i in range(0, N_CPU):
        process = multiprocessing.Process(target=implement_tfidf_cos,
                                          args=(dfs_list[i],
                                                var_name,
                                                primary_key,
                                                i,
                                                cosine_distances,
                                                primary_key_vals),
                                          name=str(i))

        jobs.append(process)

    # Start the processes
    for j in jobs:
        j.start()
        #print('Job started on CPU #{}'.format(multiprocessing.current_process().name))

    # Ensure all processes have finished
    for j in jobs:
        j.join()

    results_df = pd.DataFrame.from_records({primary_key: primary_key_vals,
                               'tfidf_cos_val': cosine_distances},
                              index=range(len(primary_key_vals)))

    return results_df

def implement_tfidf_cos(

df, var_name, primary_key, id, cosine_distances, primary_key_vals)

Applies the TFIDF-COS method, i.e. take the average cosine similarity between each value and the entire corpus of variable values as a measure of "outlierness"

:param List[str] variable_values: A list of variable values from any of the PCC data frames :param int id: The process id required for :func execute_tfidf_cos_multi_threaded_job: :return List[float] cosine_distances: An array of the average cosine similarity between a value i and the entire corpus (including itself)

def implement_tfidf_cos(df: pd.DataFrame,
                        var_name: str,
                        primary_key: str,
                        id,
                        cosine_distances: List[float],
                        primary_key_vals: List[str]):
    """
    Applies the TFIDF-COS method, i.e. take the average cosine similarity
    between each value and the entire corpus of variable values as a
    measure of "outlierness"

    :param List[str] variable_values: A list of variable values from any of
    the PCC data frames
    :param int id: The process id required for
    :func execute_tfidf_cos_multi_threaded_job:
    :return List[float] cosine_distances: An array of the average cosine
    similarity between a value i and the entire corpus (including itself)
    """
    variable_values = df[var_name].values
    primary_keys = df[primary_key].values
    vectorizer = TfidfVectorizer(analyzer='char')
    corpus_representation = vectorizer.fit_transform(variable_values)

    total = len(variable_values)

    pb_bar = pyprind.ProgBar(total, stream=sys.stdout,
                             title='Implementing TFIDF-COS analysis for {} (process # {})'.format(var_name, id))
    for var_val in range(total):
        var_val_representation = vectorizer.transform([variable_values[var_val]])
        primary_key = primary_keys[var_val]

        cos_val = cosine_similarity(
            var_val_representation,
            corpus_representation
        ).flatten()

        cosine_distances.append(1-np.mean(cos_val))
        primary_key_vals.append(primary_key)

        pb_bar.update()