Source code for sherpa.estmethods

#
#  Copyright (C) 2007, 2015, 2016, 2019 - 2021, 2023 - 2025
#  Smithsonian Astrophysical Observatory
#
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import annotations

from collections.abc import Callable, Sequence
import logging
from typing import Any, Protocol, SupportsFloat
import warnings

import numpy as np
from numpy.linalg import LinAlgError

from sherpa.stats import StatCallback
from sherpa.utils import NoNewAttributesAfterInit, \
    FuncCounter, OutOfBoundErr, Knuth_close, \
    print_fields, is_iterable, list_to_open_interval, quad_coef, \
    demuller, zeroin
from sherpa.utils.parallel import SupportsLock, SupportsProcess, \
    SupportsQueue, multi, ncpus, context, process_tasks
from sherpa.utils.types import ArrayType, FitFunc, StatFunc

from . import _est_funcs  # type: ignore


# TODO: this should not be set globally
_ = np.seterr(invalid='ignore')


__all__ = ('EstNewMin', 'Covariance', 'Confidence',
           'Projection', 'est_success', 'est_failure', 'est_hardmin',
           'est_hardmax', 'est_hardminmax', 'est_newmin', 'est_maxiter',
           'est_hitnan')


warning = logging.getLogger(__name__).warning


# The return type for the estimation routines.
#
# It looks like the limits can be numbers or None.
#
ArrayVal = Sequence[SupportsFloat | None] | np.ndarray
EstReturn = tuple[ArrayVal, ArrayVal,
                  Sequence[int | None] | np.ndarray,
                  int, np.ndarray | None]


est_success = 0
est_failure = 1
est_hardmin = 2
est_hardmax = 3
est_hardminmax = 4
est_newmin = 5
est_maxiter = 6
est_hitnan = 7


# This class is used to send around the "new parameter values" in
# sherpa.fit.Fit.est_errors.
#
[docs] class EstNewMin(Exception): "Reached a new minimum fit statistic" pass
[docs] class EstMethod(NoNewAttributesAfterInit): """Estimate errors on a set of parameters. .. versionchanged:: 4.17.1 The estfunc argument is now unused and will be removed in a later release. """ # defined pre-instantiation for pickling config = {'sigma': 1, 'eps': 0.01, 'maxiters': 200, 'soft_limits': False} def __init__(self, name: str, estfunc: Any = None ) -> None: self.name = name if estfunc is not None: warnings.warn("EstMethod: the estfunc argument is deprecated", DeprecationWarning) # config should be defined pre-instantiation for pickling # however, for some unknown reason membership in self.__dict__ # requires declaration in __init__() self.config = self.config.copy() super().__init__() def __getattr__(self, name): if name in self.__dict__.get('config', ()): return self.config[name] raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __setattr__(self, name, val): if name in self.__dict__.get('config', ()): self.config[name] = val else: NoNewAttributesAfterInit.__setattr__(self, name, val) def __repr__(self) -> str: return f"<{type(self).__name__} error-estimation method instance '{self.name}'>" def __str__(self) -> str: # Put name first always keylist = list(self.config.keys()) keylist = ['name'] + keylist full_config = {'name': self.name} full_config.update(self.config) return print_fields(keylist, full_config) def __setstate__(self, state): self.__dict__.update(state) # obtain config values from object class # TODO: pylint points out there's no name set for # the initialization call self.__dict__['config'] = getattr(self.__class__(), 'config', {}) # update new config dict with user defined from old self.__dict__['config'].update(state.get('config', {}))
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, # integers freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs: Any = None, statkwargs: Any = None ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ # This does not use abc.abstractmethod as inheriting from # NoNewAttributesAfterInit. # raise NotImplementedError()
[docs] class Covariance(EstMethod): """The covariance method for estimating errors.""" def __init__(self, name: str = 'covariance') -> None: super().__init__(name)
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs: Any = None, statkwargs: Any = None ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ if statargs is not None or statkwargs is not None: warning("statargs/kwargs set but values unused") stat_cb = StatCallback(statfunc) def fit_cb(scb, pars, parmins, parmaxes, i): # parameter i is a no-op usually return fitfunc(scb, pars, parmins, parmaxes)[2] # remin means reminimize -- *generally* not done (only # proj needs to reminimize). # # covar still needs to pass a reminimize flag to # get_one_sided_interval; a value less than zero is # interpreted as "never reminimize", so pass that value here. remin = -1.0 tol = -1.0 return covariance(pars, parmins=parmins, parmaxes=parmaxes, parhardmins=parhardmins, parhardmaxes=parhardmaxes, sigma=self.sigma, eps=self.eps, tol=tol, maxiters=self.maxiters, remin=remin, limit_parnums=limit_parnums, stat_cb=stat_cb, fit_cb=fit_cb, report_progress=report_progress)
[docs] class Confidence(EstMethod): """The confidence method for estimating errors.""" # defined pre-instantiation for pickling _added_config = {'remin': 0.01, 'fast': False, 'parallel': True, 'numcores': ncpus, 'maxfits': 5, 'max_rstat': 3, 'tol': 0.2, 'verbose': False, 'openinterval': False} def __init__(self, name: str = 'confidence') -> None: super().__init__(name) # Update EstMethod.config dict with Confidence specifics self.config.update(self._added_config)
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, # integers freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs: Any = None, statkwargs: Any = None ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ if statargs is not None or statkwargs is not None: warning("statargs/kwargs set but values unused") stat_cb = StatCallback(statfunc) def fit_cb(pars, parmins, parmaxes, i): # freeze model parameter i (current_pars, current_parmins, current_parmaxes) = freeze_par(pars, parmins, parmaxes, i) fit_pars = fitfunc(statfunc, current_pars, current_parmins, current_parmaxes)[1] # If stat is not chi-squared, and fit method is # lmdif, need to recalculate stat at end, just # like in sherpa/sherpa/fit.py:fit() stat = statfunc(fit_pars)[0] # stat = fitfunc(scb, pars, parmins, parmaxes)[2] # thaw model parameter i thaw_par(i) return stat # # convert stat call back to have the same signature as fit call back # def stat_cb_extra_args(fcn): def stat_cb_wrapper(x, *args): return fcn(x) return stat_cb_wrapper statcb = stat_cb_extra_args(stat_cb) if 1 == len(pars): fitcb = statcb else: fitcb = fit_cb return confidence(pars, parmins=parmins, parmaxes=parmaxes, parhardmins=parhardmins, parhardmaxes=parhardmaxes, sigma=self.sigma, eps=self.eps, tol=self.tol, maxiters=self.maxiters, remin=self.remin, verbose=self.verbose, limit_parnums=limit_parnums, stat_cb=statcb, fit_cb=fitcb, report_progress=report_progress, get_par_name=get_par_name, do_parallel=self.parallel, numcores=self.numcores, open_interval=self.openinterval)
[docs] class Projection(EstMethod): """The projection method for estimating errors.""" # defined pre-instantiation for pickling _added_config = {'remin': 0.01, 'fast': False, 'parallel': True, 'numcores': ncpus, 'maxfits': 5, 'max_rstat': 3, 'tol': 0.2} def __init__(self, name: str = 'projection') -> None: super().__init__(name) # Update EstMethod.config dict with Projection specifics self.config.update(self._added_config)
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, # integers freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs: Any = None, statkwargs: Any = None ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ if statargs is not None or statkwargs is not None: warning("statargs/kwargs set but values unused") if fitfunc is None: raise TypeError("fitfunc should not be none") stat_cb = StatCallback(statfunc) def fit_cb(pars, parmins, parmaxes, i): # freeze model parameter i (current_pars, current_parmins, current_parmaxes) = freeze_par(pars, parmins, parmaxes, i) fit_pars = fitfunc(statfunc, current_pars, current_parmins, current_parmaxes)[1] # If stat is not chi-squared, and fit method is # lmdif, need to recalculate stat at end, just # like in sherpa/sherpa/fit.py:fit() stat = statfunc(fit_pars)[0] # stat = fitfunc(scb, pars, parmins, parmaxes)[2] # thaw model parameter i thaw_par(i) return stat return projection(pars, parmins=parmins, parmaxes=parmaxes, parhardmins=parhardmins, parhardmaxes=parhardmaxes, sigma=self.sigma, eps=self.eps, tol=self.tol, maxiters=self.maxiters, remin=self.remin, limit_parnums=limit_parnums, stat_cb=stat_cb, fit_cb=fit_cb, report_progress=report_progress, get_par_name=get_par_name, do_parallel=self.parallel, numcores=self.numcores)
def covariance(pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, sigma: float, eps: float, tol: float, maxiters: int, remin: float, limit_parnums: np.ndarray, # integers stat_cb: Callable, fit_cb: Callable, report_progress: Callable ) -> EstReturn: """Estimate errors using the covariance method.""" # Do nothing with tol # Do nothing with report_progress (generally fast enough we don't # need to report back per-parameter progress) # Even though we only want limits on certain parameters, we have to # compute the matrix for *all* thawed parameters. So we will do that, # and then pick the parameters of interest out of the result. try: info = _est_funcs.info_matrix(pars, parmins, parmaxes, parhardmins, parhardmaxes, sigma, eps, maxiters, remin, stat_cb) except EstNewMin as emin: # catch the EstNewMin exception and attach the modified # parameter values to the exception obj. These modified # parvals determine the new lower statistic. raise EstNewMin(pars) from emin # Invert matrix, take its square root and multiply by sigma to get # parameter uncertainties; parameter uncertainties are the # diagonal elements of the matrix. # Use simpler matrix inversion function from numpy. If that # doesn't work, assume it's an ill-conditioned or singular matrix, # and call pinv from numpy -- pinv will call the SVD function to # invert the matrix. But call pinv *only* when inv is shown not # to work in a particular case -- use inv by default! # The reason for this is that pinv can give back very strange # results, when you don't *need* to use pinv, and it *also* # happens that the ratio between smallest and largest diagonal # elements approaches the machine precision for the data type. # The result is that errors that come from the largest diagonal # element are ludicrously small; you can't have a parameter value # of order 1.0, and an error of order 10^-30, for example. The # simpler inv function for inverting matrices does not appear to # have the same issue. try: inv_info = np.linalg.inv(info) except LinAlgError: # catch the SVD exception and exit gracefully inv_info = np.zeros_like(info) inv_info[:] = np.nan except: try: inv_info = np.linalg.pinv(info) except LinAlgError: # catch the SVD exception and exit gracefully inv_info = np.zeros_like(info) inv_info[:] = np.nan diag = (sigma * np.sqrt(inv_info)).diagonal() # limit_parnums lists the indices of the array pars, that # correspond to the parameters of interest. We will pick out # the diagonal elements corresponding to entries in limits_parnums, # and return only those bounds to the user. upper_bounds = [] lower_bounds = [] error_flags = [] for num in limit_parnums: eflag = est_success ubound = diag[num] lbound = -diag[num] # What happens when lbound or ubound is NaN? This is # presumably why the code is written as it is below (e.g. a # pass if the values can be added to pars[num]). # if pars[num] + ubound < parhardmaxes[num]: pass else: ubound = np.nan eflag = est_hardmax if pars[num] + lbound > parhardmins[num]: pass else: lbound = np.nan if eflag == est_hardmax: eflag = est_hardminmax else: eflag = est_hardmin upper_bounds.append(ubound) lower_bounds.append(lbound) error_flags.append(eflag) return (np.array(lower_bounds), np.array(upper_bounds), np.array(error_flags), 0, inv_info) def projection(pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, sigma: float, eps: float, tol: float, maxiters: int, remin: float, limit_parnums: np.ndarray, # integers stat_cb: Callable, fit_cb: Callable, report_progress: Callable, get_par_name: Callable, do_parallel: bool, numcores: int ) -> EstReturn: """Estimate errors using the projection method.""" # Number of parameters to be searched on (*not* number of thawed # parameters, just number we are searching on) numsearched = len(limit_parnums) # _est_funcs.projection can be called on any subset of the thawed # parameters. So we made a change here to call _est_funcs.projection # once per parameter number listed in limit_parnums, instead of # calling _est_funcs.projection once, with the original limit_parnums # array. This way, we can report pack progress after the confidence # limit search is completed for each parameter, without descending # into the C++ code. # # It does mean we have to take apart the tuple returned by each call # to _est_funcs.projection; take the data we've pulled out, and # upon exiting the while loop, constructing a new tuple to return. # SMD 03/17/2009 proj_func = _est_funcs.projection # LocalEstFunction def func(counter: int, # unused singleparnum: int, lock: SupportsLock | None = None ) -> tuple[SupportsFloat, SupportsFloat, int, int, None]: try: singlebounds = proj_func(pars, parmins, parmaxes, parhardmins, parhardmaxes, sigma, eps, tol, maxiters, remin, [singleparnum], stat_cb, fit_cb) except EstNewMin as emin: # catch the EstNewMin exception and attach the modified # parameter values to the exception obj. These modified # parvals determine the new lower statistic. raise EstNewMin(pars) from emin if lock is not None: lock.acquire() report_progress(singleparnum, singlebounds[0], singlebounds[1]) if lock is not None: lock.release() return (singlebounds[0][0], singlebounds[1][0], singlebounds[2][0], singlebounds[3], None) if numsearched < 2 or not multi or numcores < 2: do_parallel = False if not do_parallel: lower_limits = np.zeros(numsearched) upper_limits = np.zeros(numsearched) eflags = np.zeros(numsearched, dtype=int) nfits = 0 for i, pnum in enumerate(limit_parnums): singlebounds = func(i, pnum) lower_limits[i] = singlebounds[0] upper_limits[i] = singlebounds[1] eflags[i] = singlebounds[2] nfits = nfits + singlebounds[3] return (lower_limits, upper_limits, eflags, nfits, None) return parallel_est(func, limit_parnums, pars, numcores) #################################confidence################################### class ConfArgs: """The class ConfArgs is responsible for the arguments to the fit call back function.""" def __init__(self, xpars: ArrayType, smin: ArrayType, smax: ArrayType, hmin: ArrayType, hmax: ArrayType, target_stat: SupportsFloat ) -> None: self.ith_par = 0 self.xpars = np.array(xpars, copy=True) self.slimit = (np.array(smin, copy=True), np.array(smax, copy=True)) self.hlimit = (np.array(hmin, copy=True), np.array(hmax, copy=True)) self.target_stat = target_stat def __call__(self ) -> tuple[int, np.ndarray, np.ndarray, np.ndarray, SupportsFloat]: return (self.ith_par, self.xpars, self.slimit, self.hlimit, self.target_stat) def __str__(self) -> str: a2s = np.array2string msg = '' msg += '# smin = ' + a2s(self.slimit[0], precision=6) + '\n' msg += '# smax = ' + a2s(self.slimit[1], precision=6) + '\n' msg += '# hmin = ' + a2s(self.hlimit[0], precision=6) + '\n' msg += '# hmax = ' + a2s(self.hlimit[1], precision=6) + '\n#\n' msg += '# Note: for the intermediate steps, the notation:\n' msg += ' par.name -/+: f( x ) = stat\n' msg += '# ==> `stat` is the statistic when parameter `par.name` is frozen at `x`\n' msg += '# while searching for the `lower/upper` confidence level, respectively.\n#' return msg def get_par(self) -> SupportsFloat: """return the current (worked on) par""" return self.xpars[self.ith_par] def get_hlimit(self, dir: int) -> SupportsFloat: """ return the current (worked on) hard limit""" return self.hlimit[dir][self.ith_par] def get_slimit(self, dir: int) -> SupportsFloat: """ return the current (worked on) soft limit""" return self.slimit[dir][self.ith_par] class ConfBlog: def __init__(self, blogger, prefix, verbose, lock) -> None: self.blogger = blogger self.prefix = prefix self.verbose = verbose self.lock = lock class Limit: """Represent a limit. This should not be used directly: use `LowerLimit` or `UpperLimit` instead. """ def __init__(self, limit: SupportsFloat) -> None: self.limit = limit class LowerLimit(Limit): "A lower limit" def __str__(self) -> str: return f'LowerLimit: limit={self.limit:e}' def is_beyond_limit(self, x) -> bool: return x < self.limit class UpperLimit(Limit): "An upper limit" def __str__(self) -> str: return f'UpperLimit: limit={self.limit:e}' def is_beyond_limit(self, x: SupportsFloat) -> bool: # The float calls are for typing checks. return float(x) > float(self.limit) class ConfBracket: """The class ConfBracket is responsible for bracketing the root within the interval (a,b) where f(a)*f(b) < 0.0""" neg_pos = (-1, 1) def __init__(self, myargs, trial_points) -> None: self.myargs = myargs self.trial_points = trial_points self.fcn: Callable | None = None # TODO: rename the dir and iter arguments def __call__(self, dir: int, iter, step_size, open_interval, maxiters, tol, bloginfo: ConfBlog ) -> ConfRootNone: # # Either 1) a root has been found (ConfRootZero), 2) an interval # where the root has been confined (ConfRootBracket) or 3) No # possible chance for a root (ConfRootNone), ie by trying points # upto/beyond the hard limit and no chance for a root has been found. # return self.find(dir, iter, step_size, open_interval, maxiters, tol, bloginfo) # TODO: rename the dir and iter arguments # TODO: the iter argument gets over-ridden below, so do we need it? (and rename it) def find(self, dir: int, iter, step_size, open_interval, maxiters, tol, bloginfo: ConfBlog, base=2.0 ) -> ConfRootNone: assert self.fcn is not None, 'callback func has not been set' hlimit = [LowerLimit(self.myargs.get_hlimit(dir)), UpperLimit(self.myargs.get_hlimit(dir))] slimit = [LowerLimit(self.myargs.get_slimit(dir)), UpperLimit(self.myargs.get_slimit(dir))] xxx = self.trial_points[0] fff = self.trial_points[1] conf_step = ConfStep(xxx, fff) mymaxiters = min(maxiters, 16) plateau = 0 max_plateau_iter = 5 try: # TODO: rename iter argument for iter in range(mymaxiters): if 0 == iter: x = conf_step.covar(dir, iter, step_size, base) elif 1 == iter: x = conf_step.secant(dir, iter, step_size, base) # x = conf_step.covar( dir, iter, step_size, base ) else: x = conf_step.quad(dir, iter, step_size, base, bloginfo) if x is None or np.isnan(x): return ConfRootNone() # Make sure x is not beyond the **hard** limit if hlimit[dir].is_beyond_limit(x): x = hlimit[dir].limit f = self.fcn(x, self.myargs()) # print 'find(): beyond hard limit: f(%.14e)=%.14e' % (x,f) if abs(f) <= tol: return ConfRootZero(x) if f >= 0.0: return ConfRootBracket(self.fcn, self.trial_points, open_interval) return ConfRootNone() if slimit[dir].is_beyond_limit(x): f = self.fcn(x, self.myargs()) # print 'find(): beyond soft limit: f(%.14e)=%.14e' % (x,f) if abs(f) <= tol: return ConfRootZero(x) if f >= 0.0: return ConfRootBracket(self.fcn, self.trial_points, open_interval) if f < fff[-2]: # if the fit beyond the soft limit is a better fit # then the confidence for the parameter does not exist return ConfRootNone() else: f = self.fcn(x, self.myargs()) # print 'find(): f(%.14e)=%.14e' % (x,f) if abs(f) <= tol: return ConfRootZero(x) if f >= 0.0: return ConfRootBracket(self.fcn, self.trial_points, open_interval) if Knuth_close(fff[-2], fff[-1], 1.0e-6): plateau += 1 if plateau > max_plateau_iter: # print 'find( %d ): plateau = %d', (iter,plateau) return ConfRootNone(None) # else: # if plateau > 0: # plateau -= 1 return ConfRootNone(None) except OutOfBoundErr: return ConfRootNone() class ConfRootNone: """The base class for the root of the confidence interval""" def __init__(self, root: SupportsFloat | None = None ) -> None: """If self.root == None, then 1) points up to the hard limits were tried and it was not possible to bracketed the solution. 2) a parameter beyond the soft limit has been tried and the new stat was found to be **less** then the initial minimum""" self.root = root def __call__(self, tol, bloginfo: ConfBlog ) -> SupportsFloat | None: return self.root def __str__(self) -> str: return 'No possible root exist' class ConfRootBracket(ConfRootNone): """The class contains the bracket where the confidence root has been bracketed, ie where f(a)*f(b) < 0""" def __init__(self, fcn, trial_points, open_interval ) -> None: super().__init__(root=None) self.fcn = fcn self.trial_points = trial_points self.open_interval = open_interval def __call__(self, tol, bloginfo: ConfBlog ) -> SupportsFloat | tuple[SupportsFloat, SupportsFloat] | None: def warn_user_about_open_interval(listval): if bloginfo.lock is not None: bloginfo.lock.acquire() if 0 == bloginfo.verbose: prefix = '%s ' % bloginfo.prefix.lstrip() else: prefix = '%s ' % bloginfo.prefix interval = list_to_open_interval(listval) bloginfo.blogger.info(prefix + 'WARNING: The confidence level lies within ' + interval) if bloginfo.lock is not None: bloginfo.lock.release() xxx = self.trial_points[0] fff = self.trial_points[1] if np.sign(fff[-2]) == np.sign(fff[-1]): self.root = None return None answer = zeroin(self.fcn, xxx[-2], xxx[-1], fa=fff[-2], fb=fff[-1], maxfev=32, tol=tol) if abs(answer[0][1]) > tol: xafa = answer[1][0] xa = xafa[0] fa = xafa[1] xbfb = answer[1][1] xb = xbfb[0] fb = xbfb[1] if np.sign(fa) != np.sign(fb): if not self.open_interval: warn_user_about_open_interval([xa, xb]) return (xa + xb) / 2.0 return min(xa, xb), max(xa, xb) return None self.root = answer[0][0] return self.root def __str__(self) -> str: msg = 'root is within the interval ( f(%e)=%e, f(%e)=%e )' \ % (self.trial_points[0][-2], self.trial_points[1][-2], self.trial_points[0][-1], self.trial_points[1][-1], ) return msg class ConfRootZero(ConfRootNone): """The class with the root/zero of the confidence interval""" def __str__(self) -> str: return f'root = {self.root:e}' class ConfStep: def __init__(self, xtrial, ftrial) -> None: self.xtrial = xtrial self.ftrial = ftrial def covar(self, dir, iter, stepsize, base): return self.xtrial[-1] + \ ConfBracket.neg_pos[dir] * pow(base, iter) * stepsize # return self.xtrial[ 0 ] + \ # ConfBracket.neg_pos[ dir ] * pow( base, iter ) * stepsize def Halley(self, coeffs, x, maxfev=8, tol=1.0e-3): for nfev in range(maxfev): ax = coeffs[0] * x fval = (ax + coeffs[1]) * x + coeffs[2] if abs(fval) <= tol: return [x, fval] fdif = 2.0 * ax + coeffs[1] fdif2 = 2.0 numer = 2.0 * fval * fdif denom = 2.0 * fdif * fdif - fval * fdif2 x -= numer / denom nfev += 1 return [x, fval] def is_same_dir(self, dir, current_pos, proposed_pos) -> bool: delta = proposed_pos - current_pos return np.sign(delta) == ConfBracket.neg_pos[dir] def quad(self, dir, iter, step_size, base, bloginfo): coeffs = quad_coef(self.xtrial[-3:], self.ftrial[-3:]) delta = ConfBracket.neg_pos[dir] delta *= abs(self.xtrial[-1] - self.xtrial[-2]) lastx = self.xtrial[-1] mroot = demuller(np.poly1d(coeffs), lastx + delta, lastx + 2 * delta, lastx + 3 * delta, tol=1.0e-2) xroot = mroot[0][0] if xroot is None or np.isnan(xroot): return self.covar(dir, iter, step_size, base) try: [xroot, froot] = self.Halley(coeffs, xroot, tol=1.0e-3) except ZeroDivisionError: xroot = None if xroot is not None and not np.isnan(xroot) and \ self.is_same_dir(dir, self.xtrial[-1], xroot): return xroot return self.covar(dir, iter, step_size, base) def secant(self, dir, iter, step_size, base): xb = self.xtrial[-2] fb = self.ftrial[-2] xa = self.xtrial[-1] fa = self.ftrial[-1] if abs(fb) > abs(fa) or 0.0 == fa: return self.covar(dir, iter, step_size, base) s = fb / fa p = (xa - xb) * s if 1.0 == s: return self.covar(dir, iter, step_size, base) q = 1.0 - s x = xb - p / q if self.is_same_dir(dir, xa, x): return x return self.covar(dir, iter, step_size, base) def confidence(pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, sigma: float, eps: float, tol: float, maxiters: int, remin: float, verbose: bool, # different to covariance/projection limit_parnums: np.ndarray, # integers stat_cb: Callable, fit_cb: Callable, report_progress: Callable, get_par_name: Callable, do_parallel: bool, numcores: int, open_interval ) -> EstReturn: def get_prefix(index, name, minus_plus): '''To print the prefix/indent when verbose is on''' blank = 3 * index * ' ' return [f"{blank}{name} {mtext}:" for mtext in minus_plus] def get_delta_root(arg, dir, par_at_min): my_neg_pos = ConfBracket.neg_pos[dir] if is_iterable(arg): # return map( lambda x: my_neg_pos * abs( x - par_at_min ), arg ) return arg if arg is not None: arg -= par_at_min return my_neg_pos * abs(arg) return arg def get_step_size(error_scales, upper_scales, index, par): if 0 != error_scales[index]: # if covar error is NaN then set it to fraction of the par value. ith_covar_err = 0.0625 * abs(par) else: ith_covar_err = abs(upper_scales[index]) if 0.0 == ith_covar_err: # just in case covar and/or par is 0 ith_covar_err = 1.0e-6 return ith_covar_err def monitor_func(fcn, history): def myfunc(x, *args): fval = fcn(x, *args) history[0].append(x) history[1].append(fval) return fval return myfunc def print_status(myblog, verbose, prefix, answer, lock): if lock is not None: lock.acquire() if 0 == verbose: msg = '%s\t' % prefix.lstrip() else: msg = '%s\t' % prefix if is_iterable(answer): msg += list_to_open_interval(answer) elif answer is None: msg += '-----' else: msg += '%g' % answer myblog(msg) if lock is not None: lock.release() # # Work in the translated coordinate. Hence the 'errors/confidence' # are the zeros/roots in the translated coordinate system. # def translated_fit_cb(fcn, myargs): def translated_fit_cb_wrapper(x, *args): hlimit = myargs.hlimit slimit = myargs.slimit hmin = hlimit[0] hmax = hlimit[1] xpars = myargs.xpars ith_par = myargs.ith_par # The parameter must be within the hard limits if x < hmin[ith_par] or x > hmax[ith_par]: raise OutOfBoundErr smin = slimit[0] smax = slimit[1] orig_ith_xpar = xpars[ith_par] xpars[ith_par] = x translated_stat = fcn( xpars, smin, smax, ith_par) - myargs.target_stat xpars[ith_par] = orig_ith_xpar return translated_stat return translated_fit_cb_wrapper def verbose_fitcb(fcn, bloginfo): if 0 == bloginfo.verbose: return fcn def verbose_fcn(x, *args): fval = fcn(x, *args) msg = '%s f( %e ) =' % (bloginfo.prefix, x) if fval is None: msg = '%s None' % msg else: msg = '%s %e' % (msg, fval) bloginfo.blogger.info(msg) return fval return verbose_fcn sherpablog = logging.getLogger('sherpa') # where to print progress report # Get minimum fit statistic, and calculate target statistic value orig_min_stat = stat_cb(pars) delta_stat = sigma * sigma target_stat = orig_min_stat + delta_stat lower_scales = None upper_scales = None error_scales = None nfits = 0 results = None try: (lower_scales, upper_scales, error_scales, nfits, results) = covariance(pars, parmins, parmaxes, parhardmins, parhardmaxes, 1.0, eps, tol, maxiters, remin, limit_parnums, stat_cb, fit_cb, report_progress) except EstNewMin as e: raise e except: error_scales = np.full(len(pars), est_hardminmax) myargs = ConfArgs(pars, parmins, parmaxes, parhardmins, parhardmaxes, target_stat) if 0 != verbose: msg = '#\n# f' + np.array2string(np.asarray(pars), precision=6) msg += ' = %e\n' % orig_min_stat msg += '# sigma = %e\n' % sigma msg += '# target_stat = %e\n' % target_stat msg += '# tol = %e\n' % eps msg += '%s' % myargs sherpablog.info(msg) # TODO: this dictionary is used to store a value, but the value is # never used. Do we need it? store = {} # LocalEstFunc def func(counter: int, singleparnum: int, lock: SupportsLock | None = None ) -> tuple[SupportsFloat, SupportsFloat, int, int, None]: counter_cb = FuncCounter(fit_cb) # # These are the bounds to be returned by this method # conf_int = [[], []] error_flags = [] # # If the user has requested a specific parameter to be # calculated then 'ith_par' represents the index of the # free parameter to deal with. # myargs.ith_par = singleparnum fitcb = translated_fit_cb(counter_cb, myargs) par_name = get_par_name(myargs.ith_par) ith_covar_err = get_step_size(error_scales, upper_scales, counter, pars[myargs.ith_par]) trial_points = [[], []] fitcb = monitor_func(fitcb, trial_points) bracket = ConfBracket(myargs, trial_points) # the parameter name is set, may as well get the prefix prefix = get_prefix(counter, par_name, ['-', '+']) myfitcb = [verbose_fitcb(fitcb, ConfBlog(sherpablog, prefix[0], verbose, lock)), verbose_fitcb(fitcb, ConfBlog(sherpablog, prefix[1], verbose, lock))] for dirn in range(2): # # trial_points stores the history of the points for the # parameter which has been evaluated in order to locate # the root. Note the first point is 'given' since the info # of the minimum is crucial to the search. # bracket.trial_points[0].append(pars[myargs.ith_par]) bracket.trial_points[1].append(- delta_stat) myblog = ConfBlog(sherpablog, prefix[dirn], verbose, lock) # have to set the callback func otherwise disaster. bracket.fcn = myfitcb[dirn] root = bracket(dirn, iter, ith_covar_err, open_interval, maxiters, eps, myblog) myzero = root(eps, myblog) delta_zero = get_delta_root(myzero, dirn, pars[myargs.ith_par]) conf_int[dirn].append(delta_zero) status_prefix = get_prefix(counter, par_name, ['lower bound', 'upper bound']) print_status(myblog.blogger.info, verbose, status_prefix[dirn], delta_zero, lock) # This should really set the error flag appropriately. error_flags.append(est_success) # # include the minimum point to separate the -/+ interval # store[par_name] = trial_points return (conf_int[0][0], conf_int[1][0], error_flags[0], counter_cb.nfev, None) if len(limit_parnums) < 2 or not multi or numcores < 2: do_parallel = False if not do_parallel: lower_limits = [] upper_limits = [] eflags = [] nfits = 0 for i, lpar in enumerate(limit_parnums): lower_limit, upper_limit, flags, nfit, extra = func( i, lpar) lower_limits.append(lower_limit) upper_limits.append(upper_limit) eflags.append(flags) nfits += nfit return (lower_limits, upper_limits, eflags, nfits, None) return parallel_est(func, limit_parnums, pars, numcores) #################################confidence################################### class LocalEstFunc(Protocol): """Process a single parameter.""" def __call__(self, counter: int, singleparnum: int, lock: SupportsLock | None = None ) -> tuple[SupportsFloat, SupportsFloat, int, int, None]: ... def parallel_est(estfunc: LocalEstFunc, limit_parnums: np.ndarray, # integers pars: np.ndarray, numcores: int = ncpus ) -> EstReturn: """Run a function on a sequence of inputs in parallel. A specialized version of sherpa.utils.parallel.parallel_map. Parameters ---------- estfunc : function This function accepts three arguments and returns a value. limit_parnums : sequence pars : sequence The current parameter values numcores : int, optional The number of calls to ``function`` to run in parallel. When set to ``None``, all the available CPUs on the machine - as set either by the 'numcores' setting of the 'parallel' section of Sherpa's preferences or by `multiprocessing.cpu_count` - are used. Returns ------- ans : tuple """ # See sherpa.utils.parallel for a discussion of how multiprocessing is # being used to run code in parallel. # manager = context.Manager() out_q = manager.Queue() err_q = manager.Queue() # Unlike sherpa.utils.parallel.parallel_map, these routines do use a lock # for serializing screen output. # lock = manager.Lock() size = len(limit_parnums) parids = np.arange(size) # if len(limit_parnums) is less than numcores, only use length number of # processes if size < numcores: numcores = size # group limit_parnums into numcores-worth of chunks limit_parnums = np.array_split(limit_parnums, numcores) parids = np.array_split(parids, numcores) def worker(parids, parnums) -> None: results = [] for parid, singleparnum in zip(parids, parnums): try: result = estfunc(parid, singleparnum, lock) results.append((parid, result)) except EstNewMin: # catch the EstNewMin exception and include the exception # class and the modified parameter values to the error queue. # These modified parvals determine the new lower statistic. # The exception class will be re-raised with the # parameter values attached. C++ Python exceptions are not # picklable for use in the queue. err_q.put(EstNewMin(pars)) return except Exception as e: err_q.put(e) return out_q.put(results) tasks = [context.Process(target=worker, args=(parid, parnum)) for parid, parnum in zip(parids, limit_parnums)] return run_tasks(tasks, out_q, err_q, size) def run_tasks(tasks: Sequence[SupportsProcess], out_q: SupportsQueue, err_q: SupportsQueue, size: int ) -> EstReturn: """Run the processes, exiting early if necessary, and return the results. A specialized version of sherpa.utils.parallel.run_tasks (note the different order of the queues). Parameters ---------- procs : list of multiprocessing.Process tasks The processes to run. out_q, err_q : manager.Queue The success and error and channels used by the processes. size : int The number of results being returned (can be larger than len(procs)). Returns ------- result : tuple """ process_tasks(tasks, err_q) lower_limits = size * [None] upper_limits = size * [None] eflags = size * [None] nfits = 0 while not out_q.empty(): for parid, singlebounds in out_q.get(): # Have to guarantee that the tuple returned by projection # is always (array, array, array, int) for this to work. lower_limits[parid] = singlebounds[0] upper_limits[parid] = singlebounds[1] eflags[parid] = singlebounds[2] nfits += singlebounds[3] return (lower_limits, upper_limits, eflags, nfits, None)