Source code for trap.post_processing

from typing import Optional

import numba as nb
import numpy as np
import pandas as pd
import sqlalchemy


[docs] def construct_lightcurves( db_engine: Optional[sqlalchemy.engine.base.Engine] = None, attribute: str = "int_flux", sources: Optional[pd.DataFrame] = None, images: Optional[pd.DataFrame] = None, ): """Reconstruct a dataframe with lightcurves from the source relations defined in the standard database. Parameters ---------- db_engine A sqlalchemy database engine. attribute: str The name of the attribute to use as value for the lightcurve. This can be any column name of the extraced_sources table. sources: pandas.DataFrame A DataFrame containing the extracted sources. By providing the sources there is no need to read the extracted_sources from the database. If the sources are already in memory, providing them like this is more performant than having to read the data from the database every time this function is called. This is especially relevant when this function is called several times with a different attribute on the same DataFrame with extracted sources. images: pandas.DataFrame A DataFrame containing the images and their frequency band. By providing these, there is no need to read them from the database. Returns ------- A pandas DataFrame where each row is a lightcurve and each column is correlated to each image. """ if sources is None or images is None: if db_engine is None: raise ValueError( "Either the argument 'db_engine' or both 'sources' and 'images' must be provided." ) with db_engine.connect() as conn: if sources is None: sources = pd.read_sql_query("SELECT * FROM extracted_sources", conn) if images is None: images = pd.read_sql_query( "SELECT id,freq_band FROM images", conn ).set_index("id") lightcurves = pd.DataFrame( { "id": [], } ).set_index("id") total_nr_sources = int(sources.src_id.max() + 1) for im_id in range(sources.im_id.max() + 1): fluxes = np.full(total_nr_sources, np.nan) im_slice = sources[sources.im_id == im_id] fluxes[im_slice.src_id.values] = im_slice[attribute].values lightcurves[f"im_{im_id}"] = fluxes # Update duplicate's history duplicate_slice = im_slice[im_slice.is_duplicate.astype("bool")] to_copy_slice = sources.loc[duplicate_slice["parent"]] lightcurves.loc[duplicate_slice["src_id"], lightcurves.columns[:-1]] = ( lightcurves.loc[to_copy_slice["src_id"], lightcurves.columns[:-1]].values ) nr_frequencybands = len(images["freq_band"].unique()) lightcurves_per_band = [] for band_id in range(nr_frequencybands): band_mask = images["freq_band"] == band_id lightcurve_for_band = lightcurves.loc[:, band_mask.values] lightcurves_per_band.append(lightcurve_for_band) return lightcurves_per_band
@nb.njit() def _weighted_sum(ra, ra_err, dec, dec_err): """Private function used to calculate the weighted sums for ra and dec.""" nr_sources = ra.shape[0] nr_images = ra.shape[1] weighted_sum_ra = np.zeros(nr_sources) sum_weight_ra = np.zeros(nr_sources) weighted_sum_dec = np.zeros(nr_sources) sum_weight_dec = np.zeros(nr_sources) for i in range(nr_images): mask = np.isfinite(ra[:, i]) w_ra = 1 / ra_err[mask, i] ** 2 w_dec = 1 / dec_err[mask, i] ** 2 weighted_sum_ra[mask] += w_ra * ra[mask, i] sum_weight_ra[mask] += w_ra weighted_sum_dec[mask] += w_dec * dec[mask, i] sum_weight_dec[mask] += w_dec return weighted_sum_ra, sum_weight_ra, weighted_sum_dec, sum_weight_dec @nb.njit() def _first_image_of_detection(lightcurve): """Private function used to determine the first image each source was found in""" nr_sources = lightcurve.shape[0] nr_images = lightcurve.shape[1] first_image = np.full(nr_sources, fill_value=-1, dtype="int") already_found = np.full(nr_sources, False, dtype="bool") for i in range(nr_images): mask = np.isfinite(lightcurve[:, i]) mask &= ~already_found first_image[mask] = i already_found[mask] = True return first_image
[docs] def construct_varmetric( db_engine: sqlalchemy.engine.base.Engine, sources: Optional[pd.DataFrame] = None, images: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: r"""Calculate lightcurve properties which can be used for filtering and isolating potential transients. The properties are based on the extracted_sources table. These properties are: - newsource Reference to the id of the first extracted source in the lightcurve. - v_int The flux coefficient of variation (V_ν), based on the integrated flux values. - eta_int The ‘reduced chi-squared’ variability index (η_ν), based on the integrated flux values. - sigma_rms_min Integrated flux from the from the extracted source that triggered an new source entry, divided by the minimum value of the estimated-RMS-map within the source-extraction region. - sigma_rms_max Integrated flux from the from the extracted source that triggered an new source entry, divided by the maximum value of the estimated-RMS-map within the source-extraction region. - lightcurve_max The maximum flux value of the lightcurve based on the integrated flux. - lightcurve_avg The average flux value of the lightcurve based on the integrated flux. - wm_ra The weighted mean right ascension of the source, computed across all detections using inverse-squared positional uncertainties (1/ra_err²) as weights. This represents the best-estimate sky position in right ascension. - wm_dec The weighted mean declination of the source, computed across all detections using inverse-squared positional uncertainties (1/dec_err²) as weights. This represents the best-estimate sky position in declination. - avg_ra_err The average or mean of the right ascension uncertainties (ra_fir_err) across all detections contributing to the lightcurve. Represents the average measurement uncertainty in RA. - avg_dec_err The average or mean of the declination uncertainties (dec_fit_err) across all detections contributing to the lightcurve. Represents the average measurement uncertainty in Dec. - nr_datapoints The number of images that were used to calculate the variablility metrics per source. If a source was not naturally detected in a specific image the flux value is NaN. Such values are not used in the calculation of the variability metrics and do not count towards the total nr_datapoints for that source. - first_image The id of the first image in which a source was detected - first_detection_time The acquisition time of the first image in which a source was detected Parameters ---------- db_engine A sqlalchemy database engine. sources: pandas.DataFrame (optional) A DataFrame containing the extracted sources. By providing the sources there is no need to read the extracted_sources from the database. Since ony the provided sources will be used to calculate the variability metric, this allows for the calculation of the varibility metrics on a subset of extracted sources. images: pandas.DataFrame (optional) A DataFrame containing the images and their frequency band. By providing these, there is no need to read them from the database. They do need to match the provided sources, meaning that each im_id that is present in the sources table should be reflected in the images table. It is recommended to leave this empty if the provided sources are all from the specified database. In that case the function defaults to reading all image metadata from the database which should then match the provided sources. Returns ------- dict A dictionary with the lightcurve properties mentioned above. """ if sources is None or images is None: if db_engine is None: raise ValueError( "Either the argument 'db_engine' or both 'sources' and 'images' must be provided." ) with db_engine.connect() as conn: if sources is None: sources = pd.read_sql_query("SELECT * FROM extracted_sources", conn) if images is None: images = pd.read_sql_query("SELECT * FROM images", conn).set_index("id") src_ids = np.unique(sources.src_id) first_extracted_source_id = np.zeros(len(src_ids), dtype=int) grouped = sources.groupby("src_id") im_ids = grouped["im_id"].min() first_extracted_source_id = grouped.id.min().values int_flux_first_src = sources.loc[first_extracted_source_id].int_flux lightcurves_int_all_bands = construct_lightcurves( db_engine, attribute="int_flux", sources=sources, images=images ) lightcurves_int_err_all_bands = construct_lightcurves( db_engine, attribute="int_flux_err", sources=sources, images=images ) ra_all_bands = construct_lightcurves( db_engine, attribute="ra", sources=sources, images=images ) ra_err_all_bands = construct_lightcurves( db_engine, attribute="ra_fit_err", sources=sources, images=images ) dec_all_bands = construct_lightcurves( db_engine, attribute="dec", sources=sources, images=images ) dec_err_all_bands = construct_lightcurves( db_engine, attribute="dec_fit_err", sources=sources, images=images ) # Remove force fits as these are not suitable to variability metrics: https://git.astron.nl/RD/trap/-/issues/32#note_134250 # Note: we do need the force fit sources to complete the lightcurve parent daisy-chain, # so we only set the value to NaN but keep the source. lightcurves_is_force_fit = construct_lightcurves( db_engine, attribute="is_force_fit", sources=sources, images=images ) nr_frequencybands = len(images["freq_band"].unique()) varmetric_per_band = [] for band_id in range(nr_frequencybands): lightcurves_int = lightcurves_int_all_bands[band_id] lightcurves_int_err = lightcurves_int_err_all_bands[band_id] ra = ra_all_bands[band_id] ra_err = ra_err_all_bands[band_id] dec = dec_all_bands[band_id] dec_err = dec_err_all_bands[band_id] is_force_fit_mask = lightcurves_is_force_fit[band_id] == 1 lightcurves_int[is_force_fit_mask] = np.nan lightcurves_int_err[is_force_fit_mask] = np.nan ra[is_force_fit_mask] = np.nan ra_err[is_force_fit_mask] = np.nan dec[is_force_fit_mask] = np.nan dec_err[is_force_fit_mask] = np.nan lightcurve_integrated_flux = lightcurves_int.to_numpy() lightcurve_integrated_flux_error = lightcurves_int_err.to_numpy() nr_images_per_source = np.isfinite(lightcurve_integrated_flux).sum(axis=1) multiple_sources_mask = nr_images_per_source > 1 lightcurve_integrated_flux_subset = lightcurve_integrated_flux[ multiple_sources_mask ] lightcurve_integrated_flux_error = lightcurve_integrated_flux_error[ multiple_sources_mask ] nr_images_per_source_masked = nr_images_per_source[multiple_sources_mask] integrated_flux_mean = ( np.nansum(lightcurve_integrated_flux_subset, axis=1) / nr_images_per_source_masked ) integrated_flux_mean_sq = ( np.nansum(lightcurve_integrated_flux_subset**2, axis=1) / nr_images_per_source_masked ) weights = 1.0 / lightcurve_integrated_flux_error**2 weight_mean = np.nansum(weights, axis=1) / nr_images_per_source_masked weighted_integrated_flux_mean = ( np.nansum(weights * lightcurve_integrated_flux_subset, axis=1) / nr_images_per_source_masked ) weighted_integrated_flux_mean_squared = ( np.nansum(weights * lightcurve_integrated_flux_subset**2, axis=1) / nr_images_per_source_masked ) eta = np.full(len(src_ids), np.nan) with np.errstate(invalid="ignore", divide="ignore"): eta[multiple_sources_mask] = ( nr_images_per_source_masked / (nr_images_per_source_masked - 1.0) ) * ( weighted_integrated_flux_mean_squared - (weighted_integrated_flux_mean**2) / weight_mean ) v_int = np.full(len(src_ids), np.nan) v_int[multiple_sources_mask] = ( np.sqrt( nr_images_per_source_masked * (integrated_flux_mean_sq - integrated_flux_mean**2) / (nr_images_per_source_masked - 1.0) ) / integrated_flux_mean ) # Varmetric caluclations based on: https://github.com/transientskp/tkp/blob/b34582712b82b888a5a7b51b3ee371e682b8c349/tkp/testutil/db_subs.py#L188 varmetric = pd.DataFrame( { "newsource": first_extracted_source_id, "v_int": v_int, "eta_int": eta, } ) # The following are added separately such that the gaps are filled with nans and the lengths match the rest varmetric["lightcurve_max"] = np.nan varmetric["lightcurve_avg"] = np.nan varmetric["lightcurve_max"] = np.nanmax(lightcurve_integrated_flux, axis=1) varmetric["lightcurve_avg"] = np.nanmean(lightcurve_integrated_flux, axis=1) # Add weighted ra and dec metrics src_ids = ra.index.to_numpy() ra = ra.to_numpy() ra_err = ra_err.to_numpy() dec = dec.to_numpy() dec_err = dec_err.to_numpy() weighted_sum_ra, sum_weight_ra, weighted_sum_dec, sum_weight_dec = ( _weighted_sum(ra, ra_err, dec, dec_err) ) with np.errstate(invalid="ignore", divide="ignore"): # building sphinx fails on divide by zero warnings so explicitly ignore this # specifically for sum_weight_ra and sum_weight_dec. We accept the nodata # value that numpy defaults to in these cases. varmetric["wm_ra"] = weighted_sum_ra / sum_weight_ra varmetric["wm_dec"] = weighted_sum_dec / sum_weight_dec varmetric["avg_ra_err"] = np.nanmean(ra_err, axis=1) varmetric["avg_dec_err"] = np.nanmean(dec_err, axis=1) varmetric["nr_datapoints"] = nr_images_per_source first_image_detected_src = _first_image_of_detection(ra) # It is possible for a particular source to not have any observations in one or more frequency bands. # There will then be no first detection image for this band (i.e. first_image_detected_src=-1). # In such a case, set all varmetric values for that row to NaN. first_image_mask = first_image_detected_src != -1 first_image = np.full(len(varmetric), np.nan) first_detection_time = np.full(len(varmetric), np.nan, dtype=object) first_image[first_image_mask] = first_image_detected_src[first_image_mask] varmetric["first_image"] = first_image first_detection_time[first_image_mask] = images.taustart_ts.get( first_image_detected_src[first_image_mask] ).values varmetric["first_detection_time"] = first_detection_time varmetric["freq_band"] = band_id varmetric[~first_image_mask] = np.nan varmetric["src_id"] = src_ids varmetric_per_band.append(varmetric) return pd.concat(varmetric_per_band).reset_index(drop=True)
[docs] def update_varmetrics_iterative( varmetric, source_list, extracted_sources, im_id, images ): """ Updates the varmetric dataframe using only the new data. """ def init_varmetric(source, latest_extracted_source_id): flux = source["int_flux"] weight_flux = 1.0 / (source["int_flux_err"] ** 2) weigt_ra = 1.0 / (source["ra_fit_err"] ** 2) weigt_dec = 1.0 / (source["dec_fit_err"] ** 2) varmetric_dict = { "newsource": latest_extracted_source_id, "src_id": source["src_id"], "freq_band": images["freq_band"], "v_int": np.nan, # Variance requires more than one image "eta_int": np.nan, # Variance requires more than one image "lightcurve_max": source["int_flux"], "lightcurve_avg": source["int_flux"], "wm_ra": source["ra"], "wm_dec": source["dec"], "avg_ra_err": source["ra_fit_err"], "avg_dec_err": source["dec_fit_err"], "nr_datapoints": 1, "first_image": im_id, "first_detection_time": images["taustart_ts"], # Running averages and sums "_avg_f_int": flux, "_avg_f_int_sq": flux**2, "_avg_f_int_weight": weight_flux, "_avg_weighted_f_int": weight_flux * flux, "_avg_weighted_f_int_sq": weight_flux * flux**2, "_sum_weight_ra": weigt_ra, "_sum_weighted_ra": weigt_ra * source["ra"], "_sum_weight_dec": weigt_dec, "_sum_weighted_dec": weigt_dec * source["dec"], } return varmetric_dict if im_id == 0: return pd.DataFrame( init_varmetric( extracted_sources, source_list["latest_extracted_source_id"].values ) ) # Iterate through each new observation for i, row in extracted_sources.iterrows(): # Locate the specific row in the existing varmetric table source_mask = (varmetric["src_id"] == row["src_id"]) & ( varmetric["freq_band"] == images["freq_band"] ) # If this is a brand new source/band combo, initialize a row if not source_mask.any(): latest_extracted_source_id = source_list.loc[ row["src_id"], "latest_extracted_source_id" ] varmetric_new_row = init_varmetric(row, latest_extracted_source_id) varmetric.loc[len(source_mask)] = varmetric_new_row continue nr_datapoints = varmetric.loc[source_mask].iloc[0]["nr_datapoints"] nr_datapoints_new = nr_datapoints + 1 # New values from the source row flux = row["int_flux"] weight = 1.0 / ( row["int_flux_err"] ** 2 ) # inverse-weight for this observation based on the error in the flux measurement. Lower error means larger weight. source = varmetric.loc[source_mask].iloc[0] # v_int avg_f = ( nr_datapoints * source["lightcurve_avg"] + flux ) / nr_datapoints_new # running average for integrated flux avg_f_int_sq = ( # running average for squared integrated flux source["_avg_f_int_sq"] + (flux**2 - source["_avg_f_int_sq"]) / nr_datapoints_new ) variance = avg_f_int_sq - (avg_f**2) v_int = ( # updated flux coefficient of variation np.sqrt((nr_datapoints_new * variance) / (nr_datapoints_new - 1.0)) ) / avg_f # eta_int avg_f_int_weight = ( # running average for weight based on flux error nr_datapoints * source["_avg_f_int_weight"] + weight ) / nr_datapoints_new avg_weighted_f_int = ( # running average for weighted flux nr_datapoints * source["_avg_weighted_f_int"] + (weight * flux) ) / (nr_datapoints_new) avg_weighted_f_int_sq = ( # running average for weighted squared flux nr_datapoints * source["_avg_weighted_f_int_sq"] + (weight * flux**2) ) / nr_datapoints_new eta_int = ( # updated reduced chi-squared variability index nr_datapoints_new / (nr_datapoints_new - 1.0) * (avg_f_int_weight * avg_weighted_f_int_sq - avg_weighted_f_int**2) ) / avg_f_int_weight # weighted mean ra, dec # inverse-weight for this observation's position based on the error in the position. Lower error means larger weight. w_ra = 1.0 / (row["ra_fit_err"] ** 2) w_dec = 1.0 / (row["dec_fit_err"] ** 2) # Running sums for weigted positions, used to get the weighted mean. sum_weight_ra = source["_sum_weight_ra"] + w_ra sum_weighted_ra = source["_sum_weighted_ra"] + w_ra * row["ra"] sum_weight_dec = source["_sum_weight_dec"] + w_dec sum_weighted_dec = source["_sum_weighted_dec"] + w_dec * row["dec"] # Updated weighted mean position wm_ra = sum_weighted_ra / sum_weight_ra wm_dec = sum_weighted_dec / sum_weight_dec # mean err of ra, dec avg_ra_err = ( nr_datapoints * source["avg_ra_err"] + row["ra_fit_err"] ) / nr_datapoints_new avg_dec_err = ( nr_datapoints * source["avg_dec_err"] + row["dec_fit_err"] ) / nr_datapoints_new varmetric.loc[ source_mask, [ "wm_ra", "wm_dec", "avg_ra_err", "avg_dec_err", "lightcurve_avg", "_avg_f_int_sq", "_avg_f_int_weight", "_avg_weighted_f_int", "_avg_weighted_f_int_sq", "_sum_weight_ra", "_sum_weighted_ra", "_sum_weight_dec", "_sum_weighted_dec", "v_int", "eta_int", "nr_datapoints", ], ] = [ wm_ra, wm_dec, avg_ra_err, avg_dec_err, avg_f, avg_f_int_sq, avg_f_int_weight, avg_weighted_f_int, avg_weighted_f_int_sq, sum_weight_ra, sum_weighted_ra, sum_weight_dec, sum_weighted_dec, v_int, eta_int, nr_datapoints_new, ] if flux > source["lightcurve_max"]: varmetric.loc[source_mask, "lightcurve_max"] = flux return varmetric