Source code for xga.products.lightcurve

#  This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS).
#  Last modified by David J Turner (turne540@msu.edu) 18/11/2025, 22:52. Copyright (c) The Contributors
import re
from datetime import datetime
from typing import Union, List, Tuple, Dict
from warnings import warn

import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
from astropy.time import Time, TimeDelta
from astropy.units import Quantity, Unit, UnitConversionError
from fitsio import FITS, FITSHDR, read_header
from matplotlib.axes import Axes
from matplotlib.figure import Figure

from xga.exceptions import FailedProductError, IncompatibleProductError, NotAssociatedError, XGADeveloperError, \
    TelescopeNotAssociatedError
from xga.products import BaseProduct, BaseAggregateProduct
from xga.utils import dict_search


[docs] class LightCurve(BaseProduct): """ This is the XGA LightCurve product class, which is used to interface with X-ray lightcurves generated for a variety of sources. It provides simple access to data and information about the lightcurve, fitting capabilities, and the ability to easily create lightcurve visualisations. :param str path: The path to the lightcurve. :param str obs_id: The ObsID from which this lightcurve was generated. :param str instrument: The instrument from which this lightcurve. :param str stdout_str: The stdout from the generation process. :param str stderr_str: The stderr for the generation process. :param str gen_cmd: The generation command for the lightcurve. :param Quantity central_coord: The central coordinate of the region from which this lightcurve was extracted. :param Quantity inn_rad: The inner radius of the lightcurve region. :param Quantity out_rad: The outer radius of the lightcurve region. :param Quantity lo_en: The lower energy bound for this lightcurve. :param Quantity hi_en: The upper energy bound for this lightcurve. :param Quantity time_bin_size: The time bin size used to generate the lightcurve. :param str pattern_expr: The event selection pattern used to generate the lightcurve. :param bool region: Whether this was generated from a region in a region file :param bool is_back_sub: Whether this lightcurve is background subtracted or not. :param str telescope: The telescope that this product is derived from. Default is None. """ def __init__(self, path: str, obs_id: str, instrument: str, stdout_str: str, stderr_str: str, gen_cmd: str, central_coord: Quantity, inn_rad: Quantity, out_rad: Quantity, lo_en: Quantity, hi_en: Quantity, time_bin_size: Quantity, pattern_expr: str = "", region: bool = False, is_back_sub: bool = True, telescope: str = None): """ This is the XGA LightCurve product class, which is used to interface with X-ray lightcurves generated for a variety of sources. It provides simple access to data and information about the lightcurve, fitting capabilities, and the ability to easily create lightcurve visualisations. :param str path: The path to the lightcurve. :param str obs_id: The ObsID from which this lightcurve was generated. :param str instrument: The instrument from which this lightcurve. :param str stdout_str: The stdout from the generation process. :param str stderr_str: The stderr for the generation process. :param str gen_cmd: The generation command for the lightcurve. :param Quantity central_coord: The central coordinate of the region from which this lightcurve was extracted. :param Quantity inn_rad: The inner radius of the lightcurve region. :param Quantity out_rad: The outer radius of the lightcurve region. :param Quantity lo_en: The lower energy bound for this lightcurve. :param Quantity hi_en: The upper energy bound for this lightcurve. :param Quantity time_bin_size: The time bin size used to generate the lightcurve. :param str pattern_expr: The event selection pattern used to generate the lightcurve. :param bool region: Whether this was generated from a region in a region file :param bool is_back_sub: Whether this lightcurve is background subtracted or not. :param str telescope: The telescope that this product is derived from. Default is None. """ # A validity check to help remind me to pass the telescope to the super-class init when this merges with # multi-mission XGA if hasattr(super(), 'telescope'): raise XGADeveloperError("S3 streaming event lists have been merged into multi-mission XGA, and the " "call to BaseProduct init in EventList needs to be updated.") else: self._tele = telescope # Call the BaseProduct init, sets up some attributes # TODO Support streaming of remote data super().__init__(path, obs_id, instrument, stdout_str, stderr_str, gen_cmd, None) # Set the product type self._prod_type = "lightcurve" # Store the size of the time binning used to generate the lightcurve as an attribute self._time_bin = time_bin_size.to('s') # Checking the passed event selection pattern if self.telescope == 'xmm': # Unfortunate local import to avoid circular import errors from ..sas import check_pattern self._pattern_expr, self._pattern_name = check_pattern(pattern_expr) else: self._pattern_expr = "" self._pattern_name = "" # Put the energy bounds into an attribute self._energy_bounds = (lo_en, hi_en) # Storing the central coordinate of this light curve self._central_coord = central_coord # Storing the region information self._inner_rad = inn_rad self._outer_rad = out_rad # And also the shape of the region if self._inner_rad.isscalar: self._shape = 'circular' else: self._shape = 'elliptical' # This describes whether this spectrum was generated directly from a region present in a region file self._region = region lc_storage_name = "bound_{l}-{u}_ra{ra}_dec{dec}_ri{ri}_ro{ro}" if not self._region and self.inner_rad.isscalar: lc_storage_name = lc_storage_name.format(ra=self.central_coord[0].value, dec=self.central_coord[1].value, ri=self._inner_rad.value, ro=self._outer_rad.value, l=self.energy_bounds[0].to('keV').value, u=self.energy_bounds[1].to('keV').value) elif not self._region and not self._inner_rad.isscalar: inn_rad_str = 'and'.join(self._inner_rad.value.astype(str)) out_rad_str = 'and'.join(self._outer_rad.value.astype(str)) lc_storage_name = lc_storage_name.format(ra=self.central_coord[0].value, dec=self.central_coord[1].value, ri=inn_rad_str, ro=out_rad_str, l=self.energy_bounds[0].to('keV').value, u=self.energy_bounds[1].to('keV').value) else: lc_storage_name = "region" # Add the time bin information lc_storage_name += "_timebin{tb}_pattern{p}".format(tb=time_bin_size.value, p=self._pattern_name) # And we save the completed key to an attribute self._storage_key = lc_storage_name # The definition of this product has defined whether the data in 'RATE' are background subtracted or not, and # that info is stored in this attribute - changes how we read in data self._is_back_sub = is_back_sub # Here we set up attributes to store the various information we can pull from a light curve file - they # are all initially set to None because we only read the information into memory if the user actually # calls one of the properties which uses one of these attributes # This will store the background subtracted count rate and uncertainty self._bck_sub_cnt_rate = None self._bck_sub_cnt_rate_err = None # This should store the values (in counts/per second) for the source, instrumental-effect-corrected, count- # rate of the light curve self._src_cnt_rate = None # The count-rate should be accompanied by uncertainties, and they will live in this attribute self._src_cnt_rate_err = None # This stores an important quantity, the reference time for the light curve time values self._ref_time = None # This stores the 'time system' - so for default XMM lightcurves for instance it will be TT (terrestrial time) self._time_sys = None # Here we store the x-axis, the time steps which the count-rates are attributed to self._time = None # The correction factor for each data point - should include vignetting and deadtime correction self._frac_exp = None # The background count-rate and count-rate uncertainty, scaled to account for the area difference between # the source and background regions self._bck_cnt_rate = None self._bck_cnt_rate_err = None # These attributes store 2D quantities that describe the start and end points of the good-time-intervals for # the source and background light-curves. The zeroth column is start, and the other column is stop self._src_gti = None self._bck_gti = None # In these attributes we store the XMM recorded start and stop times for this light curve, as well as the # place where the time values were assigned (i.e. relative to satellite or barycentre of the solar system) self._time_start = None self._time_stop = None self._time_assign = None # This just stores whether the data have been read into memory or not (set by the _read_on_demand method) self._read_in = False # This is used to store the header of the main data table, but again it is only read in its entirety if the # user actually asks for it through the 'header' property self._header = None # Defining properties first @property def storage_key(self) -> str: """ This property returns the storage key which this object assembles to place the LightCurve in an XGA source's storage structure. The key is based on the properties of the LightCurve, and some configuration options, and is basically human-readable. :return: String storage key. :rtype: str """ return self._storage_key @property def central_coord(self) -> Quantity: """ This property provides the central coordinates (RA-Dec) of the region that this light curve was generated from. :return: Astropy Quantity object containing the central coordinate in degrees. :rtype: Quantity """ return self._central_coord @property def shape(self) -> str: """ Returns the shape of the outer edge of the region this light curve was generated from. :return: The shape (either circular or elliptical). :rtype: str """ return self._shape @property def inner_rad(self) -> Quantity: """ Gives the inner radius (if circular) or radii (if elliptical - semi-major, semi-minor) of the region in which this light curve was generated. :return: The inner radius(ii) of the region. :rtype: Quantity """ return self._inner_rad @property def outer_rad(self) -> Quantity: """ Gives the outer radius (if circular) or radii (if elliptical - semi-major, semi-minor) of the region in which this light curve was generated. :return: The outer radius(ii) of the region. :rtype: Quantity """ return self._outer_rad @property def time_bin_size(self) -> Quantity: """ Gives the time bin size used to generate the lightcurve. :return: The time bin size used to generate the lightcurve. :rtype: Quantity """ return self._time_bin @property def count_rate(self) -> Quantity: """ Returns the background-subtracted instrumental-effect-corrected count-rates for this light curve. :return: Background-subtracted instrumental-effect-corrected count-rates, in units of ct/s. :rtype: Quantity """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._bck_sub_cnt_rate @property def count_rate_err(self) -> Quantity: """ Returns the background-subtracted instrumental-effect-corrected count-rate uncertainties for this light curve. :return: Background-subtracted instrumental-effect-corrected count-rate uncertainties, in units of ct/s. :rtype: Quantity """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._bck_sub_cnt_rate_err @property def src_count_rate(self) -> Quantity: """ Returns the source instrumental-effect-corrected count-rates for this light curve. :return: Source instrumental-effect-corrected count-rates, in units of ct/s. :rtype: Quantity """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._src_cnt_rate @property def src_count_rate_err(self) -> Quantity: """ Returns the source instrumental-effect-corrected count-rate uncertainties for this light curve. :return: Source instrumental-effect-corrected count-rate uncertainties, in units of ct/s. :rtype: Quantity """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._src_cnt_rate_err @property def ref_time(self) -> Time: """ Returns the reference time for this lightcurve, which is what the 'time' values are calculated from. :return: An Astropy Time object that defines the reference time for this lightcurve. :rtype: Time """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._ref_time @property def time_system(self) -> str: """ Returns the time system for this lightcurve; e.g. TT or terrestrial time. :return: The time system. :rtype: str """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._time_sys @property def time(self) -> Quantity: """ Returns the time steps that correspond to the count-rates measured for this light curve :return: Background-subtracted and instrumental-effect-corrected count-rate uncertainties, in units of seconds. :rtype: Quantity """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._time @property def datetime(self) -> datetime: """ Returns the time steps for this light curve, but in a datetime format, and no longer relative to a reference time. :return: The absolute datetimes that the time steps correspond to. :rtype: np.ndarray(datetime) """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return (self.ref_time + TimeDelta(self.time, format='sec', scale=self.time_system.lower())).to_datetime() @property def bck_count_rate(self) -> Quantity: """ Returns the background count-rates for this light curve. :return: Background count-rates, in units of ct/s. :rtype: Quantity """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._bck_cnt_rate @property def bck_count_rate_err(self) -> Quantity: """ Returns the background count-rate uncertainties for this light curve. :return: Background count-rate uncertainties, in units of ct/s. :rtype: Quantity """ # The version of read on demand for this class will itself check to see if the data have already been # read in, so we don't need to check that here - this method will read our LC data from disk into this # XGA product instance self._read_on_demand() return self._bck_cnt_rate_err @property def frac_exp(self) -> Quantity: """ This should contain the correction factor for which all sensitivity effects (dead time, vignetting) are taken into account - i.e. dividing by this should make lightcurves across different instruments/telescope consistent. :return: Fractional exposure. :rtype: Quantity """ # Make sure the file is read in self._read_on_demand() return self._frac_exp @property def src_gti(self) -> Quantity: """ Returns a 2D quantity with start (column 0) and end (column 1) times for the good-time-intervals of the source light curve. :return: A 2D astropy quantity with start (column 0) and end (column 1) times for the source good-time-intervals (in seconds). :rtype: Quantity """ # Make sure the GTI information is read into memory self._read_on_demand() return self._src_gti @property def bck_gti(self) -> Quantity: """ Returns a 2D quantity with start (column 0) and end (column 1) times for the good-time-intervals of the background light curve. :return: A 2D astropy quantity with start (column 0) and end (column 1) times for the background good-time-intervals (in seconds). :rtype: Quantity """ # Make sure the GTI information is read into memory self._read_on_demand() return self._bck_gti @property def start_time(self) -> Quantity: """ A property getter to access the recorded start time for this light curve. :return: Light curve start time, in seconds. :rtype: Quantity """ self._read_on_demand() return self._time_start @property def stop_time(self) -> Quantity: """ A property getter to access the recorded stop time for this light curve. :return: Light curve stop time, in seconds. :rtype: Quantity """ self._read_on_demand() return self._time_stop @property def start_datetime(self) -> datetime: """ A property getter to access the recorded start datetime for this light curve. :return: Light curve start datetime. :rtype: datetime """ self._read_on_demand() return (self.ref_time + TimeDelta(self._time_start, format='sec', scale=self.time_system.lower())).to_datetime() @property def stop_datetime(self) -> datetime: """ A property getter to access the recorded stop datetime for this light curve. :return: Light curve stop datetime. :rtype: datetime """ self._read_on_demand() return (self.ref_time + TimeDelta(self._time_stop, format='sec', scale=self.time_system.lower())).to_datetime() @property def time_assign(self) -> str: """ A property getter to access the physical location that the assigned times are based on. :return: The TASSIGN entry of the light curve file. :rtype: str """ self._read_on_demand() return self._time_assign @property def header(self) -> FITSHDR: """ Property getter allowing access to the main fits header of the light curve. :return: The header of the primary data table (RATE) of the light curve that was read in. :rtype: FITSHDR """ if self._header is None and self.usable: self._header = read_header(self.path) elif not self.usable: reasons = ", ".join(self.not_usable_reasons) raise FailedProductError("SAS failed to generate this product successfully, so you cannot access " "the header from it; reason given is {}.".format(reasons)) return self._header # Then define internal methods def _read_on_demand(self): """ This method is called by properties that deliver data to the user, either directly or via other methods of this class, such as view(). It will ensure that the data haven't already been read from the source file into memory, and that the source file has actually been classed as usable, and then read the relevant data into attributes of this class. """ # TODO The way this is laid out is a bit weird # Usable flag to check that nothing went wrong in the light-curve generation, and the _read_in flag to # check that we haven't already read this in to memory - no sense doing it again if self.usable and not self._read_in: with (FITS(self.path) as all_lc): # This chunk reads out the various columns of the 'RATE' entry in the light curve file, storing # them in suitably unit-ed astropy quantities if self._is_back_sub: # TODO I should email the XMM help desk about this and double check self._bck_sub_cnt_rate = Quantity(all_lc['RATE'].read_column('RATE'), 'ct/s') if 'ERROR' in all_lc['RATE'].get_colnames(): self._bck_sub_cnt_rate_err = Quantity(all_lc['RATE'].read_column('ERROR'), 'ct/s') else: self._bck_sub_cnt_rate_err = Quantity(all_lc['RATE'].read_column('RATE_ERR'), 'ct/s') if np.isnan(self._bck_sub_cnt_rate).any(): good_ent = np.where(~np.isnan(self._bck_sub_cnt_rate)) self._bck_sub_cnt_rate = self._bck_sub_cnt_rate[good_ent] self._bck_sub_cnt_rate_err = self._bck_sub_cnt_rate_err[good_ent] else: good_ent = np.arange(0, len(self._bck_sub_cnt_rate)) else: # If we weren't told that the rate data are background subtracted when the light curve was # declared, then we store the values in the source count rate attributes self._src_cnt_rate = Quantity(all_lc['RATE'].read_column('RATE'), 'ct/s') if 'ERROR' in all_lc['RATE'].get_colnames(): self._src_cnt_rate_err = Quantity(all_lc['RATE'].read_column('ERROR'), 'ct/s') else: self._src_cnt_rate_err = Quantity(all_lc['RATE'].read_column('RATE_ERR'), 'ct/s') if np.isnan(self._src_cnt_rate).any(): good_ent = np.where(~np.isnan(self._src_cnt_rate)) self._src_cnt_rate = self._src_cnt_rate[good_ent] self._src_cnt_rate_err = self._src_cnt_rate_err[good_ent] else: good_ent = np.arange(0, len(self._src_cnt_rate)) self._time = Quantity(all_lc['RATE'].read_column('TIME'), 's')[good_ent] self._frac_exp = Quantity(all_lc['RATE'].read_column('FRACEXP'))[good_ent] if "BACKV" in all_lc['RATE'].read_column('RATE'): self._bck_cnt_rate = Quantity(all_lc['RATE'].read_column('BACKV'), 'ct/s')[good_ent] self._bck_cnt_rate_err = Quantity(all_lc['RATE'].read_column('BACKE'), 'ct/s')[good_ent] else: self._bck_cnt_rate = Quantity(np.full(len(all_lc['RATE'].read_column('TIME')), np.nan), 'ct/s')[good_ent] self._bck_cnt_rate_err = Quantity(np.full(len(all_lc['RATE'].read_column('TIME')), np.nan), 'ct/s')[good_ent] # Grab the start, stop, and time assign values from the overall header of the light curve hdr = all_lc['RATE'].read_header() if 'TSTART' in hdr: self._time_start = Quantity(hdr['TSTART'], 's') elif 'TSTARTI' in hdr and 'TSTARTF' in hdr: self._time_start = Quantity(hdr['TSTARTI']+hdr['TSTARTF'], 's') else: raise KeyError("Neither TSTART or TSTARTI/TSTARTF are present in the light curve header.") if 'TSTOP' in hdr: self._time_stop = Quantity(hdr['TSTOP'], 's') elif 'TSTOPI' in hdr and 'TSTOPF' in hdr: self._time_stop = Quantity(hdr['TSTOPI']+hdr['TSTOPF'], 's') else: raise KeyError("Neither TSTOP or TSTOPI/TSTOPF are present in the light curve header.") if 'TASSIGN' in hdr: self._time_assign = hdr['TASSIGN'] else: self._time_assign = None if 'MJDREF' in hdr: self._ref_time = Time(hdr['MJDREF'], format='mjd') elif 'MJDREFI' in hdr and 'MJDREFF' in hdr: self._ref_time = Time(hdr['MJDREFI']+hdr['MJDREFF'], format='mjd') self._time_sys = hdr['TIMESYS'] # TODO NEED TO ASK EROSITA TEAM IF THE COMBINED LIGHTCURVES ARE MEANT TO HAVE GTIs WRITTEN # I now read in the GTIs after dealing with the header keywords because I might need to construct # my own GTI entry if there is no specific GTI in the lightcurve if self.telescope == 'xmm': # Here we read out the beginning and end times of the GTIs for source and background self._src_gti = Quantity([all_lc['SRC_GTIS'].read_column('START'), all_lc['SRC_GTIS'].read_column('STOP')], 's').T self._bck_gti = Quantity([all_lc['BKG_GTIS'].read_column('START'), all_lc['BKG_GTIS'].read_column('STOP')], 's').T elif ((self.telescope == 'erosita' or self.telescope == 'erass') and 'SRCGTI{}'.format(self.instrument[-1]) in all_lc): self._src_gti = Quantity([all_lc['SRCGTI{}'.format(self.instrument[-1])].read_column('START'), all_lc['SRCGTI{}'.format(self.instrument[-1])].read_column('STOP')], 's').T self._bck_gti = self._src_gti elif "STDGTI" in all_lc: self._src_gti = Quantity([all_lc['STDGTI'].read_column('START'), all_lc['STDGTI'].read_column('STOP')], 's').T self._bck_gti = self._src_gti else: self._src_gti = Quantity([[self._time_start.value, self._time_stop.value]], 's') self._bck_gti = self._src_gti # TODO add calculation for error prop of src-bck or bck+bckcorr # And set this attribute to make sure that no further reading in is done self._read_in = True elif not self.usable: reasons = ", ".join(self.not_usable_reasons) raise FailedProductError("Failed to generate this product successfully, so you cannot access " "data from it; reason given is {}.".format(reasons)) # Then define user-facing methods
[docs] def overlap_check(self, lightcurves: Union['LightCurve', List['LightCurve']]) -> Union[np.ndarray, bool]: """ A simple method which checks whether a passed LightCurve (or list of lightcurves) overlap temporally with this lightcurve. :param LightCurve/List[LightCurve] lightcurves: A LightCurve, or a list of LightCurves, to check for overlap with this LightCurve. :rtype: np.ndarray/bool :return: A boolean value (or an array of boolean values if multiple LightCurve instances were passed) which is True if the passed LightCurve temporally overlaps with this one, and False if it does not. """ # Make sure that the input is iterable to normalise the behaviour later on if isinstance(lightcurves, LightCurve): lightcurves = [lightcurves] # Grabs the start and stop datetimes (which take into account the reference times of each lightcurve) for # the passed lightcurves, puts them all in non-scalar quantities starts = np.array([lc.start_datetime for lc in lightcurves]) ends = np.array([lc.stop_datetime for lc in lightcurves]) # Simply constructs a boolean array that tells us if each lightcurve starts in, finishes in, or completely # encloses the light curve we're checking against overlap = ((starts >= self.start_datetime) & (starts < self.stop_datetime)) | \ ((ends >= self.start_datetime) & (ends < self.stop_datetime)) | \ ((starts <= self.start_datetime) & (ends >= self.stop_datetime)) if len(overlap) == 1: overlap = overlap[0] return overlap
[docs] def get_data(self, date_time: bool = False, fracexp_corr: bool = False) \ -> Tuple[Quantity, Quantity, Union[TimeDelta, np.ndarray]]: """ A get method to retrieve the count-rate (and error) and timing data from this LightCurve. Alternatively, the 'count_rate', 'count_rate_err', and 'time' properties can be used to access the information. This implementation is meant to be analogous to the 'get_data' method of the AggregateLightCurve class. :param bool date_time: Whether the time data should be returned as an array of datetimes (not the default), or an Astropy TimeDelta object from the MJD reference time defined in the file header. :param bool fracexp_corr: Controls whether the data should be corrected for vignetting and deadtime effects by dividing by the 'FRACEXP' entry in the lightcurve. Default is False. :return: The count rate data, count rate uncertainty data, and time data. :rtype: Tuple[Quantity, Quantity, Union[TimeDelta, np.ndarray]] """ # Read out the count-rate and count-rate-error data, applying fractional exposure correction # if the user requested it if fracexp_corr: cr_data = self.count_rate / self.frac_exp # TODO THIS SCALING OF ERRORS IS PROBABLY WRONG? cr_err_data = self.count_rate_err / self.frac_exp else: cr_data = self.count_rate cr_err_data = self.count_rate_err # Grab the time data t_data = self.datetime # If the user wants the time data as a TimeDelta from the reference MJD time then calculate that if not date_time: t_data = (Time(t_data) - self.ref_time).sec # Return the requested information return cr_data, cr_err_data, t_data
[docs] def get_view(self, ax: Axes, time_unit: Union[str, Unit] = Unit('s'), lo_time_lim: Quantity = None, hi_time_lim: Quantity = None, colour: str = 'black', plot_sep: bool = False, src_colour: str = 'tab:cyan', bck_colour: str = 'firebrick', custom_title: str = None, label_font_size: int = 15, title_font_size: int = 18, highlight_bad_times: bool = True, fracexp_corr: bool = False, alpha: float = 0.8) -> Axes: """ A method that allows the user to retrieve a populated lightcurve visualisation axes, in a form that allows them to then add their own plots in additon to what has been automatically constructed. This is an alternative to the view method, which calls this method and then displays the visualisation as constructed here. :param Axes ax: The matplotlib axes that should be populated with a lightcurve visualization. :param str/Unit time_unit: The unit to be used for the time axis. :param Quantity lo_time_lim: The lower x-limit (i.e. lower time limit) of the data to be displayed. :param Quantity hi_time_lim: The upper x-limit (i.e. upper time limit) of the data to be displayed. :param str colour: The colour to be used to plot data points (if background and source lightcurves are not plotted separately). :param bool plot_sep: Should the source and background lightcurves be plotted separately. Default is False. :param str src_colour: The colour to be used to plot source lightcurve data points, if plot_sep is True. :param str bck_colour: The colour to be used to plot background lightcurve data points, if plot_sep is True. :param str custom_title: A title to be added to the axes, which would override the automatically constructed figure title. :param int label_font_size: The fontsize to be used for labels. :param int title_font_size: The fontsize to be used for the title. :param bool highlight_bad_times: Should periods of time that are NOT within a GTI be highlighted? Default is True. :param bool fracexp_corr: Controls whether the plotted data should be corrected for vignetting and deadtime effects by dividing by the 'FRACEXP' entry in the lightcurve. Default is False. :param float alpha: The alpha value to be used for the plotted data. Default is 0.8. :return: The input Axes, but populated with a lightcurve visualisation. :rtype: Axes """ if isinstance(time_unit, str): time_unit = Unit(time_unit) if not self.time.unit.is_equivalent(time_unit): raise UnitConversionError("You have supplied a 'time_unit' that cannot be converted to seconds.") time_x = self.time.to(time_unit) - self.start_time.to(time_unit) if lo_time_lim is None: lo_time_lim = time_x.min() elif lo_time_lim is not None and lo_time_lim.unit.is_equivalent(time_unit): lo_time_lim = lo_time_lim.to(time_unit) if hi_time_lim is None: hi_time_lim = time_x.max() elif hi_time_lim is not None and hi_time_lim.unit.is_equivalent(time_unit): hi_time_lim = hi_time_lim.to(time_unit) # If the user wants the FRAC_EXP correction to be applied, we set the correction array to those values. # Otherwise it is just left as ones if fracexp_corr: corr_arr = self.frac_exp else: corr_arr = np.ones(len(self.frac_exp)) if plot_sep: if self.src_count_rate is None: raise ValueError("This light-curve is background subtracted, so we cannot plot the total and " "background separately.") ax.errorbar(time_x.value / corr_arr, self.src_count_rate.value / corr_arr, yerr=self.src_count_rate_err.value / corr_arr, capsize=2, color=src_colour, label='Source', fmt='x', alpha=alpha) ax.errorbar(time_x.value / corr_arr, self.bck_count_rate.value / corr_arr, yerr=self.bck_count_rate_err.value / corr_arr, capsize=2, color=bck_colour, label='Background', fmt='x', alpha=alpha) else: ax.errorbar(time_x.value / corr_arr, self.count_rate.value / corr_arr, yerr=self.count_rate_err.value / corr_arr, capsize=2, color=colour, label='Background subtracted', fmt='x', alpha=alpha) if highlight_bad_times and (len(self.src_gti) != 1 or self.src_gti[0, 0] != self.start_time or self.src_gti[0, 1] != self.stop_time): for ind in range(len(self.src_gti)): if ind == 0: # This is where the first GTI DOESN'T begin with the start of the observation - so there is a # bad period between the start of the LC time coverage and the first GTI start if (self.src_gti[ind, 0] - self.start_time).to('s') != 0: bad_start = Quantity(0, 's') else: bad_start = self.src_gti[ind, 1] - self.start_time.to(time_unit) # Here the first BAD time interval goes from the end of the first GTI to the start of the # next one, OR to the end of the obs if there isn't a next one if len(self.src_gti) != 1: bad_stop = self.src_gti[ind + 1, 0] - self.start_time.to(time_unit) else: bad_stop = self.stop_time - self.start_time # bad_stop = self.src_gti[ind, 0] - self.start_time.to(time_unit) label = "Bad time interval" elif ind == (len(self.src_gti)-1): bad_start = self.src_gti[ind, 1] bad_stop = self.stop_time - self.start_time label = "" else: bad_start = self.src_gti[ind, 1] - self.start_time.to(time_unit) bad_stop = self.src_gti[ind+1, 0] - self.start_time.to(time_unit) label = "" ax.axvspan(bad_start.value, bad_stop.value, color='firebrick', alpha=0.3, label=label) if custom_title is not None: ax.set_title(custom_title, fontsize=title_font_size) elif self.src_name is not None: ax.set_title("{s} {t} {o} {i} {l}-{u}keV Lightcurve".format(s=self.src_name, t=self.telescope, o=self.obs_id, i=self.instrument.upper(), l=self.energy_bounds[0].to('keV').value, u=self.energy_bounds[1].to('keV').value), fontsize=title_font_size) else: ax.set_title("{t} {o} {i} {l}-{u}keV Lightcurve".format(s=self.src_name, t=self.telescope, o=self.obs_id, i=self.instrument.upper(), l=self.energy_bounds[0].to('keV').value, u=self.energy_bounds[1].to('keV').value), fontsize=title_font_size) if lo_time_lim < time_x.min(): warn('The lower time limit is smaller than the lowest time value, it has been set to the ' 'lowest available value.', stacklevel=2) lo_time_lim = time_x.min() if hi_time_lim > time_x.max(): warn('The upper time limit is higher than the greatest time value, it has been set to the ' 'greatest available value.', stacklevel=2) hi_time_lim = time_x.max() ax.minorticks_on() ax.tick_params(direction='in', which='both', right=True, top=True) # Setting the axis limits ax.set_xlim(lo_time_lim.value, hi_time_lim.value) ax.set_xlabel("Relative Time [{}]".format(time_unit.to_string('latex')), fontsize=label_font_size) ax.set_ylabel("Count-rate [{}]".format(self.count_rate.unit.to_string('latex')), fontsize=label_font_size) ax.legend(loc='best') return ax
[docs] def view(self, figsize: tuple = (14, 6), time_unit: Union[str, Unit] = Unit('s'), lo_time_lim: Quantity = None, hi_time_lim: Quantity = None, colour: str = 'black', plot_sep: bool = False, src_colour: str = 'tab:cyan', bck_colour: str = 'firebrick', custom_title: str = None, label_font_size: int = 15, title_font_size: int = 18, highlight_bad_times: bool = True, fracexp_corr: bool = False, alpha: float = 0.8): """ A method that creates and displays a visualisation of this lightcurve. :param tuple figsize: The figure size to use for this lightcurve visualisation. :param str/Unit time_unit: The unit to be used for the time axis. :param Quantity lo_time_lim: The lower x-limit (i.e. lower time limit) of the data to be displayed. :param Quantity hi_time_lim: The upper x-limit (i.e. upper time limit) of the data to be displayed. :param str colour: The colour to be used to plot data points (if background and source lightcurves are not plotted separately). :param bool plot_sep: Should the source and background lightcurves be plotted separately. Default is False. :param str src_colour: The colour to be used to plot source lightcurve data points, if plot_sep is True. :param str bck_colour: The colour to be used to plot background lightcurve data points, if plot_sep is True. :param str custom_title: A title to be added to the axes, which would override the automatically constructed figure title. :param int label_font_size: The fontsize to be used for labels. :param int title_font_size: The fontsize to be used for the title. :param bool highlight_bad_times: Should periods of time that are NOT within a GTI be highlighted? Default is True. :param bool fracexp_corr: Controls whether the plotted data should be corrected for vignetting and deadtime effects by dividing by the 'FRACEXP' entry in the lightcurve. Default is False. :param float alpha: The alpha value to be used for the plotted data. Default is 0.8. """ # Create figure object fig = plt.figure(figsize=figsize) ax = plt.gca() ax = self.get_view(ax, time_unit, lo_time_lim, hi_time_lim, colour, plot_sep, src_colour, bck_colour, custom_title, label_font_size, title_font_size, highlight_bad_times, fracexp_corr, alpha) plt.tight_layout() # Display the image plt.show() # Wipe the figure plt.close("all")
[docs] class AggregateLightCurve(BaseAggregateProduct): """ The init method for the AggregateLightCurve class, performs checks and organises the light-curves which have been passed in, for easy retrieval. It also allows for analysis to be performed on the combined data, and for visualisations to be created. This class is designed to package light-curves generated for the same source, with the same settings, and for the same energy bounds - if interested in the time varying behaviours of multiple energy bands then the HardnessCurve and AggregateHardnessCurve products should be used. It can take light-curves from different instruments, and will deal with them simultaneously rather than stacking them. Light curves that are part of an AggregateLightCurve will be separated into 'time chunks', where a time chunk is a period that has uninterrupted coverage. For instance, three XMM observations separated by a year each would be in three different time chunks, but if there were a fourth observation that was taken by another telescope and happened concurrently (even if it didn't start and end at the same time) with the first XMM observation, then it would be in the same time chunk. :param Union[List[LightCurve], np.ndarray] lightcurves: A list or array of LightCurve objects that are to be collated in an AggregateLightCurve. These must be for the same source, and generated with the same settings. """ def __init__(self, lightcurves: Union[List[LightCurve], np.ndarray]): """ The init method for the AggregateLightCurve class, performs checks and organises the light-curves which have been passed in, for easy retrieval. It also allows for analysis to be performed on the combined data, and for visualisations to be created. This class is designed to package light-curves generated for the same source, with the same settings, and for the same energy bounds - if interested in the time varying behaviours of multiple energy bands then the HardnessCurve and AggregateHardnessCurve products should be used. It can take light-curves from different instruments, and will deal with them simultaneously rather than stacking them. Light curves that are part of an AggregateLightCurve will be separated into 'time chunks', where a time chunk is a period that has uninterrupted coverage. For instance, three XMM observations separated by a year each would be in three different time chunks, but if there were a fourth observation that was taken by another telescope and happened concurrently (even if it didn't start and end at the same time) with the first XMM observation, then it would be in the same time chunk. :param Union[List[LightCurve], np.ndarray] lightcurves: A list or array of LightCurve objects that are to be collated in an AggregateLightCurve. These must be for the same source, and generated with the same settings. """ if isinstance(lightcurves, list): lightcurves = np.array(lightcurves) elif not isinstance(lightcurves, np.ndarray): raise TypeError("The AggregateLightCurve argument must be either a list of LightCurve objects, or a " "numpy array of LightCurve Objects.") # Check that all the elements of the list/array are actually LightCurve objects if any([not isinstance(lc, LightCurve) for lc in lightcurves]): raise TypeError("You cannot pass a list/array containing an object that is not an instance of LightCurve.") # Obviously we need to make sure there are enough light curves to aggregate if len(lightcurves) < 2: raise ValueError("At least two light curves must be passed in order to declare an AggregateLightCurve.") # We need to do some checking on the input light-curves, to make sure they're for the same source, region, and # energy bands - the first of the light curves is selected as the comparison point. We're going to check that # each of them has the same central coordinates, region shape, inner radius, outer radius, and energy bounds comp_lc = lightcurves[0] if not all([np.array_equal(comp_lc.central_coord, lc.central_coord) for lc in lightcurves[1:]]): raise IncompatibleProductError("Central coordinates of all lightcurves passed to AggregateLightCurve must " "be the same.") elif not all([lc.shape == comp_lc.shape for lc in lightcurves[1:]]): raise IncompatibleProductError("Region shape of all lightcurves passed to AggregateLightCurve must " "be the same.") elif not all([np.array_equal(comp_lc.inner_rad, lc.inner_rad) for lc in lightcurves[1:]]): raise IncompatibleProductError("Inner radii of lightcurves passed to AggregateLightCurve must " "be the same.") elif not all([np.array_equal(comp_lc.outer_rad, lc.outer_rad) for lc in lightcurves[1:]]): raise IncompatibleProductError("Outer radii of lightcurves passed to AggregateLightCurve must " "be the same.") # Past me made the energy_bounds return a tuple for some reason - just need to convert to a Quantity to compare elif not all([np.array_equal(Quantity(comp_lc.energy_bounds), Quantity(lc.energy_bounds)) for lc in lightcurves[1:]]): raise IncompatibleProductError("The energy bounds of lightcurves passed to AggregateLightCurve must " "be the same - if interested in the time varying behaviours of multiple " "energy bands then the HardnessCurve and AggregateHardnessCurve products " "should be used.") # Checks that the time bin sizes used to generate the lightcurves are the same elif not all([lc.time_bin_size == comp_lc.time_bin_size for lc in lightcurves[1:]]): raise IncompatibleProductError("Time bin sizes of lightcurves passed to AggregateLightCurve must " "be the same.") elif len(list(set([lc.src_name for lc in lightcurves]))) > 1: raise ValueError("Some of the passed light curves were not generated for the same object; their " "source names must match.") # Pulls out all the ObsIDs and instruments associated with these light curves obs_ids = list(set([lc.obs_id for lc in lightcurves])) insts = list(set([lc.instrument for lc in lightcurves])) if len(obs_ids) == 1: obs_id_to_pass = obs_ids[0] else: obs_id_to_pass = 'combined' if len(insts) == 1: inst_to_pass = insts[0] else: inst_to_pass = 'combined' # This just sorts the lightcurves by their start time, for earliest to latest start_sort = np.argsort(Quantity([lc.start_time for lc in lightcurves])) lightcurves = lightcurves[start_sort] super().__init__([lc.path for lc in lightcurves], 'lightcurve', obs_id_to_pass, inst_to_pass) # Storing the name of the object for which we've generated light curves - we've already checked that all the # input light curves have the same src name. self._src_name = lightcurves[0].src_name # Make sure to add the energy bounds to the attribute - we require and know that they are the same for # all the lightcurves that have been passed. self._energy_bounds = lightcurves[0].energy_bounds # This stores the ObsIDs and their instruments, for the lightcurves associated with this object self._rel_obs = {} # This array determines which lightcurves have overlapping temporal coverage overlapping = np.full((len(lightcurves), len(lightcurves)), False) for lc_ind, lc in enumerate(lightcurves): # If a telescope is already present in the rel_obs dict then we need to add the current ObsID as part # of a new list, otherwise we need to make sure we add the current lightcurve ObsID and instrument, or # just instrument if the ObsID is already there if lc.telescope not in self._rel_obs: self._rel_obs[lc.telescope] = {lc.obs_id: [lc.instrument]} elif lc.telescope in self._rel_obs and lc.obs_id not in self._rel_obs[lc.telescope]: self._rel_obs[lc.telescope][lc.obs_id] = [lc.instrument] elif lc.telescope in self._rel_obs and lc.obs_id in self._rel_obs[lc.telescope]: self._rel_obs[lc.telescope][lc.obs_id].append(lc.instrument) # Use the LightCurve overlap checking method to figure out which of our set of light curves overlaps with # the current one. Checking like this does include the current light curve which obviously will overlap, # but we're setting up an overlap matrix and those values will be the diagonal, so we don't mind cur_overlap = lc.overlap_check(lightcurves) overlapping[lc_ind, :] = cur_overlap # Get the entries one up from the diagonal, we'll use them to split the light curves up into time chunks, which # is where there are gaps between coverage. This works because there will be False overlap entries in the # shifted diagonal of the matrix, and those are cases where there is a break in the observations split = np.diag(overlapping, 1) split = np.insert(split, 0, False) # Here we want to index the light curves into time chunk groupings groupings = np.zeros(len(lightcurves), dtype=int) # This gives us the indices of where we need to split the light curves into chunks, as we know that the light # curves are already sorted into the correct temporal order split_inds = np.argwhere(~split).T[0] # Just sets up the time chunk ID 1D matrix for our light curves for split_ind_ind, split_ind in enumerate(split_inds): if split_ind_ind != (len(split_inds)-1): groupings[split_ind: split_inds[split_ind_ind+1]] = split_ind_ind else: # As I want it to go to the very end of the array groupings[split_ind:] = split_ind_ind self._time_chunk_ids = np.arange(0, len(split_inds)) self._component_products = {tel: {} for tel in self.telescopes} # Maybe there is a more elegant, in-line, way of doing this, but I cannot be bothered to think of it for lc_ind, lc in enumerate(lightcurves): rel_grp = groupings[lc_ind] if lc.obs_id not in self._component_products[lc.telescope]: self._component_products[lc.telescope][lc.obs_id] = {lc.instrument: {rel_grp: lc}} elif (lc.obs_id in self._component_products[lc.telescope] and lc.instrument not in self._component_products[lc.telescope][lc.obs_id]): self._component_products[lc.telescope][lc.obs_id][lc.instrument] = {rel_grp: lc} elif (lc.obs_id in self._component_products[lc.telescope] and lc.instrument in self._component_products[lc.telescope][lc.obs_id]): self._component_products[lc.telescope][lc.obs_id][lc.instrument][rel_grp] = lc # This is a validation step, ensuring that all like light curves (i.e. same # telescope, same instrument) have the same reference time ref_times = {} for lc in lightcurves: cur_ref_time = ref_times.setdefault(lc.telescope, lc.ref_time) if cur_ref_time != lc.ref_time: raise IncompatibleProductError("The time system reference times of AggregateLightCurve component " "lightcurves from the same telescope passed to AggregateLightCurve " "must be the same.") # Make a reference time dictionary attribute self._ref_times = ref_times # This is all helps to set the storage key as the same as the LightCurve, but we do account for different # patterns accepted for different instruments - as mos1 and 2 should be treated the same we don't look at # the specific MOS instrument, same with eROSITA telescope modules. self._patterns = {tel: {} for tel in self.telescopes} for lc in lightcurves: patt = lc.storage_key.split('_pattern')[-1] # Turns mos1 and mos2 into just mos, and tm1-7 into tm rel_inst = re.sub(r'\d+', '', lc.instrument) if rel_inst not in self._patterns[lc.telescope]: self._patterns[lc.telescope][rel_inst] = patt elif lc.instrument in self._patterns[lc.telescope] and self._patterns[lc.telescope][rel_inst] != patt: raise IncompatibleProductError( "Lightcurves for the same instrument ({t}-{i}) must have the same event " "selection pattern.".format(t=lc.telescope, i=rel_inst.upper())) patts = [str(tel) + "_".join([pk + 'pattern' + pv for pk, pv in pd.items()]) for tel, pd in self._patterns.items()] # This is what the AggregateLightCurve will be stored under in an XGA source product storage structure. self._storage_key = lightcurves[0].storage_key.split('_pattern')[0] + '_' + '_'.join(patts) # Start by defining properties, then internal (protected) methods, and then user facing methods @property def obs_ids(self) -> dict: """ A property of this spectrum set that details which ObsIDs of which telescopes have contributed lightcurves to this object. :return: A dictionary where the keys are telescope names and the values are lists of ObsIDs :rtype: dict """ return {t: list(self._rel_obs[t].keys()) for t in self._rel_obs} @property def instruments(self) -> dict: """ A property of this aggregate light curve that details which ObsIDs and instruments of which telescopes have contributed lightcurves to this object. The top level keys are telescopes, lower level keys are ObsIDs, and the values are lists of instruments. :return: A dictionary where the top level keys are telescopes, the lower level keys are ObsIDs, and their values are lists of related instruments. :rtype: dict """ return self._rel_obs @property def associated_instruments(self) -> dict: """ Returns a dictionary containing unique instrument names associated with each telescope, for the constituent light curves of this AggregateLightCurve. NOTE - there is no guarantee that the instruments of a telescope are associated with a light curve in every time chunk. :return: Dictionary with telescope names as keys, and lists of unique associated instruments as values. :rtype: dict """ assoc_inst = {tel: list(set([inst for inst_list in oi_insts.values() for inst in inst_list])) for tel, oi_insts in self.instruments.items()} return assoc_inst @property def telescopes(self) -> List[str]: """ Property getter for telescopes that are associated with this aggregate light curve. :return: A list of telescope names with valid data related to this aggregate light curve. :rtype: List[str] """ return list(self._rel_obs.keys()) @property def src_name(self) -> str: """ Method to return the name of the object a product is associated with. The product becomes aware of this once it is added to a source object. :return: The name of the source object this product is associated with. :rtype: str """ return self._src_name # This needs a setter, as this property only becomes not-None when the product is added to a source object. @src_name.setter def src_name(self, name: str): """ Property setter for the src_name attribute of a product, should only really be called by a source object, not by a user. :param str name: The name of the source object associated with this product. """ self._src_name = name for p in self.all_lightcurves: p.src_name = name @property def all_lightcurves(self) -> List[LightCurve]: """ Simple extra wrapper for get_lightcurve that allows the user to retrieve every single lightcurve associated with this AggregateLightCurve instance, for all time chunk IDs, telescopes, ObsIDs, and Instruments. :return: A list of every single lightcurve associated with this object. :rtype: List[LightCurve] """ # This stores the flattened set of all lightcurves all_lcs = [] # Iterate through the time chunks associated with this AggregateLightCurve, as we need to pass an ID to # get_lightcurves to retrieve lightcurves for tc_i in self.time_chunk_ids: # If there is only one light curve per time chunk then get_lightcurves will just return a # LightCurve object, but I always want it to be a list rel_lcs = self.get_lightcurves(tc_i) if isinstance(rel_lcs, LightCurve): rel_lcs = [rel_lcs] # Add the list of lightcurves for the current time chunk to the overall flattened lightcurve list all_lcs += rel_lcs return all_lcs @property def central_coord(self) -> Quantity: """ This property provides the central coordinates (RA-Dec) of the region that this set of light curves was generated from. :return: Astropy Quantity object containing the central coordinate in degrees. :rtype: Quantity """ return self.all_lightcurves[0].central_coord @property def shape(self) -> str: """ Returns the shape of the outer edge of the region this set of light curves was generated from. :return: The shape (either circular or elliptical). :rtype: str """ return self.all_lightcurves[0].shape @property def inner_rad(self) -> Quantity: """ Gives the inner radius (if circular) or radii (if elliptical - semi-major, semi-minor) of the region in which this set of light curves was generated. :return: The inner radius(ii) of the region. :rtype: Quantity """ return self.all_lightcurves[0].inner_rad @property def outer_rad(self): """ Gives the outer radius (if circular) or radii (if elliptical - semi-major, semi-minor) of the region in which this set of light curves was generated. :return: The outer radius(ii) of the region. :rtype: Quantity """ return self.all_lightcurves[0].outer_rad @property def time_bin_size(self) -> Quantity: """ Gives the time bin size used to generate this set of light curves. :return: The time bin size used to generate this set of light curves. :rtype: Quantity """ return self.all_lightcurves[0].time_bin_size @property def ref_times(self) -> Dict[str, Time]: """ Returns the time system reference times of this aggregate lightcurve's components. There will be one reference time for each telescope associated with this object. :return: Dictionary mapping telescope names to their reference times. :rtype: Dict[str, Time] """ return self._ref_times @property def time_chunk_ids(self) -> np.ndarray: """ Getter for the time chunk IDs associated with this AggregateLightCurve. Light curves that are part of an AggregateLightCurve will be separated into 'time chunks', where a time chunk is a period that has uninterrupted coverage. For instance, three XMM observations separated by a year each would be in three different time chunks, but if there were a fourth observation that was taken by another telescope and happened concurrently (even if it didn't start and end at the same time) with the first XMM observation, then it would be in the same time chunk. :return: np.ndarray :rtype: An array of integer time chunk identifiers, ordered from earlier to later times. """ return self._time_chunk_ids @property def num_time_chunks(self) -> int: """ Getter for the number of time chunks associated with this AggregateLightCurve. Light curves that are part of an AggregateLightCurve will be separated into 'time chunks', where a time chunk is a period that has uninterrupted coverage. For instance, three XMM observations separated by a year each would be in three different time chunks, but if there were a fourth observation that was taken by another telescope and happened concurrently (even if it didn't start and end at the same time) with the first XMM observation, then it would be in the same time chunk. :return: np.ndarray :rtype: An array of integer time chunk identifiers, ordered from earlier to later times. """ return len(self._time_chunk_ids) @property def time_chunks(self) -> Quantity: """ A getter for the start and stop times of the time chunks associated with this AggregateLightCurve. The left hand column are start times, and the right hand column are stop times. These are the earliest and latest times of coverage for all the observations in the particular time chunk. :return: A Nx2 non-scalar Astropy Quantity, where the left hand column are chunk start times, and the right hand column are chunk stop times. :rtype: Quantity """ # Stores the chunk start and stop times until they are turned into a non-scalar Quantity at the end chunk_bounds = [] # Iterates through the time chunks associated with this object for tc_id in self.time_chunk_ids: # Fetching lightcurves, making sure the return is a list rel_lcs = self.get_lightcurves(tc_id) if isinstance(rel_lcs, LightCurve): rel_lcs = [rel_lcs] # Finds the minimum start_time (i.e. the earliest start time) for lightcurves in this time chunk, and # the maximum (i.e. latest) stop time - these define the bounds of this chunk tc_start = min(Quantity([lc.start_time for lc in rel_lcs])) tc_end = max(Quantity([lc.stop_time for lc in rel_lcs])) chunk_bounds.append(Quantity([tc_start, tc_end])) return Quantity(chunk_bounds) @property def datetime_chunks(self) -> np.ndarray: """ A getter for the start and stop datetimes of the time chunks associated with this AggregateLightCurve. The left hand column are start datetimes, and the right hand column are stop datetimes. These are the earliest and latest times of coverage for all the observations in the particular time chunk. :return: A Nx2 array of datetime objects, where the left hand column are chunk start datetimes, and the right hand column are chunk stop datetimes. :rtype: np.ndarray(datetime) """ # This behaves exactly the same as time_chunks, but works on datetime objects instead. chunk_bounds = [] for tc_id in self.time_chunk_ids: rel_lcs = self.get_lightcurves(tc_id) if isinstance(rel_lcs, LightCurve): rel_lcs = [rel_lcs] tc_start = min([lc.start_datetime for lc in rel_lcs]) tc_end = max([lc.stop_datetime for lc in rel_lcs]) chunk_bounds.append([tc_start, tc_end]) return np.array(chunk_bounds) @property def time_chunk_lengths(self) -> Quantity: """ Computes and returns lengths (in seconds) of each time chunk. :return: An astropy quantity containing the time chunk lengths in seconds. :rtype: Quantity """ return np.diff(self.time_chunks, axis=1).flatten() @property def overall_time_window(self) -> Quantity: """ Returns the beginning time of the first time chunk, and the end time of the last time chunk; represents the entire time window covered by this AggregateLightCurve (not necessarily continuously). :return: Two element astropy Quantity, with the first element being the start time of the first time chunk, and the second element being the end time of the last time chunk. :rtype: Quantity """ return Quantity([self.time_chunks[:, 0].min(), self.time_chunks[:, 1].max()]) @property def overall_datetime_window(self) -> np.ndarray: """ Returns the beginning datetime of the first time chunk, and the end datetime of the last time chunk; represents the entire time window covered by this AggregateLightCurve (not necessarily continuously). :return: Two element astropy Quantity, with the first element being the start datetime of the first time chunk, and the second element being the end datetime of the last time chunk. :rtype: np.ndarray """ return np.array([self.datetime_chunks[:, 0].min(), self.datetime_chunks[:, 1].max()]) @property def overall_time_window_coverage_fraction(self) -> float: """ Provides the fraction of the overall time window (from the beginning of the first observation to the end of the last observation) that is actually covered by constituent light curves. :return: The overall time window coverage fraction. :rtype: float """ return (self.time_chunks[:, 1] - self.time_chunks[:, 0]).sum() / np.diff(self.overall_time_window)[0] @property def storage_key(self) -> str: """ This property returns the storage key which this object assembles to place the AggregateLightCurve in an XGA source's storage structure. The key is based on the properties of the AggregateLightCurve, and some of the configuration options, and is basically human-readable. :return: String storage key. :rtype: str """ return self._storage_key @property def event_selection_patterns(self) -> dict: """ The event selection patterns used for different telescope instruments that are associated with this AggregateLightCurve. :return: A dictionary where top level keys are telescope names, lower level keys are instrument names, and values are event selection patterns. :rtype: dict """ return self._patterns # Now the protected methods of the class def _validate_tel_inst(self, telescope: str = None, inst: str = None) -> Tuple[str, str]: """ Internal method to check telescope and instrument inputs, and fill in values if Nones are passed and the light curve only has one telescope and instrument associated. :param str telescope: The name of the telescope to validate. :param str inst: The name of the instrument to validate. :return: Validated telescope and instrument names. :rtype Tuple[str, str] """ # Check the telescope input, if it is None, and we only have one telescope in the AggLC we # can save the user the trouble and select that single telescope. Otherwise, we throw # an error and tell them to set a value if telescope is None and len(self.telescopes) != 1: raise ValueError("For AggregateLightCurve instances containing data from multiple telescopes, a value " "must be passed to the 'telescope' argument of 'get_data'.") elif telescope is None: telescope = self.telescopes[0] elif telescope not in self.telescopes: raise TelescopeNotAssociatedError("The telescope name {0} is not associated with any constituent " "products in this AggregateLightCurve.".format(telescope)) # Now we do the same thing for instrument name, but with the added context # of the telescope name already set up above if inst is None and len(self.associated_instruments[telescope]) != 1: raise ValueError("This AggregateLightCurve instance contains data from multiple instruments of {t}, so a " "value must be passed to the 'inst' argument of 'get_data'.".format(t=telescope)) elif inst is None: inst = self.associated_instruments[telescope][0] return telescope, inst def _get_gtis(self, which_gti: str = "src", inst: str = None, telescope: str = None, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True) -> Tuple[Quantity, np.ndarray]: """ Internal get method to retrieve the good time intervals (GTIs) for the source OR background light curves of a particular instrument of a particular telescope from this AggregateLightCurve. The returned GTIs are in time chunk order. A time interval within which to retrieve GTIs can be specified. :param str which_gti: Which GTIs to retrieve, either 'src' or 'bck'. :param str inst: The instrument for which to retrieve the overall GTI information. Default is None, which will automatically select the instrument name if only one is represented in this AggregateLightCurve. If multiple instruments are represented, the user must pass a value to choose which GTIs to retrieve. :param str telescope: The telescope for which to retrieve the overall GTI information. Default is None, which will automatically select the telescope name if only one is represented in this AggregateLightCurve. If multiple telescopes are represented, the user must pass a value to choose which GTIs to retrieve. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. :return: The requested good-time-intervals for the source and background light curves, and an array indicating which time chunk each GTI belongs in. Data are in the correct temporal order. :rtype: Tuple[Quantity, np.ndarray] """ # This is an internal method, so I feel okay using a string 'which_gti' rather than # a boolean variable to determine whether to retrieve the source or background GTI. # Leaves the option open for some other kind of GTI retrieval to be added as well. # Need to check that the variable value is valid however if which_gti not in ["src", "bck"]: raise ValueError("The 'which_gti' parameter can only take the values 'src' or 'bck'.") # Validate the telescope and instrument inputs and fill in values if Nones are passed and # the AggregateLightCurve only has one telescope and instrument associated. telescope, inst = self._validate_tel_inst(telescope, inst) # We call a class method that will return the time interval IDs that represent data within the # user specified time window. The default behavior is to return all time chunk IDs, as the # default values of interval_start and interval_end are None. rel_time_chunk_ids = self.time_chunk_ids_within_interval(interval_start, interval_end, over_run) # Then we'll store the GTIs and chunk IDs in these lists gtis = [] tc_data = [] # Iterate through the time chunk IDs for tc_id in rel_time_chunk_ids: try: # Grab the light curves, but catch if there isn't an entry for the chosen instrument for this # time chunk and handle it gracefully rel_lc = self.get_lightcurves(tc_id, inst=inst, telescope=telescope) except NotAssociatedError: continue # Append the current time chunk's chosen instrument's GTIs to the list if which_gti == "src": rel_gti = rel_lc.src_gti elif which_gti == "bck": rel_gti = rel_lc.bck_gti gtis.append(rel_gti) rel_tc = np.full(len(rel_gti), tc_id) tc_data.append(rel_tc) # Concatenate the lists of GTIs and chunk IDs to form a single array of GTIs and chunk IDs gtis = np.concatenate(gtis) tc_data = np.concatenate(tc_data) return gtis, tc_data # Then define user-facing methods
[docs] def get_lightcurves(self, time_chunk_id: int, obs_id: str = None, inst: str = None, telescope: str = None) -> Union[List[LightCurve], LightCurve]: """ This is the getter for the lightcurves stored in the AggregateLightCurve data storage structure. They can be retrieved based on ObsID and instrument. :param int time_chunk_id: The time chunk identifier to retrieve lightcurves for. :param str obs_id: Optionally, a specific obs_id to search for can be supplied. :param str inst: Optionally, a specific instrument to search for can be supplied. :param str telescope: Optionally, a specific telescope to search for can be supplied. :return: List of matching lightcurves, or just a LightCurve object if one match is found. :rtype: Union[List[LightCurve], LightCurve] """ def unpack_list(to_unpack: list): """ A recursive function to go through every layer of a nested list and flatten it all out. It doesn't return anything because to make life easier the 'results' are appended to a variable in the namespace above this one. :param list to_unpack: The list that needs unpacking. """ # Must iterate through the given list for entry in to_unpack: # If the current element is not a list then all is chill, this element is ready for appending # to the final list if not isinstance(entry, list): out.append(entry) else: # If the current element IS a list, then obviously we still have more unpacking to do, # so we call this function recursively. unpack_list(entry) if time_chunk_id not in self.time_chunk_ids: tc_str = ", ".join(self.time_chunk_ids.astype(str)) raise IndexError("{i} is not a time chunk ID associated with this AggregateLightCurve object. " "Allowed time chunk IDs are; {a}".format(i=time_chunk_id, a=tc_str)) elif telescope not in self.telescopes and telescope is not None: raise TelescopeNotAssociatedError("{0} is not associated with chunk {1} of this " "AggregateLightCurve.".format(telescope, time_chunk_id)) elif ((telescope is not None and telescope in self.telescopes) and (obs_id not in self.obs_ids[telescope] and obs_id is not None)): raise NotAssociatedError("ObsID {o} is not associated with telescope {t} for chunk {tc} of this " "AggregateLightCurve.".format(o=obs_id, tc=time_chunk_id, t=telescope)) elif ((telescope is not None and telescope in self.telescopes) and (obs_id is not None and obs_id in self.obs_ids) and (inst is not None and inst not in self.instruments[obs_id])): raise NotAssociatedError("Instrument {i} is not associated with {t}-{o} for time chunk {tc} of this " "AggregateLightCurve.".format(o=obs_id, i=inst, tc=time_chunk_id, t=telescope)) matches = [] for match in dict_search(time_chunk_id, self._component_products): out = [] unpack_list(match) if ((telescope == out[0] or telescope is None) and (obs_id == out[1] or obs_id is None) and (inst == out[2] or inst is None)): matches.append(out[-1]) # Here I only return the object if one match was found if len(matches) == 1: matches = matches[0] elif len(matches) == 0: # This probably means that an instrument was requested without a specific ObsID and it doesn't exist for # the specified time chunk raise NotAssociatedError("The requested data are not associated with time chunk {} of this " "AggregateLightCurve.".format(time_chunk_id)) return matches
[docs] def get_data(self, inst: str = None, telescope: str = None, date_time: bool = False, fracexp_corr: bool = False, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True) -> Tuple[Quantity, Quantity, Union[TimeDelta, np.ndarray], np.ndarray]: """ A get method to retrieve count-rate, count-rate error, timing, and time chunk data for a particular instrument of a particular telescope from this AggregateLightCurve. The returned data are in the correct temporal order. A time interval within which to retrieve data can be specified. :param str inst: The instrument for which to retrieve the overall count-rate and time data. Default is None, which will automatically select the instrument name if only one is represented in this AggregateLightCurve. If multiple instruments are represented, the user must pass a value to choose which data to retrieve. :param str telescope: The telescope for which to retrieve the overall count-rate and time data. Default is None, which will automatically select the telescope name if only one is represented in this AggregateLightCurve. If multiple telescopes are represented, the user must pass a value to choose which data to retrieve. :param bool date_time: Whether the time data should be returned as an array of datetimes (not the default), or an Astropy TimeDelta object with the time as a different from MJD 50814.0 in seconds (the default). :param bool fracexp_corr: Controls whether the data should be corrected for vignetting and deadtime effects by dividing by the 'FRACEXP' entry in the lightcurve. Default is False. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. :return: The count rate data, count rate uncertainty data, and time data for the selected instrument. The final element of the return is an array indicating which time chunk each data point belongs to. Data are in the correct temporal order. :rtype: Tuple[Quantity, Quantity, Union[TimeDelta, np.ndarray], np.ndarray] """ # Validate the telescope and instrument inputs and fill in values if Nones are passed and # the AggregateLightCurve only has one telescope and instrument associated. telescope, inst = self._validate_tel_inst(telescope, inst) # We call a class method that will return the time interval IDs that represent data within the # user specified time window. The default behavior is to return all time chunk IDs, as the # default values of interval_start and interval_end are None. rel_time_chunk_ids = self.time_chunk_ids_within_interval(interval_start, interval_end, over_run) # These store the countrates, errors, and times that we pull out for the chosen instrument for all # time chunk IDs that are within the user-specified time window (defaults to the entire time window # of this AggregateLightCurve instance). cr_data = [] cr_err_data = [] t_data = [] tc_data = [] # Iterate through the time chunk IDs for tc_id in rel_time_chunk_ids: try: # Grab the light curves, but catch if there isn't an entry for the chosen instrument for this # time chunk and handle it gracefully rel_lc = self.get_lightcurves(tc_id, inst=inst, telescope=telescope) except NotAssociatedError: continue # Append the current time chunk's chosen instrument's count rate data and # error to their lists - the get_data method of the LightCurve class # will handle fractional exposure correction and the conversion of # times to date-times (if the user requested that) rel_cr, rel_cr_err, rel_time = rel_lc.get_data(date_time, fracexp_corr) rel_tc = np.full(len(rel_cr), tc_id) cr_data.append(rel_cr) cr_err_data.append(rel_cr_err) t_data.append(rel_time) tc_data.append(rel_tc) # Concatenate the light curve arrays of time, count-rate, and count-rate-error into a single # array each - that is what we're going to return t_data = np.concatenate(t_data) cr_data = np.concatenate(cr_data) cr_err_data = np.concatenate(cr_err_data) tc_data = np.concatenate(tc_data) # Concatenate the count rate data and error into one quantity each and return everything return cr_data, cr_err_data, t_data, tc_data
[docs] def get_src_gtis(self, inst: str = None, telescope: str = None, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True) -> Tuple[Quantity, np.ndarray]: """ A get method to retrieve the good time intervals (GTIs) for the source light curves of a particular instrument of a particular telescope from this AggregateLightCurve. The returned GTIs are in time chunk order. A time interval within which to retrieve GTIs can be specified. :param str inst: The instrument for which to retrieve the overall GTI information. Default is None, which will automatically select the instrument name if only one is represented in this AggregateLightCurve. If multiple instruments are represented, the user must pass a value to choose which GTIs to retrieve. :param str telescope: The telescope for which to retrieve the overall GTI information. Default is None, which will automatically select the telescope name if only one is represented in this AggregateLightCurve. If multiple telescopes are represented, the user must pass a value to choose which GTIs to retrieve. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. :return: The requested good-time-intervals for the source light curves, and an array indicating which time chunk each GTI belongs in. Data are in the correct temporal order. :rtype: Tuple[Quantity, np.ndarray] """ return self._get_gtis('src', inst, telescope, interval_start, interval_end, over_run)
[docs] def get_bck_gtis(self, inst: str = None, telescope: str = None, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True) -> Tuple[Quantity, np.ndarray]: """ A get method to retrieve the good time intervals (GTIs) for the background light curves of a particular instrument of a particular telescope from this AggregateLightCurve. The returned GTIs are in time chunk order. A time interval within which to retrieve GTIs can be specified. :param str inst: The instrument for which to retrieve the overall GTI information. Default is None, which will automatically select the instrument name if only one is represented in this AggregateLightCurve. If multiple instruments are represented, the user must pass a value to choose which GTIs to retrieve. :param str telescope: The telescope for which to retrieve the overall GTI information. Default is None, which will automatically select the telescope name if only one is represented in this AggregateLightCurve. If multiple telescopes are represented, the user must pass a value to choose which GTIs to retrieve. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. :return: The requested good-time-intervals for the background light curves, and an array indicating which time chunk each GTI belongs in. Data are in the correct temporal order. :rtype: Tuple[Quantity, np.ndarray] """ return self._get_gtis('bck', inst, telescope, interval_start, interval_end, over_run)
[docs] def time_chunk_ids_within_interval(self, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True) -> np.ndarray: """ Fetches the IDs of time chunks that fall within a specified interval. The interval can be defined either as a duration offset from a reference time (`Quantity`) or as absolute timestamps (`Time` or `datetime`). Depending on the `over_run` parameter, the method returns IDs of chunks fully contained within the interval or only partially overlapping the interval. Raises validation exceptions in case of incompatible interval types or invalid configurations. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. :return: An array of integer time chunk IDs for the time chunks that satisfy the filtering criteria. :rtype: np.ndarray :raises ValueError: If the start of the interval is not before the end of the interval. :raises TypeError: If the provided interval types are not one of the accepted formats or if their types are mismatched. """ # Convert any Astropy Time objects to a standard Python datetime if isinstance(interval_start, Time): interval_start = interval_start.to_datetime() if isinstance(interval_end, Time): interval_end = interval_end.to_datetime() # We allow the user to pass None for limit values, in which case the overall start or end time of # the window covered by this object is used # Handle the start time first, matching the data format of the interval end time if it was provided if interval_start is None and (interval_end is None or isinstance(interval_end, Quantity)): interval_start = self.overall_time_window[0] elif interval_start is None: interval_start = self.overall_datetime_window[0] # Do the same for the end of the time interval if interval_end is None and (interval_start is None or isinstance(interval_start, Quantity)): interval_end = self.overall_time_window[1] elif interval_end is None: interval_end = self.overall_datetime_window[1] # If the user passed the limits themselves, we could still have a mismatch in format (e.g. the start # as a time from reference time, and the end as a datetime). We wish to avoid dealing with # limits in different formats, so raise an error if this is the case. # Also check the object types passed to ensure they haven't done anything really silly like passed a string if (not isinstance(interval_start, (Quantity, Time, datetime)) or not isinstance(interval_end, (Quantity, Time, datetime))): raise TypeError("The 'interval_start' and 'interval_end' arguments must be one of the following; " "None, an astropy Quantity, an astropy Time, or an Python datetime.") elif type(interval_start) != type(interval_end): raise TypeError("The 'interval_start' ({bf}) and 'interval_end' ({ef}) arguments must be of " "the same type.".format(bf=type(interval_start), ef=type(interval_end))) # Validity check on the interval, obviously can't have the start time being at the same time or after # the end of the interval if interval_start >= interval_end: raise ValueError("Time passed to 'interval_start' argument must be before the time " "passed to 'interval_end'.") # Now we've done all our validation checks on the inputs, we will fetch the 'correct' format of time chunk # bounds (i.e. seconds from reference time, or datetime) for the input interval limits # We need only check the type of one of the interval limits now, as we have made sure the start # and end of interval data formats are the same if isinstance(interval_start, Quantity): ch_starts = self.time_chunks[:, 0] ch_ends = self.time_chunks[:, 1] else: ch_starts = self.datetime_chunks[:, 0] ch_ends = self.datetime_chunks[:, 1] # Now that we've normalized all the time/datetime interval formats, and the # formats of the time chunks we're comparing them too, we do one last # validity check to ensure that the user didn't pass intervals outside # the time window of this AggregateLightCurve if interval_start < ch_starts[0]: raise ValueError("The value of 'interval_start' ({ins}) is before the start ({wis}) of the time window " "covered by this AggregateLightCurve.".format(ins=interval_start, wis=ch_starts[0])) if interval_end > ch_ends[-1]: raise ValueError("The value of 'interval_end' ({ine}) is after the end ({wie}) of the time window " "covered by this AggregateLightCurve.".format(ine=interval_end, wie=ch_ends[-1])) # The user can choose between two slightly different matching criteria - if # 'over_run' is False then the time chunks they want us to return have to # be entirely contained within the interval they specified # If 'over_run' is True (the default), then even time chunks that start or end # outside the interval will be included in the return, so long as some part # of them is within the user's interval if not over_run: ch_filter = ((ch_starts >= interval_start) & (ch_ends <= interval_end)) else: ch_filter = (((ch_starts >= interval_start) & (ch_starts <= interval_end)) | ((ch_ends >= interval_start) & (ch_ends <= interval_end)) | ((ch_starts <= interval_start) & (ch_ends >= interval_end))) # Apply the newly created filter to the time chunk IDs property, which will give us the IDs of the # time chunks that match the user's criteria. rel_ch_ids = self.time_chunk_ids[ch_filter] # Now we return them return rel_ch_ids
[docs] def obs_ids_within_interval(self, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True) -> dict: """ Fetches the ObsIDs of data associated with time chunks that fall within a specified interval. The interval can be defined either as a duration offset from a reference time (`Quantity`) or as absolute timestamps (`Time` or `datetime`). Depending on the `over_run` parameter, the method returns ObsIDs related to chunks fully contained within the interval or only partially overlapping the interval. Raises validation exceptions in case of incompatible interval types or invalid configurations. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. :return: A dictionary with telescope names as keys, and values being lists of ObsIDs within the specified interval. :rtype: dict """ # First run the class method that will take the interval information and return the relevant time chunk IDs rel_ch_ids = self.time_chunk_ids_within_interval(interval_start, interval_end, over_run) # Unfortunate to have a nested for loop going on, but this is all pretty low overhead so I don't mind # We just iterate through the time chunk IDs (get_lightcurves doesn't support passing multiple time # chunk IDs), fetch the relevant light curves, and build the rel_obsids dictionary with telescope # names as keys, and values being lists of relevant ObsIDs rel_obsids = {} for ch_id in rel_ch_ids: rel_lcs = self.get_lightcurves(ch_id) rel_lcs = rel_lcs if isinstance(rel_lcs, list) else [rel_lcs] for rel_lc in rel_lcs: rel_obsids.setdefault(rel_lc.telescope, []) rel_obsids[rel_lc.telescope].append(rel_lc.obs_id) return rel_obsids
[docs] def time_chunk_good_fractions(self, src_gti: bool = True, inst: str = None, telescope: str = None) -> np.ndarray: """ A method to retrieve the good time fractions of each time chunk, for a particular instrument of a particular telescope. The good time fractions are the fraction of a time chunk that falls within a good-time-interval. :param bool src_gti: Controls whether the good fractions are calculated using the source or background good-time-intervals. Default is True, which will use the source GTI information. :param str inst: The instrument for which to calculate good time fractions of time chunks. Default is None, which will automatically select the instrument name if only one is represented in this AggregateLightCurve. If multiple instruments are represented, the user must pass a value to choose which GTIs to retrieve. :param str telescope: The telescope for which to calculate good time fractions of time chunks. Default is None, which will automatically select the telescope name if only one is represented in this AggregateLightCurve. If multiple telescopes are represented, the user must pass a value to choose which GTIs to retrieve. :return: The fraction of each time chunk that within a good-time interval. :rtype: np.ndarray """ # We account for the user wanting the good time fractions for background light # curves, which are sometimes different from source GTIs if src_gti: rel_gti, chunk_indices = self.get_src_gtis(inst, telescope) else: rel_gti, chunk_indices = self.get_bck_gtis(inst, telescope) gti_durations = rel_gti[:, 1] - rel_gti[:, 0] # Sum GTI durations for each time chunk using bincount total_gti_per_chunk = np.bincount(chunk_indices, weights=gti_durations, minlength=self.num_time_chunks) # Calculate fractions (avoid division by zero) fractions = total_gti_per_chunk / self.time_chunk_lengths return fractions
[docs] def check_times_within_gti(self, times: Quantity, src_gti: bool = True, inst: str = None, telescope: str = None) -> np.ndarray: """ Check whether given times are within the good-time-intervals (GTIs) of any component light curve within this AggregateLightCurve that matches the specified instrument and telescope. Comparisons are made to the source or background GTIs depending on the passed value of the 'src_gti' parameter. Times must be in the form of seconds from reference time of the telescope of interest. :param Quantity/np.ndarray/Time times: Times to be checked against the GTIs. Can be scalar or an array. Passing astropy Time objects, Python datetime objects (or arrays of datetimes), or seconds from reference time are all supported. :param bool src_gti: Flag indicating whether to use source GTIs (True) or background GTIs (False). Defaults to True. :param str inst: The instrument whose light curve GTIs we are to compare the input times with. If None, value will be inferred if only one telescope and instrument are associated with this object. :param str telescope: The telescope whose light curve GTIs we are to compare the input times with. If None, value will be inferred if only one telescope and instrument are associated with this object. :return: A boolean array indicating whether each input time falls within any of the GTIs. :rtype: np.ndarray """ # Validate the telescope and instrument inputs and fill in values if Nones are passed and # the AggregateLightCurve only has one telescope and instrument associated. telescope, inst = self._validate_tel_inst(telescope, inst) if isinstance(times, Time): times = (times - self.ref_times[telescope]).to('s') elif ((not isinstance(times, Quantity) and (isinstance(times, np.ndarray) and all([isinstance(en, datetime) for en in times]))) or isinstance(times, datetime)): times = (Time(times) - self.ref_times[telescope]).to('s') elif not isinstance(times, Quantity) and isinstance(times, np.ndarray): raise TypeError("If an array is passed to the 'times' argument, every element must be of type 'datetime'.") # If the 'times' argument is a single value, turn it into an array if times.isscalar: times = Quantity([times]) # The user may choose to compare to source or background GTI information, so # depending on what has been passed to 'src_gti' we retrieve different GTIs if src_gti: rel_gti, chunk_indices = self.get_src_gtis(inst, telescope) else: rel_gti, chunk_indices = self.get_bck_gtis(inst, telescope) # Now we generate the boolean array that will indicate which entries in 'times' # are within a GTI and which aren't time_within = ((rel_gti[:, 0] <= times[..., None]) & (rel_gti[:, 1] >= times[..., None])).any(axis=1) return time_within
[docs] def get_view(self, fig: Figure, inst: str = None, custom_title: str = None, label_font_size: int = 18, title_font_size: int = 20, inst_cmap: str = 'viridis', y_lims: Quantity = None, time_chunk_ids: Union[int, List[int]] = None, yscale: str = 'linear', fracexp_corr: bool = False, show_legend: bool = True, alpha: float = 0.8, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True) -> Tuple[dict, Figure]: """ A get method for a populated visualisation of the light curves present in this AggregateLightCurve. :param Figure fig: The matplotlib Figure object to create axes on, and thus make the plot. :param str inst: A specific instrument to display data for. Default is None, in which case all instruments are plotted. :param str custom_title: A custom title to add to the visualisation - the default is None, in which case a title containing the source name and energy band will be generated. :param int label_font_size: The font size for axes labels, default is 18. :param int title_font_size: The font size for the title, default is 20. :param str inst_cmap: The colormap from which we draw colours to uniquely identify different instruments plotted in this get_view method. :param Quantity y_lims: The lower and upper limits that should be applied to the y-axis of this plot. The default is None, in which case they will be determined automatically based on the data. :param int/List[int] time_chunk_ids: This parameter can be used to control which time chunks are plotted on this AggregateLightCurve view. The default is None, in which case all time chunks are plotted; however the user may also pass a list of chunk IDs (or a single chunk ID) to limit the data that are shown. :param str yscale: The scaling that should be applied to the y-axis of this figure. Default is linear. Any matplotlib scale can be used. :param bool fracexp_corr: Controls whether the plotted data should be corrected for vignetting and deadtime effects by dividing by the 'FRACEXP' entry in the lightcurve. Default is False. :param bool show_legend: Controls whether a legend is included in each panel of the visualization. :param float alpha: The alpha value to be used for the plotted data. Default is 0.8. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. :return: A dictionary of axes objects that have been added, and the figure object that was passed in. :rtype: Tuple[dict, Figure] """ # The user can either pass the time chunk IDs that they are interested in plotting, or a time # window defined by interval_start and interval_end. The 'interval_*' arguments will override # the 'time_chunk_ids' argument if both are passed. if any([interval_start is not None, interval_end is not None]): # If the time_chunk_ids variable is not None at this point, then we're about to override it, # and we want to keep track of that to warn the user time_chunks_overridden = time_chunk_ids is not None # Run the method that will get the relevant time chunk IDs for the user-specified interval time_chunk_ids = self.time_chunk_ids_within_interval(interval_start, interval_end, over_run) # Warn the user that we overrode the time_chunk_ids variable with the interval start and end arguments if time_chunks_overridden: warn("Both 'time_chunk_ids' and 'interval_start'/'interval_end' arguments were passed. The interval " "arguments have overridden the 'time_chunk_ids' argument.", stacklevel=2) # We check the input for the time_chunk_ids argument first, because not only does it determine the data that # we plot, but it determines how we set up the figure if time_chunk_ids is not None and isinstance(time_chunk_ids, int): # The 'all_rel_lcs' variable is actually only used for auto-setting the y-axis limits, the light curve # retrieval for plotting will happen separately (I just found it more convenient that way, even if this # isn't particularly elegant). all_rel_lcs = self.get_lightcurves(time_chunk_ids, inst=inst) if isinstance(all_rel_lcs, LightCurve): all_rel_lcs = [all_rel_lcs] time_chunk_ids = np.array([time_chunk_ids]) elif time_chunk_ids is not None and isinstance(time_chunk_ids, (list, np.ndarray)): # This has to be done in a for-loop rather than a list comprehension, because I have to be able to catch # not associated errors # all_rel_lcs = [lc for tc_id in time_chunk_ids for lc in self.get_lightcurves(time_chunk_id=tc_id)] all_rel_lcs = [] for tc_id in time_chunk_ids: try: # Possible that this will return a single lightcurve, rather than a list of them cur_lcs = self.get_lightcurves(time_chunk_id=tc_id, inst=inst) # So we make sure that it IS a list if isinstance(cur_lcs, LightCurve): cur_lcs = [cur_lcs] all_rel_lcs += cur_lcs except NotAssociatedError: pass time_chunk_ids = np.array(time_chunk_ids) elif time_chunk_ids is not None: raise TypeError("Only integers and lists of integers may be passed for the 'time_chunk_ids' argument.") else: all_rel_lcs = self.all_lightcurves time_chunk_ids = self.time_chunk_ids # This sets the fraction of the total x-width of the figure that is set between each axes buffer_frac = 0.008 # This calculates the total time length of all time chunks - note that we are selecting those specified by the # time_chunk_ids array, which by default is all time chunks, but can be set by the user chunk_len = (self.datetime_chunks[:, 1] - self.datetime_chunks[:, 0]) chunk_len = np.array([float(cl.total_seconds()) for cl in chunk_len])[time_chunk_ids] # Then finds what fraction of the total coverage each time chunk covers, taking into account the buffer (again # this is for those time chunks specified by the time chunk id argument, with the default being all chunks) chunk_frac = chunk_len / (chunk_len.sum() + buffer_frac*len(chunk_len)-1) # Proportion of vertical to horizontal extent of the slanted line that breaks the separate axes break_slant = 1.3 # Set properties of the break lines break_kwargs = dict(marker=[(-1, -break_slant), (1, break_slant)], markersize=12, linestyle="none", color='k', mec='k', mew=1, clip_on=False) # To store the to-be-setup axes in axes_dict = {} # This is added to each iteration so that the next axes knows what x-position to start at cumu_x_pos = 0 # Iterate through the selected time chunks, each will have a sub-axes for tc_id_ind, tc_id in enumerate(time_chunk_ids): # Grab the fraction of time coverage for this time chunk (e.g. size of this axes) rel_frac = chunk_frac[tc_id_ind] # We deal with axes differently depending on whether they are the first time chunk in a series, one in # the middle of a series of time chunks, or the last. We also have to account for the fact that there # could only be one or two time chunks. This all defines whether we plot break lines, which axes # boundaries are visible, which axis tick labels are shown etc. if tc_id_ind == 0: # Adding a new axes, the size defined by the fractional coverage time, and the position by # cumu_x_pos (though that should always be zero for tc_id == 0). It extends the full height of # the figure axes_dict[tc_id_ind] = fig.add_axes([cumu_x_pos, 0.0, rel_frac, 1]) # Set up a y-label - other axes won't have this because they share the y-axis and we only want to # label the first one y_lab = "Count-rate [{}]".format(self.all_lightcurves[0].count_rate.unit.to_string('latex')) axes_dict[tc_id_ind].set_ylabel(y_lab, fontsize=label_font_size) # We set the upper and lower y-axis limits based on the maximum and minimum count rates across all # the lightcurves that are to be plotted, as the y-axis is shared - if the user hasn't specified # their own y-axis limits if y_lims is None and not fracexp_corr: low_lim = min([np.nanmin(lc.count_rate-lc.count_rate_err) for lc in all_rel_lcs]).value*0.95 upp_lim = max([np.nanmax(lc.count_rate+lc.count_rate_err) for lc in all_rel_lcs]).value*1.05 elif y_lims is None and fracexp_corr: low_lim = min([np.nanmin(lc.count_rate/lc.frac_exp - lc.count_rate_err/lc.frac_exp) for lc in all_rel_lcs]).value * 0.95 upp_lim = max([np.nanmax(lc.count_rate/lc.frac_exp + lc.count_rate_err/lc.frac_exp) for lc in all_rel_lcs]).value * 1.05 else: # The user has specified axis limits, so we make sure to convert them to the y-axis unit low_lim, upp_lim = y_lims.to(self.all_lightcurves[0].count_rate.unit).value axes_dict[tc_id_ind].set_ylim(low_lim, upp_lim) # If there is more than one time chunk, we turn off the line on the right hand side of this initial # axes - so there is no unsightly barrier between it and the next axes - we also add slanted lines # to indicate a break in the y-axis if len(time_chunk_ids) != 1: axes_dict[tc_id_ind].spines.right.set_visible(False) axes_dict[tc_id_ind].plot([1, 1], [1, 0], transform=axes_dict[tc_id_ind].transAxes, **break_kwargs) # We make sure the ticks look how we want them axes_dict[tc_id_ind].tick_params(direction='in', which='both', right=False, left=True, top=True) # In this case we are at a time chunk that is not the first, and not the last elif tc_id_ind != (len(time_chunk_ids) - 1): # Add the axes at the correct position, making sure to share the y-axis with the first axes axes_dict[tc_id_ind] = fig.add_axes([cumu_x_pos, 0.0, rel_frac, 1], sharey=axes_dict[0]) # Both the left hand and the right hand axis lines are turned off axes_dict[tc_id_ind].spines.left.set_visible(False) axes_dict[tc_id_ind].spines.right.set_visible(False) # We make sure to setup the ticks as we want them - making sure that the y-axis is not labelled for # this one, and that the left and right ticks are turned off axes_dict[tc_id_ind].tick_params(direction='in', which='both', right=False, left=False, top=True, labelleft=False) # This is what sets up the break lines on the left and right hand sides of the x-axis, at the top # and bottom axes_dict[tc_id_ind].plot([1, 1], [1, 0], transform=axes_dict[tc_id_ind].transAxes, **break_kwargs) axes_dict[tc_id_ind].plot([0, 0], [0, 1], transform=axes_dict[tc_id_ind].transAxes, **break_kwargs) # Finally this is triggered when we're at the last time chunks else: axes_dict[tc_id_ind] = fig.add_axes([cumu_x_pos, 0.0, rel_frac, 1], sharey=axes_dict[0]) # The right hand axis line is visible, but not the left axes_dict[tc_id_ind].spines.right.set_visible(True) axes_dict[tc_id_ind].spines.left.set_visible(False) # And we make sure to add the tick setup and ensure that the final break lines on the left are drawn axes_dict[tc_id_ind].tick_params(direction='in', which='both', right=True, left=False, top=True, labelleft=False) axes_dict[tc_id_ind].plot([0, 0], [0, 1], transform=axes_dict[tc_id_ind].transAxes, **break_kwargs) # Iterate the cumulative position cumu_x_pos += (rel_frac+buffer_frac) # And turn on minor ticks, because I prefer how that looks axes_dict[tc_id_ind].minorticks_on() # Set the y-scale to the specified type (default is linear, but the user can override that) axes_dict[tc_id_ind].set_yscale(yscale) # Setting the x-axis limits, based on the known time coverage of the time chunk axes_dict[tc_id_ind].set_xlim(self.datetime_chunks[tc_id, 0], self.datetime_chunks[tc_id, 1]) # Create a single x-axis label for all axes, it looks ugly and is unnecessary to have one for each fig.text(0.5, -0.04, "Time", ha='center', fontsize=label_font_size) # Fetching the colormap object specified by the user - this is what we draw colours from for each instrument # involved in this AggregateLightCurve. We want them all to be different (of course), but we also want # particular instruments to have consistent colouring across subplots (i.e. time chunks) rel_cmap = plt.get_cmap(inst_cmap) # This goes through all the instruments for all the ObsIDs for all the telescopes, constructing a list of # unique instrument names uniq_insts = sorted(list(set([i for tel in self.instruments for oi in self.instruments[tel] for i in self.instruments[tel][oi]]))) # Then we simply loop through the instruments, normalising their index in the list by the total number (we want # to feed values between zero and one into the colormap), and get the colours out inst_colours = {inst: rel_cmap(inst_ind / max(1, len(uniq_insts)-1)) for inst_ind, inst in enumerate(uniq_insts)} # Now we need to populate our carefully set up axes with DATA for tc_id_ind, tc_id in enumerate(time_chunk_ids): ax = axes_dict[tc_id_ind] try: # That this is the first part of the try-except, and won't trigger the except, is quite deliberate. It # sets up the x-axis tick labels, even if there are no data for the requested instrument (if the # user has even requested a specific instrument) - makes the plot look nice # The labels will have hour, minute, date, month (shortened word so as not to be ambigious with # american date system), and year ax.xaxis.set_major_formatter(mdates.DateFormatter('%Hh-%Mm %d-%b-%Y')) for label in ax.get_xticklabels(which='major'): label.set(y=label.get_position()[1] - 0.03, rotation=40, horizontalalignment='right') # This is what _might_ trigger the NotAssociatedError rel_lcs = self.get_lightcurves(tc_id, inst=inst) except NotAssociatedError: continue # Make sure rel_lcs, even if it only contains a single light curve if isinstance(rel_lcs, LightCurve): rel_lcs = [rel_lcs] # Now we cycle through the light curves for the current time chunk and add them to the plot for rel_lc in rel_lcs: ident = "{t} {o}-{i}".format(t=rel_lc.pretty_telescope_name, o=rel_lc.obs_id, i=rel_lc.instrument) if fracexp_corr: plt_cr = rel_lc.count_rate.value / rel_lc.frac_exp plt_cr_err = rel_lc.count_rate_err.value / rel_lc.frac_exp else: plt_cr = rel_lc.count_rate.value plt_cr_err = rel_lc.count_rate_err.value ax.errorbar(rel_lc.datetime, plt_cr, yerr=plt_cr_err, capsize=2, label=ident, fmt='x', color=inst_colours[rel_lc.instrument], alpha=alpha) if show_legend: ax.legend(loc='best') # Check if the user has defined a custom title, and if not then we build one and add it to the plot if custom_title is not None: fig.suptitle(custom_title, fontsize=title_font_size, y=1.05) elif self.src_name is not None: fig.suptitle("{s} {l}-{u}keV Aggregate Lightcurve".format(s=self.src_name, l=self.energy_bounds[0].to('keV').value, u=self.energy_bounds[1].to('keV').value), fontsize=title_font_size, y=1.05) else: fig.suptitle("{l}-{u}keV Aggregate Lightcurve".format(l=self.energy_bounds[0].to('keV').value, u=self.energy_bounds[1].to('keV').value), fontsize=title_font_size, y=1.05) return axes_dict, fig
[docs] def view(self, figsize: tuple = (14, 6), inst: str = None, custom_title: str = None, label_font_size: int = 15, title_font_size: int = 18, inst_cmap: str = 'viridis', y_lims: Quantity = None, time_chunk_ids: Union[int, List[int]] = None, yscale: str = 'linear', fracexp_corr: bool = False, show_legend: bool = True, alpha: float = 0.8, interval_start: Union[Quantity, Time, datetime] = None, interval_end: Union[Quantity, Time, datetime] = None, over_run: bool = True): """ This method creates a combined visualisation of all the light curves associated with this object (apart from when you specify a single instrument, then it uses all the light curves from that instrument). The data are displayed in the correct temporal order, with the x-axis labels indicating the date and time rather than the mission specific internal time. :param tuple figsize: The size of the visualisation figure, default is (14, 6). Adjusting this value is the best way to achieve nice looking plots when labels are overlapping, particularly when there are many observations and time chunks to plot in the x-direction. :param str inst: A specific instrument to display data for. Default is None, in which case all instruments are plotted. :param str custom_title: A custom title to add to the visualisation - the default is None, in which case a title containing the source name and energy band will be generated. :param int label_font_size: The font size for axes labels, default is 18. :param int title_font_size: The font size for the title, default is 20. :param str inst_cmap: The colormap from which we draw colours to uniquely identify different instruments plotted in this view method. :param Quantity y_lims: The lower and upper limits that should be applied to the y-axis of this plot. The default is None, in which case they will be determined automatically based on the data. :param int/List[int] time_chunk_ids: This parameter can be used to control which time chunks are plotted on this AggregateLightCurve view. The default is None, in which case all time chunks are plotted; however the user may also pass a list of chunk IDs (or a single chunk ID) to limit the data that are shown. :param str yscale: The scaling that should be applied to the y-axis of this figure. Default is linear. Any matplotlib scale can be used. :param bool fracexp_corr: Controls whether the plotted data should be corrected for vignetting and deadtime effects by dividing by the 'FRACEXP' entry in the lightcurve. Default is False. :param bool show_legend: Controls whether a legend is included in each panel of the visualization. :param float alpha: The alpha value to be used for the plotted data. Default is 0.8. :param interval_start: The starting point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window start. :param interval_end: The ending point of the time interval. Can be a Quantity indicating a duration from the reference time, an astropy Time, or a Python datetime, or None to use the overall window end. :param over_run: A boolean flag. If True, includes chunks partially overlapping the interval. If False, matches chunks entirely contained within the interval. """ # Create figure object fig = plt.figure(figsize=figsize) ax_dict, fig = self.get_view(fig, inst, custom_title, label_font_size, title_font_size, inst_cmap, y_lims, time_chunk_ids, yscale, fracexp_corr, show_legend, alpha, interval_start, interval_end, over_run) # plt.tight_layout() # Display the plot plt.show() # Wipe the figure plt.close("all")