#
# Copyright (C) 2007, 2015 - 2019, 2021 - 2025
# Smithsonian Astrophysical Observatory
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
"""Provide Astronomy-specific I/O routines for Sherpa.
This module contains read and write routines for handling `FITS
<https://en.wikipedia.org/wiki/FITS>`_ and ASCII format data. The
actual support is provided by the selected I/O backend package
(currently Crates, provided by CIAO, or the FITS support in the
AstroPy package, if installed).
Which backend is used?
----------------------
When this module is first imported, Sherpa tries to import the
backends installed with Sherpa in the order listed in the
``.sherpa.rc`` or ``.sherpa-standalone.rc`` file. The first module that imports
successfully is set as the active backend. The following command prints the
name of the backend:
>>> from sherpa.astro import io
>>> print(io.backend.name)
Change the backend
------------------
After the initial import, the backend can be changed by loading one of the
I/O backends shipped with sherpa (or any other module that provides the same
interface):
>>> import sherpa.astro.io.pyfits_backend
>>> io.backend = sherpa.astro.io.pyfits_backend
"""
from collections.abc import Callable, Sequence, Mapping
from configparser import ConfigParser
from contextlib import suppress
import importlib
import importlib.metadata
import logging
from pathlib import Path
import re
from typing import TYPE_CHECKING, Any, Type, TypeVar
import numpy as np
from sherpa import get_config
from sherpa.astro.data import DataIMG, DataIMGInt, DataARF, DataRMF, \
DataPHA, DataRosatRMF
from sherpa.astro.utils import reshape_2d_arrays
from sherpa.data import Data, Data1D, Data2D, Data2DInt
from sherpa.io import _check_args
from sherpa.utils import is_subclass
from sherpa.utils.err import ArgumentErr, DataErr, IOErr
from sherpa.utils.numeric_types import SherpaFloat, SherpaUInt
from .types import NamesType, HdrTypeArg, HdrType, DataType, \
Header, HeaderItem, Column, TableBlock, ImageBlock, \
BlockList, BlockType, SpecrespBlock, MatrixBlock, EboundsBlock
# Responses can often be send as the Data object or the instrument
# version, so support it would be good to support this with types
# like
#
# RMFType = DataRMF | RMF1D
#
# but that is annoying thanks to circular dependencies.
#
if TYPE_CHECKING:
from sherpa.astro.instrument import ARF1D, RMF1D
ARFType = DataARF | ARF1D
RMFType = DataRMF | RMF1D
else:
ARFType = DataARF
RMFType = DataRMF
config = ConfigParser()
config.read(get_config())
# What should we use for the default package?
io_opt = [o.strip().lower() + '_backend' for o in
config.get('options', 'io_pkg', fallback='dummy').split()]
ogip_emin_str = config.get('ogip', 'minimum_energy', fallback='1.0e-10')
ogip_emin : float | None
if ogip_emin_str.upper() == 'NONE':
ogip_emin = None
else:
emsg = "Invalid value for [ogip] minimum_energy config value; " + \
"it must be None or a float > 0"
try:
ogip_emin = float(ogip_emin_str)
except ValueError as ve:
raise ValueError(emsg) from ve
if ogip_emin <= 0.0:
raise ValueError(emsg)
for iotry in io_opt:
with suppress(ImportError):
backend = importlib.import_module('.' + iotry,
package='sherpa.astro.io')
break
else:
# None of the options in the rc file work, e.g. because it's an
# old file that does not have dummy listed
import sherpa.astro.io.dummy_backend as backend
error = logging.getLogger(__name__).error
warning = logging.getLogger(__name__).warning
info = logging.getLogger(__name__).info
T = TypeVar('T')
__all__ = ('backend',
'read_table', 'read_image', 'read_arf', 'read_rmf', 'read_arrays',
'read_pha', 'write_image', 'write_pha', 'write_table',
'write_arf', 'write_rmf',
'pack_table', 'pack_image', 'pack_pha', 'read_table_blocks')
# Note: write_arrays is not included in __all__, so don't add to the
# See Also section.
#
[docs]
def read_arrays(*args) -> Data:
"""Create a dataset from multiple arrays.
The return value defaults to a `sherpa.data.Data1D` instance,
but this can be changed by supplying the required class
as the last argument (anything that is derived from
`sherpa.data.Data`).
Parameters
----------
*args
There must be at least one argument. The number of
arguments depends on the data type being read in.
The supported argument types depends on the I/O backend
in use (as supported by the ``get_column_data`` routine
provided by the backend).
Returns
-------
data : a sherpa.data.Data derived object
Examples
--------
The following examples do not contain argument types specific to
a particular I/O backend.
Create a `sherpa.data.Data1D` instance from the data in the
arrays ``x`` and ``y`` (taken to be the independent and
dependent axes respectively):
>>> d = read_arrays(x, y)
As in the previous example, but explicitly declaring the data type:
>>> d = read_arrays(x, y, sherpa.data.Data1D)
Create a `sherpa.data.Data2D` instance with the independent
axes ``x0`` and ``x1``, and dependent axis ``y``:
>>> d = read_arrays(x0, x1, y, sherpa.data.Data2D)
"""
largs = list(args)
if len(largs) == 0:
raise IOErr('noarrays')
if is_subclass(largs[-1], Data):
dstype = largs.pop()
else:
dstype = Data1D
dargs = backend.get_column_data(*largs)
# Determine max number of args for dataset constructor
_check_args(len(dargs), dstype)
return dstype('', *dargs)
[docs]
def read_table(arg,
ncols: int = 2,
colkeys: NamesType | None = None,
dstype: Type[Data1D] = Data1D) -> Data1D:
"""Create a dataset from a tabular file.
The supported file types (e.g. ASCII or FITS) depends on the
selected I/O backend.
Parameters
----------
arg
The name of the file or a representation of the file
(the type depends on the I/O backend).
ncols : int, optional
The first ncols columns are read from the file.
colkeys : None or list of strings, optional
If given, select these columns from the file.
dstype : optional
The data type to create (it is expected to follow the
`sherpa.data.Data` interface).
Returns
-------
data : a sherpa.data.Data derived object
See Also
--------
write_table
Examples
--------
The following examples do not contain argument types specific to
a particular I/O backend.
Create a `sherpa.data.Data1D` object from the first two
columns in the file ``src.fits``:
>>> d = read_table('src.fits')
Create a `sherpa.data.Data1DInt` object from the first three
columns in the file ``src.fits``:
>>> d = read_table('src.fits', ncols=3, dstype=Data1DInt)
Create a `sherpa.data.Data1D` data set from the specified
columns in ``tbl.fits``, where ``WLEN`` is used for the
independent axis, ``FLUX`` the dependent axis, and
``FLUXERR`` for the statistical error on the dependent axis:
>>> d = read_table('tbl.fits', colkeys=['WLEN', 'FLUX', 'FLUXERR'])
"""
hdu, name = backend.get_table_data(arg, ncols, colkeys)
cols = [col.values for col in hdu.columns]
# Determine max number of args for dataset constructor
_check_args(len(cols), dstype)
return dstype(name, *cols)
# TODO: should this be exported?
#
def read_ascii(filename: str,
ncols: int = 2,
colkeys: NamesType | None = None,
dstype: Type[Data1D] = Data1D,
**kwargs) -> Data1D:
"""Create a dataset from an ASCII tabular file.
Parameters
----------
filename : str
The name of the file or a representation of the file
(the type depends on the I/O backend).
ncols : int, optional
The first ncols columns are read from the file.
colkeys : None or list of strings, optional
If given, select these columns from the file.
dstype : optional
The data type to create (it is expected to follow the
`sherpa.data.Data` interface).
**kwargs
The remaining arguments are passed through to the
``get_ascii_data`` routine of the I/O backend. It is
expected that these include ``sep`` and ``comment``,
which describe the column separators and indicator of
a comment line, respectively.
Returns
-------
data : a sherpa.data.Data derived object
Examples
--------
The following examples do not contain argument types specific to
a particular I/O backend.
Create a `sherpa.data.Data1D` object from the first two
columns in the file ``src.dat``:
>>> d = read_ascii('src.dat')
Create A `sherpa.data.Data1DInt` object from the first three
columns in the file ``src.dat``:
>>> d = read_ascii('src.dat', ncols=3, dstype=Data1DInt)
Create a `sherpa.data.Data1D` data set from the specified
columns in ``tbl.fits``, where ``WLEN`` is used for the
independent axis, ``FLUX`` the dependent axis, and
``FLUXERR`` for the statistical error on the dependent axis:
>>> d = read_ascii('tbl.fits', colkeys=['WLEN', 'FLUX', 'FLUXERR'])
"""
hdu, name = backend.get_ascii_data(filename, ncols=ncols, colkeys=colkeys,
dstype=dstype, **kwargs)
cols = [col.values for col in hdu.columns]
# Determine max number of args for dataset constructor
_check_args(len(cols), dstype)
return dstype(name, *cols)
# TODO: Can this read in Data2D or only DataIMG?
[docs]
def read_image(arg,
coord: str = 'logical',
dstype: Type[Data2D] = DataIMG) -> Data2D:
"""Create an image dataset from a file.
.. versionchanged:: 4.16.0
Setting coord to a value other than 'logical' will now
correctly change the coordinate setting for `DataIMG` datasets.
Parameters
----------
arg
The name of the file or a representation of the file
(the type depends on the I/O backend).
coord : {'logical', 'physical', 'world'}, optional
The type of axis coordinates to use. An error is raised if the
file does not contain the necessary metadata for the selected
coordinate system or the selected dstype does not support it.
dstype : optional
The data type to create (it is expected to follow the
`sherpa.data.Data` interface).
Returns
-------
data : a sherpa.data.Data derived object
See Also
--------
write_image
Examples
--------
The following examples do not contain argument types specific to
a particular I/O backend.
Create a `sherpa.astro.data.DataIMG` object from the FITS file
``img.fits``:
>>> d = read_image('img.fits')
Select the physical coordinate system from the file:
>>> d = read_image('img.fits', coord='physical')
"""
block, filename = backend.get_image_data(arg)
img = block.image
ny, nx = img.shape
x0 = np.arange(nx, dtype=SherpaFloat) + 1.
x1 = np.arange(ny, dtype=SherpaFloat) + 1.
x0, x1 = reshape_2d_arrays(x0, x1)
data = {"y": img.ravel(), "shape": img.shape}
# We have to be careful with issubclass checks because
#
# base class | sub class
# Data2D Data2DInt
# Data2D DataIMG
# DataIMG DataIMGInt
#
if issubclass(dstype, DataIMG):
header = {}
for item in _remove_structural_items(block.header):
header[item.name] = item.value
data["header"] = header
data["sky"] = block.sky
data["eqpos"] = block.eqpos
# What are the independent axes?
#
if issubclass(dstype, (Data2DInt, DataIMGInt)):
data['x0lo'] = x0 - 0.5
data['x1lo'] = x1 - 0.5
data['x0hi'] = x0 + 0.5
data['x1hi'] = x1 + 0.5
elif issubclass(dstype, Data2D):
data['x0'] = x0
data['x1'] = x1
else:
# backend.get_image_data should have caught this case
#
raise ArgumentErr("bad", "dstype argument",
"dstype is not derived from Data2D")
# Note that we only set the coordinates after creating the
# dataset, which assumes that the independent axes are always in
# logical units. They may not be, but in this case we need to drop
# the logical/physical/world conversion system (or improve it to
# be more flexible). This is an attempt to resolve issue #1762,
# where sherpa.astro.ui.load_image(infile, coord="physical") did
# not behave sensibly (likely due to #1414 which was to address
# issue #1380).
#
dataset = dstype(filename, **data)
if isinstance(dataset, DataIMG):
dataset.set_coord(coord)
return dataset
[docs]
def read_arf(arg) -> DataARF:
"""Create a DataARF object.
Parameters
----------
arg
The name of the file or a representation of the file
(the type depends on the I/O backend) containing the
ARF data.
Returns
-------
data : sherpa.astro.data.DataARF
"""
block, filename = backend.get_arf_data(arg)
header = {}
exposure = None
for item in _remove_structural_items(block.header):
if item.name == "EXPOSURE":
exposure = item.value
else:
header[item.name] = item.value
data = {"energ_lo": block.rget("ENERG_LO").values,
"energ_hi": block.rget("ENERG_HI").values,
"specresp": block.rget("SPECRESP").values,
"exposure": exposure,
"header": header,
"ethresh": ogip_emin}
blo = block.get("BIN_LO")
bhi = block.get("BIN_HI")
if blo is not None and bhi is not None:
data["bin_lo"] = blo.values
data["bin_hi"] = bhi.values
return DataARF(filename, **data)
def _extract_rmf(matrix: MatrixBlock,
ebounds: EboundsBlock,
filename: str) -> dict[str, Any]:
"""Extract the matrix data from the table.
Parameters
----------
matrix, ebounds : TableBlock
The MATRIX and EBOUNDS blocks.
filename : str
The file name (used for error/warning messages).
Returns
-------
data : dict
The arguments needed for creating a RMF.
"""
header = {}
detchans = 0
for item in _remove_structural_items(matrix.header):
if item.name == "DETCHANS":
detchans = int(item.value)
else:
header[item.name] = item.value
# Validate the data:
# - check that OFFSET is set in either F_CHAN column of MATRIX
# or CHANNEL column of EBOUNDS
# - flatten out the F_CHAN, N_CHAN, and MATRIX columns
#
f_chan = matrix.rget("F_CHAN")
channel = ebounds.rget("CHANNEL")
if f_chan.minval is not None:
offset = f_chan.minval
elif channel.minval is not None:
offset = channel.minval
else:
offset = 1
error("Failed to locate TLMIN keyword for F_CHAN column "
"in RMF file '%s'; Update the offset value in the "
"RMF data set to the appropriate TLMIN value prior "
"to fitting", filename)
# The n_grp column remains as is but the other columns
# - remove any rows where n_grp for that row is 0
# - flatten out any 2D structure or variable-length
# data
#
# This is not designed to be the fastest code.
#
n_grp_raw = matrix.rget("N_GRP").values
f_chan_raw = f_chan.values
n_chan_raw = matrix.rget("N_CHAN").values
mat_raw = matrix.rget("MATRIX").values
good = n_grp_raw > 0
ng_raw = n_grp_raw[good]
fc_raw = f_chan_raw[good]
nc_raw = n_chan_raw[good]
mx_raw = mat_raw[good]
# Since these values are sent to the fold_rmf routine in
# sherpa.astro.utils._utils, we want the types to match up to
# avoid conversion time in that routine. The n_grp, f_chan, and
# n_chan arrays should be SherpaUIntArray and the matrix should be
# given by sherpa.utils.numeric_types. This conversion should
# probably be done by DataRMF itself though.
#
f_chan_l = []
n_chan_l = []
mat_l = []
for ng, fc, nc, mxs in zip(ng_raw, fc_raw, nc_raw, mx_raw):
# fc and nc may be scalars rather than an array (this is known
# before the loop but for now we check for it each row).
# It is assumed that fc and nc have the same "size" (i.e.
# array or scalar).
#
try:
f_chan_l.append(fc[:ng])
n_chan_l.append(nc[:ng])
except IndexError:
f_chan_l.append([fc])
n_chan_l.append([nc])
# Loop through all the matrix elements, restricting to nchan
# (in case we do not have a VLF matrix array). The matrix may
# also be a scalar.
#
start = 0
for nchan in n_chan_l[-1]:
# nchan may be an unsigned int, which could lead to type casting
# errors, so explicitly convert to an int.
end = start + int(nchan)
try:
mat_l.append(mxs[start:end])
except IndexError:
# Assume the scalar case
mat_l.append([mxs])
start = end
def flatten(xs, dtype):
return np.concatenate(xs).astype(dtype)
return {"detchans": detchans,
"energ_lo": matrix.rget("ENERG_LO").values,
"energ_hi": matrix.rget("ENERG_HI").values,
"n_grp": np.asarray(n_grp_raw, dtype=SherpaUInt),
"f_chan": flatten(f_chan_l, dtype=SherpaUInt),
"n_chan": flatten(n_chan_l, dtype=SherpaUInt),
"matrix": flatten(mat_l, dtype=SherpaFloat),
"offset": offset,
"e_min": ebounds.rget("E_MIN").values,
"e_max": ebounds.rget("E_MAX").values,
"header": header,
"ethresh": ogip_emin # should we take this from the header?
}
[docs]
def read_rmf(arg) -> DataRMF:
"""Create a DataRMF object.
Parameters
----------
arg
The name of the file or a representation of the file
(the type depends on the I/O backend) containing the
RMF data.
Returns
-------
data : sherpa.astro.data.DataRMF or subclass
"""
matrixes, ebounds, filename = backend.get_rmf_data(arg)
# matrixes is guaranteed not to be empty
nmat = len(matrixes)
if nmat > 1:
# Warn the user that the multi-matrix RMF is not supported.
#
error("RMF in %s contains %d MATRIX blocks; "
"Sherpa only uses the first block!",
filename, nmat)
# Use the metadata from the first matrix
#
matrix = matrixes[0]
data = _extract_rmf(matrix, ebounds, filename)
return _rmf_factory(filename, data)
def _rmf_factory(filename: str,
data: Mapping[str, Any]) -> DataRMF:
response_map = {
'ROSAT': DataRosatRMF,
'DEFAULT': DataRMF,
}
rmf_telescop = data['header'].get('TELESCOP', 'DEFAULT')
rmf_class = response_map.get(rmf_telescop, DataRMF)
return rmf_class(filename, **data)
def _read_ancillary(header: Header,
key: str,
label: str,
dpath: Path,
read_func: Callable[[str], T],
output_once: bool) -> T | None:
"""Read in a file if the keyword is set.
Parameters
----------
header : Header
The header information. This may be updated.
key : str
The key to look for in data. If not set, or its value compares
case insensitively to "none" then nothing is read in.
label : str
This is used to identify the file being read in. It is only
used if output_once is set.
dname : Path
The location of the file containing the metadata. This is prepended
to the value read from the data argument if it is not an
absolute path.
read_func
The function used to read in the file: it expects one argument,
the file to read in, and returns the data object.
output_once : bool, optional
If set then the routine uses the Sherpa logger to display
informational or warning information.
Returns
-------
data : None or a sherpa.data.Data derived object
"""
val = header.get(key)
if val is None:
return None
if not isinstance(val.value, str):
return None
if val.value.lower() == "none":
return None
# Rely on pathlib to handle the path combinations.
#
filename = str(dpath / Path(val.value))
out = None
try:
out = read_func(filename)
if output_once:
info("read %s file %s", label, filename)
except Exception as exc:
if output_once:
warning("unable to read %s: %s", label, str(exc))
return out
def _process_pha_block(filename: str,
table: TableBlock,
*,
output_once: bool,
use_errors: bool,
use_background: bool) -> DataPHA:
"""Convert a PHA block into a DataPHA object.
This is assumed to be a "type I" block.
"""
# The assumption is that the table requires a CHANNEL column,
# e.g. as guaranteed by read_pha, but leave this check in.
#
chancol = table.get("CHANNEL")
if chancol is None:
raise IOErr("reqcol", "CHANNEL", filename)
channel = chancol.values
nrows = channel.size
expval = table.header.get("EXPOSURE")
exposure = None if expval is None else expval.value
def get(name: str,
expand: bool = False) -> np.ndarray | int | float | np.integer | np.floating | None:
"""Return the column values if they exist.
This checks for columns and then the header. For the header
case we can keep the scalar case (expand=False) or convert
into a ndarray.
"""
col = table.get(name)
if col is not None:
return col.values
colval = table.header.get(name)
if colval is None:
return None
if isinstance(colval.value, (str, np.floating)):
val = float(colval.value)
elif isinstance(colval.value, (bool, np.bool_, np.integer)):
val = int(colval.value)
else:
val = colval.value
if expand:
return np.full(nrows, val)
return val
def getcol(name: str) -> np.ndarray | None:
"""Return the column values if they exist.
This does not check for a matching keyword to support the
original code (even though the OGIP standard allows for these
names).
"""
col = table.get(name)
if col is None:
return None
return col.values
# Why do we remove the *FILE keywords? See #1885
#
unwanted = ["ANCRFILE", "BACKFILE", "RESPFILE",
"BACKSCAL", "AREASCAL", "EXPOSURE"]
header = _remove_structural_items(table.header)
basic_header = dict((k.name, k.value) for k in header
if k.name not in unwanted)
# The statistical and systematic errors can be
# - not set
# - a column
# - a keyword
#
# However, we can have values of 0, which is not useful for
# Sherpa, so we remove them (this is likely pointing at the
# handling of the POISERR keyword being suboptimal).
#
staterr = get("STAT_ERR", expand=True)
syserr = get("SYS_ERR", expand=True)
if staterr is not None and np.all(staterr == 0.0):
staterr = None
if syserr is not None and np.all(syserr == 0.0):
syserr = None
# Do we warn the user that the errors read in from the file are
# being ignored?
#
if not use_errors:
if output_once and staterr is not None or syserr is not None:
if staterr is None:
msg = 'systematic'
elif syserr is None:
msg = 'statistical'
# Extra warning
warning("systematic errors were not found in "
"file '%s'", filename)
else:
msg = 'statistical and systematic'
# TODO: remove extra space at end of first line
info("%s errors were found in file '%s'\nbut not used; "
"to use them, re-read with use_errors=True", msg, filename)
staterr = None
syserr = None
# For relative path we use the location of the PHA file as the
# base.
#
dpath = Path(filename).parent
albl = 'ARF'
rlbl = 'RMF'
if use_background:
albl = albl + ' (background)'
rlbl = rlbl + ' (background)'
arf = _read_ancillary(table.header, "ANCRFILE", albl, dpath,
read_arf, output_once)
rmf = _read_ancillary(table.header, "RESPFILE", rlbl, dpath,
read_rmf, output_once)
# Do we create the backgrounds directly?
#
kwargs = {"channel": channel,
"bin_lo": get("BIN_LO"),
"bin_hi": get("BIN_HI"),
# "grouping": get("GROUPING", expand=True),
# "quality": get("QUALITY", expand=True),
"grouping": getcol("GROUPING"),
"quality": getcol("QUALITY"),
"exposure": exposure,
"header": basic_header}
pha = DataPHA(filename, counts=get("COUNTS"),
staterror=staterr, syserror=syserr,
backscal=get("BACKSCAL"), areascal=get("AREASCAL"),
**kwargs)
pha.set_response(arf, rmf)
# Are there any backgrounds to process?
#
backgrounds = []
# Do we read in the background data?
# TODO: The use_background logic doesn't seem to make sense here.
#
backfile = table.header.get("BACKFILE")
if backfile is not None and isinstance(backfile.value, str) \
and backfile.value.lower() != "none" and \
not use_background:
bfilename = str(dpath / Path(backfile.value))
try:
backgrounds = _process_pha_backfile(bfilename, arf, rmf,
output_once=output_once,
use_errors=use_errors)
except Exception as exc:
if output_once:
warning("unable to read background: %s", str(exc))
for colname, scalname in [('BACKGROUND_UP', 'BACKSCUP'),
('BACKGROUND_DOWN', 'BACKSCDN')]:
col = table.get(colname)
if col is None:
continue
bkg = DataPHA(filename, counts=col.values,
backscal=get(scalname), **kwargs)
bkg.set_response(arf, rmf)
if output_once:
info("read %s into a dataset from file %s",
colname.lower(), filename)
backgrounds.append(bkg)
for idx, bkg in enumerate(backgrounds, 1):
if bkg.grouping is None:
bkg.grouping = pha.grouping
bkg.grouped = bkg.grouping is not None
if bkg.quality is None:
bkg.quality = pha.quality
pha.set_background(bkg, idx)
# set units *after* bkgs have been set
pha._set_initial_quantity()
return pha
def _process_pha_backfile(filename: str,
arf: DataARF | None,
rmf: DataRMF | None,
*,
output_once: bool,
use_errors: bool) -> list[DataPHA]:
bkgs = read_pha(filename, use_errors=use_errors, use_background=True)
if output_once:
info("read background file %s", filename)
# read_pha can return a single item or a list.
if isinstance(bkgs, DataPHA):
blist = [bkgs]
else:
blist = list(bkgs)
for bkg in blist:
if rmf is not None and bkg.get_response() == (None, None):
bkg.set_response(arf, rmf)
return blist
[docs]
def read_pha(arg,
use_errors: bool = False,
use_background: bool = False
) -> DataPHA | list[DataPHA]:
"""Create a DataPHA object.
.. versionchanged:: 4.17.0
Channel numbers that start at 0 are now left as is rather than
be renumbered to start at 1.
Parameters
----------
arg
The name of the file or a representation of the file
(the type depends on the I/O backend) containing the
PHA data.
use_errors : bool, optional
If the PHA file contains statistical error values for the
count (or count rate) column, should it be read in. This
defaults to ``False``.
use_background : bool, optional
Should the background PHA data (and optional responses) also
be read in and associated with the data set?
Returns
-------
data : sherpa.astro.data.DataPHA
"""
pha, filename = backend.get_pha_data(arg,
use_background=use_background)
# Check all the columns have the same shape (e.g. all type I or
# II). The CHANNEL column is guaranteed to exist at this point
# and be 1 or 2D.
#
chancol = pha.rget("CHANNEL")
channel = chancol.values
if len(channel.shape) == 1:
ptype = 1
elif len(channel.shape) == 2:
ptype = 2
else:
# This should not be possible
raise IOErr(f"PHA {filename}: unable to handle CHANNEL shape")
# Prior to 4.17.0 the code tried to ensure that the first channel
# was numbered 1. The approach is to now use the values read in
# from file / memory.
#
# Convert from RATE to COUNT. Ideally this would be passed to
# DataPHA so it knew (and could then enforce a non-Poisson
# statistic).
#
# This is done before any II to I conversion. The error checks
# should not be needed, because SpectrumBlock should enforce these
# invariants, but leave in for now.
#
if pha.get("COUNTS") is None:
rate = pha.get("RATE")
if rate is None:
raise IOErr("reqcol", "COUNTS or RATE", filename)
expval = pha.header.get("EXPOSURE")
if expval is None:
raise IOErr("nokeyword", filename, "EXPOSURE")
exposure = expval.value
# Change the RATE column into a "COUNTS" column.
#
rate.values *= exposure
rate.name = "COUNTS"
# What other columns need to be converted?
#
# How about SYS_ERR, BACKGROUND_UP/DOWN?
#
# TODO: this likely fails if STAT_ERR is a keyword
# and not a column.
#
stat_err = pha.get("STAT_ERR")
if stat_err is not None:
stat_err.values *= exposure
if ptype == 1:
phas = [pha]
else:
special = ["TG_M", "TG_PART", "TG_SRCID", "SPEC_NUM"]
tg_m = pha.get("TG_M")
tg_part = pha.get("TG_PART")
tg_srcid = pha.get("TG_SRCID")
spec_num = pha.get("SPEC_NUM")
phas = []
for idx in range(channel.shape[0]):
header = Header([item for item in pha.header.values
if item.name not in special])
if tg_m is not None:
header.add(HeaderItem("TG_M", tg_m.values[idx]))
if tg_part is not None:
header.add(HeaderItem("TG_PART", tg_part.values[idx]))
if tg_srcid is not None:
header.add(HeaderItem("TG_SRCID", tg_srcid.values[idx]))
if spec_num is not None:
header.add(HeaderItem("SPEC_NUM", spec_num.values[idx]))
cols = [Column(col.name, values=col.values[idx].copy(),
desc=col.desc, unit=col.unit,
minval=col.minval, maxval=col.maxval)
for col in pha.columns
if col.name not in special]
phas.append(TableBlock(pha.name, header=header, columns=cols))
output_once = True
datasets = []
for p in phas:
data = _process_pha_block(filename, p,
output_once=output_once,
use_errors=use_errors,
use_background=use_background)
output_once = False
datasets.append(data)
if len(datasets) == 1:
return datasets[0]
return datasets
def _empty_header(creator=False) -> Header:
"""Create an empty header."""
out = Header([])
if creator:
_add_creator(out)
return out
def _pack_header(header: HdrTypeArg) -> Header:
"""Convert a header and add a CREATOR keyword if not set."""
out = Header([HeaderItem(name=name, value=value)
for name, value in header.items()])
_add_creator(out)
return out
def _pack_table(dataset: Data1D) -> BlockList:
"""Identify the columns in the data.
This relies on the _fields attribute containing the data columns,
and _extra_fields the extra information (other than the name
column). We only return values from _fields that contain data.
"""
cols = []
for name in dataset._fields:
if name == 'name':
continue
val = getattr(dataset, name)
if val is None:
continue
cols.append(Column(name.upper(), val))
empty = _empty_header()
blocks: list[BlockType]
blocks = [TableBlock(name="TABLE", header=empty, columns=cols)]
return BlockList(header=empty, blocks=blocks)
def _pack_image(dataset: Data2D) -> ImageBlock:
"""Extract the needed data of a 2D dataset."""
if not isinstance(dataset, Data2D):
raise IOErr('notimage', dataset.name)
img = dataset.get_img()
# Data2D does not have a header or sky/eqpos
#
hdr = getattr(dataset, "header", {})
sky = getattr(dataset, "sky", None)
eqpos = getattr(dataset, "eqpos", None)
header = Header([HeaderItem(name, value)
for name, value in hdr.items()])
return ImageBlock("IMAGE", header=header, image=img,
sky=sky, eqpos=eqpos)
# This is to match NAXIS1 and the like, but we do not make it
# very prescriptive (ie enforce the FITS keyword standards).
#
NUMBERED_KEYWORD_PAT = re.compile(r"^([^\d]+)(\d+)$")
def _is_structural_keyword(key: str) -> bool:
"""Do we think this is a "structural" keyword?
Parameters
----------
key : str
The keyword (need not be upper case).
Returns
-------
flag : bool
Is this considered a structural keyword.
Notes
-----
The decision of what is a structural header and what isn't is
rather ad-hoc and may need to be updated over time. The current
code is based in part on the `HEASARC keyword list
<https://heasarc.gsfc.nasa.gov/docs/fcg/standard_dict>`_.
This does not strip out the OGIP keywords like HDUCLAS1 or
HDUVERS.
It does not remove secondary WCS keywords, like CTYPE1P.
"""
ukey = key.upper()
if ukey in ["BITPIX", "BLANK", "BLOCKED", "BSCALE", "BUNIT",
"BZERO", "DATAMAX", "DATAMIN", "END", "EXTEND",
"EXTLEVEL", "EXTNAME", "EXTVER", "GCOUNT", "GROUPS",
"NAXIS", "PCOUNT", "SIMPLE", "TFIELDS", "THEAP",
"XTENSION",
# other common ones
"CHECKSUM", "DATASUM", "CHECKVER",
# HEASARC
"TSORTKEY",
# CIAO uses this as well as EXTNAME
"HDUNAME"
]:
return True
# Keywords which end in 1, 2, ..
match = NUMBERED_KEYWORD_PAT.match(ukey)
if match is None:
return False
# The pattern is not completely robust so catch a case when
# it does not restrict to an integer > 0.
#
if match[2].startswith("0"):
return False
return match[1] in ["CDELT", "CROTA", "CRPIX", "CRVAL", "CTYPE", "CUNIT",
"NAXIS", "PSCAL", "PTYPE", "PZERO", "TBCOL",
"TDIM", "TDISP", "TFORM", "TNULL", "TSCAL",
"TTYPE", "TUNIT", "TZERO",
"TCTYP", "TCUNI", "TCRPX", "TCRVL", "TCDLT",
# HEASARC
"TDMAX", "TDMIN", "TLMAX", "TLMIN",
# CIAO Data-Model keywords
"DSTYP", "DSVAL", "DSFORM", "DSUNIT", "TDBIN",
"MTYPE", "MFORM",
]
def _remove_structural_items(header: Header) -> list[HeaderItem]:
"""Remove FITS keywords relating to file structure.
The aim is to allow writing out a header that was taken from a
file and not copy along "unwanted" values, since the data may
no-longer be relevant. Not all FITS header keys may be passed
along by a particular backend (e.g. crates).
Parameters
----------
header : Header
The input header
Returns
-------
nheader : list of HeaderItem
Header with unwanted keywords returned (including those
set to None).
Notes
-----
The tricky part is knowing what is unwanted, since copying
along a WCS transform for a column may be useful, or may
break things.
"""
return [item for item in header.values
if not _is_structural_keyword(item.name)]
# By making this a package-level value, users can actually change the
# text (and in fact the name) by using code like
#
# from sherpa.astro.io import CREATOR
# CREATOR.value = "not sherpa"
#
# but it does not seem worth documenting this at this time.
#
CREATOR = HeaderItem(name="CREATOR",
value=f"sherpa {importlib.metadata.version('sherpa')}",
desc="Program creating this file")
def _add_creator(header: Header) -> None:
"""Add a CREATOR card if not set."""
if header.get("CREATOR") is not None:
return
header.add(CREATOR)
def _pack_pha(dataset: DataPHA) -> BlockList:
"""Extract FITS column and header information.
Notes
-----
The `PHA Data Extension header page
<https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/spectra/ogip_92_007/node6.html>`_
lists the following keywords as either required or
we-really-want-them:
EXTNAME = "SPECTRUM"
TELESCOP - the "telescope" (i.e. mission/satellite name).
INSTRUME - the instrument/detector.
FILTER - the instrument filter in use (if any)
EXPOSURE - the integration time (in seconds) for the PHA data (assumed to be corrected for deadtime, data drop-outs etc. )
BACKFILE - the name of the corresponding background file (if any)
CORRFILE - the name of the corresponding correction file (if any)
CORRSCAL - the correction scaling factor.
RESPFILE - the name of the corresponding (default) redistribution matrix file (RMF; see George et al. 1992a).
ANCRFILE - the name of the corresponding (default) ancillary response file (ARF; see George et al. 1992a).
HDUCLASS = "OGIP"
HDUCLAS1 = "SPECTRUM"
HDUVERS = "1.2.1"
POISSERR - whether Poissonian errors are appropriate to the data (see below).
CHANTYPE - whether the channels used in the file have been corrected in any way (see below).
DETCHANS - the total number of detector channels available.
We also add in the following, defaulting to the first value - we
should do better to support HDUCLAS3=RATE data!
HDUCLAS2 - indicating the type of data stored.
Allowed values are:
'TOTAL' for a gross PHA Spectrum (source + bkgd)
'NET' for a bkgd-subtracted PHA Spectrum
'BKG' for a bkgd PHA Spectrum
HDUCLAS3 - indicating further details of the type of data stored.
Allowed values are:
'COUNT' for PHA data stored as counts (rather than count/s)
'RATE' for PHA data stored in count/s
HDUCLAS4 - indicating whether this is a type I or II extension.
Allowed values are:
'TYPE:I' for type I (single spectrum) data
'TYPE:II' for type II (multiple spectra) data
The POISSERR keyword is not required if a STAT_ERR column is
present however it is recommended in this case for clarity. If
STAT_ERR is to be used for the errors then POISSERR is set to
false.
The TLMIN1 and TLMAX1 values are always set to match the CHANNEL
data range. This was changed in 4.17.0 as previously they were
only set when the channel range did not start at 1.
"""
# The logic here repeats some of the checks that probably should
# be done by the DataPHA class itself. However, it is likely
# that we don't want to make the DataPHA class always reject
# inconsistent state, as this could preclude certain workflows,
# so we need some validation here.
#
if not isinstance(dataset, DataPHA):
raise IOErr("notpha", dataset.name)
arf, rmf = dataset.get_response()
bkg = dataset.get_background()
# The default keywords; these will be over-ridden by
# anything set by the input.
#
default_header = {
"EXTNAME": "SPECTRUM",
"HDUCLASS": "OGIP",
"HDUCLAS1": "SPECTRUM",
"HDUCLAS2": "TOTAL",
"HDUCLAS3": "COUNT",
"HDUCLAS4": "TYPE:I",
"HDUVERS": "1.2.1",
"HDUDOC": "Arnaud et al. 1992a Legacy 2 p 65",
# The PHA file should have an EXPOSURE value, but just in case.
"EXPOSURE": 1.0,
# Rely on the DataPHA class to have set up TELESCOP/INSTRUME/FILTER
# based on any associated background or response. If the user has
# changed them then so be it.
#
"TELESCOP": "none",
"INSTRUME": "none",
"FILTER": "none",
"CORRFILE": "none",
"CORRSCAL": 0,
"CHANTYPE": "PI",
"RESPFILE": "none",
"ANCRFILE": "none",
"BACKFILE": "none"
}
# Merge the keywords
#
merged_header = default_header | dataset.header
header = _pack_header(merged_header)
if dataset.channel is not None and header.get("DETCHANS") is None:
header.add(HeaderItem("DETCHANS", len(dataset.channel),
desc="Number of channels"))
# Over-write the header value (if set). This value should really
# exist (OGIP standards) but may not, particularly for testing.
#
if dataset.exposure is not None:
header.delete("EXPOSURE")
header.add(HeaderItem("EXPOSURE", float(dataset.exposure),
unit="s", desc="Exposure time"))
# See #1885 for a discussion about the current behaviour.
#
if rmf is not None:
header.delete("RESPFILE")
header.add(HeaderItem("RESPFILE", rmf.name.split("/")[-1],
desc="RMF"))
if arf is not None:
header.delete("ANCRFILE")
header.add(HeaderItem("ANCRFILE", arf.name.split("/")[-1],
desc="ARF"))
if bkg is not None:
header.delete("BACKFILE")
header.add(HeaderItem("BACKFILE", bkg.name.split("/")[-1],
desc="Background"))
# Create the columns. Special case the CHANNEL and COUNTS
# columns.
#
# TODO: there is currently no way to know we should write out a
# RATE column instead of COUNTS.
#
cols = []
if dataset.channel is not None:
channel = Column("CHANNEL", dataset.channel.astype(np.int32),
desc="Channel values")
# Ensure the min/max ranges are written out as integers since
# data.channel may contain floating point values. Always
# re-write so that the file matches the data.
#
channel.minval = int(dataset.channel[0])
channel.maxval = int(dataset.channel[-1])
cols.append(channel)
if dataset.counts is not None:
if np.issubdtype(dataset.counts.dtype, np.integer):
countvals = dataset.counts.astype(np.int32)
elif np.issubdtype(dataset.counts.dtype, np.floating):
countvals = dataset.counts.astype(np.float32)
else:
raise DataErr("ogip-error", "PHA dataset", dataset.name,
"contains an unsupported COUNTS column")
cols.append(Column("COUNTS", countvals, desc="Counts"))
if dataset.staterror is not None:
desc = "Statistical error"
if np.ptp(dataset.staterror) == 0:
header.add(HeaderItem("STAT_ERR", dataset.staterror[0],
desc=desc))
else:
cols.append(Column("STAT_ERR",
dataset.staterror.astype(np.float32),
desc=desc))
def addfield(name, value, dtype, desc):
"""Add as keyword or array of the expected type"""
if value is None:
header.add(HeaderItem(name, dtype(0), desc=desc))
elif np.ptp(value) == 0:
header.add(HeaderItem(name, dtype(value[0]), desc=desc))
else:
cols.append(Column(name, value.astype(dtype), desc=desc))
# Should this convert the value/values back into a fractional error?
addfield("SYS_ERR", dataset.syserror, np.float32,
"Fractional systematic error")
addfield("GROUPING", dataset.grouping, np.int16,
"Grouping (1,-1,0)")
addfield("QUALITY", dataset.quality, np.int16,
"Quality (0 for okay)")
if dataset.bin_lo is not None and dataset.bin_hi is not None:
cols.extend([Column("BIN_LO", dataset.bin_lo),
Column("BIN_HI", dataset.bin_hi)])
def addscal(value, name, label):
"""Add the scaling value."""
if value is None:
header.add(HeaderItem(name, 1.0, desc=label))
elif np.isscalar(value):
header.add(HeaderItem(name, float(value), desc=label))
else:
# Ensure we have no scalar value in the header
header.delete(name)
# OGIP standard has this as a 4-byte real column/value
cols.append(Column(name, value.astype(np.float32),
desc=label))
# Note that CORRSCAL is set in default_header. Can it also
# be a vector?
#
addscal(dataset.backscal, "BACKSCAL", "Background scaling factor")
addscal(dataset.areascal, "AREASCAL", "Area scaling factor")
# Try to set the POISSERR keyword *IF NOT SET*, but Sherpa does
# not use this reliably.
#
if header.get("POISSERR") is None:
header.add(HeaderItem("POISSERR",
dataset.staterror is None))
pheader = _empty_header()
blocks: list[BlockType]
blocks = [TableBlock("SPECTRUM", header=header, columns=cols)]
return BlockList(header=pheader, blocks=blocks)
def _pack_arf(dataset: ARFType) -> BlockList:
"""Extract FITS column and header information.
There is currently no support for Type II ARF files.
Notes
-----
The `ARF Data Extension header page
<https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html>`_
lists the following keywords as either required or
we-really-want-them:
EXTNAME = 'SPECRESP'
TELESCOP - the "telescope" (ie mission/satellite name).
INSTRUME - the instrument/detector.
FILTER - the instrument filter in use (if any)
HDUCLASS = 'OGIP' - file format is OGIP standard.
HDUCLAS1 = 'RESPONSE' - extension contains response data.
HDUCLAS2 = 'SPECRESP' - extension contains an ARF.
HDUVERS = '1.1.0' - version of the file format.
We do not add the ARFVERSN, HDUVERS1, or HDUVERS2 keywords which
are marked as obsolete.
"""
# The UI layer wraps up a DataARF into ARF1D quite often, so
# support both types as input.
#
from sherpa.astro.instrument import ARF1D
if not isinstance(dataset, (DataARF, ARF1D)):
raise IOErr("data set is not an ARF")
# Check we have data. Should we allow missing columns?
#
for col in ["energ_lo", "energ_hi", "specresp"]:
if getattr(dataset, col) is None:
raise ArgumentErr("bad", "ARF",
f"{col.upper()} column is missing")
# The default keywords; these will be over-ridden by
# anything set by the input.
#
default_header: HdrType = {
"EXTNAME": "SPECRESP",
"HDUCLASS": "OGIP",
"HDUCLAS1": "RESPONSE",
"HDUCLAS2": "SPECRESP",
"HDUVERS": "1.1.0",
"TELESCOP": "none",
"INSTRUME": "none",
"FILTER": "none",
}
# Merge the keywords
#
merged_header = default_header | dataset.header
aheader = _pack_header(merged_header)
# The exposure time is not an OGIP-mandated value but
# may be used by CIAO, so copy it across if set.
#
if dataset.exposure is not None:
aheader.add(HeaderItem("EXPOSURE", float(dataset.exposure),
desc="Total exposure time",
unit="s"))
# The column ordering for the output file is determined by the
# order the keys are added to the data dict. Ensure the
# data type meets the FITS standard (Real4).
#
acols = [Column("ENERG_LO", unit="keV", desc="Energy",
values=dataset.energ_lo.astype(np.float32)),
Column("ENERG_HI", unit="keV", desc="Energy",
values=dataset.energ_hi.astype(np.float32)),
Column("SPECRESP", unit="cm^2", desc="Effective Area",
values=dataset.specresp.astype(np.float32))]
# Chandra files can have BIN_LO/HI values, so copy
# across if both set.
#
blo = dataset.bin_lo
bhi = dataset.bin_hi
if blo is not None and bhi is not None:
acols.extend([Column("BIN_LO", blo),
Column("BIN_HI", bhi)])
pheader = _empty_header(creator=True)
blocks: list[BlockType]
blocks = [SpecrespBlock(name="SPECRESP", header=aheader, columns=acols)]
return BlockList(header=pheader, blocks=blocks)
def _find_int_dtype(rows: Sequence[Sequence[int]]) -> type:
"""What data type should represent the matrix of integers?
There is no guarantee that each row has the same number of
elements.
"""
for row in rows:
if len(row) == 0:
continue
if np.max(row) > 32767:
return np.int32
return np.int16
def _make_int_array(rows: Sequence[Sequence[int]],
ncols: int) -> np.ndarray:
"""Convert a list of rows into a 2D array of "width" ncols.
The conversion is to a type determined by the maximum value in
rows (it selects between 2-byte and 4-byte integers).
Parameters
----------
rows : list of 1D arrays
The values to convert. Expected to be integers (>= 0).
ncols : int
The size of each row in the output. There must be no row
with more elements.
Returns
-------
out : ndarray of size (nrows, ncols)
"""
nrows = len(rows)
dtype = _find_int_dtype(rows)
out: np.ndarray = np.zeros((nrows, ncols), dtype=dtype)
for idx, row in enumerate(rows):
if len(row) == 0:
continue
out[idx, 0:len(row)] = row
return out
def _make_float32_array(rows: Sequence[Sequence[float]],
ncols: int) -> np.ndarray:
"""Convert a list of rows into a 2D array of "width" ncols.
The output has type numpy.float32.
Parameters
----------
rows : list of 1D arrays
The values to convert. Expected to have type float32.
ncols : int
The size of each row in the output. There must be no row
with more elements.
Returns
-------
out : ndarray of size (nrows, ncols)
"""
nrows = len(rows)
out = np.zeros((nrows, ncols), dtype=np.float32)
for idx, row in enumerate(rows):
out[idx, 0:len(row)] = row
return out
def _make_int_vlf(rows: Sequence[Sequence[int]]) -> np.ndarray:
"""Convert a list of rows into a VLF.
The conversion is to a type determined by the maximum value in
rows (it selects between 2-byte and 4-byte integers).
Parameters
----------
rows : list of 1D arrays
The values to convert. Expected to be integers (>= 0).
Returns
-------
out : ndarray of dtype object
"""
dtype = _find_int_dtype(rows)
out: list[np.ndarray] = []
for row in rows:
out.append(np.asarray(row, dtype=dtype))
return np.asarray(out, dtype=object)
def _reconstruct_rmf(rmf: RMFType) -> DataType:
"""Recreate the structure needed to write out as a FITS file.
This does not guarantee to create byte-identical data in a round
trip, but it should create equivalent data (e.g. the choice of
whether a column should be a Variable Length Field may differ, as
can the data types).
Parameters
----------
rmf : DataRMF or RMF1D instance
Returns
-------
out : dict
The re-constructed data and header values.
Notes
-----
The F_CHAN, N_CHAN, and MATRIX columns are stored as either
- 1D arrays (unlikely for MATRIX); N_GRP = 1 for all rows
- 2D arrays when N_GRP is a constant for all rows
- 2D arrays when N_GRP <= 4 (so some rows will have 0's in
when N_GRP for that row is less than the max N_GRP)
- 2D array with dtype=object indicates a VLF (each row
should be the correct type though)
"""
n_grp = []
f_chan: list[list[int]] = []
n_chan: list[list[int]] = []
matrix: list[list[float]] = []
# Used to reconstruct the original data
idx = 0
start = 0
# Header keyword
numelt = 0
# How big is the matrix?
matrix_size = set()
for ng in rmf.n_grp:
n_grp.append(ng)
f_chan.append([])
n_chan.append([])
matrix.append([])
# Short-cut when no data
if ng == 0:
# Record we have a zero-element row. Should we instead
# remove the last element of f_chan, n_chan, matrix?
#
matrix_size.add(0)
continue
# Grab the next ng elements from rmf.f_chan/n_chan
# and the corresponding rmf.matrix elements. Fortunately
# we can ignore F_CHAN when extracting data from matrix.
#
for _ in range(ng):
# ensure this is an integer and not a floating-point number
# which can happen when rmf.n_chan is stored as Unt64.
#
end = start + rmf.n_chan[idx].astype(np.int32)
mdata = rmf.matrix[start:end]
f_chan[-1].append(rmf.f_chan[idx])
n_chan[-1].append(rmf.n_chan[idx])
matrix[-1].extend(mdata.astype(np.float32))
idx += 1
start = end
matrix_size.add(len(matrix[-1]))
numelt += np.sum(n_chan[-1])
# N_GRP should be 2-byte integer.
#
n_grp_out = np.asarray(n_grp, dtype=np.int16)
numgrp = n_grp_out.sum()
# Can we convert F_CHAN/N_CHAN to fixed-length if either:
# - N_GRP is the same for all rows
# - max(N_GRP) < 4
#
# The decision to convert to the int16 or int32 types is made
# within the _make_int_xxx routines (the maximum value is used to
# decide what type to use).
#
if len(set(n_grp_out)) == 1 or n_grp_out.max() < 4:
ny = n_grp_out.max()
f_chan_out = _make_int_array(f_chan, ny)
n_chan_out = _make_int_array(n_chan, ny)
else:
f_chan_out = _make_int_vlf(f_chan)
n_chan_out = _make_int_vlf(n_chan)
# We can convert the matrix to fixed size if each row in matrix
# has the same size.
#
# The individual matrix elements are of type np.float32 so we
# should not need to do any conversion, but we are explicit in
# the fixed-length case.
#
if len(matrix_size) == 1:
ny = matrix_size.pop()
matrix_out = _make_float32_array(matrix, ny)
else:
# Since the elements can be lists, ensure they get converted
# to ndarray.
#
matrix_out = np.asarray([np.asarray(m, dtype=np.float32)
for m in matrix],
dtype=object)
return {"N_GRP": n_grp_out,
"F_CHAN": f_chan_out,
"N_CHAN": n_chan_out,
"MATRIX": matrix_out,
# Ensure these are integer types
"NUMGRP": int(numgrp),
"NUMELT": int(numelt)}
def _pack_rmf(dataset: RMFType) -> BlockList:
"""Extract FITS column and header information.
Unlike the other pack routines this returns data for
multiple blocks. At present this ignores the ORDER column
in the MATRIX block.
Notes
-----
The `RMF Data Extension header page
<https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html>`_
lists the following keywords as either required or
we-really-want-them. First for the MATRIX block:
EXTNAME = 'MATRIX' or 'SPECRESP MATRIX'
TELESCOP - the "telescope" (ie mission/satellite name).
INSTRUME - the instrument/detector.
FILTER - the instrument filter in use (if any)
CHANTYPE - PI or PHA
DETCHANS - the total number of raw detector PHA channels
HDUCLASS = 'OGIP' - file format is OGIP standard.
HDUCLAS1 = 'RESPONSE' - extension contains response data.
HDUCLAS2 = 'RSP_MATRIX' - extension contains a response matrix.
HDUVERS = '1.3.0' - version of the file format.
TLMIN# - the first channel in the response.
These are optional but mechanical so can be added:
NUMGRP - sum(N_GRP)
NUMELT - sum(N_CHAN)
These are optional:
LO_THRES - minimum probability threshold
HDUCLAS3 - 'REDIST', 'DETECTOR', 'FULL'
For the EBOUNDS extension we have:
EXTNAME = 'EBOUNDS'
TELESCOP
INSTRUME
FILTER
CHANTYPE
DETCHANS
HDUCLASS = 'OGIP'
HDUCLAS1 = 'RESPONSE'
HDUCLAS2 = 'EBOUNDS'
HDUVERS = '1.2.0'
We do not add the RMFVERSN, HDUVERS1, or HDUVERS2 keywords which
are marked as obsolete.
"""
# The UI layer wraps up a DataRMF into RMF1D quite often, so
# support both types as input.
#
from sherpa.astro.instrument import RMF1D
if not isinstance(dataset, (DataRMF, RMF1D)):
raise IOErr("data set is not a RMF")
# We convert the input into a dict (rmfdata) and then more
# dictionaries (both for headers and columns) before eventually
# creating the Block objects we return. This separates out the
# data manipulation - e.g. creating the correct column types and
# ensuring the headers contain the needed data - from creating the
# file contents (i.e. Block objects). The two steps could be
# combined but leave as is for now since this is not code that
# needs to be fast, but does need to be readable.
#
rmfdata = _reconstruct_rmf(dataset)
# The default keywords; these will be over-ridden by
# anything set by the input.
#
default_matrix_header = {
"EXTNAME": "MATRIX",
"HDUCLASS": "OGIP",
"HDUCLAS1": "RESPONSE",
"HDUCLAS2": "RSP_MATRIX",
"HDUVERS": "1.3.0",
"TELESCOP": "none",
"INSTRUME": "none",
"FILTER": "none",
"CHANTYPE": "PI",
"DETCHANS": "none",
"NUMGRP": 0,
"NUMELT": 0
}
ebounds_header: HdrType = {
"EXTNAME": "EBOUNDS",
"HDUCLASS": "OGIP",
"HDUCLAS1": "RESPONSE",
"HDUCLAS2": "EBOUNDS",
"HDUVERS": "1.2.0",
"TELESCOP": "none",
"INSTRUME": "none",
"FILTER": "none",
"CHANTYPE": "PI",
"DETCHANS": "none"
}
# Header Keys
header = dataset.header
# Merge the keywords (at least for the MATRIX block).
#
matrix_header = default_matrix_header | header
matrix_header["NUMGRP"] = rmfdata["NUMGRP"]
matrix_header["NUMELT"] = rmfdata["NUMELT"]
matrix_header["DETCHANS"] = dataset.detchans
# Copy values over.
#
for copykey in ["TELESCOP", "INSTRUME", "FILTER", "CHANTYPE",
"DETCHANS"]:
ebounds_header[copykey] = matrix_header[copykey]
# The column ordering for the output file is determined by the
# order the keys are added to the data dict.
#
# To allow the backend convert to Variable-Length fields, we
# have F_CHAN/N_CHAN/MATRIX either be a 2D ndarray (so fixed
# output) OR a list of rows (use VLF). It's not ideal: we could
# wrap up in a local VLF type just to indicate this to the
# backend.
#
matrix_data = {
"ENERG_LO": dataset.energ_lo,
"ENERG_HI": dataset.energ_hi,
"N_GRP": rmfdata["N_GRP"],
"F_CHAN": rmfdata["F_CHAN"],
"N_CHAN": rmfdata["N_CHAN"],
"MATRIX": rmfdata["MATRIX"],
"OFFSET": dataset.offset # copy over this value
}
# TODO: is this correct?
nchan = dataset.offset + dataset.detchans - 1
dchan = np.int32 if nchan > 32767 else np.int16
# Technically e_min/max can be empty, but we do not expect
# this, and this support should probably be removed. For
# now error out if we are sent such data.
#
if dataset.e_min is None or dataset.e_max is None:
raise IOErr(f"RMF {dataset.name} has no E_MIN or E_MAX data")
ebounds_data = {
"CHANNEL": np.arange(dataset.offset, nchan + 1, dtype=dchan),
"E_MIN": dataset.e_min.astype(np.float32),
"E_MAX": dataset.e_max.astype(np.float32)
}
# Now convert these into TableBlock types.
#
# Create the MATRIX block:
# ENERG_LO
# ENERG_HI
# N_GRP
# F_CHAN
# N_CHAN
# MATRIX
#
mheader = _pack_header(matrix_header)
mcols = [Column(name=name, values=value)
for name, value in matrix_data.items()
if name != "OFFSET"]
# Adjust the column objects.
#
for col in mcols:
if col.name.startswith("ENERG"):
col.unit = "keV"
continue
if col.name == "F_CHAN":
col.minval = matrix_data["OFFSET"]
continue
# Create the EBOUNDS block:
# CHANNEL
# E_MIN
# E_MAX
#
eheader = _pack_header(ebounds_header)
ecols = [Column(name=name, values=value)
for name, value in ebounds_data.items()]
for col in ecols:
if col.name == "CHANNEL":
continue
col.unit = "keV"
header = _empty_header()
blocks: list[BlockType]
blocks = [MatrixBlock(name="MATRIX", header=mheader, columns=mcols),
EboundsBlock(name="EBOUNDS", header=eheader, columns=ecols)]
return BlockList(header=header, blocks=blocks)
def write_arrays(filename: str,
args: Sequence[np.ndarray],
fields: NamesType | None = None,
ascii: bool = True,
clobber: bool = False) -> None:
"""Write out a collection of arrays.
Parameters
----------
filename : str
The name of the file.
args
The data to write out.
fields : None or list of str, optional
The names to use for each column. If ``None`` then the names
default to `col1` ... `colN`.
ascii : bool, optional
If `True` use an ASCII format, otherwise a binary format.
clobber : bool, optional
If `True` then the output file will be over-written if it
already exists, otherwise an error is raised.
See Also
--------
read_arrays
"""
backend.set_arrays(filename, args, fields, ascii=ascii, clobber=clobber)
[docs]
def write_table(filename: str,
dataset: Data1D,
ascii: bool = True,
clobber: bool = False) -> None:
"""Write out a table.
Parameters
----------
filename : str
The name of the file.
dataset
The data to write out.
ascii : bool, optional
If `True` use an ASCII format, otherwise a binary format.
clobber : bool, optional
If `True` then the output file will be over-written if it
already exists, otherwise an error is raised.
See Also
--------
read_table
"""
data = _pack_table(dataset)
backend.set_table_data(filename, data, ascii=ascii, clobber=clobber)
[docs]
def write_image(filename: str,
dataset: Data2D,
ascii: bool = True,
clobber: bool = False) -> None:
"""Write out an image.
Parameters
----------
filename : str
The name of the file.
dataset
The data to write out.
ascii : bool, optional
If `True` use an ASCII format, otherwise a binary format.
clobber : bool, optional
If `True` then the output file will be over-written if it
already exists, otherwise an error is raised.
See Also
--------
read_image
"""
data = _pack_image(dataset)
backend.set_image_data(filename, data, ascii=ascii, clobber=clobber)
[docs]
def write_pha(filename: str,
dataset: DataPHA,
ascii: bool = True,
clobber: bool = False) -> None:
"""Write out a PHA dataset.
Parameters
----------
filename : str
The name of the file.
dataset
The data to write out.
ascii : bool, optional
If `True` use an ASCII format, otherwise a binary format.
clobber : bool, optional
If `True` then the output file will be over-written if it
already exists, otherwise an error is raised.
See Also
--------
read_pha
"""
block = _pack_pha(dataset)
backend.set_pha_data(filename, block, ascii=ascii,
clobber=clobber)
[docs]
def write_arf(filename: str,
dataset: ARFType,
ascii: bool = True,
clobber: bool = False) -> None:
"""Write out an ARF.
This does not handle Type II files.
.. versionadded:: 4.16.0
Parameters
----------
filename : str
The name of the file.
dataset
The data to write out.
ascii : bool, optional
If `True` use an ASCII format, otherwise a binary format.
clobber : bool, optional
If `True` then the output file will be over-written if it
already exists, otherwise an error is raised.
See Also
--------
read_arf
"""
blocks = _pack_arf(dataset)
backend.set_arf_data(filename, blocks,
ascii=ascii, clobber=clobber)
[docs]
def write_rmf(filename: str,
dataset: RMFType,
clobber: bool = False) -> None:
"""Write out a RMF.
.. versionadded:: 4.16.0
Parameters
----------
filename : str
The name of the file.
dataset
The data to write out.
clobber : bool, optional
If `True` then the output file will be over-written if it
already exists, otherwise an error is raised.
See Also
--------
read_rmf
"""
blocks = _pack_rmf(dataset)
backend.set_rmf_data(filename, blocks, clobber=clobber)
[docs]
def pack_table(dataset: Data1D) -> object:
"""Convert a Sherpa data object into an I/O item (tabular).
Parameters
----------
dataset : a sherpa.data.Data1D derived object
Returns
-------
obj
An object representing the data, the format of which depends
on the I/O backend.
See Also
--------
pack_image, pack_pha
Examples
--------
>>> d = sherpa.data.Data1D('tmp', [1, 2, 3], [4, 10, 2])
>>> tbl = pack_table(d)
"""
data = _pack_table(dataset)
return backend.pack_table_data(data)
[docs]
def pack_image(dataset: Data2D) -> Any:
"""Convert a Sherpa data object into an I/O item (image).
Parameters
----------
dataset : a sherpa.data.Data2D derived object
Returns
-------
obj
An object representing the data, the format of which depends
on the I/O backend.
See Also
--------
pack_image, pack_pha
Examples
--------
>>> y, x = np.mgrid[:10, :5]
>>> z = (x-2)**2 + (y-2)**3
>>> d = sherpa.data.Data2D('img', x.flatten(), y.flatten(),
z.flatten(), shape=z.shape)
>>> img = pack_image(d)
"""
data = _pack_image(dataset)
return backend.pack_image_data(data)
[docs]
def pack_pha(dataset: DataPHA) -> Any:
"""Convert a Sherpa PHA data object into an I/O item (tabular).
Parameters
----------
dataset : a sherpa.astro.data.DataPHA derived object
Returns
-------
obj
An object representing the data, the format of which depends
on the I/O backend.
See Also
--------
pack_image, pack_table
"""
block = _pack_pha(dataset)
return backend.pack_pha_data(block)
[docs]
def read_table_blocks(arg,
make_copy: bool = False
) -> tuple[str,
dict[int, dict[str, np.ndarray]],
dict[int, HdrType]]:
"""Return the HDU elements (columns and header) from a FITS table.
Parameters
----------
arg
The data file, which can be the name of the file or an object
representing the opened file (the type of this depends on the I/O
backend in use).
make_copy : bool, optional
This argument is currently unused.
Returns
-------
filename, cols, hdr : str, dict, dict
The name of the file, the column data, and the header data
from the HDUs in the file. The keys of the dictionaries are the
block name and the values are dictionaries, with the keys
being the column or header name.
"""
def getkeys(hdr):
if hdr is None:
return {}
return {key.name: key.value for key in hdr.values}
def getcols(cols):
if cols is None:
return {}
return {col.name: col.values for col in cols}
# Desconstruct the data. It is not clear what to do if the file
# contains any image blocks, so we explicitly ignore them at this
# time (i.e. there is currently no requirement on the
# backend.read_table_blocks call on what to do in this situation).
#
blist, filename = backend.read_table_blocks(arg, make_copy=make_copy)
hdr = {1: getkeys(blist.header)}
cols: dict[int, dict[str, np.ndarray]]
cols = {1: {}}
for idx, block in enumerate(blist.blocks, 2):
if isinstance(block, ImageBlock):
continue
hdr[idx] = getkeys(block.header)
cols[idx] = getcols(block.columns)
return filename, cols, hdr