Source code for mergeron.gen.enforcement_stats

"""Methods to format and print summary statistics on merger enforcement patterns."""

import enum
from collections.abc import Mapping

import numpy as np
from scipy.interpolate import make_interp_spline  # type: ignore

from .. import VERSION, ArrayBIGINT, Enameled, this_yaml  # noqa: TID252
from ..core import TABLE_TYPES, TTL_KEY, INVData, INVTableData  # noqa: TID252
from . import INVResolution

__version__ = VERSION


@this_yaml.register_class
@enum.unique
[docs] class IndustryGroup(str, Enameled): """Industry group of reported markets."""
[docs] ALL = "All Markets"
[docs] GRO = "Grocery Markets"
[docs] OIL = "Oil Markets"
[docs] CHM = "Chemical Markets"
[docs] PHM = "Pharmaceuticals Markets"
[docs] HOS = "Hospital Markets"
[docs] EDS = "Electronically-Controlled Devices and Systems Markets"
[docs] BRD = "Branded Consumer Goods Markets"
[docs] OTH = '"Other" Markets'
[docs] IIC = "Industries in Common"
@this_yaml.register_class @enum.unique
[docs] class OtherEvidence(str, Enameled): """Additional evidence available, if any, for reported markets."""
[docs] HOT = "Hot Documents Identified"
[docs] NHT = "No Hot Documents Identified"
[docs] HTU = "No Evidence on Hot Documents"
[docs] NCC = "No Strong Customer Complaints"
[docs] SCC = "Strong Customer Complaints"
[docs] CCU = "No Evidence on Customer Complaints"
[docs] END = "Entry Difficult"
[docs] EEY = "Entry Easy"
[docs] EEU = "No Entry Evidence"
[docs] UNR = "Unrestricted on additional evidence"
@this_yaml.register_class @enum.unique
[docs] class StatsGrpSelector(str, Enameled): """Measure used to summarize investigations data."""
[docs] FC = "ByFirmCount"
[docs] HD = "ByHHIandDelta"
[docs] DL = "ByDelta"
[docs] ZN = "ByConcZone"
@this_yaml.register_class @enum.unique
[docs] class StatsReturnSelector(str, Enameled): """Statistics to report on investigations data."""
[docs] CNT = "count"
[docs] RPT = "rate, point"
[docs] RIN = "rate, interval"
@this_yaml.register_class @enum.unique
[docs] class SortSelector(str, Enameled): """Sort order for reporting investigations data."""
[docs] UCH = "unchanged"
[docs] REV = "reversed"
# Parameters and functions to interpolate selected HHI and ΔHHI values # recorded in fractions to ranges of values in points on the HHI scale
[docs] HHI_DELTA_KNOTS = np.array([0, 100, 200, 300, 500, 800, 1200, 2500, 5001], int)
[docs] HHI_POST_ZONE_KNOTS = np.array([0, 1800, 2400, 10001], int)
hhi_delta_ranger, hhi_zone_post_ranger = ( make_interp_spline(_f / 1e4, _f, k=0) for _f in (HHI_DELTA_KNOTS, HHI_POST_ZONE_KNOTS) )
[docs] HMG_PRESUMPTION_ZONE_MAP: dict[int, dict[int, tuple[int, int, int]]] = { HHI_POST_ZONE_KNOTS[0]: { HHI_DELTA_KNOTS[0]: (0, 0, 0), HHI_DELTA_KNOTS[1]: (0, 0, 0), HHI_DELTA_KNOTS[2]: (0, 0, 0), }, HHI_POST_ZONE_KNOTS[1]: { HHI_DELTA_KNOTS[0]: (0, 1, 1), HHI_DELTA_KNOTS[1]: (1, 1, 2), HHI_DELTA_KNOTS[2]: (1, 1, 2), }, HHI_POST_ZONE_KNOTS[2]: { HHI_DELTA_KNOTS[0]: (0, 2, 1), HHI_DELTA_KNOTS[1]: (1, 2, 3), HHI_DELTA_KNOTS[2]: (2, 2, 4), }, }
[docs] ZONE_VALS = np.unique( np.vstack([ tuple(HMG_PRESUMPTION_ZONE_MAP[_k].values()) for _k in HMG_PRESUMPTION_ZONE_MAP ]), axis=0, )
[docs] ZONE_STRINGS = { 0: R"Green Zone (Safeharbor)", 1: R"Yellow Zone", 2: R"Red Zone (SLC Presumption)", TTL_KEY: "TOTAL", }
[docs] ZONE_DETAIL_STRINGS_HHI = { 0: Rf"HHI < {HHI_POST_ZONE_KNOTS[1]} pts.", 1: R"HHI ∈ [{}, {}) pts. and ".format(*HHI_POST_ZONE_KNOTS[1:3]), 2: Rf"HHI ⩾ {HHI_POST_ZONE_KNOTS[2]} pts. and ", }
[docs] ZONE_DETAIL_STRINGS_DELTA = { 0: "", 1: Rf"ΔHHI < {HHI_DELTA_KNOTS[1]} pts.", 2: Rf"ΔHHI ⩾ {HHI_DELTA_KNOTS[1]} pts.}}", 3: R"ΔHHI ∈ [{}, {}) pts.".format(*HHI_DELTA_KNOTS[1:3]), 4: Rf"ΔHHI ⩾ {HHI_DELTA_KNOTS[2]} pts.", }
[docs] def enf_cnts_obs_by_group( _invdata_array_dict: INVData, _study_period: str, _table_ind_grp: IndustryGroup, _table_evid_cond: OtherEvidence, _stats_group: StatsGrpSelector, _enf_spec: INVResolution, /, ) -> ArrayBIGINT: """Summarize investigations data by reporting group. Parameters ---------- _invdata_array_dict raw investigations data _study_period study period _table_ind_grp industry group _table_evid_cond additional evidence _stats_group grouping measure _enf_spec enforcement specification (see, :class:`mergeron.gen.INVResolution`) Returns ------- ArrayBIGINT Counts of markets resolved as enforced, cleared, or both, respectively. """ if _stats_group == StatsGrpSelector.HD: raise ValueError( f"Clearance/enforcement statistics, '{_stats_group}' not valied here." ) match _stats_group: case StatsGrpSelector.FC: cnts_func = enf_cnts_byfirmcount cnts_listing_func = enf_cnts_obs_byfirmcount case StatsGrpSelector.DL: cnts_func = enf_cnts_bydelta cnts_listing_func = enf_cnts_obs_byhhianddelta case StatsGrpSelector.ZN: cnts_func = enf_cnts_byconczone cnts_listing_func = enf_cnts_obs_byhhianddelta return cnts_func( cnts_listing_func( _invdata_array_dict, _study_period, _table_ind_grp, _table_evid_cond, _enf_spec, ) )
[docs] def enf_cnts_obs_byfirmcount( _data_array_dict: INVData, _data_period: str = "1996-2003", _table_ind_group: IndustryGroup = IndustryGroup.ALL, _table_evid_cond: OtherEvidence = OtherEvidence.UNR, _enf_spec: INVResolution = INVResolution.ENFT, /, ) -> ArrayBIGINT: """Summarize investigations data by firm count. Parameters ---------- _data_array_dict raw investigations data _data_period data period _table_ind_group industry group _table_evid_cond additional evidence _enf_spec enforcement specification (see, :class:`mergeron.gen.INVResolution`) Returns ------- ArrayBIGINT Counts of markets resolved as enforced, cleared, or both, respectively, reported by number of pre-merger firms. """ if _data_period not in _data_array_dict: raise ValueError( f"Invalid value of data period, {f'"{_data_period}"'}." f"Must be one of, {tuple(_data_array_dict.keys())!r}." ) data_array_dict_sub = _data_array_dict[_data_period][TABLE_TYPES[1]] table_no_ = table_no_lku(data_array_dict_sub, _table_ind_group, _table_evid_cond) cnts_array = data_array_dict_sub[table_no_].data_array ndim_in = 1 stats_kept_indxs = [] match _enf_spec: case INVResolution.CLRN: stats_kept_indxs = [-1, -2] case INVResolution.ENFT: stats_kept_indxs = [-1, -3] case INVResolution.BOTH: stats_kept_indxs = [-1, -3, -2] return np.hstack([cnts_array[:, :ndim_in], cnts_array[:, stats_kept_indxs]])
[docs] def enf_cnts_obs_byhhianddelta( _data_array_dict: INVData, _data_period: str = "1996-2003", _table_ind_group: IndustryGroup = IndustryGroup.ALL, _table_evid_cond: OtherEvidence = OtherEvidence.UNR, _enf_spec: INVResolution = INVResolution.ENFT, /, ) -> ArrayBIGINT: """Summarize investigations data by HHI and ΔHHI. Parameters ---------- _data_array_dict raw investigations data _data_period data period _table_ind_group industry group _table_evid_cond additional evidence _enf_spec enforcement specification (see, :class:`mergeron.gen.INVResolution`) Returns ------- ArrayBIGINT Counts of markets resolved as enforced, cleared, or both, respectively, reported by HHI and ΔHHI. """ if _data_period not in _data_array_dict: raise ValueError( f"Invalid value of data period, {f'"{_data_period}"'}." f"Must be one of, {tuple(_data_array_dict.keys())!r}." ) data_array_dict_sub = _data_array_dict[_data_period][TABLE_TYPES[0]] table_no_ = table_no_lku(data_array_dict_sub, _table_ind_group, _table_evid_cond) cnts_array = data_array_dict_sub[table_no_].data_array ndim_in = 2 stats_kept_indxs = [] match _enf_spec: case INVResolution.CLRN: stats_kept_indxs = [-1, -2] case INVResolution.ENFT: stats_kept_indxs = [-1, -3] case INVResolution.BOTH: stats_kept_indxs = [-1, -3, -2] return np.hstack([cnts_array[:, :ndim_in], cnts_array[:, stats_kept_indxs]])
[docs] def table_no_lku( _data_array_dict_sub: Mapping[str, INVTableData], _table_ind_group: IndustryGroup = IndustryGroup.ALL, _table_evid_cond: OtherEvidence = OtherEvidence.UNR, /, ) -> str: """Lookup table number based on industry group and additional evidence.""" if _table_evid_cond not in ( _egl := [ _data_array_dict_sub[_v].additional_evidence for _v in _data_array_dict_sub ] ): raise ValueError( f"Invalid value for additional evidence, {f'"{_table_evid_cond}"'}." f"Must be one of {_egl!r}" ) if _table_ind_group not in ( _igl := [_data_array_dict_sub[_v].industry_group for _v in _data_array_dict_sub] ): raise ValueError( f"Invalid value for industry group, {f'"{_table_ind_group}"'}." f"Must be one of {_igl!r}" ) tno_ = next( _t for _t in _data_array_dict_sub if all(( _data_array_dict_sub[_t].industry_group == _table_ind_group, _data_array_dict_sub[_t].additional_evidence == _table_evid_cond, )) ) return tno_
[docs] def enf_cnts_byfirmcount(_raw_counts: ArrayBIGINT, /) -> ArrayBIGINT: """Summarize investigations data by firm count. Parameters ---------- _raw_counts raw investigations data array Returns ------- ArrayBIGINT Subtotals for columns other than the first, grouped by the first column. """ if not _raw_counts[:, 0].any(): return np.array([], int) ndim_in = 1 return np.vstack([ np.concatenate([ (_i,), np.einsum( "ij->j", _raw_counts[_raw_counts[:, 0] == _i][:, ndim_in:], dtype=int ), ]) for _i in np.unique(_raw_counts[:, 0]) ])
[docs] def enf_cnts_bydelta(_raw_counts: ArrayBIGINT, /) -> ArrayBIGINT: """Summarize investigations data by ΔHHI. Parameters ---------- _raw_counts raw investigations data array Returns ------- ArrayBIGINT Subtotals for columns higher than the second, grouped by the second column. """ if not _raw_counts[:, 1].any(): return np.array([], int) ndim_in = 2 return np.vstack([ np.concatenate([ (_k,), np.einsum( "ij->j", _raw_counts[_raw_counts[:, 1] == _k][:, ndim_in:], dtype=int ), ]) for _k in HHI_DELTA_KNOTS[:-1] ])
[docs] def enf_cnts_byconczone(_raw_counts: ArrayBIGINT, /) -> ArrayBIGINT: """Summarize investigations data by concentration zone, as defined in the Guidelines. Includes sub-total detail for "Moderately Concentrated" and "Unconcentrated" markets. Parameters ---------- _raw_counts raw investigations data array Returns ------- ArrayBIGINT Subtotals range of HHI and ΔHHI, with detail """ if not _raw_counts[:, 0].any() or np.isnan(_raw_counts[:, 0]).all(): return np.array([], int) # Step 1: Tag and agg. from HHI-post and ΔHHI to zone triple # NOTE: Although you could just map and not (partially) aggregate in this step, # the mapped array is a copy, and is larger without partial aggregation, so # aggregation reduces the footprint of this step in memory. Although this point # is more relevant for generated than observed data, using the same coding pattern # in both cases does make life easier _ndim_in = 2 _nkeys = 3 cnts_byhhipostanddelta: ArrayBIGINT = np.zeros( (1, _nkeys + _raw_counts.shape[1] - _ndim_in), dtype=int ) cnts_byconczone: ArrayBIGINT = np.zeros( (1, _nkeys + _raw_counts.shape[1] - _ndim_in), dtype=int ) # Prepare to tag clearance stats by presumption zone hhi_zone_post_ranged = hhi_zone_post_ranger(_raw_counts[:, 0] / 1e4) hhi_delta_ranged = hhi_delta_ranger(_raw_counts[:, 1] / 1e4) for _hhi_zone_post_lim in HHI_POST_ZONE_KNOTS[:-1]: zone_test = hhi_zone_post_ranged == _hhi_zone_post_lim for hhi_zone_delta_lim in HHI_DELTA_KNOTS[:3]: delta_test = ( (hhi_delta_ranged >= hhi_zone_delta_lim) if hhi_zone_delta_lim == HHI_DELTA_KNOTS[2] else (hhi_delta_ranged == hhi_zone_delta_lim) ) zone_val = HMG_PRESUMPTION_ZONE_MAP[_hhi_zone_post_lim][hhi_zone_delta_lim] conc_test = zone_test & delta_test cnts_byhhipostanddelta = np.vstack(( cnts_byhhipostanddelta, np.array( ( *zone_val, *np.einsum( "ij->j", _raw_counts[:, _ndim_in:][conc_test], dtype=int ), ), dtype=int, ), )) cnts_byhhipostanddelta = cnts_byhhipostanddelta[1:] for zone_val in ZONE_VALS: # Logical-and of multiple vectors: hhi_zone_test = ( 1 * np.stack( [ cnts_byhhipostanddelta[:, _idx] == _val for _idx, _val in enumerate(zone_val) ], axis=1, ) ).prod(axis=1) == 1 cnts_byconczone = np.vstack(( cnts_byconczone, np.concatenate( ( zone_val, np.einsum( "ij->j", cnts_byhhipostanddelta[hhi_zone_test][:, _nkeys:], dtype=int, ), ), dtype=int, ), )) return cnts_byconczone[1:]