Source code for concentrationMetrics.model

# encoding: utf-8

# (c) 2017-2023 Open Risk (www.openriskmanagement.com), all rights reserved
#
# ConcentrationMetrics is licensed under the MIT license a copy of which is included
# in the source distribution of concentrationMetrics. This is notwithstanding any licenses of
# third-party software included in this distribution. You may not use this file except in
# compliance with the License.
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific language governing permissions and
# limitations under the License.


""" This module provides the key concentrationMetrics objects

* Index_ implements the main index calculation functionality

.. moduleauthor: Open Risk
"""

import os

import numpy as np
import pandas as pd

# ADJUST THIS TO REFLECT YOUR OWN ENVIRONMENT!
# Set the full path including trailing slash
package_name = 'concentrationMetrics'
module_path = os.path.dirname(__file__)
source_path = module_path
dataset_path = source_path + "/datasets/"


[docs] class Index(object): """The concentration _`Index` object provides the main interface to the various index calculations. """ def __init__(self, data=None, index=None, *args): # we only do something if the constructor has been passed some data if data is not None: self.data = data self.index = index self.arguments = args self.results = None results = [] for i in range(data.shape[0]): calc = self.call_method(index, data[i, :], *args) results.append(calc) self.results = results
[docs] def print(self, cols=None): print(self.index) print('--------') if cols is None: for i in self.results: print(i) else: for i in self.results[:cols]: print(i)
# call index by name
[docs] def call_method(self, name, data, *args): return getattr(self, name)(data, *args)
# calculate total size
[docs] def total_size(self, data): return np.sum(data)
[docs] def get_weights(self, data): """Calculate data weights. :param data: Positive numerical data values :type data: numpy array :return: Vector of weights :raise: ValueError if negative values .. _get_weights: """ # try: # # select the first column only # if len(data.shape) > 1: # data = data[:, 0] # # print(data) # except TypeError: # print("Data is not in numpy format") if len(data.shape) > 1: data = data[:, 0] # ts = self.total_size(data) # if ts <= 0: # raise ValueError('Input data vector sum to a positive value') # else: # return np.true_divide(data, ts) if not (data >= 0).all(): raise ValueError('Input data vector must have positive values') else: ts = self.total_size(data) if not ts > 0: raise ValueError('Input data vector must have some non-zero values') else: return np.true_divide(data, ts)
[docs] def cr(self, data, n): """Calculate the Concentration Ratio. :param data: Positive numerical data :type data: numpy array :param n: Integer selecting the top-n entries :type n: int :return: Concentration Ratio (Float) :raise: TypeError if n out of range `Open Risk Manual Entry for Concentration Ratio <https://www.openriskmanual.org/wiki/Concentration_Ratio>`_ """ if n < 0 or n > data.size: raise ValueError('n must be an positive integer smaller than the data size') else: data = np.array(sorted(data, reverse=True)) weights = self.get_weights(data) return weights[:n].sum()
[docs] def berger_parker(self, data): """Calculate the Berger-Parker Index (special version of the Concentration Ratio). :param data: Positive numerical data :type data: numpy array :return: Berger Parker (Float) `Open Risk Manual Entry for Berger-Parker Index <https://www.openriskmanual.org/wiki/Concentration_Ratio>`_ """ return self.cr(data, 1)
[docs] def hhi(self, data, normalized=True, ci=None, samples=None): """Calculate the Herfindahl-Hirschman index. :param normalized: :type normalized: bool :param data: Positive numerical data :type data: numpy array :param ci: confidence interval :type ci: float :return: HHI (Float) `Open Risk Manual Entry for the Hirschman-Herfindahl Index <https://www.openriskmanual.org/wiki/Herfindahl-Hirschman_Index>`_ """ # Normalize the data weights = self.get_weights(data) n = weights.size # Compute the HHI if n == 0: return 0 else: h = np.square(weights).sum() if normalized: return (h - 1.0 / n) / (1.0 - 1.0 / n) else: return h
[docs] def simpson(self, data): """Calculate the Simpson index. :param data: Positive numerical data :type data: numpy array :return: Simpson (Float) `Open Risk Manual Entry for Simpson Index <https://www.openriskmanual.org/wiki/Simpson_Index>`_ """ # Based on the HHI calculation return 1.0 - self.hhi(data, normalized=False, ci=None, samples=None)
[docs] def invsimpson(self, data): """Calculate the Inverse Simpson index. :param data: Positive numerical data :type data: numpy array :return: Inverse Simpson (Float) `Open Risk Manual Entry for Inverse Simpson Index <https://www.openriskmanual.org/wiki/Inverse_Simpson_Index>`_ """ # Based on the HHI calculation return 1.0 / self.hhi(data, normalized=False, ci=None, samples=None)
[docs] def hk(self, data, a): """Calculate the inverted Hannah Kay index. :param data: Positive numerical data :type data: numpy array :param a: Integer index parameter alpha :return: HK (Float) `Open Risk Manual Entry for Hannah Kay Index <https://www.openriskmanual.org/wiki/Hannah_Kay_Index>`_ """ weights = self.get_weights(data) n = weights.size if n == 0: return 0 else: if a <= 0: raise ValueError('Alpha must be strictly positive') elif a == 1: weights_nz = weights[weights != 0] log_weights = np.log(weights_nz) h = np.multiply(weights_nz, log_weights).sum() return np.exp(h) else: h1 = np.power(weights, a).sum() h2 = np.power(h1, 1.0 / (a - 1.0)) return h2
[docs] def hoover(self, data): """Calculate the Hoover index. :param data: Positive numerical data :type data: numpy array :return: Hoover (Float) `Open Risk Manual Entry for Hoover Index <https://www.openriskmanual.org/wiki/Hoover_Index>`_ """ weights = self.get_weights(data) n = weights.size if n == 0: return 0 else: return 0.5 * np.absolute(weights - 1.0 / n).sum()
[docs] def hti(self, data): """Calculate the Hall-Tideman index. :param data: Positive numerical data :type data: numpy array :return: HTI (Float) `Open Risk Manual Entry for Hall-Tideman Index <https://www.openriskmanual.org/wiki/Hall-Tideman_Index>`_ """ data = np.array(sorted(data, reverse=True)) weights = self.get_weights(data) n = weights.size if n == 0: return 0 else: i = np.arange(1, n + 1) return 1.0 / (2.0 * np.multiply(i, weights).sum() - 1.0)
[docs] def gini(self, data): """Calculate the Gini index. :param data: Positive numerical data :type data: numpy array :return: Gini (Float) .. note:: The formula appears also with the opposite sign convention `Open Risk Manual Entry for Gini Index <https://www.openriskmanual.org/wiki/Gini_Index>`_ """ data = np.array(sorted(data, reverse=True)) weights = self.get_weights(data) n = weights.size if n == 0: return 0 else: i = np.arange(1, n + 1) return 1.0 + (1.0 - 2.0 * np.multiply(i, weights).sum()) / n
[docs] def shannon(self, data, normalized=False): """Calculate the Shannon entropy index. :param normalized: :type normalized: bool :param data: Positive numerical data :type data: numpy array :return: Shannon entropy (Float) `Open Risk Manual Entry for Shannon Entropy Index <https://www.openriskmanual.org/wiki/Shannon_Index>`_ """ weights = self.get_weights(data) # remove zero weights weights_nz = weights[weights != 0] n = weights_nz.size if n == 0: return 0 else: log_weights = np.log(weights_nz) h = - np.multiply(weights_nz, log_weights).sum() if normalized: return 1.0 - h / np.log(n) else: return h
[docs] def atkinson(self, data, epsilon): """Calculate the Atkinson inequality index. :param data: Positive numerical data :type data: numpy array :param epsilon: Index parameter :type epsilon: float :return: Atkinson inequality (Float) .. Todo :: Resolve divide by zero when N is very large `Open Risk Manual Entry for Atkinson Index <https://www.openriskmanual.org/wiki/Atkinson_Index>`_ """ weights = self.get_weights(data) n = weights.size if n == 0: return 0 else: if epsilon <= 0: raise ValueError('Epsilon must be strictly positive (>0.0)') elif epsilon == 1: weights_nz = weights[weights != 0] n = weights_nz.size log_weights = np.log(weights_nz) h = log_weights.sum() / n return 1 - n * np.exp(h) else: n2 = np.power(n, epsilon / (epsilon - 1.0)) h1 = np.power(weights, 1.0 - epsilon).sum() h2 = np.power(h1, 1.0 / (1.0 - epsilon)) return 1 - n2 * h2
[docs] def gei(self, data, alpha): """Calculate the Generalized Entropy Index. :param data: Positive numerical data :type data: numpy array :param alpha: Index parameter :return: Generalized Entropy Index (Float) `Open Risk Manual Entry for Generalized Entropy Index <https://www.openriskmanual.org/wiki/Generalized_Entropy_Index>`_ """ weights = self.get_weights(data) n = weights.size if n == 0: return 0 else: if alpha == 0: weights_nz = weights[weights != 0] n = weights_nz.size log_weights = np.log(weights_nz) h = log_weights.sum() / n index = - (np.log(n) + h) elif alpha == 1: weights_nz = weights[weights != 0] n = weights_nz.size log_weights = np.log(weights_nz) h = np.multiply(weights_nz, log_weights).sum() index = np.log(n) + h else: n2 = np.power(n, alpha) h1 = n2 * np.power(weights, alpha).sum() - n index = h1 / n / alpha / (alpha - 1.0) return index
[docs] def theil(self, data): """Calculate the Theil Index (Generalized Entropy Index for a=1). :param data: Positive numerical data :type data: numpy array :return: Theil Index (Float) `Open Risk Manual Entry for Theil Index <https://www.openriskmanual.org/wiki/Theil_Index>`_ """ weights = self.get_weights(data) return self.gei(weights, 1)
[docs] def kolm(self, data, alpha): """Calculate the Kolm index. :param data: Positive numerical data :type data: numpy array :param alpha: Index parameter :return: Kolm Index (Float) `Open Risk Manual Entry for Kolm Index <https://www.openriskmanual.org/wiki/Kolm_Index>`_ """ n = data.size if n == 0: return 0 else: mu = data.mean() weights = self.get_weights(data) - mu n_weights = np.multiply(alpha, weights) h = np.exp(n_weights).sum() return (np.log(h) - np.log(n)) / alpha
[docs] def ellison_glaeser(self, data, na, ni): """Ellison and Glaeser (1997) indexes of industrial concentration. .. note:: Implemented as in equation (5) of the original reference .. note:: Input data are a data frame of three columns of the following type: +-----------+------------+------------+ | Exposure | Area | Industry | +-----------+------------+------------+ | Float | Categorical| Categorical| +-----------+------------+------------+ .. note:: The ordering of the columns is important. The index is not symmetric with respect to area and industry factors :param data: exposure data :type data: pandas dataframe :param na: number of areas :type na: integer :param ni: number of industries :type ni: integer :return: EG Indexes (list) `Open Risk Manual Entry for Ellison-Glaeser Index <https://www.openriskmanual.org/wiki/Ellison_Glaeser_Index>`_ """ # # Group by area # # Compute area totals and total exposure area_groups = data.groupby(['Area']).sum() area_totals = area_groups['Exposure'].values total_exposure = area_totals.sum() # Compute area fraction of total xa = area_totals / total_exposure # Compute area HHI hhi_g = self.hhi(area_totals, normalized=False) # # Group by industry # industry_groups = data.groupby(['Industry']) hhi_i = [] industry_totals = [] total = 0 eg_indexes = [] s = np.zeros((ni, na)) for industry_index, group in industry_groups: # compute industry totals # x = group['Exposure'].as_matrix() x = group['Exposure'].values i_total = x.sum() total += i_total industry_totals.append(i_total) # compute industry specific HHI hhi_i_val = self.hhi(x, normalized=False) hhi_i.append(hhi_i_val) # compute industry/area fraction of industry total # group industry group further by area and sum up industry_group = pd.DataFrame(group).groupby('Area').sum() # The aggregated area values for this industry ig_values = industry_group.values[:, 0] # The index of the aggregated values ai = list(industry_group.index) # for all area and industry pairs with non-zero exposure for a in range(len(ai)): share = ig_values[a] / i_total s[industry_index, ai[a]] = share # compute EG industry concentration index egi = 0 for a in range(len(ai)): egi += (s[industry_index, a] - xa[a]) ** 2 val = hhi_i[industry_index] # original EG formula scaled so that uniform distribution has zero gi gi = egi / (1.0 - hhi_g) / (1 - val) eg_indexes.append(gi) return eg_indexes
[docs] def compute(self, data, *args, ci=None, samples=None, index='hhi'): """Compute bootstrapped confidence interval estimates. :param data: :param args: :param ci: :param samples: :param index: :return: """ # Actual value of the index value = self.call_method(index, data, *args) if ci is not None: sample_values = [] for s in range(samples): sample_data = np.random.choice(data, size=len(data), replace=True) sample_values.append(self.call_method(index, sample_data, *args)) values = np.array(sample_values) values.sort() lower_bound_index = int((1.0 - ci) * samples) upper_bound_index = int(ci * samples) lower_bound = values[lower_bound_index] upper_bound = values[upper_bound_index] return lower_bound, value, upper_bound else: return value
[docs] def describe(self, index): # TODO Semantic documentation print(index) return
[docs] def margalev(self, data): """Calculate the Margalev index. :param data: Categorical data :type data: list :return: D (Float) `Open Risk Manual Entry for Margalev Index <https://www.openriskmanual.org/wiki/Margalef_Index>`_ """ n = len(data) s = len(list(set(data))) if n == 0: return 0 else: return (s - 1) / np.log(n)
[docs] def tango(self, data): """Calculate the Tango temporal clustering index. :param data: Categorical data :type data: list :return: D (Float) `Open Risk Manual Entry for Tango Clustering Index <https://www.openriskmanual.org/wiki/Tango_Index>`_ """ pass
[docs] def graph_density(self, adjacency_matrix): """Calculate the Graph Density of an Adjacency Matrix. :param adjacency_matrix: :type adjacency_matrix: matrix :return: D (Float) `Open Risk Manual Entry for Graph Density <https://www.openriskmanual.org/wiki/Graph_Density>`_ """ pass
[docs] def network_entropy(self, adjacency_matrix): """Calculate the Network Entropy of an Adjacency Matrix. :param adjacency_matrix: :type adjacency_matrix: matrix :return: D (Float) `Open Risk Manual Entry for Network Entropy <https://www.openriskmanual.org/wiki/Network_Entropy>`_ """ pass
[docs] def global_clustering(self, adjacency_matrix): """Calculate the Global Clustering Coefficient of an Adjacency Matrix. :param adjacency_matrix: :type adjacency_matrix: matrix :return: D (Float) `Open Risk Manual Entry for Global Clustering <https://www.openriskmanual.org/wiki/Global_Clustering>`_ """ pass
[docs] def average_clustering(self, adjacency_matrix): """Calculate the Average Clustering Coefficient of an Adjacency Matrix. :param adjacency_matrix: :type adjacency_matrix: matrix :return: D (Float) `Open Risk Manual Entry for Average Clustering <https://www.openriskmanual.org/wiki/Average_Clustering>`_ """ pass