Source code for

#  This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS).
#  Last modified by David J Turner ( 20/02/2023, 14:04. Copyright (c) The Contributors

import os
import warnings
from functools import wraps
# from multiprocessing.dummy import Pool
from multiprocessing import Pool
from subprocess import Popen, PIPE, TimeoutExpired
from typing import Tuple, Union

import fitsio
import pandas as pd
from fitsio import FITS
from tqdm import tqdm

from .. import XSPEC_VERSION
from ..exceptions import XSPECFitError, MultipleMatchError, NoMatchFoundError, XSPECNotFoundError
from ..samples.base import BaseSample
from ..sources import BaseSource

[docs]def execute_cmd(x_script: str, out_file: str, src: str, run_type: str, timeout: float) \ -> Tuple[Union[FITS, str], str, bool, list, list]: """ This function is called for the local compute option. It will run the supplied XSPEC script, then check parse the output for errors and check that the expected output file has been created. :param str x_script: The path to an XSPEC script to be run. :param str out_file: The expected path for the output file of that XSPEC script. :param str src: A string representation of the source object that this fit is associated with. :param str run_type: A flag that tells this function what type of run this is; e.g. fit or conv_factors. :param float timeout: The length of time (in seconds) which the XSPEC script is allowed to run for before being killed. :return: FITS object of the results, string repr of the source associated with this fit, boolean variable describing if this fit can be used, list of any errors found, list of any warnings found. :rtype: Tuple[Union[FITS, str], str, bool, list, list] """ if XSPEC_VERSION is None: raise XSPECNotFoundError("There is no XSPEC installation detectable on this machine.") # We assume the output will be usable to start with usable = True cmd = "xspec - {}".format(x_script) # I add exec to the beginning to make sure that the command inherits the same process ID as the shell, which # allows the timeout to kill the XSPEC run rather than the shell process. Entirely thanks to slayton on # xspec_proc = Popen("exec " + cmd, shell=True, stdout=PIPE, stderr=PIPE) # This makes sure the process is killed if it does timeout try: out, err = xspec_proc.communicate(timeout=timeout) except TimeoutExpired: xspec_proc.kill() out, err = xspec_proc.communicate() # Need to infer the name of the source to supply it in the warning source_name = x_script.split('/')[-1].split("_")[0] warnings.warn("An XSPEC fit for {} has timed out".format(source_name)) usable = False out = out.decode("UTF-8").split("\n") err = err.decode("UTF-8").split("\n") err_out_lines = [line.split("***Error: ")[-1] for line in out if "***Error" in line] warn_out_lines = [line.split("***Warning: ")[-1] for line in out if "***Warning" in line] err_err_lines = [line.split("***Error: ")[-1] for line in err if "***Error" in line] warn_err_lines = [line.split("***Warning: ")[-1] for line in err if "***Warning" in line] if usable and len(err_out_lines) == 0 and len(err_err_lines) == 0: usable = True else: usable = False error = err_out_lines + err_err_lines warn = warn_out_lines + warn_err_lines if os.path.exists(out_file + "_info.csv") and run_type == "fit": # The original version of the xga_output.tcl script output everything as one nice neat fits file # but life is full of extraordinary inconveniences and for some reason it didn't work if called from # a Jupyter Notebook. So now I'm going to smoosh all the csv outputs into one fits. results = pd.read_csv(out_file + "_results.csv", header="infer") # This is the csv with the fit results in, creates new fits file and adds in fitsio.write(out_file + ".fits", results.to_records(index=False), extname="results", clobber=True) del results # The information about individual spectra, exposure times, luminosities etc. spec_info = pd.read_csv(out_file + "_info.csv", header="infer") # Gets added into the existing file fitsio.write(out_file + ".fits", spec_info.to_records(index=False), extname="spec_info") del spec_info # This finds all of the matching spectrum plot csvs were generated rel_path = "/".join(out_file.split('/')[0:-1]) # This is mostly just used to find how many files there are spec_tabs = [rel_path + "/" + sp for sp in os.listdir(rel_path) if "{}_spec".format(out_file) in rel_path + "/" + sp] for spec_i in range(1, len(spec_tabs)+1): # Loop through and redefine names like this to ensure they're in the right order spec_plot = pd.read_csv(out_file + "_spec{}.csv".format(spec_i), header="infer") # Adds all the plot tables into the existing fits file in the right order fitsio.write(out_file + ".fits", spec_plot.to_records(index=False), extname="plot{}".format(spec_i)) del spec_plot # This reads in the fits we just made with FITS(out_file + ".fits") as res_tables: tab_names = [tab.get_extname() for tab in res_tables] if "results" not in tab_names or "spec_info" not in tab_names: usable = False # I'm going to try returning the file path as that should be pickleable res_tables = out_file + ".fits" elif os.path.exists(out_file) and run_type == "conv_factors": res_tables = out_file usable = True else: res_tables = None usable = False return res_tables, src, usable, error, warn
[docs]def xspec_call(xspec_func): """ This is used as a decorator for functions that produce XSPEC scripts. Depending on the system that XGA is running on (and whether the user requests parallel execution), the method of executing the XSPEC commands will change. This supports multi-threading. :return: """ @wraps(xspec_func) def wrapper(*args, **kwargs): # The first argument of all of these XSPEC functions will be the source object (or a list of), # so rather than return them from the XSPEC model function I'll just access them like this. if isinstance(args[0], BaseSource): sources = [args[0]] elif isinstance(args[0], (list, BaseSample)): sources = args[0] else: raise TypeError("Please pass a source object, or a list of source objects.") # This is the output from whatever function this is a decorator for # First return is a list of paths of XSPEC scripts to execute, second is the expected output paths, # and 3rd is the number of cores to use. # run_type describes the type of XSPEC script being run, for instance a fit or a fakeit run to measure # countrate to luminosity conversion constants script_list, paths, cores, run_type, src_inds, radii, timeout = xspec_func(*args, **kwargs) src_lookup = {repr(src): src_ind for src_ind, src in enumerate(sources)} rel_src_repr = [repr(sources[src_ind]) for src_ind in src_inds] # Make sure the timeout is converted to seconds, then just stored as a float timeout ='second').value # This is what the returned information from the execute command gets stored in before being parceled out # to source and spectrum objects results = {s: [] for s in src_lookup} if run_type == "fit": desc = "Running XSPEC Fits" elif run_type == "conv_factors": desc = "Running XSPEC Simulations" if len(script_list) > 0: # This mode runs the XSPEC locally in a multiprocessing pool. with tqdm(total=len(script_list), desc=desc) as fit, Pool(cores) as pool: def callback(results_in): """ Callback function for the apply_async pool method, gets called when a task finishes and something is returned. """ nonlocal fit # The progress bar will need updating nonlocal results # The dictionary the command call results are added to if results_in[0] is None: fit.update(1) return else: res_fits, rel_src, successful, err_list, warn_list = results_in results[rel_src].append([res_fits, successful, err_list, warn_list]) fit.update(1) for s_ind, s in enumerate(script_list): pth = paths[s_ind] src = rel_src_repr[s_ind] pool.apply_async(execute_cmd, args=(s, pth, src, run_type, timeout), callback=callback) pool.close() # No more tasks can be added to the pool pool.join() # Joins the pool, the code will only move on once the pool is empty. elif len(script_list) == 0: warnings.warn("All XSPEC operations had already been run.") # Now we assign the fit results to source objects for src_repr in results: # Made this lookup list earlier, using string representations of source objects. # Finds the ind of the list of sources that we should add these results to ind = src_lookup[src_repr] s = sources[ind] # This flag tells this method if the current set of fits are part of an annular spectra or not ann_fit = False ann_results = {} ann_lums = {} ann_obs_order = {} for res_set in results[src_repr]: if len(res_set) != 0 and res_set[1] and run_type == "fit": with FITS(res_set[0]) as res_table: global_results = res_table["RESULTS"][0] model = global_results["MODEL"].strip(" ") # Just define this to check if this is an annular fit or not first_key = res_table["SPEC_INFO"][0]["SPEC_PATH"].strip(" ").split("/")[-1].split('ra')[-1] first_key = first_key.split('_spec.fits')[0] if "_ident" in first_key: ann_fit = True inst_lums = {} obs_order = [] for line_ind, line in enumerate(res_table["SPEC_INFO"]): sp_info = line["SPEC_PATH"].strip(" ").split("/")[-1].split("_") # Want to derive the spectra storage key from the file name, this strips off some # unnecessary info sp_key = line["SPEC_PATH"].strip(" ").split("/")[-1].split('ra')[-1].split('_spec.fits')[0] # If its not an AnnularSpectra fit then we can just fetch the spectrum from the source # the normal way if not ann_fit: # This adds ra back on, and removes any ident information if it is there sp_key = 'ra' + sp_key # Finds the appropriate matching spectrum object for the current table line spec = s.get_products("spectrum", sp_info[0], sp_info[1], extra_key=sp_key)[0] else: obs_order.append([sp_info[0], sp_info[1]]) ann_id = int(sp_key.split("_ident")[-1].split("_")[1]) sp_key = 'ra' + sp_key.split('_ident')[0] first_part = sp_key.split('ri')[0] second_part = "_" + "_".join(sp_key.split('ro')[-1].split("_")[1:]) ann_sp_key = first_part + "ar" + "_".join(radii[ind].value.astype(str)) + second_part ann_specs = s.get_products("combined_spectrum", extra_key=ann_sp_key) if len(ann_specs) > 1: raise MultipleMatchError("I have found multiple matches for that AnnularSpectra, " "this is the developers fault, not yours.") elif len(ann_specs) == 0: raise NoMatchFoundError("Somehow I haven't found the AnnularSpectra that you " "fitted, this is the developers fault, not yours") else: ann_spec = ann_specs[0] spec = ann_spec.get_spectra(ann_id, sp_info[0], sp_info[1]) # Adds information from this fit to the spectrum object. spec.add_fit_data(str(model), line, res_table["PLOT"+str(line_ind+1)]) # The add_fit_data method formats the luminosities nicely, so we grab them back out # to help grab the luminosity needed to pass to the source object 'add_fit_data' method processed_lums = spec.get_luminosities(model) if spec.instrument not in inst_lums: inst_lums[spec.instrument] = processed_lums # Ideally the luminosity reported in the source object will be a PN lum, but its not impossible # that a PN value won't be available. - it shouldn't matter much, lums across the cameras are # consistent if "pn" in inst_lums: chosen_lums = inst_lums["pn"] # mos2 generally better than mos1, as mos1 has CCD damage after a certain point in its life elif "mos2" in inst_lums: chosen_lums = inst_lums["mos2"] else: chosen_lums = inst_lums["mos1"] if ann_fit: ann_results[spec.annulus_ident] = global_results ann_lums[spec.annulus_ident] = chosen_lums ann_obs_order[spec.annulus_ident] = obs_order elif not ann_fit: # Push global fit results, luminosities etc. into the corresponding source object. s.add_fit_data(model, global_results, chosen_lums, sp_key) elif len(res_set) != 0 and res_set[1] and run_type == "conv_factors": res_table = pd.read_csv(res_set[0], dtype={"lo_en": str, "hi_en": str}) # Gets the model name from the file name of the output results table model = res_set[0].split("_")[-3] # We can infer the storage key from the name of the results table, just makes it easier to # grab the correct spectra storage_key = res_set[0].split('/')[-1].split([-1][1:].split(model)[0][:-1] # Grabs the ObsID+instrument combinations from the headers of the csv. Makes sure they are unique # by going to a set (because there will be two columns for each ObsID+Instrument, rate and Lx) # First two columns are skipped because they are energy limits combos = list(set([c.split("_")[1] for c in res_table.columns[2:]])) # Getting the spectra for each column, then assigning rates and lums for comb in combos: spec = s.get_products("spectrum", comb[:10], comb[10:], extra_key=storage_key)[0] spec.add_conv_factors(res_table["lo_en"].values, res_table["hi_en"].values, res_table["rate_{}".format(comb)].values, res_table["Lx_{}".format(comb)].values, model) elif len(res_set) != 0 and not res_set[1]: for err in res_set[2]: raise XSPECFitError(err) if ann_fit: # We fetch the annular spectra object that we just fitted, searching by using the set ID of # the last spectra that was opened in the loop ann_spec = s.get_annular_spectra(set_id=spec.set_ident) try: ann_spec.add_fit_data(model, ann_results, ann_lums, ann_obs_order) # The most likely reason for running XSPEC fits to a profile is to create a temp. profile # so we check whether constant*tbabs*apec (single_temp_apec function)has been run and if so # generate a Tx profile automatically if model == "constant*tbabs*apec": temp_prof = ann_spec.generate_profile(model, 'kT', 'keV') s.update_products(temp_prof) # Normalisation profiles can be useful for many things, so we generate them too norm_prof = ann_spec.generate_profile(model, 'norm', 'cm^-5') s.update_products(norm_prof) if 'Abundanc' in ann_spec.get_results(0, 'constant*tbabs*apec'): met_prof = ann_spec.generate_profile(model, 'Abundanc', '') s.update_products(met_prof) else: raise NotImplementedError("How have you even managed to fit this model to a profile?! Its not" " supported yet.") except ValueError: warnings.warn("{src} annular spectra profile fit was not successful".format(src=ann_spec.src_name)) # If only one source was passed, turn it back into a source object rather than a source # object in a list. if len(sources) == 1: sources = sources[0] return sources return wrapper