Source code for mergeron.perks.guidelines_boundary_functions_extra

"""
Specialized methods for defining and analyzing boundaries for Guidelines standards.

These methods (functions) provide rely on scipy of sympy for core computations,
and may provide improved precision than core functions, but tend to have
poor performance

"""

from __future__ import annotations

from typing import Literal

import numpy as np
from mpmath import mp, mpf  # type: ignore
from scipy.spatial.distance import minkowski  # type: ignore
from sympy import lambdify, simplify, solve, symbols  # type: ignore

from .. import DEFAULT_REC, VERSION  # noqa: TID252
from ..core import GuidelinesBoundary, MPFloat  # noqa: TID252
from ..core import guidelines_boundary_functions as gbf  # noqa: TID252
from . import GuidelinesBoundaryCallable

__version__ = VERSION


mp.dps = 32
mp.trap_complex = True


[docs] def dh_area_quad(_dh_val: float = 0.01, /) -> float: """ Area under the ΔHHI boundary. When the given ΔHHI bound matches a Guidelines safeharbor, the area under the boundary is half the intrinsic clearance rate for the ΔHHI safeharbor. Parameters ---------- _dh_val Merging-firms' ΔHHI bound. Returns ------- Area under ΔHHI boundary. """ _dh_val = mpf(f"{_dh_val}") _s_naught = (1 - mp.sqrt(1 - 2 * _dh_val)) / 2 return float( mp.nstr( _s_naught + mp.quad(lambda x: _dh_val / (2 * x), [_s_naught, 1 - _s_naught]), mp.prec, ) )
[docs] def hhi_delta_boundary_qdtr(_dh_val: float = 0.01, /) -> GuidelinesBoundaryCallable: """ Generate the list of share combination on the ΔHHI boundary. Parameters ---------- _dh_val: Merging-firms' ΔHHI bound. Returns ------- Callable to generate array of share-pairs, area under boundary. """ _dh_val = mpf(f"{_dh_val}") _s_1, _s_2 = symbols("s_1, s_2", positive=True) _hhi_eqn = _s_2 - _dh_val / (2 * _s_1) hhi_bdry = solve(_hhi_eqn, _s_2)[0] s_nought = float(solve(_hhi_eqn.subs({_s_2: 1 - _s_1}), _s_1)[0]) hhi_bdry_area = float( mp.nstr( 2 * ( s_nought + mp.quad(lambdify(_s_1, hhi_bdry, "mpmath"), (s_nought, 1 - s_nought)) ), mp.prec, ) ) return GuidelinesBoundaryCallable( lambdify(_s_1, hhi_bdry, "numpy"), hhi_bdry_area, s_nought )
[docs] def diversion_share_boundary_qdtr_wtd_avg( _delta_star: float = 0.075, _r_val: float = DEFAULT_REC, /, *, weighting: Literal["own-share", "cross-product-share"] | None = "own-share", recapture_form: Literal["inside-out", "fixed"] = "inside-out", ) -> GuidelinesBoundaryCallable: R""" Share combinations for the share-weighted average share-ratio boundary. Parameters ---------- _delta_star Diversion share, :math:`\overline{d} / \overline{r}` or :math:`\overline{g} / (m^* \cdot \overline{r})`. _r_val Recapture rate. weighting Whether "own-share" or "cross-product-share" (or None for simple, unweighted average) recapture_form Whether recapture rate is share-proportional ("inside-out") or has fixed value for both merging firms ("fixed"). Returns ------- Array of share-pairs, area under boundary. """ _delta_star, _r_val = (mpf(_v) for _v in (f"{_delta_star}", f"{_r_val}")) _s_mid = _delta_star / (1 + _delta_star) s_naught = mpf("0.0") s_1, s_2 = symbols("s_1:3", positive=True) match weighting: case "own-share": _bdry_eqn = ( s_1 * s_2 / (1 - s_1) + s_2 * s_1 / ( (1 - (_r_val * s_2 + (1 - _r_val) * s_1)) if recapture_form == "inside-out" else (1 - s_2) ) - (s_1 + s_2) * _delta_star ) _bdry_func = solve(_bdry_eqn, s_2)[0] if recapture_form == "inside-out": s_naught = mpf(solve(simplify(_bdry_eqn.subs({s_2: 1 - s_1})), s_1)[0]) bdry_area = mp.fmul( "2", ( s_naught + mp.quad(lambdify(s_1, _bdry_func, "mpmath"), (s_naught, _s_mid)) ), ) - (_s_mid**2 + s_naught**2) case "cross-product-share": mp.trap_complex = False d_star = symbols("d", positive=True) _bdry_eqn = ( s_2 * s_2 / (1 - s_1) + s_1 * s_1 / ( (1 - (_r_val * s_2 + (1 - _r_val) * s_1)) if recapture_form == "inside-out" else (1 - s_2) ) - (s_1 + s_2) * d_star ) _bdry_func = solve(_bdry_eqn, s_2)[1] bdry_area = ( mp.fmul( "2", mp.quad( lambdify(s_1, _bdry_func.subs({d_star: _delta_star}), "mpmath"), (0, _s_mid), ).real, ) - _s_mid**2 ) case _: _bdry_eqn = ( 1 / 2 * s_2 / (1 - s_1) + 1 / 2 * s_1 / ( (1 - (_r_val * s_2 + (1 - _r_val) * s_1)) if recapture_form == "inside-out" else (1 - s_2) ) - _delta_star ) _bdry_func = solve(_bdry_eqn, s_2)[0] bdry_area = ( mp.fmul("2", mp.quad(lambdify(s_1, _bdry_func, "mpmath"), (0, _s_mid))) - _s_mid**2 ) return GuidelinesBoundaryCallable( lambdify(s_1, _bdry_func, "numpy"), float(bdry_area), float(s_naught) )
[docs] def diversion_share_boundary_distance( _delta_star: float = 0.075, _r_val: float = DEFAULT_REC, /, *, agg_method: Literal["arithmetic mean", "distance"] = "arithmetic mean", weighting: Literal["own-share", "cross-product-share"] | None = "own-share", recapture_form: Literal["inside-out", "fixed"] = "inside-out", dps: int = 5, ) -> GuidelinesBoundary: R""" Share combinations for the share-ratio boundaries using various aggregators. Reimplements the arithmetic-averages and distance estimations from function, `diversion_share_boundary_wtd_avg` but uses the Minkowski-distance function, `scipy.spatial.distance.minkowski` for all aggregators. This reimplementation is useful for testing the output of `diversion_share_boundary_wtd_avg` but runs considerably slower. Parameters ---------- _delta_star Diversion share, :math:`\overline{d} / \overline{r}` or :math:`\overline{g} / (m^* \cdot \overline{r})`. _r_val Recapture rate. agg_method Whether "arithmetic mean" or "distance". weighting Whether "own-share" or "cross-product-share". recapture_form Whether recapture rate is share-proportional ("inside-out") or has fixed value for both merging firms ("fixed"). dps Number of decimal places for rounding returned shares and area. Returns ------- Array of share-pairs, area under boundary. """ _delta_star = mpf(f"{_delta_star}") # parameters for iteration _s_mid = mp.fdiv(_delta_star, 1 + _delta_star) _weights_base = (mpf("0.5"),) * 2 _bdry_step_sz = mp.power(10, -dps) _theta = _bdry_step_sz * (10 if weighting == "cross-product-share" else 1) # initial conditions bdry_points = [(_s_mid, _s_mid)] s_1_pre, s_2_pre = _s_mid, _s_mid s_2_oddval, s_2_oddsum, s_2_evnsum = True, 0.0, 0.0 for s_1 in mp.arange(_s_mid - _bdry_step_sz, 0, -_bdry_step_sz): # The wtd. avg. GUPPI is not always convex to the origin, so we # increment s_2 after each iteration in which our algorithm # finds (s1, s2) on the boundary s_2 = s_2_pre * (1 + _theta) if (s_1 + s_2) > mpf("0.99875"): # 1: # inaccuracy at 3-9s and up break while True: de_1 = s_2 / (1 - s_1) de_2 = ( s_1 / (1 - gbf.lerp(s_1, s_2, _r_val)) if recapture_form == "inside-out" else s_1 / (1 - s_2) ) weights_i = ( ( _w := mp.fdiv( s_2 if weighting == "cross-product-share" else s_1, s_1 + s_2 ), 1 - _w, ) if weighting else _weights_base ) match agg_method: case "arithmetic mean": delta_test = minkowski((de_1, de_2), (0.0, 0.0), p=1, w=weights_i) case "distance": delta_test = minkowski((de_1, de_2), (0.0, 0.0), p=2, w=weights_i) _test_flag, _incr_decr = ( (delta_test > _delta_star, -1) if weighting == "cross-product-share" else (delta_test < _delta_star, 1) ) if _test_flag: s_2 += _incr_decr * _bdry_step_sz else: break # Build-up boundary points bdry_points.append((s_1, s_2)) # Build up area terms s_2_oddsum += s_2 if s_2_oddval else 0 s_2_evnsum += s_2 if not s_2_oddval else 0 s_2_oddval = not s_2_oddval # Hold share points s_2_pre = s_2 s_1_pre = s_1 if s_2_oddval: s_2_evnsum -= s_2_pre else: s_2_oddsum -= s_1_pre s_intcpt = gbf._diversion_share_boundary_intcpt( s_1_pre, _delta_star, _r_val, recapture_form=recapture_form, agg_method=agg_method, weighting=weighting, ) print(s_1_pre, _delta_star, _r_val, s_intcpt) print(type(s_intcpt)) if weighting == "own-share": bdry_prtlarea = ( _bdry_step_sz * (4 * s_2_oddsum + 2 * s_2_evnsum + _s_mid + s_2_pre) / 3 ) # Area under boundary bdry_area_total = 2 * (s_1_pre + bdry_prtlarea) - ( mp.power(_s_mid, "2") + mp.power(s_1_pre, "2") ) else: print([type(_f) for _f in (_s_mid, s_2_oddsum, s_2_evnsum, s_intcpt)]) bdry_prtlarea = ( _bdry_step_sz * (4 * s_2_oddsum + 2 * s_2_evnsum + _s_mid + s_intcpt) / 3 ) # Area under boundary bdry_area_total = 2 * bdry_prtlarea - mp.power(_s_mid, "2") bdry_points.append((mpf("0.0"), s_intcpt)) # Points defining boundary to point-of-symmetry return GuidelinesBoundary( np.vstack((bdry_points[::-1], np.flip(bdry_points[1:], 1))), round(float(bdry_area_total), dps), )
[docs] def diversion_share_boundary_xact_avg_mp( _delta_star: float = 0.075, _r_val: float = DEFAULT_REC, /, *, recapture_form: Literal["inside-out", "fixed"] = "inside-out", dps: int = 5, ) -> GuidelinesBoundary: R""" Share combinations along the simple average diversion-ratio boundary. Notes ----- An analytical expression for the exact average boundary is derived and plotted from the y-intercept to the ray of symmetry as follows:: from sympy import latex, plot as symplot, solve, symbols s_1, s_2 = symbols("s_1 s_2") g_val, r_val, m_val = 0.06, 0.80, 0.30 d_hat = g_val / (r_val * m_val) # recapture_form = "inside-out" sag = solve( (s_2 / (1 - s_1)) + (s_1 / (1 - (r_val * s_2 + (1 - r_val) * s_1))) - 2 * d_hat, s_2 )[0] symplot( sag, (s_1, 0., d_hat / (1 + d_hat)), ylabel=s_2 ) # recapture_form = "fixed" sag = solve((s_2/(1 - s_1)) + (s_1/(1 - s_2)) - 2 * d_hat, s_2)[0] symplot( sag, (s_1, 0., d_hat / (1 + d_hat)), ylabel=s_2 ) Parameters ---------- _delta_star Diversion share, :math:`\overline{d} / \overline{r}` or :math:`\overline{g} / (m^* \cdot \overline{r})`. _r_val Recapture rate. recapture_form Whether recapture rate is share-proportional ("inside-out") or has fixed value for both merging firms ("fixed"). dps Number of decimal places for rounding returned shares. Returns ------- Array of share-pairs, area under boundary, area under boundary. """ _delta_star = mpf(f"{_delta_star}") _s_mid = _delta_star / (1 + _delta_star) _bdry_step_sz = 10**-dps _bdry_start = np.array([(_s_mid, _s_mid)]) _s_1 = np.array(mp.arange(_s_mid - _bdry_step_sz, 0, -_bdry_step_sz)) if recapture_form == "inside-out": s_intcpt = mp.fdiv( mp.fsub( 2 * _delta_star * _r_val + 1, mp.fabs(2 * _delta_star * _r_val - 1) ), 2 * mpf(f"{_r_val}"), ) nr_t1 = 1 + 2 * _delta_star * _r_val * (1 - _s_1) - _s_1 * (1 - _r_val) nr_sqrt_mdr = 4 * _delta_star * _r_val nr_sqrt_mdr2 = nr_sqrt_mdr * _r_val nr_sqrt_md2r2 = nr_sqrt_mdr2 * _delta_star nr_sqrt_t1 = nr_sqrt_md2r2 * (_s_1**2 - 2 * _s_1 + 1) nr_sqrt_t2 = nr_sqrt_mdr2 * _s_1 * (_s_1 - 1) nr_sqrt_t3 = nr_sqrt_mdr * (2 * _s_1 - _s_1**2 - 1) nr_sqrt_t4 = (_s_1**2) * (_r_val**2 - 6 * _r_val + 1) nr_sqrt_t5 = _s_1 * (6 * _r_val - 2) + 1 nr_t2_mdr = nr_sqrt_t1 + nr_sqrt_t2 + nr_sqrt_t3 + nr_sqrt_t4 + nr_sqrt_t5 # Alternative grouping of terms in np.sqrt nr_sqrt_s1sq = (_s_1**2) * ( nr_sqrt_md2r2 + nr_sqrt_mdr2 - nr_sqrt_mdr + _r_val**2 - 6 * _r_val + 1 ) nr_sqrt_s1 = _s_1 * ( -2 * nr_sqrt_md2r2 - nr_sqrt_mdr2 + 2 * nr_sqrt_mdr + 6 * _r_val - 2 ) nr_sqrt_nos1 = nr_sqrt_md2r2 - nr_sqrt_mdr + 1 nr_t2_s1 = nr_sqrt_s1sq + nr_sqrt_s1 + nr_sqrt_nos1 if not np.isclose( np.einsum("i->", nr_t2_mdr.astype(float)), np.einsum("i->", nr_t2_s1.astype(float)), rtol=0, atol=0.5 * dps, ): raise RuntimeError( "Calculation of sq. root term in exact average GUPPI" f"with recapture spec, {f'"{recapture_form}"'} is incorrect." ) s_2 = (nr_t1 - np.sqrt(nr_t2_s1)) / (2 * _r_val) else: s_intcpt = mp.fsub(_delta_star + 1 / 2, mp.fabs(_delta_star - 1 / 2)) s_2 = ( (1 / 2) + _delta_star - _delta_star * _s_1 - np.sqrt( ((_delta_star**2) - 1) * (_s_1**2) + (-2 * (_delta_star**2) + _delta_star + 1) * _s_1 + (_delta_star**2) - _delta_star + (1 / 4) ) ) bdry_inner = np.stack((_s_1, s_2), axis=1) bdry_end = np.array([(mpf("0.0"), s_intcpt)]) bdry = np.vstack(( bdry_end, bdry_inner[::-1], _bdry_start, np.flip(bdry_inner, 1), np.flip(bdry_end, 1), )) s_2 = np.concatenate((np.array([_s_mid]), s_2)) bdry_ends = [0, -1] bdry_odds = np.array(range(1, len(s_2), 2), int) bdry_evns = np.array(range(2, len(s_2), 2), int) # Double the area under the half-curve, and subtract the double-counted bit. bdry_area_simpson = 2 * _bdry_step_sz * ( (4 / 3) * np.sum(s_2.take(bdry_odds)) + (2 / 3) * np.sum(s_2.take(bdry_evns)) + (1 / 3) * np.sum(s_2.take(bdry_ends)) ) - mp.power(_s_mid, 2) return GuidelinesBoundary(bdry, float(mp.nstr(bdry_area_simpson, dps)))
# diversion_share_boundary_wtd_avg_autoroot # this function is about half as fast as the manual one! ... and a touch less precise def _diversion_share_boundary_wtd_avg_autoroot( _delta_star: float = 0.075, _r_val: float = DEFAULT_REC, /, *, agg_method: Literal[ "arithmetic mean", "geometric mean", "distance" ] = "arithmetic mean", weighting: Literal["own-share", "cross-product-share", None] = "own-share", recapture_form: Literal["inside-out", "fixed"] = "inside-out", dps: int = 5, ) -> GuidelinesBoundary: R""" Share combinations on the share-weighted average diversion share boundary. Parameters ---------- _delta_star Diversion share, :math:`\overline{d} / \overline{r}` or :math:`\overline{g} / (m^* \cdot \overline{r})`. _r_val Recapture rate. agg_method Whether "arithmetic mean", "geometric mean", or "distance". weighting Whether "own-share" or "cross-product-share" (or None for simple, unweighted average). recapture_form Whether recapture rate is share-proportional ("inside-out") or has fixed value for both merging firms ("fixed"). dps Number of decimal places for rounding returned shares and area. Returns ------- Array of share-pairs, area under boundary. Notes ----- An analytical expression for the share-weighted arithmetic mean boundary is derived and plotted from y-intercept to the ray of symmetry as follows:: from sympy import plot as symplot, solve, symbols s_1, s_2 = symbols("s_1 s_2", positive=True) g_val, r_val, m_val = 0.06, 0.80, 0.30 delta_star = g_val / (r_val * m_val) # recapture_form == "inside-out" oswag = solve( s_1 * s_2 / (1 - s_1) + s_2 * s_1 / (1 - (r_val * s_2 + (1 - r_val) * s_1)) - (s_1 + s_2) * delta_star, s_2 )[0] symplot( oswag, (s_1, 0., d_hat / (1 + d_hat)), ylabel=s_2 ) cpswag = solve( s_2 * s_2 / (1 - s_1) + s_1 * s_1 / (1 - (r_val * s_2 + (1 - r_val) * s_1)) - (s_1 + s_2) * delta_star, s_2 )[1] symplot( cpwag, (s_1, 0.0, d_hat / (1 + d_hat)), ylabel=s_2 ) # recapture_form == "fixed" oswag = solve( s_1 * s_2 / (1 - s_1) + s_2 * s_1 / (1 - s_2) - (s_1 + s_2) * delta_star, s_2 )[0] symplot( oswag, (s_1, 0., d_hat / (1 + d_hat)), ylabel=s_2 ) cpswag = solve( s_2 * s_2 / (1 - s_1) + s_1 * s_1 / (1 - s_2) - (s_1 + s_2) * delta_star, s_2 )[1] symplot( cpswag, (s_1, 0.0, d_hat / (1 + d_hat)), ylabel=s_2 ) """ _delta_star, _r_val = (mpf(f"{_v}") for _v in (_delta_star, _r_val)) _s_mid = mp.fdiv(_delta_star, 1 + _delta_star) # initial conditions bdry = [(_s_mid, _s_mid)] s_1_pre, s_2_pre = _s_mid, _s_mid s_2_oddval, s_2_oddsum, s_2_evnsum = True, 0.0, 0.0 # parameters for iteration _step_size = mp.power(10, -dps) theta_ = _step_size * (10 if weighting == "cross-product-share" else 1) for s_1 in mp.arange(_s_mid - _step_size, 0, -_step_size): def delta_test(x: MPFloat) -> MPFloat: _de_1 = x / (1 - s_1) _de_2 = ( s_1 / (1 - gbf.lerp(s_1, x, _r_val)) if recapture_form == "inside-out" else s_1 / (1 - x) ) _w = ( mp.fdiv(s_1 if weighting == "cross-product-share" else x, s_1 + x) if weighting else 0.5 ) match agg_method: case "geometric mean": delta_test = mp.expm1( gbf.lerp(mp.log1p(_de_1), mp.log1p(_de_2), _w) ) case "distance": delta_test = mp.sqrt(gbf.lerp(_de_1**2, _de_2**2, _w)) case _: delta_test = gbf.lerp(_de_1, _de_2, _w) return _delta_star - delta_test try: s_2 = mp.findroot( delta_test, x0=(s_2_pre * (1 - theta_), s_2_pre * (1 + theta_)), tol=mp.sqrt(_step_size), solver="ridder", ) except (mp.ComplexResult, ValueError, ZeroDivisionError) as _e: print(s_1, s_2_pre) raise _e # Build-up boundary points bdry.append((s_1, s_2)) # Build up area terms s_2_oddsum += s_2 if s_2_oddval else 0 s_2_evnsum += s_2 if not s_2_oddval else 0 s_2_oddval = not s_2_oddval # Hold share points s_2_pre = s_2 s_1_pre = s_1 if (s_1_pre + s_2_pre) > mpf("0.99875"): # Loss of accuracy at 3-9s and up break if s_2_oddval: s_2_evnsum -= s_2_pre else: s_2_oddsum -= s_1_pre _s_intcpt = gbf._diversion_share_boundary_intcpt( s_2_pre, _delta_star, _r_val, recapture_form=recapture_form, agg_method=agg_method, weighting=weighting, ) if weighting == "own-share": gbd_prtlarea = ( _step_size * (4 * s_2_oddsum + 2 * s_2_evnsum + _s_mid + s_2_pre) / 3 ) # Area under boundary bdry_area_total = float( 2 * (s_1_pre + gbd_prtlarea) - (mp.power(_s_mid, "2") + mp.power(s_1_pre, "2")) ) else: gbd_prtlarea = ( _step_size * (4 * s_2_oddsum + 2 * s_2_evnsum + _s_mid + _s_intcpt) / 3 ) # Area under boundary bdry_area_total = float(2 * gbd_prtlarea - mp.power(_s_mid, "2")) bdry.append((mpf("0.0"), _s_intcpt)) bdry_array = np.array(bdry, float) # Points defining boundary to point-of-symmetry return GuidelinesBoundary( np.vstack((bdry_array[::-1], bdry_array[1:, ::-1]), dtype=float), round(float(bdry_area_total), dps), )