# -*- coding: utf-8 -*-
""" Transfromation of Data
"""
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d, InterpolatedUnivariateSpline
from scipy.interpolate import UnivariateSpline
import scipy
[docs]def get_optimal_bin_size(n):
"""
This function calculates the optimal amount of bins for the number of events n.
:param n: number of Events
:return: optimal bin size
"""
return int(2 * n**(1/3.0))
[docs]def get_average_in_bins(n):
return n/float(get_optimal_bin_size(n))
[docs]class MySpline():
""" can be pickled
"""
def __init__(self, x, y):
self.x = x
self.y = y
def __call__(self, x, *args, **kwargs):
return np.interp(x, self.x, self.y)
[docs]class CDF(Transform):
"""
Calculates the cummulative distribution (CDF)
"""
def __init__(self, *args):
Transform.__init__(self, "CDF", *args)
self.spline = None
def _fit(self, x, y=None):
# self.io.debug("Fitting CDF")
self.y = np.linspace(0, 100, 2*self.n_bins)
self.x = pd.Series(np.percentile(x, list(self.y)))
# # Count same values
# vc = self.x.value_counts()
# vc = vc.sort_index()
self.spline = MySpline(self.x, self.y)
[docs]class ToFlat(Transform):
"""
This transformation uses the CDF to transform input data to a
flat transformation.
"""
def __init__(self, x=None, *args):
Transform.__init__(self, "Flat", *args)
self.cdf = CDF(*args)
if x is not None:
self.fit(x)
def _fit(self, x, y=None):
self.cdf.fit(x)
[docs]class ToGauss(Transform):
def __init__(self, *args):
Transform.__init__(self, "Gauss", *args)
self.flat = ToFlat(*args)
def _fit(self, x, y=None):
self.flat.fit(x)
[docs]class MapTo(Transform):
""" Linear map to some values
"""
def __init__(self, fromlow, fromhigh, tolow, tohigh, limit=False, *args):
Transform.__init__(self, "MapTo", *args)
self.x_diff = float(fromlow - tolow)
self.len_from = float(fromhigh - fromlow)
self.len_to = float(tohigh - tolow)
[docs]class To11(Transform):
def __init__(self, *args):
Transform.__init__(self, "Scaled -1 1", *args)
[docs]class ToNorm(Transform):
def __init__(self):
Transform.__init__(self,"Normalised")
self.mean = 0
self.var = 1
def _fit(self, x, y=None):
self.mean = np.mean(x)
self.var = np.var(x)
if self.var is 0 or np.nan:
self.var = 1
# class Pipe(Transform):
# """ Pipeline for the transform functions """
# def __init__(self, *args):
# Transform.__init__(self, "Piped", *args)
# self.functions = []
# def add(self, f):
# if f in self.functions:
# # self.io.warn("Function already in Pipeline!")
# return
# self.functions.append(f)
# def _fit(self, x, y=None):
# xx = x.copy()
# for f in self.functions:
# f.fit(xx)
# xx = f.transform(xx)
# self.is_processed = True
# def transform(self, x):
# xx = x.copy()
# for f in self.functions:
# xx = f.transform(xx)
# return xx
# def present(self, x, y):
# for f in self.functions:
# print(f)
# class ToPurity(Transform):
# def __init__(self, n_bins = None):
# Transform.__init__(self, "Purity", n_bins)
# self.flat = ToFlat(self.n_bins)
# self.spline = None
# self.purity = []
# self.purity_err = []
# self.bincenters = None
# def _fit(self, x, y=None):
# if y is None:
# return
# self.flat.fit(x)
# y1,x1 = np.histogram(self.flat.transform(x[y == 1]), self.n_bins)
# y0,x0 = np.histogram(self.flat.transform(x[y == 0]), x1)
# n_events_in_bin = y1 + y0
# self.purity = np.array(y1/(y1+y0*1.0))
# self.purity_err = (self.purity*(1-self.purity))/(n_events_in_bin*1.0)
# weight = np.array(1/(np.sqrt(self.purity_err*1.0) + 0.00001)) # Jo that's not good
# bincenters = np.array(0.5*(x1[1:]+x1[:-1]))
# bincenters[0] = 0
# bincenters[self.n_bins -1] = 1
# nan_values = np.isnan(self.purity)
# if len(nan_values[nan_values == True]) > 0:
# self.io.warn(str(len(bincenters)) + 'Nan Values in Spline: ' + str(len(nan_values[nan_values == True])))
# self.spline = UnivariateSpline(bincenters[~nan_values], self.purity[~nan_values], w=weight[~nan_values]/float(len(nan_values[nan_values == True])+1))
# self.bincenters = bincenters
# self.is_processed = True
# def transform(self, x):
# return self.spline(self.flat.transform(x))
[docs]class ToRawPurity(Transform):
def __init__(self, n_bins=None):
Transform.__init__(self, "RawPurity", n_bins)
self.purity = []
self.pur_err = []
def _fit(self, x, y=None):
if y is None:
return
x1 = pd.value_counts(x[y==1])
n_events_in_bin = pd.value_counts(x)
self.purity = x1 / (n_events_in_bin)
self.purity[self.purity.isnull()] = 0
self.purity_err = (self.purity*(1-self.purity))/(n_events_in_bin*1.0)