# Source code for xga.sources.base

#  This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS).

import os
import pickle
from copy import deepcopy
from itertools import product
from shutil import copyfile
from typing import Tuple, List, Dict, Union
from warnings import warn, simplefilter

import numpy as np
import pandas as pd
from astropy import wcs
from astropy.coordinates import SkyCoord
from astropy.cosmology import Planck15
from astropy.cosmology.core import Cosmology
from astropy.units import Quantity, UnitBase, Unit, UnitConversionError, deg
from fitsio import FITS
from numpy import ndarray
from regions import SkyRegion, EllipseSkyRegion, CircleSkyRegion, EllipsePixelRegion, CirclePixelRegion

from .. import xga_conf, BLACKLIST
from ..exceptions import NotAssociatedError, NoValidObservationsError, MultipleMatchError, \
NoProductAvailableError, NoMatchFoundError, ModelNotAssociatedError, ParameterNotAssociatedError, \
NotSampleMemberError
from ..imagetools.misc import pix_deg_scale
from ..imagetools.misc import sky_deg_scale
from ..products import PROD_MAP, EventList, BaseProduct, BaseAggregateProduct, Image, Spectrum, ExpMap, \
RateMap, PSFGrid, BaseProfile1D, AnnularSpectra
from ..sourcetools.misc import coord_to_name
from ..utils import ALLOWED_PRODUCTS, XMM_INST, dict_search, xmm_det, xmm_sky, OUTPUT, CENSUS, SRC_REGION_COLOURS

# This disables an annoying astropy warning that pops up all the time with XMM images
# Don't know if I should do this really
simplefilter('ignore', wcs.FITSFixedWarning)

[docs]class BaseSource:
"""
The overlord of all XGA classes, the superclass for all source classes. This contains a huge amount of
functionality upon which the rest of XGA is built, includes selecting observations, reading in data products,
and storing newly created data products. Base functionality is included, but this type of source shouldn't
often need to be instantiated by a user.

:param float ra: The right ascension (in degrees) of the source.
:param float dec: The declination (in degrees) of the source.
:param float redshift: The redshift of the source, default is None. Not supplying a redshift means that
proper distance units such as kpc cannot be used.
:param str name: The name of the source, default is None in which case a name will be assembled from the
coordinates given.
:param cosmology: An astropy cosmology object to use for analysis of this source, default is Planck15.
:param bool load_products: Should existing XGA generated products for this source be loaded in, default
is True.
:param bool load_fits: Should existing XSPEC fits for this source be loaded in, will only work if
load_products is True. Default is False.
:param bool in_sample: A boolean argument that tells the source whether it is part of a sample or not, setting
to True suppresses some warnings so that they can be displayed at the end of the sample progress bar. Default
is False. User should only set to True to remove warnings.
"""
def __init__(self, ra: float, dec: float, redshift: float = None, name: str = None, cosmology=Planck15,
load_products: bool = True, load_fits: bool = False, in_sample: bool = False):
"""
The init method for the BaseSource, the most general type of XGA source which acts as a superclass for all
others.

:param float ra: The right ascension (in degrees) of the source.
:param float dec: The declination (in degrees) of the source.
:param float redshift: The redshift of the source, default is None. Not supplying a redshift means that
proper distance units such as kpc cannot be used.
:param str name: The name of the source, default is None in which case a name will be assembled from the
coordinates given.
:param cosmology: An astropy cosmology object to use for analysis of this source, default is Planck15.
:param bool load_products: Should existing XGA generated products for this source be loaded in, default
is True.
:param bool load_fits: Should existing XSPEC fits for this source be loaded in, will only work if
load_products is True. Default is False.
:param bool in_sample: A boolean argument that tells the source whether it is part of a sample or
not, setting to True suppresses some warnings so that they can be displayed at the end of the sample
progress bar. Default is False. User should only set to True to remove warnings.
"""
# This tells the source that it is a part of a sample, which we will check to see whether to suppress warnings
self._samp_member = in_sample
# This is what the warnings (or warning codes) are stored in instead, so an external process working on the
#  sample (or in the sample init) can look up what warnings are there.
self._supp_warn = []

# This sets up the user-defined coordinate attribute
self._ra_dec = np.array([ra, dec])
if name is not None:
# We don't be liking spaces in source names, we also don't like underscores
self._name = name.replace(" ", "").replace("_", "-")
else:
self._name = coord_to_name(self.ra_dec)

# This is where profile products generated by XGA will live for this source
if not os.path.exists(OUTPUT + "profiles/{}".format(self.name)):
os.makedirs(OUTPUT + "profiles/{}".format(self.name))

# And create an inventory file for that directory
if not os.path.exists(OUTPUT + "profiles/{}/inventory.csv".format(self.name)):
with open(OUTPUT + "profiles/{}/inventory.csv".format(self.name), 'w') as inven:
inven.writelines(["file_name,obs_ids,insts,info_key,src_name,type"])

# We now create a directory for custom region files for the source to be stored in
if not os.path.exists(OUTPUT + "regions/{0}/{0}_custom.reg".format(self.name)):
os.makedirs(OUTPUT + "regions/{}".format(self.name))
# And a start to the custom file itself, with red (pnt src) as the default colour
with open(OUTPUT + "regions/{0}/{0}_custom.reg".format(self.name), 'w') as reggo:
reggo.write("global color=white\n")

# Only want ObsIDs, not pointing coordinates as well
# Don't know if I'll always use the simple method
matches, excluded = simple_xmm_match(ra, dec)

# This will store information on the observations that were never included in analysis (so it's distinct from
#  the disassociated_obs information) - I don't know if this is the solution I'll stick with, but it'll do
blacklisted_obs = {}
for row_ind, row in excluded.iterrows():
# Just blacklist all instruments because for an ObsID to be in the excluded return
#  from simple_xmm_match this has to be the case
blacklisted_obs[row['ObsID']] = ['pn', 'mos1', 'mos2']

# This checks that the observations have at least one usable instrument
obs = matches["ObsID"].values
instruments = {o: [] for o in obs}
for o in obs:
# As the simple_xmm_match will only tell us about observations in which EVERY instrument is
#  blacklisted, I have to check in the blacklist to see whether some individual instruments
#  have to be excluded
excl_pn = False
excl_mos1 = False
excl_mos2 = False
if o in BLACKLIST['ObsID'].values:
if BLACKLIST[BLACKLIST['ObsID'] == o]['EXCLUDE_PN'].values[0] == 'T':
excl_pn = True
if BLACKLIST[BLACKLIST['ObsID'] == o]['EXCLUDE_MOS1'].values[0] == 'T':
excl_mos1 = True
if BLACKLIST[BLACKLIST['ObsID'] == o]['EXCLUDE_MOS2'].values[0] == 'T':
excl_mos2 = True

# Here we see if PN is allowed by the census (things like CalClosed observations are excluded in
#  the census) and if PN is allowed by the blacklist (individual instruments can be blacklisted).
if matches[matches["ObsID"] == o]["USE_PN"].values[0] and not excl_pn:
instruments[o].append("pn")
# If excluded by the blacklist, then that needs
elif excl_pn:
# The behaviour writing PN to the dictionary changes slightly depending on whether the ObsID
#  has an entry yet or not
if o not in blacklisted_obs:
blacklisted_obs[o] = ["pn"]
else:
blacklisted_obs[o] += ['pn']

# Now we repeat the same process for MOS1 and 2 - its quite clunky and there's probably a more
#  elegant way that I could write this, but ah well
if matches[matches["ObsID"] == o]["USE_MOS1"].values[0] and not excl_mos1:
instruments[o].append("mos1")
# If excluded by the blacklist, then that needs
elif excl_mos1:
# The behaviour writing MOS1 to the dictionary changes slightly depending on whether the ObsID
#  has an entry yet or not
if o not in blacklisted_obs:
blacklisted_obs[o] = ["mos1"]
else:
blacklisted_obs[o] += ['mos1']

if matches[matches["ObsID"] == o]["USE_MOS2"].values[0] and not excl_mos2:
instruments[o].append("mos2")
# If excluded by the blacklist, then that needs
elif excl_mos2:
# The behaviour writing MOS2 to the dictionary changes slightly depending on whether the ObsID
#  has an entry yet or not
if o not in blacklisted_obs:
blacklisted_obs[o] = ["mos2"]
else:
blacklisted_obs[o] += ['mos2']

# Information about which ObsIDs/instruments are available, and which have been blacklisted, is stored
#  in class attributes here.
self._obs = [o for o in obs if len(instruments[o]) > 0]
self._instruments = {o: instruments[o] for o in self._obs if len(instruments[o]) > 0}
self._blacklisted_obs = blacklisted_obs

# self._obs can be empty after this cleaning step, so do quick check and raise error if so.
if len(self._obs) == 0:
raise NoValidObservationsError("{s} has {n} observations ({a}), none of which have the necessary"
" files.".format(s=self.name, n=len(self._obs), a=", ".join(self._obs)))

# Here I set up the ObsID directories for products generated by XGA to be stored in, they also get an
#  inventory file to store information about them - largely because some of the informative file names
#  I was using were longer than 256 characters which my OS does not support
for o in self._obs:
if not os.path.exists(OUTPUT + o):
os.mkdir(OUTPUT + o)

if not os.path.exists(OUTPUT + '{}/inventory.csv'.format(o)):
with open(OUTPUT + '{}/inventory.csv'.format(o), 'w') as inven:
inven.writelines(['file_name,obs_id,inst,info_key,src_name,type'])

# Check in a box of half-side 5 arcminutes, should give an idea of which are on-axis
try:
on_axis_match = simple_xmm_match(ra, dec, Quantity(5, 'arcmin'))[0]["ObsID"].values
except NoMatchFoundError:
on_axis_match = np.array([])
self._onaxis = list(np.array(self._obs)[np.isin(self._obs, on_axis_match)])

# nhlookup returns average and weighted average values, so just take the first
self._nH = nh_lookup(self.ra_dec)[0]
self._redshift = redshift
# This method uses the instruments attribute to check and see whether a particular ObsID-Instrument
#  combination is allowed for this source. As that attribute was constructed using the blacklist information
#  we can be sure that every ObsID-Instrument combination loaded in here is allowed to be here. The only
#  other way for them to change is through using the dissociate observation capability
self._products, region_dict, self._att_files = self._initial_products()

# Want to update the ObsIDs associated with this source after seeing if all files are present
self._obs = list(self._products.keys())
self._instruments = {o: instruments[o] for o in self._obs if len(instruments[o]) > 0}

self._cosmo = cosmology
if redshift is not None:
self._lum_dist = self._cosmo.luminosity_distance(self._redshift)
self._ang_diam_dist = self._cosmo.angular_diameter_distance(self._redshift)
else:
self._lum_dist = None
self._ang_diam_dist = None

# This is a queue for products to be generated for this source, will be a numpy array in practise.
# Items in the same row will all be generated in parallel, whereas items in the same column will
# be combined into a command stack and run in order.
self.queue = None
# Another attribute destined to be an array, will contain the output type of each command submitted to
# the queue array.
self.queue_type = None
# This contains an array of the paths of the final output of each command in the queue
self.queue_path = None
# This contains an array of the extra information needed to instantiate class
# after the SAS command has run
self.queue_extra_info = None
# Defining this here, although it won't be set to a boolean value in this superclass
self._detected = None
# This block defines various dictionaries that are used in the sub source classes, when context allows
# us to find matching source regions.
self._regions = None
self._other_regions = None
self._alt_match_regions = None
self._interloper_regions = []

# Set up an attribute where a default central coordinate will live
self._default_coord = self.ra_dec

# Init the the radius multipliers that define the outer and inner edges of a background annulus
self._back_inn_factor = 1.05
self._back_out_factor = 1.5

# Initialisation of fit result attributes
self._fit_results = {}
self._test_stat = {}
self._dof = {}
self._total_count_rate = {}
self._total_exp = {}
self._luminosities = {}

# Initialisation of attributes related to Extended and GalaxyCluster sources
self._peaks = None
# Initialisation of allowed overdensity radii as None
if not hasattr(self, 'r200'):
self._r200 = None
if not hasattr(self, 'r500'):
self._r500 = None
if not hasattr(self, 'r2500'):
self._r2500 = None
# Initialisation of cluster observables as None
self._richness = None
self._richness_err = None

self._wl_mass = None
self._wl_mass_err = None

self._peak_lo_en = Quantity(0.5, 'keV')
self._peak_hi_en = Quantity(2.0, 'keV')
# Peaks don't really have any meaning for the BaseSource class, so even though this is a boolean variable
#  when populated properly I set it to None here
self._use_peak = None

# These attributes pertain to the cleaning of observations (as in disassociating them from the source if
#  they don't include enough of the object we care about).
self._disassociated = False
self._disassociated_obs = {}

# If there is an existing XGA output directory, then it makes sense to search for products that XGA
#  may have already generated and load them in - saves us wasting time making them again.
# The user does have control over whether this happens or not though.
# This goes at the end of init to make sure everything necessary has been declared

# Now going to save load_fits in an attribute, just because if the observation is cleaned we need to
#  run _existing_xga_products again, same for load_products

@property
def ra_dec(self) -> Quantity:
"""
A getter for the original ra and dec entered by the user.

:return: The ra-dec coordinates entered by the user when the source was first defined
:rtype: Quantity
"""
# Easier for it be internally kep as a numpy array, but I want the user to have astropy coordinates
return Quantity(self._ra_dec, 'deg')

@property
def default_coord(self) -> Quantity:
"""
A getter for the default analysis coordinate of this source.
:return: An Astropy quantity containing the default analysis coordinate.
:rtype: Quantity
"""
return self._default_coord

@default_coord.setter
def default_coord(self, new_coord: Quantity):
"""
Setter for the default analysis coordinate of this source.

:param Quantity new_coord: The new default coordinate.
"""
if not new_coord.unit.is_equivalent('deg'):
raise UnitConversionError("The new coordinate must be in degrees")
else:
new_coord = new_coord.to("deg")

self._default_coord = new_coord

def _initial_products(self) -> Tuple[dict, dict, dict]:
"""
Assembles the initial dictionary structure of existing XMM data products associated with this source.

:return: A dictionary structure detailing the data products available at initialisation, another
dictionary containing paths to region files, and another dictionary containing paths to attitude files.
:rtype: Tuple[dict, dict, dict]
"""

def read_default_products(en_lims: tuple) -> Tuple[str, dict]:
"""
This nested function takes pairs of energy limits defined in the config file and runs
through the default XMM products defined in the config file, filling in the energy limits and
checking if the file paths exist. Those that do exist are read into the relevant product object and
returned.

:param tuple en_lims: A tuple containing a lower and upper energy limit to generate file names for,
the first entry should be the lower limit, the second the upper limit.
:return: A dictionary key based on the energy limits for the file paths to be stored under, and the
dictionary of file paths.
:rtype: tuple[str, dict]
"""
not_these = ["root_xmm_dir", "lo_en", "hi_en", evt_key, "attitude_file"]
# Formats the generic paths given in the config file for this particular obs and energy range
files = {k.split('_')[1]: v.format(lo_en=en_lims[0], hi_en=en_lims[1], obs_id=obs_id)
for k, v in xga_conf["XMM_FILES"].items() if k not in not_these and inst in k}

# It is not necessary to check that the files exist, as this happens when the product classes
# are instantiated. So whether the file exists or not, an object WILL exist, and you can check if
# you should use it for analysis using the .usable attribute

# This looks up the class which corresponds to the key (which is the product
# ID in this case e.g. image), then instantiates an object of that class
lo = Quantity(float(en_lims[0]), 'keV')
hi = Quantity(float(en_lims[1]), 'keV')
prod_objs = {key: PROD_MAP[key](file, obs_id=obs_id, instrument=inst, stdout_str="", stderr_str="",
gen_cmd="", lo_en=lo, hi_en=hi)
for key, file in files.items() if os.path.exists(file)}
# If both an image and an exposure map are present for this energy band, a RateMap object is generated
if "image" in prod_objs and "expmap" in prod_objs:
prod_objs["ratemap"] = RateMap(prod_objs["image"], prod_objs["expmap"])
# Adds in the source name to the products
for prod in prod_objs:
prod_objs[prod].src_name = self._name
# As these files existed already, I don't have any stdout/err strings to pass, also no
# command string.

bound_key = "bound_{l}-{u}".format(l=float(en_lims[0]), u=float(en_lims[1]))
return bound_key, prod_objs

# This dictionary structure will contain paths to all available data products associated with this
# source instance, both pre-generated and made with XGA.
obs_dict = {obs: {} for obs in self._obs}
# Regions will get their own dictionary, I don't care about keeping the reg_file paths as
# an attribute because they get read into memory in the init of this class
reg_dict = {}
# Attitude files also get their own dictionary, they won't be read into memory by XGA
att_dict = {}
# Use itertools to create iterable and avoid messy nested for loop
# product makes iterable of tuples, with all combinations of the events files and ObsIDs
for oi in product(obs_dict, XMM_INST):
# Produces a list of the combinations of upper and lower energy bounds from the config file.
en_comb = zip(xga_conf["XMM_FILES"]["lo_en"], xga_conf["XMM_FILES"]["hi_en"])

# This is purely to make the code easier to read
obs_id = oi[0]
inst = oi[1]
if inst not in self._instruments[obs_id]:
continue
evt_key = "clean_{}_evts".format(inst)
evt_file = xga_conf["XMM_FILES"][evt_key].format(obs_id=obs_id)
# This is the path to the region file specified in the configuration file, but the next step is that
#  we make a local copy (if the original file exists) and then make use of that so that any modifications
#  don't harm the original file.
reg_file = xga_conf["XMM_FILES"]["region_file"].format(obs_id=obs_id)

# Attitude file is a special case of data product, only SAS should ever need it, so it doesn't
# have a product object
att_file = xga_conf["XMM_FILES"]["attitude_file"].format(obs_id=obs_id)

if os.path.exists(evt_file) and os.path.exists(att_file):
# An instrument subsection of an observation will ONLY be populated if the events file exists
# Otherwise nothing can be done with it.
obs_dict[obs_id][inst] = {"events": EventList(evt_file, obs_id=obs_id, instrument=inst,
stdout_str="", stderr_str="", gen_cmd="")}
att_dict[obs_id] = att_file
# Dictionary updated with derived product names
obs_dict[obs_id][inst].update({gen_return[0]: gen_return[1] for gen_return in map_ret})

# As mentioned above, we make a local copy of the region file if the original file path exists
#  and if a local copy DOESN'T already exist
reg_copy_path = OUTPUT+"{o}/{o}_xga_copy.reg".format(o=obs_id)
if os.path.exists(reg_file) and not os.path.exists(reg_copy_path):
# A local copy of the region file is made and used
copyfile(reg_file, reg_copy_path)
# Regions dictionary updated with path to local region file, if it exists
reg_dict[obs_id] = reg_copy_path
# In the case where there is already a local copy of the region file
elif os.path.exists(reg_copy_path):
reg_dict[obs_id] = reg_copy_path
else:
reg_dict[obs_id] = None

# Cleans any observations that don't have at least one instrument associated with them
obs_dict = {o: v for o, v in obs_dict.items() if len(v) != 0}
if len(obs_dict) == 0:
raise NoValidObservationsError("{s} has {n} observations ({a}), none of which have the necessary"
" files.".format(s=self.name, n=len(self._obs), a=", ".join(self._obs)))
return obs_dict, reg_dict, att_dict

[docs]    def update_products(self, prod_obj: Union[BaseProduct, BaseAggregateProduct, BaseProfile1D, List[BaseProduct],
List[BaseAggregateProduct], List[BaseProfile1D]],
update_inv: bool = True):
"""
Setter method for the products attribute of source objects. Cannot delete existing products,
but will overwrite existing products. Raises errors if the ObsID is not associated
with this source or the instrument is not associated with the ObsID. Lists of products can also be passed
and will be added to the source storage structure, these lists may also contain None values, as typically
XGA will return None if a profile fails to generate (for instance), in which case that entry will simply
be ignored.

:param BaseProduct/BaseAggregateProduct/BaseProfile1D/List[BaseProduct]/List[BaseProfile1D] prod_obj: The
new product object(s) to be added to the source object.
:param bool update_inv: This flag is to avoid unnecessary read-writes when this method is called by a method
(such as _existing_xga_products) which want to add products to the source storage structure, but don't
want the inventory file altered (as they know the product is already in there).
"""
# Aggregate products are things like PSF grids and sets of annular spectra.
if not isinstance(prod_obj, (BaseProduct, BaseAggregateProduct, BaseProfile1D, list)) and prod_obj is not None:
raise TypeError("Only product objects can be assigned to sources.")
elif isinstance(prod_obj, list) and not all([isinstance(p, (BaseProduct, BaseAggregateProduct, BaseProfile1D))
or p is None for p in prod_obj]):
raise TypeError("If a list is passed, only product objects (or None values) may be included.")
elif not isinstance(prod_obj, list):
prod_obj = [prod_obj]

for po in prod_obj:
if po is not None:
if isinstance(po, Image):
extra_key = po.storage_key
en_key = "bound_{l}-{u}".format(l=float(po.energy_bounds[0].value),
u=float(po.energy_bounds[1].value))
elif type(po) == Spectrum or type(po) == AnnularSpectra or isinstance(po, BaseProfile1D):
extra_key = po.storage_key
elif type(po) == PSFGrid:
# The first part of the key is the model used (by default its ELLBETA for example), and
#  the second part is the number of bins per side. - Enough to uniquely identify the PSF.
extra_key = po.model + "_" + str(po.num_bins)
else:
extra_key = None

# All information about where to place it in our storage hierarchy can be pulled from the product
# object itself
obs_id = po.obs_id
inst = po.instrument
p_type = po.type

# Previously, merged images/exposure maps were stored in a separate dictionary, but now everything lives
#  together - merged products do get a 'combined' prefix on their product type key though
if obs_id == "combined":
p_type = "combined_" + p_type

# 'Combined' will effectively be stored as another ObsID
if "combined" not in self._products:
self._products["combined"] = {}

# The product gets the name of this source object added to it
po.src_name = self.name

# Double check that something is trying to add products from another source to the current one.
if obs_id != "combined" and obs_id not in self._products:
raise NotAssociatedError("{o} is not associated with this X-ray source.".format(o=obs_id))
elif inst != "combined" and inst not in self._products[obs_id]:
raise NotAssociatedError("{i} is not associated with XMM observation {o}".format(i=inst, o=obs_id))

if extra_key is not None and obs_id != "combined":
# If there is no entry for this 'extra key' (energy band for instance) already, we must make one
if extra_key not in self._products[obs_id][inst]:
self._products[obs_id][inst][extra_key] = {}
self._products[obs_id][inst][extra_key][p_type] = po

elif extra_key is None and obs_id != "combined":
self._products[obs_id][inst][p_type] = po

# Here we deal with merged products, they live in the same dictionary, but with no instrument entry
#  and ObsID = 'combined'
elif extra_key is not None and obs_id == "combined":
if extra_key not in self._products[obs_id]:
self._products[obs_id][extra_key] = {}
self._products[obs_id][extra_key][p_type] = po

elif extra_key is None and obs_id == "combined":
self._products[obs_id][p_type] = po

# This is for an image being added, so we look for a matching exposure map. If it exists we can
#  make a ratemap
if p_type == "image":
# No chance of an expmap being PSF corrected, so we just use the energy key to
#  look for one that matches our new image
exs = [prod for prod in self.get_products("expmap", obs_id, inst, just_obj=False) if en_key in prod]
if len(exs) == 1:
new_rt = RateMap(po, exs[0][-1])
new_rt.src_name = self.name
self._products[obs_id][inst][extra_key]["ratemap"] = new_rt

# However, if its an exposure map that's been added, we have to look for matching image(s). There
#  could be multiple, because there could be a normal image, and a PSF corrected image
elif p_type == "expmap":
# PSF corrected extra keys are built on top of energy keys, so if the en_key is within the extra
#  key string it counts as a match
ims = [prod for prod in self.get_products("image", obs_id, inst, just_obj=False)
if en_key in prod[-2]]
# If there is at least one match, we can go to work
if len(ims) != 0:
for im in ims:
new_rt = RateMap(im[-1], po)
new_rt.src_name = self.name
self._products[obs_id][inst][im[-2]]["ratemap"] = new_rt

# The same behaviours hold for combined_image and combined_expmap, but they get
#  stored in slightly different places
elif p_type == "combined_image":
exs = [prod for prod in self.get_products("combined_expmap", just_obj=False) if en_key in prod]
if len(exs) == 1:
new_rt = RateMap(po, exs[0][-1])
new_rt.src_name = self.name
# Remember obs_id for combined products is just 'combined'
self._products[obs_id][extra_key]["combined_ratemap"] = new_rt

elif p_type == "combined_expmap":
ims = [prod for prod in self.get_products("combined_image", just_obj=False) if en_key in prod[-2]]
if len(ims) != 0:
for im in ims:
new_rt = RateMap(im[-1], po)
new_rt.src_name = self.name
self._products[obs_id][im[-2]]["combined_ratemap"] = new_rt

if isinstance(po, BaseProfile1D) and not os.path.exists(po.save_path):
po.save()
# Here we make sure to store a record of the added product in the relevant inventory file
if isinstance(po, BaseProduct) and po.obs_id != 'combined' and update_inv:
inven = pd.read_csv(OUTPUT + "{}/inventory.csv".format(po.obs_id), dtype=str)

# Don't want to store a None value as a string for the info_key
if extra_key is None:
info_key = ''
else:
info_key = extra_key

# I want only the name of the file as it is in the storage directory, I don't want an
#  absolute path, so I remove the leading information about the absolute location in
#  the .path string
f_name = po.path.split(OUTPUT + "{}/".format(po.obs_id))[-1]

# Images, exposure maps, and other such things are not source specific, so I don't want
#  the inventory file to assign them a specific source
if isinstance(po, Image):
s_name = ''
else:
s_name = po.src_name

# Creates new pandas series to be appended to the inventory dataframe
new_line = pd.Series([f_name, po.obs_id, po.instrument, info_key, s_name, po.type],
['file_name', 'obs_id', 'inst', 'info_key', 'src_name', 'type'], dtype=str)
# Concatenates the series with the inventory dataframe
inven = pd.concat([inven, new_line.to_frame().T], ignore_index=True)

# Checks for rows that are exact duplicates, this should never happen as far as I can tell, but
#  if it did I think it would cause problems so better to be safe and add this.
inven.drop_duplicates(subset=None, keep='first', inplace=True)
# Saves the updated inventory file
inven.to_csv(OUTPUT + "{}/inventory.csv".format(po.obs_id), index=False)

elif isinstance(po, BaseProduct) and po.obs_id == 'combined' and update_inv:
inven = pd.read_csv(OUTPUT + "combined/inventory.csv".format(po.obs_id), dtype=str)

# Don't want to store a None value as a string for the info_key
if extra_key is None:
info_key = ''
else:
info_key = extra_key

# We know that this particular product is a combination of multiple ObsIDs, and those ObsIDs
#  are not stored explicitly within the product object. However we are currently within the
#  source object that they were generated from, thus we do have that information available
# Using the _instruments attribute also gives us access to inst information
i_str = "/".join([i for o in self._instruments for i in self._instruments[o]])
o_str = "/".join([o for o in self._instruments for i in self._instruments[o]])
# They cannot be stored as lists for a single column entry in a csv though, so I am smushing
#  them into strings

f_name = po.path.split(OUTPUT + "combined/")[-1]
if isinstance(po, Image):
s_name = ''
else:
s_name = po.src_name

# Creates new pandas series to be appended to the inventory dataframe
new_line = pd.Series([f_name, o_str, i_str, info_key, s_name, po.type],
['file_name', 'obs_ids', 'insts', 'info_key', 'src_name', 'type'], dtype=str)
# Concatenates the series with the inventory dataframe
inven = pd.concat([inven, new_line.to_frame().T], ignore_index=True)
inven.drop_duplicates(subset=None, keep='first', inplace=True)
inven.to_csv(OUTPUT + "combined/inventory.csv".format(po.obs_id), index=False)

elif isinstance(po, BaseProfile1D) and po.obs_id != 'combined' and update_inv:
inven = pd.read_csv(OUTPUT + "profiles/{}/inventory.csv".format(self.name), dtype=str)

# Don't want to store a None value as a string for the info_key
if extra_key is None:
info_key = ''
else:
info_key = extra_key

f_name = po.save_path.split(OUTPUT + "profiles/{}/".format(self.name))[-1]
i_str = po.instrument
o_str = po.obs_id
# Creates new pandas series to be appended to the inventory dataframe
new_line = pd.Series([f_name, o_str, i_str, info_key, po.src_name, po.type],
['file_name', 'obs_ids', 'insts', 'info_key', 'src_name', 'type'], dtype=str)
# Concatenates the series with the inventory dataframe
inven = pd.concat([inven, new_line.to_frame().T], ignore_index=True)
inven.drop_duplicates(subset=None, keep='first', inplace=True)
inven.to_csv(OUTPUT + "profiles/{}/inventory.csv".format(self.name), index=False)

elif isinstance(po, BaseProfile1D) and po.obs_id == 'combined' and update_inv:
inven = pd.read_csv(OUTPUT + "profiles/{}/inventory.csv".format(self.name), dtype=str)

# Don't want to store a None value as a string for the info_key
if extra_key is None:
info_key = ''
else:
info_key = extra_key

f_name = po.save_path.split(OUTPUT + "profiles/{}/".format(self.name))[-1]
i_str = "/".join([i for o in self._instruments for i in self._instruments[o]])
o_str = "/".join([o for o in self._instruments for i in self._instruments[o]])
# Creates new pandas series to be appended to the inventory dataframe
new_line = pd.Series([f_name, o_str, i_str, info_key, po.src_name, po.type],
['file_name', 'obs_ids', 'insts', 'info_key', 'src_name', 'type'], dtype=str)
# Concatenates the series with the inventory dataframe
inven = pd.concat([inven, new_line.to_frame().T], ignore_index=True)
inven.drop_duplicates(subset=None, keep='first', inplace=True)
inven.to_csv(OUTPUT + "profiles/{}/inventory.csv".format(self.name), index=False)

"""
A method specifically for searching an existing XGA output directory for relevant files and loading
them in as XGA products. This will retrieve images, exposure maps, and spectra; then the source product
structure is updated. The method also finds previous fit results and loads them in.

:param bool read_fits: Boolean flag that controls whether past fits are read back in or not.
"""

def parse_image_like(file_path: str, exact_type: str, merged: bool = False) -> BaseProduct:
"""
Very simple little function that takes the path to an XGA generated image-like product (so either an
image or an exposure map), parses the file path and makes an XGA object of the correct type by using
the exact_type variable.

:param str file_path: Absolute path to an XGA-generated XMM data product.
:param str exact_type: Either 'image' or 'expmap', the type of product that the file_path leads to.
:param bool merged: Whether this is a merged file or not.
:return: An XGA product object.
:rtype: BaseProduct
"""
# Get rid of the absolute part of the path, then split by _ to get the information from the file name
im_info = file_path.split("/")[-1].split("_")

if not merged:
# I know its hard coded but this will always be the case, these are files I generate with XGA.
obs_id = im_info[0]
ins = im_info[1]
else:
ins = "combined"
obs_id = "combined"

en_str = [entry for entry in im_info if "keV" in entry][0]
lo_en, hi_en = en_str.split("keV")[0].split("-")

# Have to be astropy quantities before passing them into the Product declaration
lo_en = Quantity(float(lo_en), "keV")
hi_en = Quantity(float(hi_en), "keV")

# Different types of Product objects, the empty strings are because I don't have the stdout, stderr,
#  or original commands for these objects.
if exact_type == "image" and "psfcorr" not in file_path:
final_obj = Image(file_path, obs_id, ins, "", "", "", lo_en, hi_en)
elif exact_type == "image" and "psfcorr" in file_path:
final_obj = Image(file_path, obs_id, ins, "", "", "", lo_en, hi_en)
final_obj.psf_corrected = True
final_obj.psf_bins = int([entry for entry in im_info if "bin" in entry][0].split('bin')[0])
final_obj.psf_iterations = int([entry for entry in im_info if "iter" in
entry][0].split('iter')[0])
final_obj.psf_model = [entry for entry in im_info if "mod" in entry][0].split("mod")[0]
final_obj.psf_algorithm = [entry for entry in im_info if "algo" in entry][0].split("algo")[0]
elif exact_type == "expmap":
final_obj = ExpMap(file_path, obs_id, ins, "", "", "", lo_en, hi_en)
else:
raise TypeError("Only image and expmap are allowed.")

return final_obj

og_dir = os.getcwd()
# This is used for spectra that should be part of an AnnularSpectra object
ann_spec_constituents = {}
# This is to store whether all components could be loaded in successfully
ann_spec_usable = {}
for obs in self._obs:
if os.path.exists(OUTPUT + obs):
os.chdir(OUTPUT + obs)
cur_d = os.getcwd() + '/'
# Loads in the inventory file for this ObsID

# Here we read in instruments and exposure maps which are relevant to this source
im_lines = inven[(inven['type'] == 'image') | (inven['type'] == 'expmap')]
# Instruments is a dictionary with ObsIDs on the top level and then valid instruments on
#  the lower level. As such we can be sure here we're only reading in instruments we decided
#  are valid
for i in self.instruments[obs]:
# Fetches lines of the inventory which match the current ObsID and instrument
rel_ims = im_lines[(im_lines['obs_id'] == obs) & (im_lines['inst'] == i)]
for r_ind, r in rel_ims.iterrows():
self.update_products(parse_image_like(cur_d+r['file_name'], r['type']), update_inv=False)

# For spectra we search for products that have the name of this object in, as they are for
#  specific parts of the observation.
# Have to replace any + characters with x, as that's what we did in evselect_spectrum due to SAS
#  having some issues with the + character in file names
named = [os.path.abspath(f) for f in os.listdir(".") if os.path.isfile(f) and
self._name.replace("+", "x") in f and obs in f
and (XMM_INST[0] in f or XMM_INST[1] in f or XMM_INST[2] in f)]
specs = [f for f in named if "spec" in f.split('/')[-1] and "back" not in f.split('/')[-1]]

for sp in specs:
# Filename contains a lot of useful information, so splitting it out to get it
sp_info = sp.split("/")[-1].split("_")
# Reading these out into variables mostly for my own sanity while writing this
obs_id = sp_info[0]
inst = sp_info[1]
# I now store the central coordinate in the file name, and read it out into astropy quantity
#  for when I need to define the spectrum object
central_coord = Quantity([float(sp_info[3].strip('ra')), float(sp_info[4].strip('dec'))], 'deg')
# Also read out the inner and outer radii into astropy quantities (I know that
#  they will be in degree units).
r_inner = Quantity(np.array(sp_info[5].strip('ri').split('and')).astype(float), 'deg')
r_outer = Quantity(np.array(sp_info[6].strip('ro').split('and')).astype(float), 'deg')
# Check if there is only one r_inner and r_outer value each, if so its a circle
#  (otherwise its an ellipse)
if len(r_inner) == 1:
r_inner = r_inner[0]
r_outer = r_outer[0]

# Only check the actual filename, as I have no knowledge of what strings might be in the
#  user's path to xga output
if 'grpTrue' in sp.split('/')[-1]:
grp_ind = sp_info.index('grpTrue')
grouped = True
else:
grouped = False

# mincnt or minsn information will only be in the filename if the spectrum is grouped
if grouped and 'mincnt' in sp.split('/')[-1]:
min_counts = int(sp_info[grp_ind+1].split('mincnt')[-1])
min_sn = None
elif grouped and 'minsn' in sp.split('/')[-1]:
min_sn = float(sp_info[grp_ind+1].split('minsn')[-1])
min_counts = None
else:
# We still need to pass the variables to the spectrum definition, even if it isn't
#  grouped
min_sn = None
min_counts = None

# Only if oversampling was applied will it appear in the filename
if 'ovsamp' in sp.split('/')[-1]:
over_sample = int(sp_info[-2].split('ovsamp')[-1])
else:
over_sample = None

if "region" in sp.split('/')[-1]:
region = True
else:
region = False

# I split the 'spec' part of the end of the name of the spectrum, and can use the parts of the
#  file name preceding it to search for matching arf/rmf files
sp_info_str = cur_d + sp.split('/')[-1].split('_spec')[0]

# Fairly self explanatory, need to find all the separate products needed to define an XGA
#  spectrum
arf = [f for f in named if "arf" in f and "back" not in f and sp_info_str == f.split('.arf')[0]]
rmf = [f for f in named if "rmf" in f and "back" not in f and sp_info_str == f.split('.rmf')[0]]
# As RMFs can be generated for source and background spectra separately, or one for both,
#  we need to check for matching RMFs to the spectrum we found
if len(rmf) == 0:
rmf = [f for f in named if "rmf" in f and "back" not in f and inst in f and "universal" in f]

# Exact same checks for the background spectrum
back = [f for f in named if "backspec" in f and inst in f
and sp_info_str == f.split('_backspec')[0]]
back_arf = [f for f in named if "arf" in f and inst in f
and sp_info_str == f.split('_back.arf')[0] and "back" in f]
back_rmf = [f for f in named if "rmf" in f and "back" in f and inst in f
and sp_info_str == f.split('_back.rmf')[0]]
if len(back_rmf) == 0:
back_rmf = rmf

# If exactly one match has been found for all of the products, we define an XGA spectrum and
#  add it the source object.
if len(arf) == 1 and len(rmf) == 1 and len(back) == 1 and len(back_arf) == 1 and len(back_rmf) == 1:
# Defining our XGA spectrum instance
obj = Spectrum(sp, rmf[0], arf[0], back[0], central_coord, r_inner, r_outer, obs_id, inst,
grouped, min_counts, min_sn, over_sample, "", "", "", region, back_rmf[0],
back_arf[0])

if "ident" in sp.split('/')[-1]:
set_id = int(sp.split('ident')[-1].split('_')[0])
ann_id = int(sp.split('ident')[-1].split('_')[1])
obj.annulus_ident = ann_id
obj.set_ident = set_id
if set_id not in ann_spec_constituents:
ann_spec_constituents[set_id] = []
ann_spec_usable[set_id] = True
ann_spec_constituents[set_id].append(obj)
else:
# And adding it to the source storage structure, but only if its not a member
#  of an AnnularSpectra
try:
self.update_products(obj, update_inv=False)
except NotAssociatedError:
pass

elif len(arf) == 1 and len(rmf) == 1 and len(back) == 1 and len(back_arf) == 0:
# Defining our XGA spectrum instance
obj = Spectrum(sp, rmf[0], arf[0], back[0], central_coord, r_inner, r_outer, obs_id, inst,
grouped, min_counts, min_sn, over_sample, "", "", "", region)

if "ident" in sp.split('/')[-1]:
set_id = int(sp.split('ident')[-1].split('_')[0])
ann_id = int(sp.split('ident')[-1].split('_')[1])
obj.annulus_ident = ann_id
obj.set_ident = set_id
if set_id not in ann_spec_constituents:
ann_spec_constituents[set_id] = []
ann_spec_usable[set_id] = True
ann_spec_constituents[set_id].append(obj)
else:
# And adding it to the source storage structure, but only if its not a member
#  of an AnnularSpectra
try:
self.update_products(obj, update_inv=False)
except NotAssociatedError:
pass
else:
warn_text = "{src} spectrum {sp} cannot be loaded in due to a mismatch in available" \
" ancillary files".format(src=self.name, sp=sp)
if not self._samp_member:
warn(warn_text, stacklevel=2)
else:
self._supp_warn.append(warn_text)
if "ident" in sp.split("/")[-1]:
set_id = int(sp.split('ident')[-1].split('_')[0])
ann_spec_usable[set_id] = False

os.chdir(og_dir)

# Here we will load in existing xga profile objects
os.chdir(OUTPUT + "profiles/{}".format(self.name))
saved_profs = [pf for pf in os.listdir('.') if '.xga' in pf and 'profile' in pf and self.name in pf]
for pf in saved_profs:
try:
try:
self.update_products(temp_prof, update_inv=False)
except NotAssociatedError:
pass
except (EOFError, pickle.UnpicklingError):
warn_text = "A profile save ({}) appears to be corrupted, it has not been " \
"loaded; you can safely delete this file".format(os.getcwd() + '/' + pf)
if not self._samp_member:
# If these errors have been raised then I think that the pickle file has been broken (see issue #935)
warn(warn_text, stacklevel=2)
else:
self._supp_warn.append(warn_text)
os.chdir(og_dir)

# If spectra that should be a part of annular spectra object(s) have been found, then I need to create
#  those objects and add them to the storage structure
if len(ann_spec_constituents) != 0:
for set_id in ann_spec_constituents:
if ann_spec_usable[set_id]:
ann_spec_obj = AnnularSpectra(ann_spec_constituents[set_id])
if self._redshift is not None:
# If we know the redshift we will add the radii to the annular spectra in proper distance units
self.update_products(ann_spec_obj, update_inv=False)

# Here we load in any combined images and exposure maps that may have been generated
os.chdir(OUTPUT + 'combined')
cur_d = os.getcwd() + '/'
# This creates a set of observation-instrument strings that describe the current combinations associated
#  with this source, for testing against to make sure we're loading in combined images/expmaps that
#  do belong with this source
src_oi_set = set([o+i for o in self._instruments for i in self._instruments[o]])

# Loads in the inventory file for this ObsID
rel_inven = inven[(inven['type'] == 'image') | (inven['type'] == 'expmap')]
for row_ind, row in rel_inven.iterrows():
o_split = row['obs_ids'].split('/')
i_split = row['insts'].split('/')
# Assemble a set of observations-instrument strings for the current row, to test against the
#  src_oi_set we assembled earlier
test_oi_set = set([o+i_split[o_ind] for o_ind, o in enumerate(o_split)])
# First we make sure the sets are the same length, if they're not then we know before starting that this
#  row's file can't be okay for us to load in. Then we compute the union between the test_oi_set and
#  the src_oi_set, and if that is the same length as the original src_oi_set then we know that they match
#  exactly and the product can be loaded
if len(src_oi_set) == len(test_oi_set) and len(src_oi_set | test_oi_set) == len(src_oi_set):
self.update_products(parse_image_like(cur_d+row['file_name'], row['type'], merged=True),
update_inv=False)

os.chdir(og_dir)

if os.path.exists(OUTPUT + "XSPEC/" + self.name) and read_fits:
ann_obs_order = {}
ann_results = {}
ann_lums = {}
prev_fits = [OUTPUT + "XSPEC/" + self.name + "/" + f
for f in os.listdir(OUTPUT + "XSPEC/" + self.name) if ".xcm" not in f and ".fits" in f]
for fit in prev_fits:
fit_name = fit.split("/")[-1]
fit_info = fit_name.split("_")
storage_key = "_".join(fit_info[1:-1])
# Load in the results table
fit_data = FITS(fit)

# This bit is largely copied from xspec.py, sorry for my laziness
global_results = fit_data["RESULTS"][0]
model = global_results["MODEL"].strip(" ")

if "_ident" in storage_key:
set_id, ann_id = storage_key.split("_ident")[-1].split("_")
set_id = int(set_id)
ann_id = int(ann_id)
if set_id not in ann_results:
ann_results[set_id] = {}
ann_lums[set_id] = {}
ann_obs_order[set_id] = {}

if model not in ann_results[set_id]:
ann_results[set_id][model] = {}
ann_lums[set_id][model] = {}
ann_obs_order[set_id][model] = {}

else:
set_id = None
ann_id = None

try:
inst_lums = {}
obs_order = []
for line_ind, line in enumerate(fit_data["SPEC_INFO"]):
sp_info = line["SPEC_PATH"].strip(" ").split("/")[-1].split("_")
# Want to derive the spectra storage key from the file name, this strips off some
#  unnecessary info
sp_key = line["SPEC_PATH"].strip(" ").split("/")[-1].split('ra')[-1].split('_spec.fits')[0]

# If its not an AnnularSpectra fit then we can just fetch the spectrum from the source
#  the normal way
if set_id is None:
# This adds ra back on, and removes any ident information if it is there
sp_key = 'ra' + sp_key
# Finds the appropriate matching spectrum object for the current table line
spec = self.get_products("spectrum", sp_info[0], sp_info[1], extra_key=sp_key)[0]
else:
sp_key = 'ra' + sp_key.split('_ident')[0]
ann_spec = self.get_annular_spectra(set_id=set_id)
spec = ann_spec.get_spectra(ann_id, sp_info[0], sp_info[1])
obs_order.append([sp_info[0], sp_info[1]])

# Adds information from this fit to the spectrum object.

# The add_fit_data method formats the luminosities nicely, so we grab them back out
#  to help grab the luminosity needed to pass to the source object 'add_fit_data' method
processed_lums = spec.get_luminosities(model)
if spec.instrument not in inst_lums:
inst_lums[spec.instrument] = processed_lums

# Ideally the luminosity reported in the source object will be a PN lum, but its not impossible
#  that a PN value won't be available. - it shouldn't matter much, lums across the cameras are
#  consistent
if "pn" in inst_lums:
chosen_lums = inst_lums["pn"]
# mos2 generally better than mos1, as mos1 has CCD damage after a certain point in its life
elif "mos2" in inst_lums:
chosen_lums = inst_lums["mos2"]
elif "mos1" in inst_lums:
chosen_lums = inst_lums["mos1"]
else:
chosen_lums = None

if set_id is not None:
ann_results[set_id][model][spec.annulus_ident] = global_results
ann_lums[set_id][model][spec.annulus_ident] = chosen_lums
ann_obs_order[set_id][model][spec.annulus_ident] = obs_order
else:
# Push global fit results, luminosities etc. into the corresponding source object.
except (OSError, NoProductAvailableError, IndexError, NotAssociatedError):
chosen_lums = {}
warn_text = "{src} fit {f} could not be loaded in as there are no matching spectra " \
"available".format(src=self.name, f=fit_name)
if not self._samp_member:
warn(warn_text, stacklevel=2)
else:
self._supp_warn.append(warn_text)
fit_data.close()

if len(ann_results) != 0:
for set_id in ann_results:
try:
rel_ann_spec = self.get_annular_spectra(set_id=set_id)
for model in ann_results[set_id]:
ann_obs_order[set_id][model])
# if model == "constant*tbabs*apec":
#     temp_prof = rel_ann_spec.generate_profile(model, 'kT', 'keV')
#     self.update_products(temp_prof)
#
#     # Normalisation profiles can be useful for many things, so we generate them too
#     norm_prof = rel_ann_spec.generate_profile(model, 'norm', 'cm^-5')
#     self.update_products(norm_prof)
#
#     if 'Abundanc' in rel_ann_spec.get_results(0, 'constant*tbabs*apec'):
#         met_prof = rel_ann_spec.generate_profile(model, 'Abundanc', '')
#         self.update_products(met_prof)
except (NoProductAvailableError, ValueError):
warn_text = "A previous annular spectra profile fit for {src} was not successful, or no " \
"matching spectrum has been loaded, so it cannot be read " \
"in".format(src=self.name)
if not self._samp_member:
warn(warn_text, stacklevel=2)
else:
self._supp_warn.append(warn_text)

os.chdir(og_dir)

# And finally loading in any conversion factors that have been calculated using XGA's fakeit interface
if os.path.exists(OUTPUT + "XSPEC/" + self.name) and read_fits:
conv_factors = [OUTPUT + "XSPEC/" + self.name + "/" + f for f in os.listdir(OUTPUT + "XSPEC/" + self.name)
if ".xcm" not in f and "conv_factors" in f]
for conv_path in conv_factors:
res_table = pd.read_csv(conv_path, dtype={"lo_en": str, "hi_en": str})
# Gets the model name from the file name of the output results table
model = conv_path.split("_")[-3]

# We can infer the storage key from the name of the results table, just makes it easier to
#  grab the correct spectra
storage_key = conv_path.split('/')[-1].split(self.name)[-1][1:].split(model)[0][:-1]

# Grabs the ObsID+instrument combinations from the headers of the csv. Makes sure they are unique
#  by going to a set (because there will be two columns for each ObsID+Instrument, rate and Lx)
# First two columns are skipped because they are energy limits
combos = list(set([c.split("_")[1] for c in res_table.columns[2:]]))
# Getting the spectra for each column, then assigning rates and luminosities.
# Due to the danger of a fit using a piece of data (an ObsID-instrument combo) that isn't currently
#  associated with the source, we first fetch the spectra, then in a second loop we assign the factors
rel_spec = []
try:
for comb in combos:
spec = self.get_products("spectrum", comb[:10], comb[10:], extra_key=storage_key)[0]
rel_spec.append(spec)

for comb_ind, comb in enumerate(combos):
res_table["rate_{}".format(comb)].values,
res_table["Lx_{}".format(comb)].values, model)

# This triggers in the case of something like issue #738, where a previous fit used data that is
#  not loaded into this source (either because it was manually removed, or because the central
#  position has changed etc.)
except NotAssociatedError:
warn_text = "Existing fit for {s} could not be loaded due to a mismatch in available " \
"data".format(s=self.name)
if not self._samp_member:
warn(warn_text, stacklevel=2)
else:
self._supp_warn.append(warn_text)

[docs]    def get_products(self, p_type: str, obs_id: str = None, inst: str = None, extra_key: str = None,
just_obj: bool = True) -> List[BaseProduct]:
"""
This is the getter for the products data structure of Source objects. Passing a 'product type'
such as 'events' or 'images' will return every matching entry in the products data structure.

:param str p_type: Product type identifier. e.g. image or expmap.
:param str obs_id: Optionally, a specific obs_id to search can be supplied.
:param str inst: Optionally, a specific instrument to search can be supplied.
:param str extra_key: Optionally, an extra key (like an energy bound) can be supplied.
:param bool just_obj: A boolean flag that controls whether this method returns just the product objects,
or the other information that goes with it like ObsID and instrument.
:return: List of matching products.
:rtype: List[BaseProduct]
"""

def unpack_list(to_unpack: list):
"""
A recursive function to go through every layer of a nested list and flatten it all out. It
doesn't return anything because to make life easier the 'results' are appended to a variable
in the namespace above this one.

:param list to_unpack: The list that needs unpacking.
"""
# Must iterate through the given list
for entry in to_unpack:
# If the current element is not a list then all is chill, this element is ready for appending
# to the final list
if not isinstance(entry, list):
out.append(entry)
else:
# If the current element IS a list, then obviously we still have more unpacking to do,
# so we call this function recursively.
unpack_list(entry)

if obs_id not in self._products and obs_id is not None:
raise NotAssociatedError("{0} is not associated with {1} .".format(obs_id, self.name))
elif (obs_id is not None and obs_id in self._products) and \
(inst is not None and inst not in self._products[obs_id]):
raise NotAssociatedError("{0} is associated with {1}, but {2} is not associated with that "
"observation".format(obs_id, self.name, inst))

matches = []
# Iterates through the dict search return, but each match is likely to be a very nested list,
# with the degree of nesting dependant on product type (as event lists live a level up from
# images for instance
for match in dict_search(p_type, self._products):
out = []
unpack_list(match)
# Only appends if this particular match is for the obs_id and instrument passed to this method
# Though all matches will be returned if no obs_id/inst is passed
if (obs_id == out[0] or obs_id is None) and (inst == out[1] or inst is None) \
and (extra_key in out or extra_key is None) and not just_obj:
matches.append(out)
elif (obs_id == out[0] or obs_id is None) and (inst == out[1] or inst is None) \
and (extra_key in out or extra_key is None) and just_obj:
matches.append(out[-1])
return matches

def _load_regions(self, reg_paths) -> Tuple[dict, dict]:
"""
An internal method that reads and parses region files found for observations
associated with this source. Also computes simple matches to find regions likely
to be related to the source.

:return: Two dictionaries, the first contains the regions for each of the ObsIDs and the second contains
the regions that have been very simply matched to the source. These should be ordered from closest to
furthest from the passed source coordinates.
:rtype: Tuple[dict, dict]
"""

def dist_from_source(reg):
"""
Calculates the euclidean distance between the centre of a supplied region, and the
position of the source.

:param reg: A region object.
:return: Distance between region centre and source position.
"""
ra = reg.center.ra.value
dec = reg.center.dec.value
return np.sqrt(abs(ra - self._ra_dec[0]) ** 2 + abs(dec - self._ra_dec[1]) ** 2)

# Read in the custom region file that every XGA has associated with it. Sources within will be added to the
#  source list for every ObsID?
for reg in custom_regs:
if not isinstance(reg, SkyRegion):
raise TypeError("Custom sources can only be defined in RA-Dec coordinates.")

reg_dict = {}
match_dict = {}
# As we only allow one set of regions per observation, we shall assume that we can use the
# WCS transform from ANY of the images to convert pixels to degrees

for obs_id in reg_paths:
if reg_paths[obs_id] is not None:
# Apparently can happen that there are no regions in a region file, so if that is the case
#  then I just set the ds9_regs to [None] because I know the rest of the code can deal with that.
#  It can't deal with an empty list
if len(ds9_regs) == 0:
ds9_regs = [None]
else:
ds9_regs = [None]

if isinstance(ds9_regs[0], PixelRegion):
# If regions exist in pixel coordinates, we need an image WCS to convert them to RA-DEC, so we need
#  one of the images supplied in the config file, not anything that XGA generates.
#  But as this method is only run once, before XGA generated products are loaded in, it
#  should be fine
inst = [k for k in self._products[obs_id] if k in ["pn", "mos1", "mos2"]][0]
en = [k for k in self._products[obs_id][inst] if "-" in k][0]
# Making an assumption here, that if there are regions there will be images
# Getting the radec_wcs property from the Image object
im = [i for i in self.get_products("image", obs_id, inst, just_obj=False) if en in i]

if len(im) != 1:
raise NoProductAvailableError("There is no image available for observation {o}, associated "
"with {n}. An image is require to translate pixel regions "
"to RA-DEC.".format(o=obs_id, n=self.name))
sky_regs = [reg.to_sky(w) for reg in ds9_regs]
reg_dict[obs_id] = np.array(sky_regs)
elif isinstance(ds9_regs[0], SkyRegion):
reg_dict[obs_id] = np.array(ds9_regs)
else:
# So there is an entry in this for EVERY ObsID
reg_dict[obs_id] = np.array([None])

# Here we add the custom sources to the source list, we know they are sky regions as we have
#  already enforced it. If there was no region list for a particular ObsID (detected by the first
#  entry in the reg dict being None) and there IS a custom region, we just replace the None with the
#  custom region
if reg_dict[obs_id][0] is not None:
reg_dict[obs_id] = np.append(reg_dict[obs_id], custom_regs)
elif reg_dict[obs_id][0] is None and len(custom_regs) != 0:
reg_dict[obs_id] = custom_regs
else:
reg_dict[obs_id] = np.array([None])

# I'm going to ensure that all regions are elliptical, I don't want to hunt through every place in XGA
#  where I made that assumption
for reg_ind, reg in enumerate(reg_dict[obs_id]):
if isinstance(reg, CircleSkyRegion):
# Multiply radii by two because the ellipse based sources want HEIGHT and WIDTH, not RADIUS
# Give small angle (though won't make a difference as circular) to avoid problems with angle=0
#  that I've noticed previously
new_reg.visual['color'] = reg.visual['color']
reg_dict[obs_id][reg_ind] = new_reg

# Hopefully this bodge doesn't have any unforeseen consequences
if reg_dict[obs_id][0] is not None and len(reg_dict[obs_id]) > 1:
# Quickly calculating distance between source and center of regions, then sorting
# and getting indices. Thus I only match to the closest 5 regions.
diff_sort = np.array([dist_from_source(r) for r in reg_dict[obs_id]]).argsort()
# Unfortunately due to a limitation of the regions module I think you need images
#  to do this contains match...
within = np.array([reg.contains(SkyCoord(*self._ra_dec, unit='deg'), w)
for reg in reg_dict[obs_id][diff_sort[0:5]]])

# Make sure to re-order the region list to match the sorted within array
reg_dict[obs_id] = reg_dict[obs_id][diff_sort]

# Expands it so it can be used as a mask on the whole set of regions for this observation
within = np.pad(within, [0, len(diff_sort) - len(within)])
match_dict[obs_id] = within
# In the case of only one region being in the list, we simplify the above expression
elif reg_dict[obs_id][0] is not None and len(reg_dict[obs_id]) == 1:
if reg_dict[obs_id][0].contains(SkyCoord(*self._ra_dec, unit='deg'), w):
match_dict[obs_id] = np.array([True])
else:
match_dict[obs_id] = np.array([False])
else:
match_dict[obs_id] = np.array([False])

return reg_dict, match_dict

[docs]    def update_queue(self, cmd_arr: np.ndarray, p_type_arr: np.ndarray, p_path_arr: np.ndarray,
extra_info: np.ndarray, stack: bool = False):
"""
Small function to update the numpy array that makes up the queue of products to be generated.

:param np.ndarray cmd_arr: Array containing SAS commands.
:param np.ndarray p_type_arr: Array of product type identifiers for the products generated
by the cmd array. e.g. image or expmap.
:param np.ndarray p_path_arr: Array of final product paths if cmd is successful
:param np.ndarray extra_info: Array of extra information dictionaries
:param stack: Should these commands be executed after a preceding line of commands,
or at the same time.
:return:
"""
if self.queue is None:
# I could have done all of these in one array with 3 dimensions, but felt this was easier to read
# and with no real performance penalty
self.queue = cmd_arr
self.queue_type = p_type_arr
self.queue_path = p_path_arr
self.queue_extra_info = extra_info
elif stack:
self.queue = np.vstack((self.queue, cmd_arr))
self.queue_type = np.vstack((self.queue_type, p_type_arr))
self.queue_path = np.vstack((self.queue_path, p_path_arr))
self.queue_extra_info = np.vstack((self.queue_extra_info, extra_info))
else:
self.queue = np.append(self.queue, cmd_arr, axis=0)
self.queue_type = np.append(self.queue_type, p_type_arr, axis=0)
self.queue_path = np.append(self.queue_path, p_path_arr, axis=0)
self.queue_extra_info = np.append(self.queue_extra_info, extra_info, axis=0)

[docs]    def get_queue(self) -> Tuple[List[str], List[str], List[List[str]], List[dict]]:
"""
Calling this indicates that the queue is about to be processed, so this function combines SAS
commands along columns (command stacks), and returns N SAS commands to be run concurrently,
where N is the number of columns.

:return: List of strings, where the strings are bash commands to run SAS procedures, another
list of strings, where the strings are expected output types for the commands, a list of
lists of strings, where the strings are expected output paths for products of the SAS commands.
:rtype: Tuple[List[str], List[str], List[List[str]]]
"""
if self.queue is None:
# This returns empty lists if the queue is undefined
processed_cmds = []
types = []
paths = []
extras = []
elif len(self.queue.shape) == 1 or self.queue.shape[1] <= 1:
processed_cmds = list(self.queue)
types = list(self.queue_type)
paths = [[str(path)] for path in self.queue_path]
extras = list(self.queue_extra_info)
else:
processed_cmds = [";".join(col) for col in self.queue.T]
types = list(self.queue_type[-1, :])
paths = [list(col.astype(str)) for col in self.queue_path.T]
extras = []
for col in self.queue_path.T:
# This nested dictionary comprehension combines a column of extra information
# dictionaries into one, for ease of access.
comb_extra = {k: v for ext_dict in col for k, v in ext_dict.items()}
extras.append(comb_extra)

# This is only likely to be called when processing is beginning, so this will wipe the queue.
self.queue = None
self.queue_type = None
self.queue_path = None
self.queue_extra_info = None
# The returned paths are lists of strings because we want to include every file in a stack to be able
# to check that exists
return processed_cmds, types, paths, extras

[docs]    def get_att_file(self, obs_id: str) -> str:
"""
Fetches the path to the attitude file for an XMM observation.

:param obs_id: The ObsID to fetch the attitude file for.
:return: The path to the attitude file.
:rtype: str
"""
if obs_id not in self._products:
raise NotAssociatedError("{o} is not associated with {s}".format(o=obs_id, s=self.name))
else:
return self._att_files[obs_id]

@property
def obs_ids(self) -> List[str]:
"""
Property getter for ObsIDs associated with this source that are confirmed to have events files.

:return: A list of the associated XMM ObsIDs.
:rtype: List[str]
"""
return self._obs

@property
def blacklisted(self) -> Dict:
"""
A property getter that returns the dictionary of ObsIDs and their instruments which have been
blacklisted, and thus not considered for use in any analysis of this source.

:return: The dictionary (with ObsIDs as keys) of blacklisted data.
:rtype: Dict
"""
return self._blacklisted_obs

def _source_type_match(self, source_type: str) -> Tuple[Dict, Dict, Dict]:
"""
A method that looks for matches not just based on position, but also on the type of source
we want to find. Finding no matches is allowed, but the source will be declared as undetected.
An error will be thrown if more than one match of the correct type per observation is found.

:param str source_type: Should either be ext or pnt, describes what type of source I
should be looking for in the region files.
:return: A dictionary containing the matched region for each ObsID + a combined region, another
dictionary containing any sources that matched to the coordinates and weren't chosen,
and a final dictionary with sources that aren't the target, or in the 2nd dictionary.
:rtype: Tuple[Dict, Dict, Dict]
"""
# Definitions of the colours of XCS regions can be found in the thesis of Dr Micheal Davidson
#  University of Edinburgh - 2005. These are the default XGA colour meanings
# Red - Point source
# Green - Extended source
# Magenta - PSF-sized extended source
# Blue - Extended source with significant point source contribution
# Cyan - Extended source with significant Run1 contribution
# Yellow - Extended source with less than 10 counts
try:
# Gets the allowed colours for the current source type
allowed_colours = SRC_REGION_COLOURS[source_type]
except KeyError:
raise ValueError("{} is not a recognised source type, please "
"don't use this internal function!".format(source_type))

# Here we store the actual matched sources
results_dict = {}
# And in this one go all the sources that aren't the matched source, we'll need to subtract them.
anti_results_dict = {}
# Sources in this dictionary are within the target source region AND matched to initial coordinates,
# but aren't the chosen source.
alt_match_dict = {}
# Goes through all the ObsIDs associated with this source, and checks if they have regions
#  If not then Nones are added to the various dictionaries, otherwise you end up with a list of regions
#  with missing ObsIDs
for obs in self.obs_ids:
if obs in self._initial_regions:
# This sets up an array of matched regions, accounting for the problems that can occur when
#  there is only one region in the region list (numpy's indexing gets very angry). The array
#  of matched region(s) set up here is used in this method.
if len(self._initial_regions[obs]) == 1 and not self._initial_region_matches[obs][0]:
init_region_matches = np.array([])
elif len(self._initial_regions[obs]) == 1 and self._initial_region_matches[obs][0]:
init_region_matches = self._initial_regions[obs]
elif len(self._initial_regions[obs][self._initial_region_matches[obs]]) == 0:
init_region_matches = np.array([])
else:
init_region_matches = self._initial_regions[obs][self._initial_region_matches[obs]]

# If there are no matches then the returned result is just None.
if len(init_region_matches) == 0:
results_dict[obs] = None
else:
interim_reg = []
# The only solution I could think of is to go by the XCS standard of region files, so green
#  is extended, red is point etc. - not ideal but I'll just explain in the documentation
# for entry in self._initial_regions[obs][self._initial_region_matches[obs]]:
for entry in init_region_matches:
if entry.visual["color"] in allowed_colours:
interim_reg.append(entry)

# Different matching possibilities
if len(interim_reg) == 0:
results_dict[obs] = None
elif len(interim_reg) == 1:
results_dict[obs] = interim_reg[0]
# Matching to multiple sources would be very problematic, so throw an error
elif len(interim_reg) > 1 and source_type == "pnt":
# I made the _load_regions method sort the outputted region dictionaries by distance from the
#  input coordinates, so I know that the 0th entry will be the closest to the source coords.
#  Hence I choose that one for pnt source multi-matches like this, see comment 2 of issue #639
#  for an example.
results_dict[obs] = interim_reg[0]
warn_text = "{ns} matches for the point source {n} are found in the {o} region " \
"file. The source nearest to the passed coordinates is accepted, all others " \
"will be placed in the alternate match category and will not be removed " \
if not self._samp_member:
warn(warn_text, stacklevel=2)
else:
self._supp_warn.append(warn_text)

elif len(interim_reg) > 1 and source_type == "ext":
raise MultipleMatchError("More than one match for {n} is found in the region file "
"for observation {o}, this cannot yet be dealt with "
"for extended sources.".format(o=obs, n=self.name))

# Alt match is used for when there is a secondary match to a point source
alt_match_reg = [entry for entry in init_region_matches if entry != results_dict[obs]]
alt_match_dict[obs] = alt_match_reg

# These are all the sources that aren't a match, and so should be removed from any analysis
not_source_reg = [reg for reg in self._initial_regions[obs] if reg != results_dict[obs]
and reg not in alt_match_reg]
anti_results_dict[obs] = not_source_reg

else:
results_dict[obs] = None
alt_match_dict[obs] = []
anti_results_dict[obs] = []

return results_dict, alt_match_dict, anti_results_dict

@property
def detected(self) -> dict:
"""
A property getter to return if a match of the correct type has been found.

:return: The detected boolean attribute.
:rtype: bool
"""
if self._detected is None:
raise ValueError("detected is currently None, BaseSource objects don't have the type "
"context needed to define if the source is detected or not.")
else:
return self._detected

@property
def matched_regions(self) -> dict:
"""
Property getter for the matched regions associated with this particular source.

:return: A dictionary of matching regions, or None if such a match has not been performed.
:rtype: dict
"""
return self._regions

[docs]    def source_back_regions(self, reg_type: str, obs_id: str = None, central_coord: Quantity = None) \
-> Tuple[SkyRegion, SkyRegion]:
"""
A method to retrieve source region and background region objects for a given source type with a
given central coordinate.

:param str reg_type: The type of region which we wish to get from the source.
:param str obs_id: The ObsID that the region is associated with (if appropriate).
:param Quantity central_coord: The central coordinate of the region.
:return: The method returns both the source region and the associated background region.
:rtype:
"""
# Doing an initial check so I can throw a warning if the user wants a region-list region AND has supplied
#  custom central coordinates
if reg_type == "region" and central_coord is not None:
warn("You cannot use custom central coordinates with a region from supplied region files", stacklevel=2)

if central_coord is None:
central_coord = self._default_coord

if type(central_coord) == Quantity:
centre = SkyCoord(*central_coord.to("deg"))
elif type(central_coord) == SkyCoord:
centre = central_coord
else:
raise TypeError("central_coord must be of type Quantity or SkyCoord.")

# In case combined gets passed as the ObsID at any point
if obs_id == "combined":
obs_id = None

# The search radius won't be used by the user, just peak finding solutions
allowed_rtype = ["r2500", "r500", "r200", "region", "custom", "search", "point"]
if type(self) == BaseSource:
raise TypeError("BaseSource class does not have the necessary information "
"to select a source region.")
elif obs_id is not None and obs_id not in self.obs_ids:
raise NotAssociatedError("The ObsID {o} is not associated with {s}.".format(o=obs_id, s=self.name))
elif reg_type not in allowed_rtype:
raise ValueError("The only allowed region types are {}".format(", ".join(allowed_rtype)))
elif reg_type == "region" and obs_id is None:
raise ValueError("ObsID cannot be None when getting region file regions.")
elif reg_type == "region" and obs_id is not None:
src_reg = self._regions[obs_id]
elif reg_type in ["r2500", "r500", "r200"] and reg_type not in self._radii:
raise ValueError("There is no {r} associated with {s}".format(r=reg_type, s=self.name))
elif reg_type != "region" and reg_type in self._radii:
# We know for certain that the radius will be in degrees, but it has to be converted to degrees
#  before being stored in the radii attribute
elif reg_type != "region" and reg_type not in self._radii:
raise ValueError("{} is a valid region type, but is not associated with this "
"source.".format(reg_type))
else:
raise ValueError("OH NO")

# Here is where we initialise the background regions, first in pixel coords, then converting to ra-dec.
# TODO Verify that just using the first image is okay
im = self.get_products("image")[0]
# TODO Try and remember why I had to convert to pixel regions to make it work
if isinstance(src_reg, EllipseSkyRegion):
# Here we multiply the inner width/height by 1.05 (to just slightly clear the source region),
#  and the outer width/height by 1.5 (standard for XCS) - default values
# Ideally this would be an annulus region, but they are bugged in regions v0.4, so we must bodge
in_reg = EllipsePixelRegion(src_pix_reg.center, src_pix_reg.width * self._back_inn_factor,
src_pix_reg.height * self._back_inn_factor, src_pix_reg.angle)
out_reg = EllipsePixelRegion(src_pix_reg.center, src_pix_reg.width * self._back_out_factor,
src_pix_reg.height * self._back_out_factor, src_pix_reg.angle)
bck_reg = out_reg.symmetric_difference(in_reg)
elif isinstance(src_reg, CircleSkyRegion):
in_reg = CirclePixelRegion(src_pix_reg.center, src_pix_reg.radius * self._back_inn_factor)
out_reg = CirclePixelRegion(src_pix_reg.center, src_pix_reg.radius * self._back_out_factor)
bck_reg = out_reg.symmetric_difference(in_reg)

return src_reg, bck_reg

[docs]    def within_region(self, region: SkyRegion) -> List[SkyRegion]:
"""
This method finds interloper sources that lie within the user supplied region.

:param SkyRegion region: The region in which we wish to search for interloper sources (for instance
a source region or background region).
:return: A list of regions that lie within the user supplied region.
:rtype: List[SkyRegion]
"""
im = self.get_products("image")[0]

for r in self._interloper_regions])
reg_within = np.array(self._interloper_regions)[crossover]

return reg_within

[docs]    def get_interloper_regions(self, flattened: bool = False) -> Union[List, Dict]:
"""
This get method provides a way to access the regions that have been designated as interlopers (i.e.
not the source region that a particular Source has been designated to investigate) for all observations.
They can either be retrieved in a dictionary with ObsIDs as the keys, or a flattened single list with no
ObsID context.

:param bool flattened: If true then the regions are returned as a single list of region objects. Otherwise
they are returned as a dictionary with ObsIDs as keys. Default is False.
:return: Either a list of region objects, or a dictionary with ObsIDs as keys.
:rtype: Union[List,Dict]
"""
if type(self) == BaseSource:
raise TypeError("BaseSource objects don't have enough information to know which sources "
"are interlopers.")

# If flattened then a list is returned rather than the original dictionary with
if not flattened:
ret_reg = self._other_regions
else:
# Iterate through the ObsIDs in the dictionary and add the resulting lists together
ret_reg = []
for o in self._other_regions:
ret_reg += self._other_regions[o]

return ret_reg

[docs]    def get_source_mask(self, reg_type: str, obs_id: str = None, central_coord: Quantity = None) \
-> Tuple[np.ndarray, np.ndarray]:
"""
Method to retrieve source and background masks for the given region type.

:param str reg_type: The type of region for which to retrieve the mask.
:param str obs_id: The ObsID that the mask is associated with (if appropriate).
:param Quantity central_coord: The central coordinate of the region.
:return: The source and background masks for the requested ObsID (or the combined image if no ObsID).
:rtype: Tuple[np.ndarray, np.ndarray]
"""
if obs_id == "combined":
obs_id = None

if central_coord is None:
central_coord = self._default_coord

# Don't need to do a bunch of checks, because the method I call to make the
#  mask does all the checks anyway
src_reg, bck_reg = self.source_back_regions(reg_type, obs_id, central_coord)

# I assume that if no ObsID is supplied, then the user wishes to have a mask for the combined data
if obs_id is None:
comb_images = self.get_products("combined_image")
if len(comb_images) != 0:
else:
raise NoProductAvailableError("There are no combined products available to generate a mask for.")
else:
# Just grab the first instrument that comes out the get method, the masks should be the same.

# If the masks are None, then they are set to an array of zeros

"""
Internal method that makes interloper masks in the first place; I allow this because interloper
masks will never change, so can be safely generated and stored in an init of a source class.

:param Image mask_image: The image for which to create the interloper mask.
:return: A numpy array of 0s and 1s which acts as a mask to remove interloper sources.
:rtype: ndarray
"""
for r in self._interloper_regions:
if r is not None:
# The central coordinate of the current region
c = Quantity([r.center.ra.value, r.center.dec.value], 'deg')
try:
# Checks if the central coordinate can be converted to pixels for the mask_image, if it fails then
#  its likely off of the image, as a ValueError will be thrown if a pixel coordinate is less
#  than zero, or greater than the size of the image in that axis

# If the rotation angle is zero then the conversion to mask by the regions module will be upset,
#  so I perturb the angle by 0.1 degrees
if isinstance(pr, EllipsePixelRegion) and pr.angle.value == 0:
pr.angle += Quantity(0.1, 'deg')
except ValueError:
pass

#          for reg in self._interloper_regions if reg is not None]
interlopers = sum([m for m in masks if m is not None])

[docs]    def get_interloper_mask(self, obs_id: str = None) -> ndarray:
"""
Returns a mask for a given ObsID (or combined data if no ObsID given) that will remove any sources
that have not been identified as the source of interest.

:param str obs_id: The ObsID that the mask is associated with (if appropriate).
:return: A numpy array of 0s and 1s which acts as a mask to remove interloper sources.
:rtype: ndarray
"""
if type(self) == BaseSource:
raise TypeError("BaseSource objects don't have enough information to know which sources "
"are interlopers.")

if obs_id is not None and obs_id != "combined" and obs_id not in self.obs_ids:
raise NotAssociatedError("{o} is not associated with {s}; only {a} are "
"available".format(o=obs_id, s=self.name, a=", ".join(self.obs_ids)))
elif obs_id is not None and obs_id != "combined":
elif obs_id is None or obs_id == "combined" and "combined" not in self._interloper_masks:
comb_ims = self.get_products("combined_image")
if len(comb_ims) == 0:
raise NoProductAvailableError("There are no combined images available for which to fetch"
im = comb_ims[0]
elif obs_id is None or obs_id == "combined" and "combined" in self._interloper_masks:

[docs]    def get_mask(self, reg_type: str, obs_id: str = None, central_coord: Quantity = None) -> \
Tuple[np.ndarray, np.ndarray]:
"""
Method to retrieve source and background masks for the given region type, WITH INTERLOPERS REMOVED.

:param str reg_type: The type of region for which to retrieve the interloper corrected mask.
:param str obs_id: The ObsID that the mask is associated with (if appropriate).
:param Quantity central_coord: The central coordinate of the region.
:return: The source and background masks for the requested ObsID (or the combined image if no ObsID).
:rtype: Tuple[np.ndarray, np.ndarray]
"""
# Grabs the source masks without interlopers removed

# Multiplies the uncorrected source and background masks with the interloper masks to correct
#  for interloper sources

central_coord: Quantity = None, remove_interlopers: bool = True) -> np.ndarray:
"""
A simple, but powerful method, to generate mask a mask within a custom radius for a given ObsID.

:param str obs_id: The ObsID for which to generate the mask, default is None which will return a mask
generated from a combined image.
:param Quantity central_coord: The central coordinates of the mask, the default is None which
will use the default coordinates of the source.
:param bool remove_interlopers: Whether an interloper mask should be combined with the custom mask to
remove interloper point sources.
:return: A numpy array containing the desired mask.
:rtype: np.ndarray
"""
if central_coord is None:
central_coord = self._default_coord

if obs_id is None:
# Doesn't matter which combined ratemap, just need the size and coord conversion powers
rt = self.get_combined_ratemaps()
else:
# Again so long as the image matches the ObsID passed in by the user I don't care what instrument
#  its from
rt = self.get_ratemaps(obs_id=obs_id)

# If its not an instance of RateMap that means a list of RateMaps has been returned, and as I only want
#  the WCS information and the shape of the image I don't care which one we use
if not isinstance(rt, RateMap):
rt = rt[0]

# Convert the inner and outer radii to degrees so they can be easily converted to pixels

# Making sure the inner and outer radii are whole integer numbers, as they are now in pixel units
# Convert the chosen central coordinates to pixels
pix_centre = rt.coord_conv(central_coord, 'pix')

# And applying an interloper mask if the user wants that.
if remove_interlopers:

[docs]    def get_snr(self, outer_radius: Union[Quantity, str], central_coord: Quantity = None, lo_en: Quantity = None,
hi_en: Quantity = None, obs_id: str = None, inst: str = None, psf_corr: bool = False,
psf_model: str = "ELLBETA", psf_bins: int = 4, psf_algo: str = "rl", psf_iter: int = 15,
allow_negative: bool = False,  exp_corr: bool = True) -> float:
"""
This takes a region type and central coordinate and calculates the signal to noise ratio.

:param Quantity/str outer_radius: The radius that SNR should be calculated within, this can either be a
named radius such as r500, or an astropy Quantity.
:param Quantity central_coord: The central coordinate of the region.
:param Quantity lo_en: The lower energy bound of the ratemap to use to calculate the SNR. Default is None,
in which case the lower energy bound for peak finding will be used (default is 0.5keV).
:param Quantity hi_en: The upper energy bound of the ratemap to use to calculate the SNR. Default is None,
in which case the upper energy bound for peak finding will be used (default is 2.0keV).
:param str obs_id: An ObsID of a specific ratemap to use for the SNR calculation. Default is None, which
means the combined ratemap will be used. Please note that inst must also be set to use this option.
:param str inst: The instrument of a specific ratemap to use for the SNR calculation. Default is None, which
means the combined ratemap will be used.
:param bool psf_corr: Sets whether you wish to use a PSF corrected ratemap or not.
:param str psf_model: If the ratemap you want to use is PSF corrected, this is the PSF model used.
:param int psf_bins: If the ratemap you want to use is PSF corrected, this is the number of PSFs per
side in the PSF grid.
:param str psf_algo: If the ratemap you want to use is PSF corrected, this is the algorithm used.
:param int psf_iter: If the ratemap you want to use is PSF corrected, this is the number of iterations.
:param bool allow_negative: Should pixels in the background subtracted count map be allowed to go below
zero, which results in a lower signal to noise (and can result in a negative signal to noise).
:param bool exp_corr: Should signal to noises be measured with exposure time correction, default is True. I
recommend that this be true for combined observations, as exposure time could change quite dramatically
across the combined product.
:return: The signal to noise ratio.
:rtype: float
"""
# Checking if the user passed any energy limits of their own
if lo_en is None:
lo_en = self._peak_lo_en
if hi_en is None:
hi_en = self._peak_hi_en

# Parsing the ObsID and instrument options, see if they want to use a specific ratemap
if all([obs_id is None, inst is None]):
# Here the user hasn't set ObsID or instrument, so we use the combined data
rt = self.get_combined_ratemaps(lo_en, hi_en, psf_corr, psf_model, psf_bins, psf_algo, psf_iter)

elif all([obs_id is not None, inst is not None]):
# Both ObsID and instrument have been set by the user
rt = self.get_ratemaps(obs_id, inst, lo_en, hi_en, psf_corr, psf_model, psf_bins, psf_algo, psf_iter)
else:
raise ValueError("If you wish to use a specific ratemap for {s}'s signal to noise calculation, please "
" pass both obs_id and inst.".format(s=self.name))

# Grabs the interloper removed source and background region masks. If the ObsID is None the get_mask
#  method understands that means it should return the mask for the combined data
else:
# Here we have the case where the user has passed a custom outer radius, so we need to generate a
obs_id=obs_id, central_coord=central_coord)

# We use the ratemap's built in signal to noise calculation method

return sn

[docs]    def get_counts(self, outer_radius: Union[Quantity, str], central_coord: Quantity = None, lo_en: Quantity = None,
hi_en: Quantity = None, obs_id: str = None, inst: str = None, psf_corr: bool = False,
psf_model: str = "ELLBETA", psf_bins: int = 4, psf_algo: str = "rl", psf_iter: int = 15) -> Quantity:
"""
This takes a region type and central coordinate and calculates the background subtracted X-ray counts.

:param Quantity/str outer_radius: The radius that counts should be calculated within, this can either be a
named radius such as r500, or an astropy Quantity.
:param Quantity central_coord: The central coordinate of the region.
:param Quantity lo_en: The lower energy bound of the ratemap to use to calculate the counts. Default is None,
in which case the lower energy bound for peak finding will be used (default is 0.5keV).
:param Quantity hi_en: The upper energy bound of the ratemap to use to calculate the counts. Default is None,
in which case the upper energy bound for peak finding will be used (default is 2.0keV).
:param str obs_id: An ObsID of a specific ratemap to use for the counts calculation. Default is None, which
means the combined ratemap will be used. Please note that inst must also be set to use this option.
:param str inst: The instrument of a specific ratemap to use for the counts calculation. Default is None, which
means the combined ratemap will be used.
:param bool psf_corr: Sets whether you wish to use a PSF corrected ratemap or not.
:param str psf_model: If the ratemap you want to use is PSF corrected, this is the PSF model used.
:param int psf_bins: If the ratemap you want to use is PSF corrected, this is the number of PSFs per
side in the PSF grid.
:param str psf_algo: If the ratemap you want to use is PSF corrected, this is the algorithm used.
:param int psf_iter: If the ratemap you want to use is PSF corrected, this is the number of iterations.
:return: The background subtracted counts.
:rtype: Quantity
"""
# Checking if the user passed any energy limits of their own
if lo_en is None:
lo_en = self._peak_lo_en
if hi_en is None:
hi_en = self._peak_hi_en

# Parsing the ObsID and instrument options, see if they want to use a specific ratemap
if all([obs_id is None, inst is None]):
# Here the user hasn't set ObsID or instrument, so we use the combined data
rt = self.get_combined_ratemaps(lo_en, hi_en, psf_corr, psf_model, psf_bins, psf_algo, psf_iter)

elif all([obs_id is not None, inst is not None]):
# Both ObsID and instrument have been set by the user
rt = self.get_ratemaps(obs_id, inst, lo_en, hi_en, psf_corr, psf_model, psf_bins, psf_algo, psf_iter)
else:
raise ValueError("If you wish to use a specific ratemap for {s}'s signal to noise calculation, please "
" pass both obs_id and inst.".format(s=self.name))

# Grabs the interloper removed source and background region masks. If the ObsID is None the get_mask
#  method understands that means it should return the mask for the combined data
else:
# Here we have the case where the user has passed a custom outer radius, so we need to generate a
obs_id=obs_id, central_coord=central_coord)

# We use the ratemap's built in background subtracted counts calculation method

return cnts

regions_to_search: Union[np.ndarray, list] = None) -> np.ndarray:
"""
This function finds and returns any interloper regions (by default) that have any part of their boundary
within the specified radii, centered on the specified central coordinate. Users may also pass their own
array of regions to check.

:param Quantity inner_radius: The inner radius of the area to search for interlopers in.
:param Quantity outer_radius: The outer radius of the area to search for interlopers in.
:param Quantity deg_central_coord: The central coordinate (IN DEGREES) of the area to search for
interlopers in.
:param np.ndarray/list regions_to_search: An optional parameter that allows the user to pass a specific
list of regions to check. Default is None, in which case the interloper_regions internal list
will be used.
:return: A numpy array of the interloper regions (or user passed regions) within the specified area.
:rtype: np.ndarray
"""
rotation: float) -> np.ndarray:
"""
An internal function to generate thirty x-y positions on the boundary of a particular region.

:param float reg_cen_x: The x position of the centre of the region, in degrees.
:param float reg_cen_y: The y position of the centre of the region, in degrees
:param float reg_major_rad: The semi-major axis of the region, in degrees.
:param float reg_minor_rad: The semi-minor axis of the region, in degrees.
:param float rotation: The rotation of the region, in radians.
:return: An array of thirty x-y coordinates on the boundary of the region.
:rtype: np.ndarray
"""
# Just the numpy array of angles (in radians) to find the x-y points of
angs = np.linspace(0, 2 * np.pi, 30)

# This is just the parametric equation of an ellipse - I only include the displacement to the
#  central coordinates of the region AFTER it has been rotated

# Sets of the rotation matrix
rot_mat = np.array([[np.cos(rotation), -1 * np.sin(rotation)], [np.sin(rotation), np.cos(rotation)]])

# Just rotates the edge coordinates to match the known rotation of this particular region
edge_coords = (rot_mat @ np.vstack([x, y])).T

# Now I re-centre the region
edge_coords[:, 0] += reg_cen_x
edge_coords[:, 1] += reg_cen_y

return edge_coords

if deg_central_coord.unit != deg:
raise UnitConversionError("The central coordinate must be in degrees for this function.")

# If no custom regions array was passed, we use the internal array of interloper regions
if regions_to_search is None:
regions_to_search = self._interloper_regions.copy()

# Then we can check to make sure that the outer radius is larger than the inner radius

# I think my last attempt at this type of function was made really slow by something to with the regions
#  module, so I'm going to try and move away from that here
# This is horrible I know, but it basically generates points on the boundary of each interloper, and then
#  calculates their distance from the central coordinate. So you end up with an Nx30 (because 30 is
#  how many points I generate) and N is the number of potential interlopers
int_dists = np.array([np.sqrt(np.sum((perimeter_points(r.center.ra.value, r.center.dec.value,
r.width.to('deg').value/2,
- deg_central_coord.value) ** 2, axis=1))
for r in regions_to_search])

# Finds which of the possible interlopers have any part of their boundary within the annulus in consideration

return np.array(regions_to_search)[int_within]

@staticmethod
def _interloper_sas_string(reg: EllipseSkyRegion, im: Image, output_unit: Union[UnitBase, str]) -> str:
"""
Converts ellipse sky regions into SAS region strings for use in SAS tasks.

:param EllipseSkyRegion reg: The interloper region to generate a SAS string for
:param Image im: The XGA image to use for coordinate conversion.
:param UnitBase/str output_unit: The output unit for this SAS region, either xmm_sky or xmm_det.
:return: The SAS string region for this interloper
:rtype: str
"""

if output_unit == xmm_det:
c_str = "DETX,DETY"
raise NotImplementedError("This coordinate system is not yet supported, and isn't a priority. Please "
"submit an issue on https://github.com/DavidT3/XGA/issues if you particularly "
"want this.")
elif output_unit == xmm_sky:
c_str = "X,Y"
else:
raise NotImplementedError("Only detector and sky coordinates are currently "
"supported for generating SAS region strings.")

cen = Quantity([reg.center.ra.value, reg.center.dec.value], 'deg')
sky_to_deg = sky_deg_scale(im, cen).value
conv_cen = im.coord_conv(cen, output_unit)
# Have to divide the width by two, I need to know the half-width for SAS regions, then convert
#  from degrees to XMM sky coordinates using the factor we calculated in the main function
w = reg.width.to('deg').value / 2 / sky_to_deg
# We do the same for the height
h = reg.height.to('deg').value / 2 / sky_to_deg
if w == h:
shape_str = "(({t}) IN circle({cx},{cy},{r}))"
shape_str = shape_str.format(t=c_str, cx=conv_cen[0].value, cy=conv_cen[1].value, r=h)
else:
# The rotation angle from the region object is in degrees already
shape_str = "(({t}) IN ellipse({cx},{cy},{w},{h},{rot}))".format(t=c_str, cx=conv_cen[0].value,
cy=conv_cen[1].value, w=w, h=h,
rot=reg.angle.value)
return shape_str

output_unit: Union[UnitBase, str] = xmm_sky, rot_angle: Quantity = Quantity(0, 'deg'),
interloper_regions: np.ndarray = None, central_coord: Quantity = None) -> str:
"""
A method to generate a SAS region string for an arbitrary circular or elliptical annular region, with
interloper sources removed.

:param Quantity inner_radius: The inner radius/radii of the region you wish to generate in SAS, if the
quantity has multiple elements then an elliptical region will be generated, with the first element
being the inner radius on the semi-major axis, and the second on the semi-minor axis.
:param Quantity outer_radius: The inner outer_radius/radii of the region you wish to generate in SAS, if the
quantity has multiple elements then an elliptical region will be generated, with the first element
being the outer radius on the semi-major axis, and the second on the semi-minor axis.
:param str obs_id: The ObsID of the observation you wish to generate the SAS region for.
:param str inst: The instrument of the observation you to generate the SAS region for.
:param UnitBase/str output_unit: The output unit for this SAS region, either xmm_sky or xmm_det.
:param np.ndarray interloper_regions: The interloper regions to remove from the source region,
default is None, in which case the function will run self.regions_within_radii.
:param Quantity rot_angle: The rotation angle of the source region, default is zero degrees.
:param Quantity central_coord: The coordinate on which to centre the source region, default is
None in which case the function will use the default_coord of the source object.
:return: A string for use in a SAS routine that describes the source region, and the regions
to cut out of it.
:rtype: str
"""

if central_coord is None:
central_coord = self._default_coord

# These checks/conversions are already done by the evselect_spectrum command, but I don't
#  mind doing them again

# Then we can check to make sure that the outer radius is larger than the inner radius
raise ValueError("A SAS circular region for {s} cannot have an inner_radius larger than or equal to its "
raise ValueError("A SAS elliptical region for {s} cannot have inner radii larger than or equal to its "

if output_unit == xmm_det:
c_str = "DETX,DETY"
raise NotImplementedError("This coordinate system is not yet supported, and isn't a priority. Please "
"submit an issue on https://github.com/DavidT3/XGA/issues if you particularly "
"want this.")
elif output_unit == xmm_sky:
c_str = "X,Y"
else:
raise NotImplementedError("Only detector and sky coordinates are currently "
"supported for generating SAS region strings.")

# We need a matching image to perform the coordinate conversion we require
rel_im = self.get_products("image", obs_id, inst)[0]
# We can set our own offset value when we call this function, but I don't think I need to
sky_to_deg = sky_deg_scale(rel_im, central_coord).value

# We need our chosen central coordinates in the right units of course
xmm_central_coord = rel_im.coord_conv(central_coord, output_unit)
# And just to make sure the central coordinates are in degrees
deg_central_coord = rel_im.coord_conv(central_coord, deg)

# If the user doesn't pass any regions, then we have to find them ourselves. I decided to allow this
#  so that within_radii can just be called once externally for a set of ObsID-instrument combinations,
#  like in evselect_spectrum for instance.
if interloper_regions is None and inner_radius.isscalar:
elif interloper_regions is None and not inner_radius.isscalar:

# So now we convert our interloper regions into their SAS equivalents
sas_interloper = [self._interloper_sas_string(i, rel_im, output_unit) for i in interloper_regions]

# And we need to define a SAS string for the actual region of interest
sas_source_area = "(({t}) IN annulus({cx},{cy},{ri},{ro}))"
sas_source_area = sas_source_area.format(t=c_str, cx=xmm_central_coord[0].value,
# If the inner radius is zero then we write a circle region, because it seems that's a LOT faster in SAS
sas_source_area = "(({t}) IN circle({cx},{cy},{r}))"
sas_source_area = sas_source_area.format(t=c_str, cx=xmm_central_coord[0].value,
cy=xmm_central_coord[1].value,
sas_source_area = "(({t}) IN elliptannulus({cx},{cy},{wi},{hi},{wo},{ho},{rot},{rot}))"
sas_source_area = sas_source_area.format(t=c_str, cx=xmm_central_coord[0].value,
cy=xmm_central_coord[1].value,
sas_source_area = "(({t}) IN ellipse({cx},{cy},{w},{h},{rot}))"
sas_source_area = sas_source_area.format(t=c_str, cx=xmm_central_coord[0].value,
cy=xmm_central_coord[1].value,
rot=rot_angle.to('deg').value)

# Combining the source region with the regions we need to cut out
if len(sas_interloper) == 0:
final_src = sas_source_area
else:
final_src = sas_source_area + " &&! " + " &&! ".join(sas_interloper)

return final_src

@property
def nH(self) -> Quantity:
"""
Property getter for neutral hydrogen column attribute.

:return: Neutral hydrogen column surface density.
:rtype: Quantity
"""
return self._nH

@property
def redshift(self) -> float:
"""
Property getter for the redshift of this source object.

:return: Redshift value
:rtype: float
"""
return self._redshift

@property
def on_axis_obs_ids(self) -> list:
"""
This method returns an array of ObsIDs that this source is approximately on axis in.

:return: ObsIDs for which the source is approximately on axis.
:rtype: list
"""
return self._onaxis

@property
def cosmo(self) -> Cosmology:
"""
This method returns whatever cosmology object is associated with this source object.

:return: An astropy cosmology object specified for this source on initialization.
:rtype: Cosmology
"""
return self._cosmo

# This is used to name files and directories so this is not allowed to change.
@property
def name(self) -> str:
"""
The name of the source, either given at initialisation or generated from the user-supplied coordinates.

:return: The name of the source.
:rtype: str
"""
return self._name

[docs]    def add_fit_data(self, model: str, tab_line, lums: dict, spec_storage_key: str):
"""
A method that stores fit results and global information about a the set of spectra in a source object.
Any variable parameters in the fit are stored in an internal dictionary structure, as are any luminosities
calculated. Other parameters of interest are store in other internal attributes. This probably shouldn't
ever be used by the user, just other parts of XGA, hence why I've asked for a spec_storage_key to be passed
in rather than all the spectrum configuration options individually.

:param str model: The XSPEC definition of the model used to perform the fit. e.g. constant*tbabs*apec
:param tab_line: The table line with the fit data.
:param dict lums: The various luminosities measured during the fit.
:param str spec_storage_key: The storage key of any spectrum that was used in this particular fit. The
ObsID and instrument used don't matter, as the storage key will be the same and is based off of the
settings when the spectra were generated.
"""
# Just headers that will always be present in tab_line that are not fit parameters
not_par = ['MODEL', 'TOTAL_EXPOSURE', 'TOTAL_COUNT_RATE', 'TOTAL_COUNT_RATE_ERR',

# Various global values of interest
self._total_exp[spec_storage_key] = float(tab_line["TOTAL_EXPOSURE"])
if spec_storage_key not in self._total_count_rate:
self._total_count_rate[spec_storage_key] = {}
self._test_stat[spec_storage_key] = {}
self._dof[spec_storage_key] = {}
self._total_count_rate[spec_storage_key][model] = [float(tab_line["TOTAL_COUNT_RATE"]),
float(tab_line["TOTAL_COUNT_RATE_ERR"])]
self._test_stat[spec_storage_key][model] = float(tab_line["TEST_STATISTIC"])
self._dof[spec_storage_key][model] = float(tab_line["DOF"])

# The parameters available will obviously be dynamic, so have to find out what they are and then
#  then for each result find the +- errors
par_headers = [n for n in tab_line.dtype.names if n not in not_par]
mod_res = {}
# The parameter name and the parameter index used by XSPEC are separated by |
par_info = par.split("|")
par_name = par_info[0]

# The parameter index can also have an - or + after it if the entry in question is an uncertainty
if par_info[1][-1] == "-":
ident = par_info[1][:-1]
pos = 1
elif par_info[1][-1] == "+":
ident = par_info[1][:-1]
pos = 2
else:
ident = par_info[1]
pos = 0

# Sets up the dictionary structure for the results
if par_name not in mod_res:
mod_res[par_name] = {ident: [0, 0, 0]}
elif ident not in mod_res[par_name]:
mod_res[par_name][ident] = [0, 0, 0]

mod_res[par_name][ident][pos] = float(tab_line[par])

# Storing the fit results
if spec_storage_key not in self._fit_results:
self._fit_results[spec_storage_key] = {}
self._fit_results[spec_storage_key][model] = mod_res

# And now storing the luminosity results
if spec_storage_key not in self._luminosities:
self._luminosities[spec_storage_key] = {}
self._luminosities[spec_storage_key][model] = lums

[docs]    def get_results(self, outer_radius: Union[str, Quantity], model: str,
inner_radius: Union[str, Quantity] = Quantity(0, 'arcsec'), par: str = None,
group_spec: bool = True, min_counts: int = 5, min_sn: float = None, over_sample: float = None):
"""
Important method that will retrieve fit results from the source object. Either for a specific
parameter of a given region-model combination, or for all of them. If a specific parameter is requested,
all matching values from the fit will be returned in an N row, 3 column numpy array (column 0 is the value,
column 1 is err-, and column 2 is err+). If no parameter is specified, the return will be a dictionary
of such numpy arrays, with the keys corresponding to parameter names.

:param str/Quantity outer_radius: The name or value of the outer radius that was used for the generation of
the spectra which were fitted to produce the desired result (for instance 'r200' would be acceptable
for a GalaxyCluster, or Quantity(1000, 'kpc')). If 'region' is chosen (to use the regions in
region files), then any inner radius will be ignored.
:param str model: The name of the fitted model that you're requesting the results
from (e.g. constant*tbabs*apec).
:param str/Quantity inner_radius: The name or value of the inner radius that was used for the generation of
the spectra which were fitted to produce the desired result (for instance 'r500' would be acceptable
for a GalaxyCluster, or Quantity(300, 'kpc')). By default this is zero arcseconds, resulting in a
circular spectrum.
:param str par: The name of the parameter you want a result for.
:param bool group_spec: Whether the spectra that were fitted for the desired result were grouped.
:param float min_counts: The minimum counts per channel, if the spectra that were fitted for the
desired result were grouped by minimum counts.
:param float min_sn: The minimum signal to noise per channel, if the spectra that were fitted for the
desired result were grouped by minimum signal to noise.
:param float over_sample: The level of oversampling applied on the spectra that were fitted.
:return: The requested result value, and uncertainties.
"""
# First I want to retrieve the spectra that were fitted to produce the result they're looking for,
#  because then I can just grab the storage key from one of them
# I just take the first spectrum in the list because the storage key will be the same for all of them
if isinstance(specs, list):
storage_key = specs[0].storage_key
else:
storage_key = specs.storage_key

# Bunch of checks to make sure the requested results actually exist
if len(self._fit_results) == 0:
raise ModelNotAssociatedError("There are no XSPEC fits associated with {s}".format(s=self.name))
elif storage_key not in self._fit_results:
raise ModelNotAssociatedError("Those spectra have no associated XSPEC fit to {s}".format(s=self.name))
elif model not in self._fit_results[storage_key]:
av_mods = ", ".join(self._fit_results[storage_key].keys())
raise ModelNotAssociatedError("{m} has not been fitted to those spectra of {s}; available "
"models are {a}".format(m=model, s=self.name, a=av_mods))
elif par is not None and par not in self._fit_results[storage_key][model]:
av_pars = ", ".join(self._fit_results[storage_key][model].keys())
raise ParameterNotAssociatedError("{p} was not a free parameter in the {m} fit to {s}, "
"the options are {a}".format(p=par, m=model, s=self.name, a=av_pars))

fit_data = self._fit_results[storage_key][model]
proc_data = {}  # Where the output will ive
for p_key in fit_data:
# Used to shape the numpy array the data is transferred into
num_entries = len(fit_data[p_key])
# 'Empty' new array to write out the results into, done like this because results are stored
#  in nested dictionaries with their XSPEC parameter number as an extra key
new_data = np.zeros((num_entries, 3))

# If a parameter is unlinked in a fit with multiple spectra (like normalisation for instance),
#  there can be N entries for the same parameter, writing them out in order to a numpy array
for incr, par_index in enumerate(fit_data[p_key]):
new_data[incr, :] = fit_data[p_key][par_index]

# Just makes the output a little nicer if there is only one entry
if new_data.shape[0] == 1:
proc_data[p_key] = new_data[0]
else:
proc_data[p_key] = new_data

# If no specific parameter was requested, the user gets all of them
if par is None:
return proc_data
else:
return proc_data[par]

[docs]    def get_luminosities(self, outer_radius: Union[str, Quantity], model: str,
inner_radius: Union[str, Quantity] = Quantity(0, 'arcsec'), lo_en: Quantity = None,
hi_en: Quantity = None, group_spec: bool = True, min_counts: int = 5, min_sn: float = None,
over_sample: float = None):
"""
Get method for luminosities calculated from model fits to spectra associated with this source.
Either for given energy limits (that must have been specified when the fit was first performed), or
for all luminosities associated with that model. Luminosities are returned as a 3 column numpy array;
the 0th column is the value, the 1st column is the err-, and the 2nd is err+.

:param str/Quantity outer_radius: The name or value of the outer radius that was used for the generation of
the spectra which were fitted to produce the desired result (for instance 'r200' would be acceptable
for a GalaxyCluster, or Quantity(1000, 'kpc')). If 'region' is chosen (to use the regions in
region files), then any inner radius will be ignored.
:param str model: The name of the fitted model that you're requesting the luminosities
from (e.g. constant*tbabs*apec).
:param str/Quantity inner_radius: The name or value of the inner radius that was used for the generation of
the spectra which were fitted to produce the desired result (for instance 'r500' would be acceptable
for a GalaxyCluster, or Quantity(300, 'kpc')). By default this is zero arcseconds, resulting in a
circular spectrum.
:param Quantity lo_en: The lower energy limit for the desired luminosity measurement.
:param Quantity hi_en: The upper energy limit for the desired luminosity measurement.
:param bool group_spec: Whether the spectra that were fitted for the desired result were grouped.
:param float min_counts: The minimum counts per channel, if the spectra that were fitted for the
desired result were grouped by minimum counts.
:param float min_sn: The minimum signal to noise per channel, if the spectra that were fitted for the
desired result were grouped by minimum signal to noise.
:param float over_sample: The level of oversampling applied on the spectra that were fitted.
:return: The requested luminosity value, and uncertainties.
"""
# First I want to retrieve the spectra that were fitted to produce the result they're looking for,
#  because then I can just grab the storage key from one of them
# I just take the first spectrum in the list because the storage key will be the same for all of them
if isinstance(specs, list):
storage_key = specs[0].storage_key
else:
storage_key = specs.storage_key

# Checking the input energy limits are valid, and assembles the key to look for lums in those energy
#  bounds. If the limits are none then so is the energy key
if lo_en is not None and hi_en is not None and lo_en > hi_en:
raise ValueError("The low energy limit cannot be greater than the high energy limit")
elif lo_en is not None and hi_en is not None:
en_key = "bound_{l}-{u}".format(l=lo_en.to("keV").value, u=hi_en.to("keV").value)
else:
en_key = None

# Checks that the requested region, model and energy band actually exist
if len(self._luminosities) == 0:
raise ModelNotAssociatedError("There are no XSPEC fits associated with {s}".format(s=self.name))
elif storage_key not in self._luminosities:
raise ModelNotAssociatedError("These spectra have no associated XSPEC fit to {s}.".format(s=self.name))
elif model not in self._luminosities[storage_key]:
av_mods = ", ".join(self._luminosities[storage_key].keys())
raise ModelNotAssociatedError("{m} has not been fitted to these spectra of {s}; "
"available models are {a}".format(m=model, s=self.name, a=av_mods))
elif en_key is not None and en_key not in self._luminosities[storage_key][model]:
av_bands = ", ".join([en.split("_")[-1] + "keV" for en in self._luminosities[storage_key][model].keys()])
raise ParameterNotAssociatedError("{l}-{u}keV was not an energy band for the fit with {m}; available "
"energy bands are {b}".format(l=lo_en.to("keV").value,
u=hi_en.to("keV").value,
m=model, b=av_bands))

# If no limits specified,the user gets all the luminosities, otherwise they get the one they asked for
if en_key is None:
parsed_lums = {}
for lum_key in self._luminosities[storage_key][model]:
lum_value = self._luminosities[storage_key][model][lum_key]
parsed_lum = Quantity([lum.value for lum in lum_value], lum_value[0].unit)
parsed_lums[lum_key] = parsed_lum
return parsed_lums
else:
lum_value = self._luminosities[storage_key][model][en_key]
parsed_lum = Quantity([lum.value for lum in lum_value], lum_value[0].unit)
return parsed_lum

[docs]    def convert_radius(self, radius: Quantity, out_unit: Union[Unit, str] = 'deg') -> Quantity:
"""
A simple method to convert radii between different distance units, it automatically checks whether
the requested conversion is possible, given available information. For instance it would fail if you
requested a conversion from arcseconds to a proper distance if no redshift information were available.

:param Unit/str out_unit: The unit to convert the input radius to.
:rtype: Quantity
"""
# If a string representation was passed, we make it an astropy unit
if isinstance(out_unit, str):
out_unit = Unit(out_unit)

if out_unit.is_equivalent('kpc') and self._redshift is None:
raise UnitConversionError("You cannot convert to this unit without redshift information.")

else:
raise UnitConversionError("Cannot understand {} as a distance unit".format(str(out_unit)))

[docs]    def get_radius(self, rad_name: str, out_unit: Union[Unit, str] = 'deg') -> Quantity:
"""
Allows a radius associated with this source to be retrieved in specified distance units. Note
that physical distance units such as kiloparsecs may only be used if the source has
redshift information.

:param str rad_name: The name of the desired radius, r200 for instance.
:param Unit/str out_unit: An astropy unit, either a Unit instance or a string
representation. Default is degrees.
:return: The desired radius in the desired units.
:rtype: Quantity
"""

# In case somebody types in R500 rather than r500 for instance.

@property
def num_pn_obs(self) -> int:
"""
Getter method that gives the number of PN observations.

:return: Integer number of PN observations associated with this source
:rtype: int
"""
return len([o for o in self.obs_ids if 'pn' in self._products[o]])

@property
def num_mos1_obs(self) -> int:
"""
Getter method that gives the number of MOS1 observations.

:return: Integer number of MOS1 observations associated with this source
:rtype: int
"""
return len([o for o in self.obs_ids if 'mos1' in self._products[o]])

@property
def num_mos2_obs(self) -> int:
"""
Getter method that gives the number of MOS2 observations.

:return: Integer number of MOS2 observations associated with this source
:rtype: int
"""
return len([o for o in self.obs_ids if 'mos2' in self._products[o]])

# As this is an intrinsic property of which matched observations are valid, there will be no setter
@property
def instruments(self) -> Dict:
"""
A property of a source that details which instruments have valid data for which observations.

:return: A dictionary of ObsIDs and their associated valid instruments.
:rtype: Dict
"""
return self._instruments

@property
def disassociated(self) -> bool:
"""
Property that describes whether this source has had ObsIDs disassociated from it.

:return: A boolean flag, True means that ObsIDs/instruments have been removed, False means they haven't.
:rtype: bool
"""
return self._disassociated

@property
def disassociated_obs(self) -> dict:
"""
Property that details exactly what data has been disassociated from this source, if any.

:return: Dictionary describing which instruments of which ObsIDs have been disassociated from this source.
:rtype: dict
"""
return self._disassociated_obs

[docs]    def disassociate_obs(self, to_remove: Union[dict, str, list]):
"""
Method that uses the supplied dictionary to safely remove data from the source. This data will no longer
be used in any analyses, and would typically be removed because it is of poor quality, or doesn't contribute
enough to justify its presence.

:param dict/str/list to_remove: Either a dictionary of observations to remove, (in the style of
the source.instruments dictionary with the top level keys being ObsIDs, and the lower levels
being instrument names), a string containing an ObsID, or a list of ObsIDs.
"""
# Users can pass just an ObsID string, but we then need to convert it to the form
#  that the rest of the function requires
if isinstance(to_remove, str):
to_remove = {to_remove: deepcopy(self.instruments[to_remove])}
# Here is where they have just passed a list of ObsIDs, and we need to fill in the blanks with the instruments
#  currently loaded for those ObsIDs
elif isinstance(to_remove, list):
to_remove = {o: deepcopy(self.instruments[o]) for o in to_remove}
# Here deals with when someone might have passed a dictionary where there is a single instrument, and
#  they haven't put it in a list; e.g. {'0201903501': 'pn'}. This detects instances like that and then
#  puts the individual instrument in a list as is expected by the rest of the function
elif isinstance(to_remove, dict) and not all([isinstance(v, list) for v in to_remove.values()]):
new_to_remove = {}
for o in to_remove:
if not isinstance(to_remove[o], list):
new_to_remove[o] = [deepcopy(to_remove[o])]
else:
new_to_remove[o] = deepcopy(to_remove[o])

# I use deepcopy again because there have been issues with this function still pointing to old memory
#  addresses, so I'm quite paranoid in this bit of code
to_remove = deepcopy(new_to_remove)

# We also check to make sure that the data we're being asked to remove actually is associated with the
#  source. We shall be forgiving if it isn't, and just issue a warning to let the user know that they are
#  assuming data was here that actually isn't present
# Iterating through the keys (ObsIDs) in to_remove
for o in to_remove:
if o not in self.obs_ids:
warn("{o} data cannot be removed from {s} as they are not associated with "
"it.".format(o=o, s=self.name), stacklevel=2)
# Check to see whether any of the instruments for o are not actually associated with the source
elif any([i not in self.instruments[o] for i in to_remove[o]]):
bad_list = [i for i in to_remove[o] if i not in self.instruments[o]]
warn("{o}-{ib} data cannot be removed from {s} as they are not associated "

# Sets the attribute that tells us whether any data has been removed
if not self._disassociated:
self._disassociated = True

# We want to store knowledge of what data has been removed, if there hasn't been anything taken away yet
#  then we can just set it equal to the to_remove dictionary
if len(self._disassociated_obs) == 0:
self._disassociated_obs = to_remove
# Otherwise we have to add the data to the existing dictionary structure
else:
for o in to_remove:
if o not in self._disassociated_obs:
self._disassociated_obs[o] = to_remove[o]
else:
self._disassociated_obs[o] += to_remove[o]

# If we're un-associating certain observations, odds on the combined products are no longer valid
if "combined" in self._products:
del self._products["combined"]
self._fit_results = {}
self._test_stat = {}
self._dof = {}
self._total_count_rate = {}
self._total_exp = {}
self._luminosities = {}

for o in to_remove:
for i in to_remove[o]:
del self._products[o][i]
del self._instruments[o][self._instruments[o].index(i)]

if len(self._instruments[o]) == 0:
del self._products[o]
del self._detected[o]
del self._initial_regions[o]
del self._initial_region_matches[o]
del self._regions[o]
del self._other_regions[o]
del self._alt_match_regions[o]
# These are made on demand, so need to check if its actually present first
if self._peaks is not None:
del self._peaks[o]

del self._obs[self._obs.index(o)]
if o in self._onaxis:
del self._onaxis[self._onaxis.index(o)]
del self._instruments[o]

if len(self._obs) == 0:
raise NoValidObservationsError("No observations remain associated with {} after cleaning".format(self.name))

# We attempt to load in matching XGA products if that was the behaviour set by load_products on init

@property
def luminosity_distance(self) -> Quantity:
"""
Tells the user the luminosity distance to the source if a redshift was supplied, if not returns None.

:return: The luminosity distance to the source, calculated using the cosmology associated with this source.
:rtype: Quantity
"""
return self._lum_dist

@property
def angular_diameter_distance(self) -> Quantity:
"""
Tells the user the angular diameter distance to the source if a redshift was supplied, if not returns None.

:return: The angular diameter distance to the source, calculated using the cosmology
associated with this source.
:rtype: Quantity
"""
return self._ang_diam_dist

@property
"""
The factors by which to multiply outer radius by to get inner and outer radii for background regions.

:return: An array of the two factors.
:rtype: ndarray
"""
return np.array([self._back_inn_factor, self._back_out_factor])

[docs]    def obs_check(self, reg_type: str, threshold_fraction: float = 0.5) -> Dict:
"""
This method uses exposure maps and region masks to determine which ObsID/instrument combinations
are not contributing to the analysis. It calculates the area intersection of the mask and exposure
maps, and if (for a given ObsID-Instrument) the ratio of that area to the full area of the region
calculated is less than the threshold fraction, that ObsID-instrument will be included in the returned
rejection dictionary.

:param str reg_type: The region type for which to calculate the area intersection.
:param float threshold_fraction: Intersection area/ full region area ratios below this value will mean an
ObsID-Instrument is rejected.
:return: A dictionary of ObsID keys on the top level, then instruments a level down, that
should be rejected according to the criteria supplied to this method.
:rtype: Dict
"""
# Again don't particularly want to do this local import, but its just easier
from xga.sas import eexpmap

# Going to ensure that individual exposure maps exist for each of the ObsID/instrument combinations
#  first, then checking where the source lies on the exposure map
eexpmap(self, self._peak_lo_en, self._peak_hi_en)

extra_key = "bound_{l}-{u}".format(l=self._peak_lo_en.to("keV").value, u=self._peak_hi_en.to("keV").value)

area = {o: {} for o in self.obs_ids}
full_area = {o: {} for o in self.obs_ids}
for o in self.obs_ids:
# Exposure maps of the peak finding energy range for this ObsID
exp_maps = self.get_products("expmap", o, extra_key=extra_key)
full_area[o] = m.sum()

for ex in exp_maps:
# Grabs exposure map data, then alters it so anything that isn't zero is a one
ex_data = ex.data.copy()
ex_data[ex_data > 0] = 1
# We do this because it then becomes very easy to calculate the intersection area of the mask
#  with the XMM chips. Just mask the modified expmap, then sum.
area[o][ex.instrument] = (ex_data * m).sum()

if max(list(full_area.values())) == 0:
# Everything has to be rejected in this case
reject_dict = deepcopy(self._instruments)
else:
reject_dict = {}
for o in area:
for i in area[o]:
if full_area[o] != 0:
frac = (area[o][i] / full_area[o])
else:
frac = 0
if frac <= threshold_fraction and o not in reject_dict:
reject_dict[o] = [i]
elif frac <= threshold_fraction and o in reject_dict:
reject_dict[o].append(i)

return reject_dict

# And here I'm adding a bunch of get methods that should mean the user never has to use get_products, for
#  individual product types. It will also mean that they will never have to figure out extra keys themselves
#  and I can make lists of 1 product return just as the product without being a breaking change
[docs]    def get_spectra(self, outer_radius: Union[str, Quantity], obs_id: str = None, inst: str = None,
inner_radius: Union[str, Quantity] = Quantity(0, 'arcsec'), group_spec: bool = True,
min_counts: int = 5, min_sn: float = None,
over_sample: float = None) -> Union[Spectrum, List[Spectrum]]:
"""
A useful method that wraps the get_products function to allow you to easily retrieve XGA Spectrum objects.
Simply pass the desired ObsID/instrument, and the same settings you used to generate the spectrum
in evselect_spectrum, and the spectra(um) will be provided to you. If no match is found then a
NoProductAvailableError will be raised.

:param str/Quantity outer_radius: The name or value of the outer radius that was used for the generation of
the spectrum (for instance 'r200' would be acceptable for a GalaxyCluster, or Quantity(1000, 'kpc')). If
'region' is chosen (to use the regions in region files), then any inner radius will be ignored.
:param str obs_id: Optionally, a specific obs_id to search for can be supplied. The default is None,
which means all spectra matching the other criteria will be returned.
:param str inst: Optionally, a specific instrument to search for can be supplied. The default is None,
which means all spectra matching the other criteria will be returned.
:param str/Quantity inner_radius: The name or value of the inner radius that was used for the generation of
the spectrum (for instance 'r500' would be acceptable for a GalaxyCluster, or Quantity(300, 'kpc')). By
default this is zero arcseconds, resulting in a circular spectrum.
:param bool group_spec: Was the spectrum you wish to retrieve grouped?
:param float min_counts: If the spectrum you wish to retrieve was grouped on minimum counts, what was
the minimum number of counts?
:param float min_sn: If the spectrum you wish to retrieve was grouped on minimum signal to noise, what was
the minimum signal to noise.
:param float over_sample: If the spectrum you wish to retrieve was over sampled, what was the level of
over sampling used?
:return: An XGA Spectrum object (if there is an exact match), or a list of XGA Spectrum objects (if there
were multiple matching products).
:rtype: Union[Spectrum, List[Spectrum]]
"""
else:
raise TypeError("You may only a quantity or a string as inner_radius")

else:
raise TypeError("You may only a quantity or a string as outer_radius")

if over_sample is not None:
over_sample = int(over_sample)
if min_counts is not None:
min_counts = int(min_counts)
if min_sn is not None:
min_sn = float(min_sn)

# Sets up the extra part of the storage key name depending on if grouping is enabled
if group_spec and min_counts is not None:
extra_name = "_mincnt{}".format(min_counts)
elif group_spec and min_sn is not None:
extra_name = "_minsn{}".format(min_sn)
else:
extra_name = ''

# And if it was oversampled during generation then we need to include that as well
if over_sample is not None:
extra_name += "_ovsamp{ov}".format(ov=over_sample)

# The key under which these spectra will be stored
spec_storage_name = "ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}"
spec_storage_name = spec_storage_name.format(ra=self.default_coord[0].value,
dec=self.default_coord[1].value,
gr=group_spec)
else:
spec_storage_name = "region_grp{gr}".format(gr=group_spec)

# Adds on the extra information about grouping to the storage key
spec_storage_name += extra_name
matched_prods = self.get_products('spectrum', obs_id=obs_id, inst=inst, extra_key=spec_storage_name)
if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("Cannot find any spectra matching your input.")

return matched_prods

[docs]    def get_annular_spectra(self, radii: Quantity = None, group_spec: bool = True, min_counts: int = 5,
min_sn: float = None, over_sample: float = None, set_id: int = None) -> AnnularSpectra:
"""
Another useful method that wraps the get_products function, though this one gets you AnnularSpectra.
Pass the radii used to generate the annuli, and the same settings you used to generate the spectrum
in spectrum_set, and the AnnularSpectra will be returned (if it exists). If no match is found then
a NoProductAvailableError will be raised. This method has an additional way of looking for a matching
spectrum, if the set ID is known then that can be passed by the user and used to find an exact match.

:param Quantity radii: The annulus boundary radii that were used to generate the annular spectra set
that you wish to retrieve. By default this is None, which means the method will return annular
:param bool group_spec: Was the spectrum set you wish to retrieve grouped?
:param float min_counts: If the spectrum set you wish to retrieve was grouped on minimum counts, what was
the minimum number of counts?
:param float min_sn: If the spectrum set you wish to retrieve was grouped on minimum signal to
noise, what was the minimum signal to noise.
:param float over_sample: If the spectrum set you wish to retrieve was over sampled, what was the level of
over sampling used?
:param int set_id: The unique identifier of the annular spectrum set. Passing a value for this parameter
will override any other information that you have given this method.
:return: An XGA AnnularSpectra object if there is an exact match.
:rtype: AnnularSpectra
"""
if group_spec and min_counts is not None:
extra_name = "_mincnt{}".format(min_counts)
elif group_spec and min_sn is not None:
extra_name = "_minsn{}".format(min_sn)
else:
extra_name = ''

# And if it was oversampled during generation then we need to include that as well
if over_sample is not None:
extra_name += "_ovsamp{ov}".format(ov=over_sample)

# Combines the annular radii into a string, and makes sure the radii are in degrees, as radii are in
#  degrees in the storage key
# We're dealing with the best case here, the user has passed radii, so we can generate an exact
#  storage key and look for a single match
spec_storage_name = "ra{ra}_dec{dec}_ar{ar}_grp{gr}"
spec_storage_name = spec_storage_name.format(ra=self.default_coord[0].value,
dec=self.default_coord[1].value,
spec_storage_name += extra_name
else:
# This is a worse case, we don't have radii, so we split the known parts of the key into a list
#  and we'll look for partial matches
pos_str = "ra{ra}_dec{dec}".format(ra=self.default_coord[0].value, dec=self.default_coord[1].value)
grp_str = "grp{gr}".format(gr=group_spec) + extra_name
spec_storage_name = [pos_str, grp_str]

# If the user hasn't passed a set ID AND the user has passed radii then we'll go looking with out
#  properly constructed storage key
if set_id is None and radii is not None:
matched_prods = self.get_products('combined_spectrum', extra_key=spec_storage_name)
# But if the user hasn't passed an ID AND the radii are None then we look for partial matches
elif set_id is None and radii is None:
matched_prods = [p for p in self.get_products('combined_spectrum')
if spec_storage_name[0] in p.storage_key and spec_storage_name[1] in p.storage_key]
# However if they have passed a setID then this over-rides everything else
else:
# With the set ID we fetch ALL annular spectra, then use their set_id property to match against
#  whatever the user passed in
matched_prods = [p for p in self.get_products('combined_spectrum') if p.set_ident == set_id]

if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("No matching AnnularSpectra can be found.")

return matched_prods

def _get_phot_prod(self, prod_type: str, obs_id: str = None, inst: str = None, lo_en: Quantity = None,
hi_en: Quantity = None, psf_corr: bool = False, psf_model: str = "ELLBETA",
psf_bins: int = 4, psf_algo: str = "rl", psf_iter: int = 15) \
-> Union[Image, ExpMap, RateMap, List[Image], List[ExpMap], List[RateMap]]:
"""
An internal method which is the basis of the get_images, get_expmaps, and get_ratemaps methods.

:param str obs_id: Optionally, a specific obs_id to search for can be supplied. The default is None,
which means all images/expmaps/ratemaps matching the other criteria will be returned.
:param str inst: Optionally, a specific instrument to search for can be supplied. The default is None,
which means all images/expmaps/ratemaps matching the other criteria will be returned.
:param Quantity lo_en: The lower energy limit of the images/expmaps/ratemaps you wish to
retrieve, the default is None (which will retrieve all images/expmaps/ratemaps regardless of
energy limit).
:param Quantity hi_en: The upper energy limit of the images/expmaps/ratemaps you wish to
retrieve, the default is None (which will retrieve all images/expmaps/ratemaps regardless of
energy limit).
:param bool psf_corr: Sets whether you wish to retrieve a PSF corrected images/ratemaps or not.
:param str psf_model: If the images/ratemaps you want are PSF corrected, this is the PSF model used.
:param int psf_bins: If the images/ratemaps you want are PSF corrected, this is the number of PSFs per
side in the PSF grid.
:param str psf_algo: If the images/ratemaps you want are PSF corrected, this is the algorithm used.
:param int psf_iter: If the images/ratemaps you want are PSF corrected, this is the number of iterations.
:return: An XGA Image/RateMap/ExpMap object (if there is an exact match), or a list of XGA
Image/RateMap/ExpMap objects (if there were multiple matching products).
:rtype: Union[Image, ExpMap, RateMap, List[Image], List[ExpMap], List[RateMap]]
"""
# Checks to make sure that an allowed combination of lo_en and hi_en has been passed.
if all([lo_en is None, hi_en is None]):
# Sets a flag to tell the rest of the method whether we have energy lims or not
with_lims = False
energy_key = None
elif all([lo_en is not None, hi_en is not None]):
with_lims = True
# We have energy limits here so we assemble the key that describes the energy range
energy_key = "bound_{l}-{h}".format(l=lo_en.to('keV').value, h=hi_en.to('keV').value)
else:
raise ValueError("lo_en and hi_en must be either BOTH None or BOTH an Astropy quantity.")

# If we are looking for a PSF corrected image/ratemap then we assemble the extra key with PSF details
if psf_corr and prod_type in ["image", "ratemap"]:
extra_key = "_" + psf_model + "_" + str(psf_bins) + "_" + psf_algo + str(psf_iter)

if not psf_corr and with_lims:
# Simplest case, just calling get_products and passing in our information
matched_prods = self.get_products(prod_type, obs_id, inst, extra_key=energy_key)
elif not psf_corr and not with_lims:
matched_prods = [p for p in broad_matches if not p.psf_corrected]
elif psf_corr and with_lims:
# Here we need to add the extra key to the energy key
matched_prods = self.get_products(prod_type, obs_id, inst, extra_key=energy_key + extra_key)
elif psf_corr and not with_lims:
# Here we don't know the energy key, so we have to look for partial matches in the get_products return
broad_matches = self.get_products(prod_type, obs_id, inst, extra_key=None, just_obj=False)
matched_prods = [p[-1] for p in broad_matches if extra_key in p[-2]]

if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("Cannot find any {p}s matching your input.".format(p=prod_type))

return matched_prods

[docs]    def get_images(self, obs_id: str = None, inst: str = None, lo_en: Quantity = None, hi_en: Quantity = None,
psf_corr: bool = False, psf_model: str = "ELLBETA", psf_bins: int = 4, psf_algo: str = "rl",
psf_iter: int = 15) -> Union[Image, List[Image]]:
"""
A method to retrieve XGA Image objects. This supports the retrieval of both PSF corrected and non-PSF
corrected images, as well as setting the energy limits of the specific image you would like. A
NoProductAvailableError error will be raised if no matches are found.

:param str obs_id: Optionally, a specific obs_id to search for can be supplied. The default is None,
which means all images matching the other criteria will be returned.
:param str inst: Optionally, a specific instrument to search for can be supplied. The default is None,
which means all images matching the other criteria will be returned.
:param Quantity lo_en: The lower energy limit of the image you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:param Quantity hi_en: The upper energy limit of the image you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:param bool psf_corr: Sets whether you wish to retrieve a PSF corrected image or not.
:param str psf_model: If the image you want is PSF corrected, this is the PSF model used.
:param int psf_bins: If the image you want is PSF corrected, this is the number of PSFs per
side in the PSF grid.
:param str psf_algo: If the image you want is PSF corrected, this is the algorithm used.
:param int psf_iter: If the image you want is PSF corrected, this is the number of iterations.
:return: An XGA Image object (if there is an exact match), or a list of XGA Image objects (if there
were multiple matching products).
:rtype: Union[Image, List[Image]]
"""
return self._get_phot_prod("image", obs_id, inst, lo_en, hi_en, psf_corr, psf_model, psf_bins, psf_algo,
psf_iter)

[docs]    def get_expmaps(self, obs_id: str = None, inst: str = None, lo_en: Quantity = None, hi_en: Quantity = None) \
-> Union[ExpMap, List[ExpMap]]:
"""
A method to retrieve XGA ExpMap objects. This supports setting the energy limits of the specific
exposure maps you would like. A NoProductAvailableError error will be raised if no matches are found.

:param str obs_id: Optionally, a specific obs_id to search for can be supplied. The default is None,
which means all exposure maps matching the other criteria will be returned.
:param str inst: Optionally, a specific instrument to search for can be supplied. The default is None,
which means all exposure maps matching the other criteria will be returned.
:param Quantity lo_en: The lower energy limit of the exposure maps you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:param Quantity hi_en: The upper energy limit of the exposure maps you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:return: An XGA ExpMap object (if there is an exact match), or a list of XGA ExpMap objects (if there
were multiple matching products).
:rtype: Union[ExpMap, List[ExpMap]]
"""
return self._get_phot_prod("expmap", obs_id, inst, lo_en, hi_en, False)

[docs]    def get_ratemaps(self, obs_id: str = None, inst: str = None, lo_en: Quantity = None, hi_en: Quantity = None,
psf_corr: bool = False, psf_model: str = "ELLBETA", psf_bins: int = 4, psf_algo: str = "rl",
psf_iter: int = 15) -> Union[RateMap, List[RateMap]]:
"""
A method to retrieve XGA RateMap objects. This supports the retrieval of both PSF corrected and non-PSF
corrected ratemaps, as well as setting the energy limits of the specific ratemap you would like. A
NoProductAvailableError error will be raised if no matches are found.

:param str obs_id: Optionally, a specific obs_id to search for can be supplied. The default is None,
which means all ratemaps matching the other criteria will be returned.
:param str inst: Optionally, a specific instrument to search for can be supplied. The default is None,
which means all ratemaps matching the other criteria will be returned.
:param Quantity lo_en: The lower energy limit of the ratemaps you wish to retrieve, the default
is None (which will retrieve all ratemaps regardless of energy limit).
:param Quantity hi_en: The upper energy limit of the ratemaps you wish to retrieve, the default
is None (which will retrieve all ratemaps regardless of energy limit).
:param bool psf_corr: Sets whether you wish to retrieve a PSF corrected ratemap or not.
:param str psf_model: If the ratemap you want is PSF corrected, this is the PSF model used.
:param int psf_bins: If the ratemap you want is PSF corrected, this is the number of PSFs per
side in the PSF grid.
:param str psf_algo: If the ratemap you want is PSF corrected, this is the algorithm used.
:param int psf_iter: If the ratemap you want is PSF corrected, this is the number of iterations.
:return: An XGA RateMap object (if there is an exact match), or a list of XGA RateMap objects (if there
were multiple matching products).
:rtype: Union[RateMap, List[RateMap]]
"""
return self._get_phot_prod("ratemap", obs_id, inst, lo_en, hi_en, psf_corr, psf_model, psf_bins, psf_algo,
psf_iter)

# The combined photometric products don't really NEED their own get methods, but I figured I would just for
#  clarity's sake
[docs]    def get_combined_images(self, lo_en: Quantity = None, hi_en: Quantity = None, psf_corr: bool = False,
psf_model: str = "ELLBETA", psf_bins: int = 4, psf_algo: str = "rl",
psf_iter: int = 15) -> Union[Image, List[Image]]:
"""
A method to retrieve combined XGA Image objects, as in those images that have been created by
merging all available data for this source. This supports the retrieval of both PSF corrected and non-PSF
corrected images, as well as setting the energy limits of the specific image you would like. A
NoProductAvailableError error will be raised if no matches are found.

:param Quantity lo_en: The lower energy limit of the image you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:param Quantity hi_en: The upper energy limit of the image you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:param bool psf_corr: Sets whether you wish to retrieve a PSF corrected image or not.
:param str psf_model: If the image you want is PSF corrected, this is the PSF model used.
:param int psf_bins: If the image you want is PSF corrected, this is the number of PSFs per
side in the PSF grid.
:param str psf_algo: If the image you want is PSF corrected, this is the algorithm used.
:param int psf_iter: If the image you want is PSF corrected, this is the number of iterations.
:return: An XGA Image object (if there is an exact match), or a list of XGA Image objects (if there
were multiple matching products).
:rtype: Union[Image, List[Image]]
"""

# Checks to make sure that an allowed combination of lo_en and hi_en has been passed.
if all([lo_en is None, hi_en is None]):
# Sets a flag to tell the rest of the method whether we have energy lims or not
with_lims = False
energy_key = None
elif all([lo_en is not None, hi_en is not None]):
with_lims = True
# We have energy limits here so we assemble the key that describes the energy range
energy_key = "bound_{l}-{h}".format(l=lo_en.to('keV').value, h=hi_en.to('keV').value)
else:
raise ValueError("lo_en and hi_en must be either BOTH None or BOTH an Astropy quantity.")

# If we are looking for a PSF corrected image then we assemble the extra key with PSF details
if psf_corr:
extra_key = "_" + psf_model + "_" + str(psf_bins) + "_" + psf_algo + str(psf_iter)

if not psf_corr and with_lims:
# Simplest case, just calling get_products and passing in our information
matched_prods = self.get_products('combined_image', extra_key=energy_key)
elif not psf_corr and not with_lims:
matched_prods = [p for p in broad_matches if not p.psf_corrected]
elif psf_corr and with_lims:
# Here we need to add the extra key to the energy key
matched_prods = self.get_products('combined_image', extra_key=energy_key + extra_key)
elif psf_corr and not with_lims:
# Here we don't know the energy key, so we have to look for partial matches in the get_products return
matched_prods = [p[-1] for p in broad_matches if extra_key in p[-2]]

if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("Cannot find any combined images matching your input.")

return matched_prods

[docs]    def get_combined_expmaps(self, lo_en: Quantity = None, hi_en: Quantity = None) -> Union[ExpMap, List[ExpMap]]:
"""
A method to retrieve combined XGA ExpMap objects, as in those exposure maps that have been created by
merging all available data for this source. This supports setting the energy limits of the specific
exposure maps you would like. A NoProductAvailableError error will be raised if no matches are found.

:param Quantity lo_en: The lower energy limit of the exposure maps you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:param Quantity hi_en: The upper energy limit of the exposure maps you wish to retrieve, the default
is None (which will retrieve all images regardless of energy limit).
:return: An XGA ExpMap object (if there is an exact match), or a list of XGA Image objects (if there
were multiple matching products).
:rtype: Union[ExpMap, List[ExpMap]]
"""
if all([lo_en is None, hi_en is None]):
energy_key = None
elif all([lo_en is not None, hi_en is not None]):
energy_key = "bound_{l}-{h}".format(l=lo_en.to('keV').value, h=hi_en.to('keV').value)
else:
raise ValueError("lo_en and hi_en must be either BOTH None or BOTH an Astropy quantity.")

matched_prods = self.get_products('combined_expmap', extra_key=energy_key)
if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("Cannot find any combined exposure maps matching your input.")

return matched_prods

[docs]    def get_combined_ratemaps(self, lo_en: Quantity = None, hi_en: Quantity = None,  psf_corr: bool = False,
psf_model: str = "ELLBETA", psf_bins: int = 4, psf_algo: str = "rl",
psf_iter: int = 15) -> Union[RateMap, List[RateMap]]:
"""
A method to retrieve combined XGA RateMap objects, as in those ratemap that have been created by
merging all available data for this source. This supports the retrieval of both PSF corrected and non-PSF
corrected ratemaps, as well as setting the energy limits of the specific ratemap you would like. A
NoProductAvailableError error will be raised if no matches are found.

:param Quantity lo_en: The lower energy limit of the ratemaps you wish to retrieve, the default
is None (which will retrieve all ratemaps regardless of energy limit).
:param Quantity hi_en: The upper energy limit of the ratemaps you wish to retrieve, the default
is None (which will retrieve all ratemaps regardless of energy limit).
:param bool psf_corr: Sets whether you wish to retrieve a PSF corrected ratemap or not.
:param str psf_model: If the ratemap you want is PSF corrected, this is the PSF model used.
:param int psf_bins: If the ratemap you want is PSF corrected, this is the number of PSFs per
side in the PSF grid.
:param str psf_algo: If the ratemap you want is PSF corrected, this is the algorithm used.
:param int psf_iter: If the ratemap you want is PSF corrected, this is the number of iterations.
:return: An XGA RateMap object (if there is an exact match), or a list of XGA RateMap objects (if there
were multiple matching products).
:rtype: Union[RateMap, List[RateMap]]
"""
# This function is essentially identical to get_images, but I'm going to be lazy and not write
#  a separate internal function to do both.

# Checks to make sure that an allowed combination of lo_en and hi_en has been passed.
if all([lo_en is None, hi_en is None]):
# Sets a flag to tell the rest of the method whether we have energy lims or not
with_lims = False
energy_key = None
elif all([lo_en is not None, hi_en is not None]):
with_lims = True
# We have energy limits here so we assemble the key that describes the energy range
energy_key = "bound_{l}-{h}".format(l=lo_en.to('keV').value, h=hi_en.to('keV').value)
else:
raise ValueError("lo_en and hi_en must be either BOTH None or BOTH an Astropy quantity.")

# If we are looking for a PSF corrected ratemap then we assemble the extra key with PSF details
if psf_corr:
extra_key = "_" + psf_model + "_" + str(psf_bins) + "_" + psf_algo + str(psf_iter)

if not psf_corr and with_lims:
# Simplest case, just calling get_products and passing in our information
matched_prods = self.get_products('combined_ratemap', extra_key=energy_key)
elif not psf_corr and not with_lims:
matched_prods = [p for p in broad_matches if not p.psf_corrected]
elif psf_corr and with_lims:
# Here we need to add the extra key to the energy key
matched_prods = self.get_products('combined_ratemap', extra_key=energy_key + extra_key)
elif psf_corr and not with_lims:
# Here we don't know the energy key, so we have to look for partial matches in the get_products return
matched_prods = [p[-1] for p in broad_matches if extra_key in p[-2]]

if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("Cannot find any combined ratemaps matching your input.")

return matched_prods

def _get_prof_prod(self, search_key: str, obs_id: str = None, inst: str = None, central_coord: Quantity = None,
radii: Quantity = None, lo_en: Quantity = None, hi_en: Quantity = None) \
-> Union[BaseProfile1D, List[BaseProfile1D]]:
"""
The internal method which is the guts of get_profiles and get_combined_profiles. It parses the input and
searches for full and partial matches in this source's product storage structure.

:param str search_key: The exact search key which defined profile type, and whether it is combined or not.
:param str obs_id: Optionally, a specific obs_id to search for can be supplied. The default is None,
which means all profiles matching the other criteria will be returned.
:param str inst: Optionally, a specific instrument to search for can be supplied. The default is None,
which means all profiles matching the other criteria will be returned.
:param Quantity central_coord: The central coordinate of the profile you wish to retrieve, the default
is None which means the method will use the default coordinate of this source.
:param Quantity radii: The central radii of the profile points, it is not likely that this option will be
used often as you likely won't know the radial values a priori.
:param Quantity lo_en: The lower energy bound of the profile you wish to retrieve (if applicable), default
is None, and if this argument is passed hi_en must be too.
:param Quantity hi_en: The higher energy bound of the profile you wish to retrieve (if applicable), default
is None, and if this argument is passed lo_en must be too.
:return: An XGA profile object (if there is an exact match), or a list of XGA profile objects (if there
were multiple matching products).
:rtype: Union[BaseProfile1D, List[BaseProfile1D]]
"""
if all([lo_en is None, hi_en is None]):
energy_key = "_"
elif all([lo_en is not None, hi_en is not None]):
energy_key = "bound_{l}-{h}_".format(l=lo_en.to('keV').value, h=hi_en.to('keV').value)
else:
raise ValueError("lo_en and hi_en must be either BOTH None or BOTH an Astropy quantity.")

if central_coord is None:
central_coord = self.default_coord
cen_chunk = "ra{r}_dec{d}_".format(r=central_coord[0].value, d=central_coord[1].value)

else:

broad_prods = self.get_products(search_key, obs_id, inst, just_obj=False)
matched_prods = []

matched_prods.append(p[-1])
elif cen_chunk in p[-2] and energy_key in p[-2] and not rad_info:
matched_prods.append(p[-1])

return matched_prods

[docs]    def get_profiles(self, profile_type: str, obs_id: str = None, inst: str = None, central_coord: Quantity = None,
radii: Quantity = None, lo_en: Quantity = None, hi_en: Quantity = None) \
-> Union[BaseProfile1D, List[BaseProfile1D]]:
"""
This is the generic get method for XGA profile objects stored in this source. You still must remember
the profile type value to use it, but once entered it will return a list of all matching profiles (or a
single object if only one match is found).

:param str profile_type: The string profile type of the profile(s) you wish to retrieve.
:param str obs_id: Optionally, a specific obs_id to search for can be supplied. The default is None,
which means all profiles matching the other criteria will be returned.
:param str inst: Optionally, a specific instrument to search for can be supplied. The default is None,
which means all profiles matching the other criteria will be returned.
:param Quantity central_coord: The central coordinate of the profile you wish to retrieve, the default
is None which means the method will use the default coordinate of this source.
:param Quantity radii: The central radii of the profile points, it is not likely that this option will be
used often as you likely won't know the radial values a priori.
:param Quantity lo_en: The lower energy bound of the profile you wish to retrieve (if applicable), default
is None, and if this argument is passed hi_en must be too.
:param Quantity hi_en: The higher energy bound of the profile you wish to retrieve (if applicable), default
is None, and if this argument is passed lo_en must be too.
:return: An XGA profile object (if there is an exact match), or a list of XGA profile objects (if there
were multiple matching products).
:rtype: Union[BaseProfile1D, List[BaseProfile1D]]
"""
if "profile" in profile_type:
warn("The profile_type you passed contains the word 'profile', which is appended onto "
"a profile type by XGA, you need to try this again without profile on the end, unless"
" you gave a generic profile a type with 'profile' in.", stacklevel=2)

search_key = profile_type + "_profile"
if all([obs_id is None, inst is None]):
search_key = "combined_" + search_key

search_key = profile_type + "_profile"
if search_key not in ALLOWED_PRODUCTS:
warn("{} seems to be a custom profile, not an XGA default type. If this is not "
"true then you have passed an invalid profile type.".format(search_key), stacklevel=2)

matched_prods = self._get_prof_prod(search_key, obs_id, inst, central_coord, radii, lo_en, hi_en)
if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("Cannot find any {p} profiles matching your input.".format(p=profile_type))

return matched_prods

[docs]    def get_combined_profiles(self, profile_type: str, central_coord: Quantity = None, radii: Quantity = None,
lo_en: Quantity = None, hi_en: Quantity = None) \
-> Union[BaseProfile1D, List[BaseProfile1D]]:
"""
The generic get method for XGA profiles made using all available data which are stored in this source.
You still must remember the profile type value to use it, but once entered it will return a list
of all matching profiles (or a single object if only one match is found).

:param str profile_type: The string profile type of the profile(s) you wish to retrieve.
:param Quantity central_coord: The central coordinate of the profile you wish to retrieve, the default
is None which means the method will use the default coordinate of this source.
:param Quantity radii: The central radii of the profile points, it is not likely that this option will be
used often as you likely won't know the radial values a priori.
:param Quantity lo_en: The lower energy bound of the profile you wish to retrieve (if applicable), default
is None, and if this argument is passed hi_en must be too.
:param Quantity hi_en: The higher energy bound of the profile you wish to retrieve (if applicable), default
is None, and if this argument is passed lo_en must be too.
:return: An XGA profile object (if there is an exact match), or a list of XGA profile objects (if there
were multiple matching products).
:rtype: Union[BaseProfile1D, List[BaseProfile1D]]
"""
if "profile" in profile_type:
warn("The profile_type you passed contains the word 'profile', which is appended onto "
"a profile type by XGA, you need to try this again without profile on the end, unless"
" you gave a generic profile a type with 'profile' in.", stacklevel=2)

search_key = "combined_" + profile_type + "_profile"

if search_key not in ALLOWED_PRODUCTS:
warn("That profile type seems to be a custom profile, not an XGA default type. If this is not "
"true then you have passed an invalid profile type.", stacklevel=2)

matched_prods = self._get_prof_prod(search_key, None, None, central_coord, radii, lo_en, hi_en)
if len(matched_prods) == 1:
matched_prods = matched_prods[0]
elif len(matched_prods) == 0:
raise NoProductAvailableError("Cannot find any combined {p} profiles matching your "
"input.".format(p=profile_type))

return matched_prods

@property
def fitted_models(self) -> List[str]:
"""
This property cycles through all the available fit results, and finds the unique names of XSPEC models
that have been fitted to this source.

:return: A list of model names.
:rtype: List[str]
"""
models = []
for s_key in self._fit_results:
models += list(self._fit_results[s_key].keys())

return models

[docs]    def snr_ranking(self, outer_radius: Union[Quantity, str], lo_en: Quantity = None, hi_en: Quantity = None,
allow_negative: bool = False) -> Tuple[np.ndarray, np.ndarray]:
"""
This method generates a list of ObsID-Instrument pairs, ordered by the signal to noise measured for the
given region, with element zero being the lowest SNR, and element N being the highest.

:param Quantity/str outer_radius: The radius that SNR should be calculated within, this can either be a
named radius such as r500, or an astropy Quantity.
:param Quantity lo_en: The lower energy bound of the ratemap to use to calculate the SNR. Default is None,
in which case the lower energy bound for peak finding will be used (default is 0.5keV).
:param Quantity hi_en: The upper energy bound of the ratemap to use to calculate the SNR. Default is None,
in which case the upper energy bound for peak finding will be used (default is 2.0keV).
:param bool allow_negative: Should pixels in the background subtracted count map be allowed to go below
zero, which results in a lower signal to noise (and can result in a negative signal to noise).
:return: Two arrays, the first an N by 2 array, with the ObsID, Instrument combinations in order
of ascending signal to noise, then an array containing the order SNR ratios.
:rtype: Tuple[np.ndarray, np.ndarray]
"""
# Set up some lists for the ObsID-Instrument combos and their SNRs respectively
obs_inst = []
snrs = []
# We loop through the ObsIDs associated with this source and the instruments associated with those ObsIDs
for obs_id in self.instruments:
for inst in self.instruments[obs_id]:
# Use our handy get_snr method to calculate the SNRs we want, then add that and the
#  ObsID-inst combo into their respective lists
snrs.append(self.get_snr(outer_radius, self.default_coord, lo_en, hi_en, obs_id, inst,
allow_negative))
obs_inst.append([obs_id, inst])

# Make our storage lists into arrays, easier to work with that way
obs_inst = np.array(obs_inst)
snrs = np.array(snrs)

# We want to order the output by SNR, with the lowest being first and the highest being last, so we
#  use a numpy function to output the index order needed to re-order our two arrays
reorder_snrs = np.argsort(snrs)
# Then we use that to re-order them
snrs = snrs[reorder_snrs]
obs_inst = obs_inst[reorder_snrs]

# And return our ordered dictionaries
return obs_inst, snrs

[docs]    def count_ranking(self, outer_radius: Union[Quantity, str], lo_en: Quantity = None,
hi_en: Quantity = None) -> Tuple[np.ndarray, Quantity]:
"""
This method generates a list of ObsID-Instrument pairs, ordered by the counts measured for the
given region, with element zero being the lowest counts, and element N being the highest.

:param Quantity/str outer_radius: The radius that counts should be calculated within, this can either be a
named radius such as r500, or an astropy Quantity.
:param Quantity lo_en: The lower energy bound of the ratemap to use to calculate the counts. Default is None,
in which case the lower energy bound for peak finding will be used (default is 0.5keV).
:param Quantity hi_en: The upper energy bound of the ratemap to use to calculate the counts. Default is None,
in which case the upper energy bound for peak finding will be used (default is 2.0keV).
:return: Two arrays, the first an N by 2 array, with the ObsID, Instrument combinations in order
of ascending counts, then an array containing the order counts ratios.
:rtype: Tuple[np.ndarray, Quantity]
"""
# Set up some lists for the ObsID-Instrument combos and their cnts respectively
obs_inst = []
cnts = []
# We loop through the ObsIDs associated with this source and the instruments associated with those ObsIDs
for obs_id in self.instruments:
for inst in self.instruments[obs_id]:
cnts.append(self.get_counts(outer_radius, self.default_coord, lo_en, hi_en, obs_id, inst))
obs_inst.append([obs_id, inst])

# Make our storage lists into arrays, easier to work with that way
obs_inst = np.array(obs_inst)
cnts = Quantity(cnts)

# We want to order the output by counts, with the lowest being first and the highest being last, so we
#  use a numpy function to output the index order needed to re-order our two arrays
reorder_cnts = np.argsort(cnts)
# Then we use that to re-order them
cnts = cnts[reorder_cnts]
obs_inst = obs_inst[reorder_cnts]

# And return our ordered dictionaries'
return obs_inst, cnts

[docs]    def offset(self, off_unit: Union[Unit, str] = "arcmin") -> Quantity:
"""
This method calculates the separation between the user supplied ra_dec coordinates, and the peak
coordinates in the requested off_unit. If there is no peak attribute and error will be thrown, and if no
peak has been calculated then the result will be 0.

:param Unit/str off_unit: The unit that the offset should be in.
:return: The offset between ra_dec and peak, in the requested unit.
:rtype: Quantity
"""
# Check that the source has a peak attribute to fetch, otherwise throw error
if not hasattr(self, 'peak'):
raise AttributeError("This source does not have a peak attribute, and so an offset cannot be calculated.")

# Calculate the euclidean distance between ra_dec and peak
sep = np.sqrt(abs(self.ra_dec[0] - self.peak[0]) ** 2 + abs(self.ra_dec[1] - self.peak[1]) ** 2)
# Convert the separation to the requested unit - this will throw an error if the unit is stupid

# Return the converted separation
return conv_sep

@property
def peak_lo_en(self) -> Quantity:
"""
This property returns the lower energy bound of the image used for peak finding.

:return: A quantity containing the lower energy limit used for peak finding.
:rtype: Quantity
"""
return self._peak_lo_en

@property
def peak_hi_en(self) -> Quantity:
"""
This property returns the upper energy bound of the image used for peak finding.

:return: A quantity containing the upper energy limit used for peak finding.
:rtype: Quantity
"""
return self._peak_hi_en

@property
def use_peak(self) -> bool:
"""
This property shows whether a particular XGA source object has been setup to use peak coordinates
or not. The property is either True, False, or None (if its a BaseSource).

:return: If the source is set to use peaks, True, otherwise False.
:rtype: bool
"""
return self._use_peak

@property
def suppressed_warnings(self) -> List[str]:
"""
A property getter (with no setter) for the warnings that have been suppressed from display to the user as
the source was declared as a member of a sample.

:return: The list of suppressed warnings for this source.
:rtype: List[str]
"""
if not self._samp_member:
raise NotSampleMemberError("The source for {n} is not a member of a sample, and as such warnings have "
"not been suppressed.".format(n=self.name))
else:
return self._supp_warn

[docs]    def info(self):
"""
Very simple function that just prints a summary of important information related to the source object..
"""
print("\n-----------------------------------------------------")
print("Source Name - {}".format(self._name))
print("User Coordinates - ({0}, {1}) degrees".format(*self._ra_dec))
if self._peaks is not None:
print("X-ray Peak - ({0}, {1}) degrees".format(*self._peaks["combined"].value))
print("nH - {}".format(self.nH))
if self._redshift is not None:
print("Redshift - {}".format(round(self._redshift, 3)))
print("XMM ObsIDs - {}".format(self.__len__()))
print("PN Observations - {}".format(self.num_pn_obs))
print("MOS1 Observations - {}".format(self.num_mos1_obs))
print("MOS2 Observations - {}".format(self.num_mos2_obs))
print("On-Axis - {}".format(len(self._onaxis)))
print("With regions - {}".format(len(self._initial_regions)))
print("Total regions - {}".format(sum([len(self._initial_regions[o]) for o in self._initial_regions])))
print("Obs with 1 detection - {}".format(sum([1 for o in self._initial_region_matches if
self._initial_region_matches[o].sum() == 1])))
print("Obs with >1 matches - {}".format(sum([1 for o in self._initial_region_matches if
self._initial_region_matches[o].sum() > 1])))
print("Images associated - {}".format(len(self.get_products("image"))))
print("Exposure maps associated - {}".format(len(self.get_products("expmap"))))
print("Combined Ratemaps associated - {}".format(len(self.get_products("combined_ratemap"))))
print("Spectra associated - {}".format(len(self.get_products("spectrum"))))

if len(self._fit_results) != 0:
print("Fitted Models - {}".format(" | ".join(self.fitted_models)))

if self._regions is not None and "custom" in self._radii:
if self._redshift is not None:
else:
if len(self.get_products('combined_image')) != 0:
print("Custom Region SNR - {}".format(self.get_snr("custom", self._default_coord).round(2)))

if self._r200 is not None:
print("R200 - {}".format(self._r200))
if len(self.get_products('combined_image')) != 0:
print("R200 SNR - {}".format(self.get_snr("r200", self._default_coord).round(2)))
if self._r500 is not None:
print("R500 - {}".format(self._r500))
if len(self.get_products('combined_image')) != 0:
print("R500 SNR - {}".format(self.get_snr("r500", self._default_coord).round(2)))
if self._r2500 is not None:
print("R2500 - {}".format(self._r2500))
if len(self.get_products('combined_image')) != 0:
print("R2500 SNR - {}".format(self.get_snr("r2500", self._default_coord).round(2)))

# There's probably a neater way of doing the observables - maybe a formatting function?
if self._richness is not None and self._richness_err is not None \
and not isinstance(self._richness_err, (list, tuple, ndarray)):
print("Richness - {0}±{1}".format(self._richness, self._richness_err))
elif self._richness is not None and self._richness_err is not None \
and isinstance(self._richness_err, (list, tuple, ndarray)):
print("Richness - {0} -{1}+{2}".format(self._richness, self._richness_err[0], self._richness_err[1]))
elif self._richness is not None and self._richness_err is None:
print("Richness - {0}".format(self._richness))

if self._wl_mass is not None and self._wl_mass_err is not None \
and not isinstance(self._wl_mass_err, (list, tuple, ndarray)):
print("Weak Lensing Mass - {0}±{1}".format(self._wl_mass, self._richness_err))
elif self._wl_mass is not None and self._wl_mass_err is not None \
and isinstance(self._wl_mass_err, (list, tuple, ndarray)):
print("Weak Lensing Mass - {0} -{1}+{2}".format(self._wl_mass, self._wl_mass_err[0],
self._wl_mass_err[1]))
elif self._wl_mass is not None and self._wl_mass_err is None:
print("Weak Lensing Mass - {0}".format(self._wl_mass))

if 'get_temperature' in dir(self):
try:
tx = self.get_temperature('r500', 'constant*tbabs*apec').value.round(2)
# Just average the uncertainty for this
print("R500 Tx - {0}±{1}[keV]".format(tx[0], tx[1:].mean()))
except (ModelNotAssociatedError, NoProductAvailableError):
pass

try:
lx = self.get_luminosities('r500', 'constant*tbabs*apec', lo_en=Quantity(0.5, 'keV'),
hi_en=Quantity(2.0, 'keV')).to('10^44 erg/s').value.round(2)
print("R500 0.5-2.0keV Lx - {0}±{1}[e+44 erg/s]".format(lx[0], lx[1:].mean()))

except (ModelNotAssociatedError, NoProductAvailableError):
pass
print("-----------------------------------------------------\n")

def __len__(self) -> int:
"""
Method to return the length of the products dictionary (which means the number of
individual ObsIDs associated with this source), when len() is called on an instance of this class.

:return: The integer length of the top level of the _products nested dictionary.
:rtype: int
"""
return len(self.obs_ids)

# Was going to write this as a subclass of BaseSource, as it will behave largely the same, but I don't
#  want it declaring XGA products for tens of thousands of images etc.
# As such will replicate the base functionality of BaseSource that will allow evselect_image, expmap, cifbuild
# SAS wrappers to work.
# This does have a lot of straight copied code from BaseSource, but I don't mind in this instance
[docs]class NullSource:
"""
A useful, but very limited, source class. By default this source class will include all ObsIDs present in the
XGA census, and as such can be used for bulk generation of SAS products. It can also be made to only include
certain ObsIDs.

:param List obs: An optional list of ObsIDs to include in the NullSource, otherwise all available ObsIDs
will be included.
"""
def __init__(self, obs: List[str] = None):
"""
The method used to initiate the NullSource class.
"""
# To find all census entries with non-na coordinates
cleaned_census = CENSUS.dropna()
self._ra_dec = np.array([None, None])
# The user can specify ObsIDs to associate with the NullSource, or associate all
#  of them by leaving it as None
if obs is None:
self._name = "AllObservations"
obs = cleaned_census["ObsID"].values
else:
# I know this is an ugly nested if statements, but I only wanted to run obs_check once
obs = np.array(obs)
obs_check = [o in cleaned_census["ObsID"].values for o in obs]
# If all user entered ObsIDs are in the census, then all is fine
if all(obs_check):
self._name = "{}Observations".format(len(obs))
# If they aren't all in the census then that is decidedly not fine
elif not all(obs_check):
not_valid = np.array(obs)[~np.array(obs_check)]
raise ValueError("The following are not present in the XGA census, "
"{}".format(", ".join(not_valid)))

# Find out which
instruments = {o: [] for o in obs}
for o in obs:
if cleaned_census[cleaned_census["ObsID"] == o]["USE_PN"].values[0]:
instruments[o].append("pn")
if cleaned_census[cleaned_census["ObsID"] == o]["USE_MOS1"].values[0]:
instruments[o].append("mos1")
if cleaned_census[cleaned_census["ObsID"] == o]["USE_MOS2"].values[0]:
instruments[o].append("mos2")

# This checks that the observations have at least one usable instrument
self._obs = [o for o in obs if len(instruments[o]) > 0]
self._instruments = {o: instruments[o] for o in self._obs if len(instruments[o]) > 0}

# Here we check to make sure that there is at least one valid ObsID remaining
if len(self._obs) == 0:
raise NoValidObservationsError("After checks using the XGA census, all ObsIDs associated with this "
"NullSource are considered unusable.")

# The SAS generation routine might need this information
self._att_files = {o: xga_conf["XMM_FILES"]["attitude_file"].format(obs_id=o) for o in self._obs}

# Need the event list objects declared unfortunately
self._products = {o: {} for o in self._obs}
for o in self._obs:
for inst in self._instruments[o]:
evt_key = "clean_{}_evts".format(inst)
evt_file = xga_conf["XMM_FILES"][evt_key].format(obs_id=o)
self._products[o][inst] = {"events": EventList(evt_file, obs_id=o, instrument=inst, stdout_str="",
stderr_str="", gen_cmd="")}

# This is a queue for products to be generated for this source, will be a numpy array in practise.
# Items in the same row will all be generated in parallel, whereas items in the same column will
# be combined into a command stack and run in order.
self.queue = None
# Another attribute destined to be an array, will contain the output type of each command submitted to
# the queue array.
self.queue_type = None
# This contains an array of the paths of the final output of each command in the queue
self.queue_path = None
# This contains an array of the extra information needed to instantiate class
# after the SAS command has run
self.queue_extra_info = None

[docs]    def get_att_file(self, obs_id: str) -> str:
"""
Fetches the path to the attitude file for an XMM observation.

:param obs_id: The ObsID to fetch the attitude file for.
:return: The path to the attitude file.
:rtype: str
"""
if obs_id not in self._products:
raise NotAssociatedError("{o} is not associated with {s}".format(o=obs_id, s=self.name))
else:
return self._att_files[obs_id]

@property
def obs_ids(self) -> List[str]:
"""
Property getter for ObsIDs associated with this source that are confirmed to have events files.

:return: A list of the associated XMM ObsIDs.
:rtype: List[str]
"""
return self._obs

@property
def instruments(self) -> Dict:
"""
A property of a source that details which instruments have valid data for which observations.

:return: A dictionary of ObsIDs and their associated valid instruments.
:rtype: Dict
"""
return self._instruments

[docs]    def update_queue(self, cmd_arr: np.ndarray, p_type_arr: np.ndarray, p_path_arr: np.ndarray,
extra_info: np.ndarray, stack: bool = False):
"""
Small function to update the numpy array that makes up the queue of products to be generated.

:param np.ndarray cmd_arr: Array containing SAS commands.
:param np.ndarray p_type_arr: Array of product type identifiers for the products generated
by the cmd array. e.g. image or expmap.
:param np.ndarray p_path_arr: Array of final product paths if cmd is successful
:param np.ndarray extra_info: Array of extra information dictionaries
:param stack: Should these commands be executed after a preceding line of commands,
or at the same time.
:return:
"""
if self.queue is None:
# I could have done all of these in one array with 3 dimensions, but felt this was easier to read
# and with no real performance penalty
self.queue = cmd_arr
self.queue_type = p_type_arr
self.queue_path = p_path_arr
self.queue_extra_info = extra_info
elif stack:
self.queue = np.vstack((self.queue, cmd_arr))
self.queue_type = np.vstack((self.queue_type, p_type_arr))
self.queue_path = np.vstack((self.queue_path, p_path_arr))
self.queue_extra_info = np.vstack((self.queue_extra_info, extra_info))
else:
self.queue = np.append(self.queue, cmd_arr, axis=0)
self.queue_type = np.append(self.queue_type, p_type_arr, axis=0)
self.queue_path = np.append(self.queue_path, p_path_arr, axis=0)
self.queue_extra_info = np.append(self.queue_extra_info, extra_info, axis=0)

[docs]    def get_queue(self) -> Tuple[List[str], List[str], List[List[str]], List[dict]]:
"""
Calling this indicates that the queue is about to be processed, so this function combines SAS
commands along columns (command stacks), and returns N SAS commands to be run concurrently,
where N is the number of columns.

:return: List of strings, where the strings are bash commands to run SAS procedures, another
list of strings, where the strings are expected output types for the commands, a list of
lists of strings, where the strings are expected output paths for products of the SAS commands.
:rtype: Tuple[List[str], List[str], List[List[str]]]
"""
if self.queue is None:
# This returns empty lists if the queue is undefined
processed_cmds = []
types = []
paths = []
extras = []
elif len(self.queue.shape) == 1 or self.queue.shape[1] <= 1:
processed_cmds = list(self.queue)
types = list(self.queue_type)
paths = [[str(path)] for path in self.queue_path]
extras = list(self.queue_extra_info)
else:
processed_cmds = [";".join(col) for col in self.queue.T]
types = list(self.queue_type[-1, :])
paths = [list(col.astype(str)) for col in self.queue_path.T]
extras = []
for col in self.queue_path.T:
# This nested dictionary comprehension combines a column of extra information
# dictionaries into one, for ease of access.
comb_extra = {k: v for ext_dict in col for k, v in ext_dict.items()}
extras.append(comb_extra)

# This is only likely to be called when processing is beginning, so this will wipe the queue.
self.queue = None
self.queue_type = None
self.queue_path = None
self.queue_extra_info = None
# The returned paths are lists of strings because we want to include every file in a stack to be able
# to check that exists
return processed_cmds, types, paths, extras

[docs]    def update_products(self, prod_obj: BaseProduct):
"""
This method will ONLY store images and exposure maps. Ideally I wouldn't store them as product objects
at all, but unfortunately exposure maps require an image to be generated. Unlike all other source classes,
ratemaps will not be generated when matching images and exposure maps are added.

:param BaseProduct prod_obj: The new product object to be added to the source object.
"""
if not isinstance(prod_obj, (BaseProduct, BaseAggregateProduct)) and prod_obj is not None:
raise TypeError("Only product objects can be assigned to sources.")
elif prod_obj.type != 'image' and prod_obj.type != 'expmap':
raise TypeError("Only images and exposure maps can be stored in a NullSource, {} objects "
"cannot".format(prod_obj.type))

if prod_obj is not None:
en_bnds = prod_obj.energy_bounds
extra_key = "bound_{l}-{u}".format(l=float(en_bnds[0].value), u=float(en_bnds[1].value))
# As the extra_key variable can be altered if the Image is PSF corrected, I'll also make
#  this variable with just the energy key
en_key = "bound_{l}-{u}".format(l=float(en_bnds[0].value), u=float(en_bnds[1].value))

# Secondary checking step now I've added PSF correction
if type(prod_obj) == Image and prod_obj.psf_corrected:
extra_key += "_" + prod_obj.psf_model + "_" + str(prod_obj.psf_bins) + "_" + prod_obj.psf_algorithm + \
str(prod_obj.psf_iterations)

# All information about where to place it in our storage hierarchy can be pulled from the product
# object itself
obs_id = prod_obj.obs_id
inst = prod_obj.instrument
p_type = prod_obj.type

# Double check that something is trying to add products from another source to the current one.
if obs_id != "combined" and obs_id not in self._products:
raise NotAssociatedError("{o} is not associated with this null source.".format(o=obs_id))
elif inst != "combined" and inst not in self._products[obs_id]:
raise NotAssociatedError(
"{i} is not associated with XMM observation {o}".format(i=inst, o=obs_id))

if extra_key not in self._products[obs_id][inst]:
self._products[obs_id][inst][extra_key] = {}
self._products[obs_id][inst][extra_key][p_type] = prod_obj

[docs]    def get_products(self, p_type: str, obs_id: str = None, inst: str = None, extra_key: str = None,
just_obj: bool = True) -> List[BaseProduct]:
"""
This is the getter for the products data structure of Source objects. Passing a 'product type'
such as 'events' or 'images' will return every matching entry in the products data structure.

:param str p_type: Product type identifier. e.g. image or expmap.
:param str obs_id: Optionally, a specific obs_id to search can be supplied.
:param str inst: Optionally, a specific instrument to search can be supplied.
:param str extra_key: Optionally, an extra key (like an energy bound) can be supplied.
:param bool just_obj: A boolean flag that controls whether this method returns just the product objects,
or the other information that goes with it like ObsID and instrument.
:return: List of matching products.
:rtype: List[BaseProduct]
"""

def unpack_list(to_unpack: list):
"""
A recursive function to go through every layer of a nested list and flatten it all out. It
doesn't return anything because to make life easier the 'results' are appended to a variable
in the namespace above this one.

:param list to_unpack: The list that needs unpacking.
"""
# Must iterate through the given list
for entry in to_unpack:
# If the current element is not a list then all is chill, this element is ready for appending
# to the final list
if not isinstance(entry, list):
out.append(entry)
else:
# If the current element IS a list, then obviously we still have more unpacking to do,
# so we call this function recursively.
unpack_list(entry)

if obs_id not in self._products and obs_id is not None:
raise NotAssociatedError("{o} is not associated with {s}.".format(o=obs_id, s=self.name))
elif inst not in XMM_INST and inst is not None:
raise ValueError("{} is not an allowed instrument".format(inst))

matches = []
# Iterates through the dict search return, but each match is likely to be a very nested list,
# with the degree of nesting dependant on product type (as event lists live a level up from
# images for instance
for match in dict_search(p_type, self._products):
out = []
unpack_list(match)
# Only appends if this particular match is for the obs_id and instrument passed to this method
# Though all matches will be returned if no obs_id/inst is passed
if (obs_id == out[0] or obs_id is None) and (inst == out[1] or inst is None) \
and (extra_key in out or extra_key is None) and not just_obj:
matches.append(out)
elif (obs_id == out[0] or obs_id is None) and (inst == out[1] or inst is None) \
and (extra_key in out or extra_key is None) and just_obj:
matches.append(out[-1])
return matches

# This is used to name files and directories so this is not allowed to change.
@property
def name(self) -> str:
"""
The name of the source, either given at initialisation or generated from the user-supplied coordinates.

:return: The name of the source.
:rtype: str
"""
return self._name

@property
def num_pn_obs(self) -> int:
"""
Getter method that gives the number of PN observations.

:return: Integer number of PN observations associated with this source
:rtype: int
"""
return len([o for o in self.obs_ids if 'pn' in self._products[o]])

@property
def num_mos1_obs(self) -> int:
"""
Getter method that gives the number of MOS1 observations.

:return: Integer number of MOS1 observations associated with this source
:rtype: int
"""
return len([o for o in self.obs_ids if 'mos1' in self._products[o]])

@property
def num_mos2_obs(self) -> int:
"""
Getter method that gives the number of MOS2 observations.

:return: Integer number of MOS2 observations associated with this source
:rtype: int
"""
return len([o for o in self.obs_ids if 'mos2' in self._products[o]])

[docs]    def info(self):
"""
Just prints a couple of pieces of information about the NullSource
"""
print("\n-----------------------------------------------------")
print("Source Name - {}".format(self._name))
print("XMM ObsIDs - {}".format(self.__len__()))
print("PN Observations - {}".format(self.num_pn_obs))
print("MOS1 Observations - {}".format(self.num_mos1_obs))
print("MOS2 Observations - {}".format(self.num_mos2_obs))
print("-----------------------------------------------------\n")

def __len__(self) -> int:
"""
Method to return the length of the products dictionary (which means the number of
individual ObsIDs associated with this source), when len() is called on an instance of this class.

:return: The integer length of the top level of the _products nested dictionary.
:rtype: int
"""
return len(self.obs_ids)