from typing import Optional
import numba as nb
import numpy as np
import pandas as pd
import sqlalchemy
[docs]
def construct_lightcurves(
db_engine: Optional[sqlalchemy.engine.base.Engine] = None,
attribute: str = "int_flux",
sources: Optional[pd.DataFrame] = None,
images: Optional[pd.DataFrame] = None,
):
"""Reconstruct a dataframe with lightcurves from the source relations defined in the standard database.
Parameters
----------
db_engine
A sqlalchemy database engine.
attribute: str
The name of the attribute to use as value for the lightcurve.
This can be any column name of the extraced_sources table.
sources: pandas.DataFrame
A DataFrame containing the extracted sources. By providing the sources
there is no need to read the extracted_sources from the database. If the
sources are already in memory, providing them like this is more performant
than having to read the data from the database every time this function
is called. This is especially relevant when this function is called several
times with a different attribute on the same DataFrame with extracted sources.
images: pandas.DataFrame
A DataFrame containing the images and their frequency band. By providing these,
there is no need to read them from the database.
Returns
-------
A pandas DataFrame where each row is a lightcurve and each column is correlated to each image.
"""
if sources is None or images is None:
if db_engine is None:
raise ValueError(
"Either the argument 'db_engine' or both 'sources' and 'images' must be provided."
)
with db_engine.connect() as conn:
if sources is None:
sources = pd.read_sql_query("SELECT * FROM extracted_sources", conn)
if images is None:
images = pd.read_sql_query(
"SELECT id,freq_band FROM images", conn
).set_index("id")
lightcurves = pd.DataFrame(
{
"id": [],
}
).set_index("id")
total_nr_sources = int(sources.src_id.max() + 1)
for im_id in range(sources.im_id.max() + 1):
fluxes = np.full(total_nr_sources, np.nan)
im_slice = sources[sources.im_id == im_id]
fluxes[im_slice.src_id.values] = im_slice[attribute].values
lightcurves[f"im_{im_id}"] = fluxes
# Update duplicate's history
duplicate_slice = im_slice[im_slice.is_duplicate.astype("bool")]
to_copy_slice = sources.loc[duplicate_slice["parent"]]
lightcurves.loc[duplicate_slice["src_id"], lightcurves.columns[:-1]] = (
lightcurves.loc[to_copy_slice["src_id"], lightcurves.columns[:-1]].values
)
nr_frequencybands = len(images["freq_band"].unique())
lightcurves_per_band = []
for band_id in range(nr_frequencybands):
band_mask = images["freq_band"] == band_id
lightcurve_for_band = lightcurves.loc[:, band_mask.values]
lightcurves_per_band.append(lightcurve_for_band)
return lightcurves_per_band
@nb.njit()
def _weighted_sum(ra, ra_err, dec, dec_err):
"""Private function used to calculate the weighted sums for ra and dec."""
nr_sources = ra.shape[0]
nr_images = ra.shape[1]
weighted_sum_ra = np.zeros(nr_sources)
sum_weight_ra = np.zeros(nr_sources)
weighted_sum_dec = np.zeros(nr_sources)
sum_weight_dec = np.zeros(nr_sources)
for i in range(nr_images):
mask = np.isfinite(ra[:, i])
w_ra = 1 / ra_err[mask, i] ** 2
w_dec = 1 / dec_err[mask, i] ** 2
weighted_sum_ra[mask] += w_ra * ra[mask, i]
sum_weight_ra[mask] += w_ra
weighted_sum_dec[mask] += w_dec * dec[mask, i]
sum_weight_dec[mask] += w_dec
return weighted_sum_ra, sum_weight_ra, weighted_sum_dec, sum_weight_dec
@nb.njit()
def _first_image_of_detection(lightcurve):
"""Private function used to determine the first image each source was found in"""
nr_sources = lightcurve.shape[0]
nr_images = lightcurve.shape[1]
first_image = np.full(nr_sources, fill_value=-1, dtype="int")
already_found = np.full(nr_sources, False, dtype="bool")
for i in range(nr_images):
mask = np.isfinite(lightcurve[:, i])
mask &= ~already_found
first_image[mask] = i
already_found[mask] = True
return first_image
[docs]
def construct_varmetric(
db_engine: sqlalchemy.engine.base.Engine,
sources: Optional[pd.DataFrame] = None,
images: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
r"""Calculate lightcurve properties which can be used for filtering and isolating potential transients.
The properties are based on the extracted_sources table.
These properties are:
- newsource
Reference to the id of the first extracted source in the lightcurve.
- v_int
The flux coefficient of variation (V_ν), based on the integrated flux values.
- eta_int
The ‘reduced chi-squared’ variability index (η_ν), based on the integrated flux values.
- sigma_rms_min
Integrated flux from the from the extracted source that triggered an new source entry, divided by the minimum value of the estimated-RMS-map within the source-extraction region.
- sigma_rms_max
Integrated flux from the from the extracted source that triggered an new source entry, divided by the maximum value of the estimated-RMS-map within the source-extraction region.
- lightcurve_max
The maximum flux value of the lightcurve based on the integrated flux.
- lightcurve_avg
The average flux value of the lightcurve based on the integrated flux.
- wm_ra
The weighted mean right ascension of the source, computed across all detections
using inverse-squared positional uncertainties (1/ra_err²) as weights. This represents
the best-estimate sky position in right ascension.
- wm_dec
The weighted mean declination of the source, computed across all detections
using inverse-squared positional uncertainties (1/dec_err²) as weights. This represents
the best-estimate sky position in declination.
- avg_ra_err
The average or mean of the right ascension uncertainties (ra_fir_err) across all detections
contributing to the lightcurve. Represents the average measurement uncertainty in RA.
- avg_dec_err
The average or mean of the declination uncertainties (dec_fit_err) across all detections
contributing to the lightcurve. Represents the average measurement uncertainty in Dec.
- nr_datapoints
The number of images that were used to calculate the variablility metrics per source.
If a source was not naturally detected in a specific image the flux value is NaN.
Such values are not used in the calculation of the variability metrics and do not
count towards the total nr_datapoints for that source.
- first_image
The id of the first image in which a source was detected
- first_detection_time
The acquisition time of the first image in which a source was detected
Parameters
----------
db_engine
A sqlalchemy database engine.
sources: pandas.DataFrame (optional)
A DataFrame containing the extracted sources. By providing the sources
there is no need to read the extracted_sources from the database. Since ony the provided sources
will be used to calculate the variability metric, this allows for the calculation of the varibility
metrics on a subset of extracted sources.
images: pandas.DataFrame (optional)
A DataFrame containing the images and their frequency band. By providing these,
there is no need to read them from the database. They do need to match the provided sources,
meaning that each im_id that is present in the sources table should be reflected in the images table.
It is recommended to leave this empty if the provided sources are all from the specified database.
In that case the function defaults to reading all image metadata from the database which should then match
the provided sources.
Returns
-------
dict
A dictionary with the lightcurve properties mentioned above.
"""
if sources is None or images is None:
if db_engine is None:
raise ValueError(
"Either the argument 'db_engine' or both 'sources' and 'images' must be provided."
)
with db_engine.connect() as conn:
if sources is None:
sources = pd.read_sql_query("SELECT * FROM extracted_sources", conn)
if images is None:
images = pd.read_sql_query("SELECT * FROM images", conn).set_index("id")
src_ids = np.unique(sources.src_id)
first_extracted_source_id = np.zeros(len(src_ids), dtype=int)
grouped = sources.groupby("src_id")
im_ids = grouped["im_id"].min()
first_extracted_source_id = grouped.id.min().values
int_flux_first_src = sources.loc[first_extracted_source_id].int_flux
lightcurves_int_all_bands = construct_lightcurves(
db_engine, attribute="int_flux", sources=sources, images=images
)
lightcurves_int_err_all_bands = construct_lightcurves(
db_engine, attribute="int_flux_err", sources=sources, images=images
)
ra_all_bands = construct_lightcurves(
db_engine, attribute="ra", sources=sources, images=images
)
ra_err_all_bands = construct_lightcurves(
db_engine, attribute="ra_fit_err", sources=sources, images=images
)
dec_all_bands = construct_lightcurves(
db_engine, attribute="dec", sources=sources, images=images
)
dec_err_all_bands = construct_lightcurves(
db_engine, attribute="dec_fit_err", sources=sources, images=images
)
# Remove force fits as these are not suitable to variability metrics: https://git.astron.nl/RD/trap/-/issues/32#note_134250
# Note: we do need the force fit sources to complete the lightcurve parent daisy-chain,
# so we only set the value to NaN but keep the source.
lightcurves_is_force_fit = construct_lightcurves(
db_engine, attribute="is_force_fit", sources=sources, images=images
)
nr_frequencybands = len(images["freq_band"].unique())
varmetric_per_band = []
for band_id in range(nr_frequencybands):
lightcurves_int = lightcurves_int_all_bands[band_id]
lightcurves_int_err = lightcurves_int_err_all_bands[band_id]
ra = ra_all_bands[band_id]
ra_err = ra_err_all_bands[band_id]
dec = dec_all_bands[band_id]
dec_err = dec_err_all_bands[band_id]
is_force_fit_mask = lightcurves_is_force_fit[band_id] == 1
lightcurves_int[is_force_fit_mask] = np.nan
lightcurves_int_err[is_force_fit_mask] = np.nan
ra[is_force_fit_mask] = np.nan
ra_err[is_force_fit_mask] = np.nan
dec[is_force_fit_mask] = np.nan
dec_err[is_force_fit_mask] = np.nan
lightcurve_integrated_flux = lightcurves_int.to_numpy()
lightcurve_integrated_flux_error = lightcurves_int_err.to_numpy()
nr_images_per_source = np.isfinite(lightcurve_integrated_flux).sum(axis=1)
multiple_sources_mask = nr_images_per_source > 1
lightcurve_integrated_flux_subset = lightcurve_integrated_flux[
multiple_sources_mask
]
lightcurve_integrated_flux_error = lightcurve_integrated_flux_error[
multiple_sources_mask
]
nr_images_per_source_masked = nr_images_per_source[multiple_sources_mask]
integrated_flux_mean = (
np.nansum(lightcurve_integrated_flux_subset, axis=1)
/ nr_images_per_source_masked
)
integrated_flux_mean_sq = (
np.nansum(lightcurve_integrated_flux_subset**2, axis=1)
/ nr_images_per_source_masked
)
weights = 1.0 / lightcurve_integrated_flux_error**2
weight_mean = np.nansum(weights, axis=1) / nr_images_per_source_masked
weighted_integrated_flux_mean = (
np.nansum(weights * lightcurve_integrated_flux_subset, axis=1)
/ nr_images_per_source_masked
)
weighted_integrated_flux_mean_squared = (
np.nansum(weights * lightcurve_integrated_flux_subset**2, axis=1)
/ nr_images_per_source_masked
)
eta = np.full(len(src_ids), np.nan)
with np.errstate(invalid="ignore", divide="ignore"):
eta[multiple_sources_mask] = (
nr_images_per_source_masked / (nr_images_per_source_masked - 1.0)
) * (
weighted_integrated_flux_mean_squared
- (weighted_integrated_flux_mean**2) / weight_mean
)
v_int = np.full(len(src_ids), np.nan)
v_int[multiple_sources_mask] = (
np.sqrt(
nr_images_per_source_masked
* (integrated_flux_mean_sq - integrated_flux_mean**2)
/ (nr_images_per_source_masked - 1.0)
)
/ integrated_flux_mean
)
# Varmetric caluclations based on: https://github.com/transientskp/tkp/blob/b34582712b82b888a5a7b51b3ee371e682b8c349/tkp/testutil/db_subs.py#L188
varmetric = pd.DataFrame(
{
"newsource": first_extracted_source_id,
"v_int": v_int,
"eta_int": eta,
}
)
# The following are added separately such that the gaps are filled with nans and the lengths match the rest
varmetric["lightcurve_max"] = np.nan
varmetric["lightcurve_avg"] = np.nan
varmetric["lightcurve_max"] = np.nanmax(lightcurve_integrated_flux, axis=1)
varmetric["lightcurve_avg"] = np.nanmean(lightcurve_integrated_flux, axis=1)
# Add weighted ra and dec metrics
src_ids = ra.index.to_numpy()
ra = ra.to_numpy()
ra_err = ra_err.to_numpy()
dec = dec.to_numpy()
dec_err = dec_err.to_numpy()
weighted_sum_ra, sum_weight_ra, weighted_sum_dec, sum_weight_dec = (
_weighted_sum(ra, ra_err, dec, dec_err)
)
with np.errstate(invalid="ignore", divide="ignore"):
# building sphinx fails on divide by zero warnings so explicitly ignore this
# specifically for sum_weight_ra and sum_weight_dec. We accept the nodata
# value that numpy defaults to in these cases.
varmetric["wm_ra"] = weighted_sum_ra / sum_weight_ra
varmetric["wm_dec"] = weighted_sum_dec / sum_weight_dec
varmetric["avg_ra_err"] = np.nanmean(ra_err, axis=1)
varmetric["avg_dec_err"] = np.nanmean(dec_err, axis=1)
varmetric["nr_datapoints"] = nr_images_per_source
first_image_detected_src = _first_image_of_detection(ra)
# It is possible for a particular source to not have any observations in one or more frequency bands.
# There will then be no first detection image for this band (i.e. first_image_detected_src=-1).
# In such a case, set all varmetric values for that row to NaN.
first_image_mask = first_image_detected_src != -1
first_image = np.full(len(varmetric), np.nan)
first_detection_time = np.full(len(varmetric), np.nan, dtype=object)
first_image[first_image_mask] = first_image_detected_src[first_image_mask]
varmetric["first_image"] = first_image
first_detection_time[first_image_mask] = images.taustart_ts.get(
first_image_detected_src[first_image_mask]
).values
varmetric["first_detection_time"] = first_detection_time
varmetric["freq_band"] = band_id
varmetric[~first_image_mask] = np.nan
varmetric["src_id"] = src_ids
varmetric_per_band.append(varmetric)
return pd.concat(varmetric_per_band).reset_index(drop=True)
[docs]
def update_varmetrics_iterative(
varmetric, source_list, extracted_sources, im_id, images
):
"""
Updates the varmetric dataframe using only the new data.
"""
def init_varmetric(source, latest_extracted_source_id):
flux = source["int_flux"]
weight_flux = 1.0 / (source["int_flux_err"] ** 2)
weigt_ra = 1.0 / (source["ra_fit_err"] ** 2)
weigt_dec = 1.0 / (source["dec_fit_err"] ** 2)
varmetric_dict = {
"newsource": latest_extracted_source_id,
"src_id": source["src_id"],
"freq_band": images["freq_band"],
"v_int": np.nan, # Variance requires more than one image
"eta_int": np.nan, # Variance requires more than one image
"lightcurve_max": source["int_flux"],
"lightcurve_avg": source["int_flux"],
"wm_ra": source["ra"],
"wm_dec": source["dec"],
"avg_ra_err": source["ra_fit_err"],
"avg_dec_err": source["dec_fit_err"],
"nr_datapoints": 1,
"first_image": im_id,
"first_detection_time": images["taustart_ts"],
# Running averages and sums
"_avg_f_int": flux,
"_avg_f_int_sq": flux**2,
"_avg_f_int_weight": weight_flux,
"_avg_weighted_f_int": weight_flux * flux,
"_avg_weighted_f_int_sq": weight_flux * flux**2,
"_sum_weight_ra": weigt_ra,
"_sum_weighted_ra": weigt_ra * source["ra"],
"_sum_weight_dec": weigt_dec,
"_sum_weighted_dec": weigt_dec * source["dec"],
}
return varmetric_dict
if im_id == 0:
return pd.DataFrame(
init_varmetric(
extracted_sources, source_list["latest_extracted_source_id"].values
)
)
# Iterate through each new observation
for i, row in extracted_sources.iterrows():
# Locate the specific row in the existing varmetric table
source_mask = (varmetric["src_id"] == row["src_id"]) & (
varmetric["freq_band"] == images["freq_band"]
)
# If this is a brand new source/band combo, initialize a row
if not source_mask.any():
latest_extracted_source_id = source_list.loc[
row["src_id"], "latest_extracted_source_id"
]
varmetric_new_row = init_varmetric(row, latest_extracted_source_id)
varmetric.loc[len(source_mask)] = varmetric_new_row
continue
nr_datapoints = varmetric.loc[source_mask].iloc[0]["nr_datapoints"]
nr_datapoints_new = nr_datapoints + 1
# New values from the source row
flux = row["int_flux"]
weight = 1.0 / (
row["int_flux_err"] ** 2
) # inverse-weight for this observation based on the error in the flux measurement. Lower error means larger weight.
source = varmetric.loc[source_mask].iloc[0]
# v_int
avg_f = (
nr_datapoints * source["lightcurve_avg"] + flux
) / nr_datapoints_new # running average for integrated flux
avg_f_int_sq = ( # running average for squared integrated flux
source["_avg_f_int_sq"]
+ (flux**2 - source["_avg_f_int_sq"]) / nr_datapoints_new
)
variance = avg_f_int_sq - (avg_f**2)
v_int = ( # updated flux coefficient of variation
np.sqrt((nr_datapoints_new * variance) / (nr_datapoints_new - 1.0))
) / avg_f
# eta_int
avg_f_int_weight = ( # running average for weight based on flux error
nr_datapoints * source["_avg_f_int_weight"] + weight
) / nr_datapoints_new
avg_weighted_f_int = ( # running average for weighted flux
nr_datapoints * source["_avg_weighted_f_int"] + (weight * flux)
) / (nr_datapoints_new)
avg_weighted_f_int_sq = ( # running average for weighted squared flux
nr_datapoints * source["_avg_weighted_f_int_sq"] + (weight * flux**2)
) / nr_datapoints_new
eta_int = ( # updated reduced chi-squared variability index
nr_datapoints_new
/ (nr_datapoints_new - 1.0)
* (avg_f_int_weight * avg_weighted_f_int_sq - avg_weighted_f_int**2)
) / avg_f_int_weight
# weighted mean ra, dec
# inverse-weight for this observation's position based on the error in the position. Lower error means larger weight.
w_ra = 1.0 / (row["ra_fit_err"] ** 2)
w_dec = 1.0 / (row["dec_fit_err"] ** 2)
# Running sums for weigted positions, used to get the weighted mean.
sum_weight_ra = source["_sum_weight_ra"] + w_ra
sum_weighted_ra = source["_sum_weighted_ra"] + w_ra * row["ra"]
sum_weight_dec = source["_sum_weight_dec"] + w_dec
sum_weighted_dec = source["_sum_weighted_dec"] + w_dec * row["dec"]
# Updated weighted mean position
wm_ra = sum_weighted_ra / sum_weight_ra
wm_dec = sum_weighted_dec / sum_weight_dec
# mean err of ra, dec
avg_ra_err = (
nr_datapoints * source["avg_ra_err"] + row["ra_fit_err"]
) / nr_datapoints_new
avg_dec_err = (
nr_datapoints * source["avg_dec_err"] + row["dec_fit_err"]
) / nr_datapoints_new
varmetric.loc[
source_mask,
[
"wm_ra",
"wm_dec",
"avg_ra_err",
"avg_dec_err",
"lightcurve_avg",
"_avg_f_int_sq",
"_avg_f_int_weight",
"_avg_weighted_f_int",
"_avg_weighted_f_int_sq",
"_sum_weight_ra",
"_sum_weighted_ra",
"_sum_weight_dec",
"_sum_weighted_dec",
"v_int",
"eta_int",
"nr_datapoints",
],
] = [
wm_ra,
wm_dec,
avg_ra_err,
avg_dec_err,
avg_f,
avg_f_int_sq,
avg_f_int_weight,
avg_weighted_f_int,
avg_weighted_f_int_sq,
sum_weight_ra,
sum_weighted_ra,
sum_weight_dec,
sum_weighted_dec,
v_int,
eta_int,
nr_datapoints_new,
]
if flux > source["lightcurve_max"]:
varmetric.loc[source_mask, "lightcurve_max"] = flux
return varmetric