Source code for mergeron.gen.data_generation

"""Methods to generate data for analyzing merger enforcement policy."""

from __future__ import annotations

import zipfile
from itertools import starmap
from typing import TypedDict

import numpy as np
from attrs import Attribute, Converter, define, field, validators
from joblib import Parallel, cpu_count, delayed, parallel_config  # type: ignore
from numpy.random import SeedSequence
from ruamel import yaml

from .. import (  # noqa: TID252  # noqa
    _PKG_NAME,
    NTHREADS,
    VERSION,
    RECForm,
    this_yaml,
    yaml_rt_mapper,
)
from ..core import guidelines_boundaries as gbl  # noqa: TID252
from ..core.guidelines_boundaries import MGThresholds  # noqa: TID252
from . import (
    INVResolution,  # noqa: F401
    MarketsData,
    MarketShareSpec,
    PCMDistribution,
    PCMRestriction,
    PCMSpec,
    PriceSpec,
    SeedSequenceData,
    SHRDistribution,
    SSZConstant,
    UPPTestRegime,
    UPPTestsCounts,
)
from .data_generation_functions import (
    diversion_ratios_builder,
    market_share_sampler,
    prices_sampler,
)
from .upp_tests import compute_upp_test_counts

__version__ = VERSION

[docs] H5_CHUNK_SIZE = 10**6
[docs] class SamplingFunctionKWArgs(TypedDict, total=False): """Keyword arguments of sampling methods defined below."""
[docs] sample_size: int
"""number of draws to generate"""
[docs] seed_data: SeedSequenceData | None
"""seed data to ensure independent and replicable draws"""
[docs] nthreads: int
"""number of parallel threads to use"""
def _seed_data_conv(_v: SeedSequenceData | None, _i: MarketSample) -> SeedSequenceData: if isinstance(_v, SeedSequenceData): return _v _sseq = tuple(SeedSequence(pool_size=8) for _ in range(4)) _sdtt = _i.share_spec.dist_type == SHRDistribution.UNI _pst = _i.price_spec == PriceSpec.RNG return SeedSequenceData( share=_sseq[0], pcm=_sseq[1], fcounts=(None if _sdtt else _sseq[2]), price=(None if not _pst else (_sseq[2] if _sdtt else _sseq[3])), ) @this_yaml.register_class @define(kw_only=True)
[docs] class MarketSample: """Parameter specification for market data generation."""
[docs] share_spec: MarketShareSpec = field( default=MarketShareSpec(SHRDistribution.UNI), validator=validators.instance_of(MarketShareSpec), )
"""Market-share specification, see :class:`MarketShareSpec`"""
[docs] pcm_spec: PCMSpec = field( default=PCMSpec(PCMDistribution.UNI), validator=validators.instance_of(PCMSpec) )
"""Margin specification, see :class:`PCMSpec`""" @pcm_spec.validator def _psv(self, _a: Attribute[PCMSpec], _v: PCMSpec, /) -> None: if ( self.share_spec.recapture_form == RECForm.FIXED and _v.pcm_restriction == PCMRestriction.MNL ): raise ValueError( f'Specification of "PCMSpec.pcm_restriction", as {PCMRestriction.MNL!r} ' f'requires that "MarketShareSpec.recapture_form" be {RECForm.INOUT!r} ' f"or {RECForm.OUTIN!r}, not {RECForm.FIXED!r} as presently specified" )
[docs] price_spec: PriceSpec = field( default=PriceSpec.SYM, validator=validators.instance_of(PriceSpec) )
"""Price specification, see :class:`PriceSpec`"""
[docs] hsr_filing_test_type: SSZConstant = field( default=SSZConstant.ONE, validator=validators.instance_of(SSZConstant) )
"""Method for modeling HSR filing thresholds, see :class:`SSZConstant`"""
[docs] sample_size: int = field(default=10**6, validator=validators.instance_of(int))
"""number of draws to simulate"""
[docs] seed_data: SeedSequenceData = field( converter=Converter(_seed_data_conv, takes_self=True) # type: ignore )
"""sequence of SeedSequences to ensure replicable data generation with appropriately independent random streams """ @seed_data.default def __dsd(self) -> SeedSequenceData | None: return _seed_data_conv(None, self) @seed_data.validator def _sdv( _i: MarketSample, _a: Attribute[SeedSequenceData], _v: SeedSequenceData, / ) -> None: if _i.share_spec.dist_type == SHRDistribution.UNI and any(( _v.fcounts, _v.price, )): raise ValueError( "Attribute, seed_data.fcounts is ignored as irrelevant when " "market shares are drawn with Uniform distribution. " "Set seed_data.fcounts to None and retry." ) if _i.price_spec != PriceSpec.RNG and _v.price is not None: raise ValueError( "Attribute, seed_data.price is ignored as irrelevant unless " "prices are asymmetric and uncorrelated and price-cost margins " "are also not symmetric. Set seed_data.price to None and retry." )
[docs] nthreads: int = field(default=NTHREADS, validator=validators.instance_of(int))
"""number of parallel threads to use"""
[docs] dataset: MarketsData | None = field(default=None, init=False)
[docs] enf_counts: UPPTestsCounts | None = field(default=None, init=False)
def _markets_sampler( self, /, *, sample_size: int, seed_data: SeedSequenceData, nthreads: int ) -> MarketsData: """ Generate share, diversion ratio, price, and margin data for MarketSpec. see :attr:`SamplingFunctionKWArgs` for description of keyword parameters Returns ------- Merging firms' shares, margins, etc. for each hypothetical merger in the sample """ # Scale up sample size to offset discards based on specified criteria shr_sample_size = sample_size * self.hsr_filing_test_type shr_sample_size *= ( SSZConstant.MNL_DEP if self.pcm_spec.pcm_restriction == PCMRestriction.MNL else 1 ) shr_sample_size = int(shr_sample_size) # Generate share data mktshr_data = market_share_sampler( shr_sample_size, self.share_spec, seed_data.fcounts, seed_data.share, nthreads, ) mktshr_array_ = mktshr_data.mktshr_array fcounts_ = mktshr_data.fcounts aggregate_purchase_prob_ = mktshr_data.aggregate_purchase_prob nth_firm_share_ = mktshr_data.nth_firm_share del mktshr_data # Generate merging-firm price and PCM data margin_data, price_data = prices_sampler( mktshr_array_[:, :2], nth_firm_share_, aggregate_purchase_prob_, self.pcm_spec, self.price_spec, self.hsr_filing_test_type, seed_data.pcm, seed_data.price, nthreads, ) pcm_array_ = margin_data.pcm_array price_array_ = price_data.price_array if shr_sample_size > sample_size: mnl_test_rows = margin_data.mnl_test * price_data.hsr_filing_test mktshr_array_ = mktshr_array_[mnl_test_rows][:sample_size] pcm_array_ = margin_data.pcm_array[mnl_test_rows][:sample_size] price_array_ = price_data.price_array[mnl_test_rows][:sample_size] fcounts_ = fcounts_[mnl_test_rows][:sample_size] aggregate_purchase_prob_ = aggregate_purchase_prob_[mnl_test_rows][ :sample_size ] nth_firm_share_ = nth_firm_share_[mnl_test_rows][:sample_size] del mnl_test_rows # Calculate diversion ratios divratio_array = diversion_ratios_builder( self.share_spec.recapture_form, self.share_spec.recapture_rate, mktshr_array_[:, :2], aggregate_purchase_prob_, ) return MarketsData( mktshr_array_[:, :2], pcm_array_, price_array_, divratio_array, np.einsum("ij,ij->i", mktshr_array_[:, :2], mktshr_array_[:, [1, 0]])[ :, None ], aggregate_purchase_prob_, fcounts_, nth_firm_share_, ( np.einsum("ij,ij->i", mktshr_array_[:, :2], mktshr_array_[:, [1, 0]]) + np.einsum("ij,ij->i", mktshr_array_, mktshr_array_) )[:, None], )
[docs] def generate_sample(self, /) -> None: """Populate :attr:`data` with generated data. Returns ------- None """ self.dataset = self._markets_sampler( seed_data=self.seed_data, sample_size=self.sample_size, nthreads=self.nthreads, )
def __sim_enf_cnts( self, _upp_test_parms: gbl.MGThresholds, _sim_test_regime: UPPTestRegime, /, *, seed_data: SeedSequenceData, sample_size: int = 10**6, nthreads: int = NTHREADS, ) -> UPPTestsCounts: """Generate market data and compute UPP test counts on same. Parameters ---------- _upp_test_parms Guidelines thresholds for testing UPP and related statistics _sim_test_regime Configuration to use for testing; UPPTestsRegime object specifying whether investigation results in enforcement, clearance, or both; and aggregation methods used for GUPPI and diversion ratio measures sample_size Number of draws to generate seed_data List of seed sequences, to assure independent samples in each thread nthreads Number of parallel processes to use Returns ------- UPPTestCounts object with of test counts by firm count, ΔHHI and concentration zone """ market_data_sample = self._markets_sampler( sample_size=sample_size, seed_data=seed_data, nthreads=nthreads ) upp_test_arrays: UPPTestsCounts = compute_upp_test_counts( market_data_sample, _upp_test_parms, _sim_test_regime ) return upp_test_arrays def __sim_enf_cnts_ll( self, _enf_parm_vec: gbl.MGThresholds, _sim_test_regime: UPPTestRegime, / ) -> UPPTestsCounts: """Parallelize data-generation and testing. The parameters `_sim_enf_cnts_kwargs` are passed unaltered to the parent function, `sim_enf_cnts()`, except that, if provided, `seed_data` is used to spawn a seed sequence for each thread, to assure independent samples in each thread, and `nthreads` defines the number of parallel processes used. The number of draws in each thread may be tuned, by trial and error, to the amount of memory (RAM) available. Parameters ---------- _enf_parm_vec Guidelines thresholds to test against _sim_test_regime Configuration to use for testing Returns ------- Arrays of enforcement counts or clearance counts by firm count, ΔHHI and concentration zone """ sample_sz = self.sample_size subsample_sz = H5_CHUNK_SIZE iter_count = (sample_sz / subsample_sz).__ceil__() thread_count = self.nthreads or cpu_count() if ( self.share_spec.recapture_form != RECForm.OUTIN and self.share_spec.recapture_rate != _enf_parm_vec.rec ): raise ValueError( "{} {} {}".format( f"Recapture rate from market sample spec, {self.share_spec.recapture_rate}", f"must match the value, {_enf_parm_vec.rec}", "the guidelines thresholds vector.", ) ) rng_seed_data = list( starmap( SeedSequenceData, zip( *[ _s.spawn(iter_count) if _s else [None] * iter_count for _s in ( getattr(self.seed_data, _a.name) for _a in self.seed_data.__attrs_attrs__ ) ], strict=True, ), ) ) sim_enf_cnts_kwargs: SamplingFunctionKWArgs = SamplingFunctionKWArgs({ "sample_size": subsample_sz, "nthreads": thread_count, }) with parallel_config( backend="threading", n_jobs=min(thread_count, iter_count), return_as="generator", ): res_list = Parallel()( delayed(self.__sim_enf_cnts)( _enf_parm_vec, _sim_test_regime, **sim_enf_cnts_kwargs, seed_data=_rng_seed_data_ch, ) for _rng_seed_data_ch in rng_seed_data ) res_list_stacks = UPPTestsCounts(*[ np.stack([getattr(_j, _k) for _j in res_list]) for _k in ("by_firm_count", "by_delta", "by_conczone") ]) upp_test_results = UPPTestsCounts(*[ ( np.array([], int) if not (_gv := getattr(res_list_stacks, _g.name)).any() else np.hstack(( _gv[0, :, :_h], np.einsum("ijk->jk", _gv[:, :, _h:], dtype=int), )) ) for _g, _h in zip(res_list_stacks.__attrs_attrs__, [1, 1, 3], strict=True) ]) del res_list, res_list_stacks return upp_test_results
[docs] def estimate_enf_counts( self, _enf_parm_vec: MGThresholds, _upp_test_regime: UPPTestRegime, / ) -> None: """Populate :attr:`enf_counts` with estimated UPP test counts. Parameters ---------- _enf_parm_vec Threshold values for various Guidelines criteria _upp_test_regime Specifies whether to analyze enforcement, clearance, or both and the GUPPI and diversion ratio aggregators employed, with default being to analyze enforcement based on the maximum merging-firm GUPPI and maximum diversion ratio between the merging firms Returns ------- None """ if self.dataset is None: self.enf_counts = self.__sim_enf_cnts_ll(_enf_parm_vec, _upp_test_regime) else: self.enf_counts = compute_upp_test_counts( self.dataset, _enf_parm_vec, _upp_test_regime )
[docs] def to_archive( self, zip_: zipfile.ZipFile, _subdir: str = "", /, *, save_dataset: bool = False ) -> None: """Serialize market sample to Zip archive.""" zpath = zipfile.Path(zip_, at=_subdir) name_root = f"{_PKG_NAME}_market_sample" with (zpath / f"{name_root}.yaml").open("w") as _yfh: this_yaml.dump(self, _yfh) if save_dataset: if self.dataset is None and self.enf_counts is None: raise ValueError( "No dataset and/or enforcement counts available for saving. " "Generate some data or set save_dataset to False to proceed." ) else: if self.dataset is not None: with (zpath / f"{name_root}_dataset.h5").open("wb") as _hfh: _hfh.write(self.dataset.to_h5bin()) if self.enf_counts is not None: with (zpath / f"{name_root}_enf_counts.yaml").open("w") as _yfh: this_yaml.dump(self.enf_counts, _yfh)
@staticmethod
[docs] def from_archive( zip_: zipfile.ZipFile, _subdir: str = "", /, *, restore_dataset: bool = False ) -> MarketSample: """Deserialize market sample from Zip archive.""" zpath = zipfile.Path(zip_, at=_subdir) name_root = f"{_PKG_NAME}_market_sample" market_sample_: MarketSample = this_yaml.load( (zpath / f"{name_root}.yaml").read_text() ) if restore_dataset: _dt = (_dp := zpath / f"{name_root}_dataset.h5").is_file() _et = (_ep := zpath / f"{name_root}_enf_counts.yaml").is_file() if not (_dt or _et): raise ValueError( "Archive has no sample data to restore. " "Delete second argument, or set it False, and rerun." ) else: if _dt: with _dp.open("rb") as _hfh: object.__setattr__( market_sample_, "dataset", MarketsData.from_h5f(_hfh) ) if _et: object.__setattr__( market_sample_, "enf_counts", this_yaml.load(_ep.read_text()) ) return market_sample_
@classmethod
[docs] def to_yaml( cls, _r: yaml.representer.RoundTripRepresenter, _d: MarketSample ) -> yaml.MappingNode: """Serialize market sample to YAML representation.""" retval: yaml.MappingNode = _r.represent_mapping( f"!{cls.__name__}", { _a.name: getattr(_d, _a.name) for _a in _d.__attrs_attrs__ if _a.name not in {"dataset", "enf_counts"} }, ) return retval
@classmethod
[docs] def from_yaml( cls, _c: yaml.constructor.RoundTripConstructor, _n: yaml.MappingNode ) -> MarketSample: """Deserialize market sample from YAML representation.""" return cls(**yaml_rt_mapper(_c, _n))