Source code for cetra.cetra

#!/usr/bin/env python3

# Copyright (c) 2024 Leigh C. Smith - lsmith@ast.cam.ac.uk
# Exceptions are noted within their relevant locations within this file
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import os
from pycuda.compiler import SourceModule
from pycuda import gpuarray
import pycuda.driver as drv
import numpy as np
import warnings
from scipy.interpolate import interp1d
from copy import deepcopy
from time import time
from dataclasses import dataclass
from tqdm.auto import tqdm
from .gjt_durations import get_transit_duration_limits


class Constants(object):
    seconds_per_day = 86400  # s
    G = 6.67430e-11  # m**3 * kg**-1 * s**-2
    solar_radius = 6.957e+8  # m, IAU
    solar_mass = 1.9891e+30  # kg, IAU
    jupiter_radius = 7.1492e+7  # m, IAU, equatorial


_cu_src = None
_resample_kernel_1 = None
_resample_kernel_2 = None
_linear_search_kernel = None
_periodic_search_k1 = None
_periodic_search_k2 = None
_detrender_quadfit = None
_detrender_qtrfit = None
_detrender_calc_IC = None
_detrender_fit_trend = None


def _ensure_kernels() -> None:
    global _cu_src
    global _resample_kernel_1, _resample_kernel_2
    global _linear_search_kernel, _periodic_search_k1, _periodic_search_k2
    global _detrender_quadfit, _detrender_qtrfit, _detrender_calc_IC, _detrender_fit_trend
    if _cu_src is not None:
        return
    import pycuda.autoinit
    cu_src_file_path = os.path.join(os.path.dirname(__file__), "cetra.cu")
    with open(cu_src_file_path, "r") as cu_src_file:
        _cu_src = SourceModule(cu_src_file.read())
    _resample_kernel_1 = _cu_src.get_function("resample_k1")
    _resample_kernel_2 = _cu_src.get_function("resample_k2")
    _linear_search_kernel = _cu_src.get_function("linear_search")
    _periodic_search_k1 = _cu_src.get_function("periodic_search_k1")
    _periodic_search_k2 = _cu_src.get_function("periodic_search_k2")
    _detrender_quadfit = _cu_src.get_function("detrender_quadfit")
    _detrender_qtrfit = _cu_src.get_function("detrender_qtrfit")
    _detrender_calc_IC = _cu_src.get_function("detrender_calc_IC")
    _detrender_fit_trend = _cu_src.get_function("detrender_fit_trend")


[docs] class LightCurve(object): """ A stellar light curve, resampled to a regular cadence via GPU. On construction the input data are validated, sorted chronologically, and resampled onto a uniform time grid using a GPU kernel (``resample_k1`` / ``resample_k2``). Gaps in coverage are filled with null points (``flux = NaN``, ``flux_error = inf``) so that any point in time can be located by simple index arithmetic. Attributes ---------- time : ndarray Resampled time array in days. flux : ndarray Resampled flux array (relative to baseline). flux_error : ndarray Resampled flux error array. cadence : float Cadence of the resampled light curve in days. size : int Number of points in the resampled light curve. reference_time : float Value of ``time[0]``; used to form ``offset_time``. offset_time : ndarray ``time - reference_time`` (starts at zero). flux_weight : ndarray ``1 / flux_error ** 2`` offset_flux : ndarray ``1 - flux`` flat_loglike : float Log-likelihood of the constant (no-transit) flux model. input_cadence : float Detected cadence of the raw input data in days. input_num_points : int Number of points in the raw input data. transit_mask : ndarray or None Boolean array where ``True`` marks in-transit points. ``None`` until :meth:`mask_transit` is called. offset_trend : ndarray or None Fitted trend array (set by the deprecated ``get_trend``). error_multiplier : float or None Error inflation factor (set by the deprecated ``get_trend``). """ def __init__(self, times: np.ndarray, fluxes: np.ndarray, flux_errors: np.ndarray, resample_cadence: float | None = None, verbose: bool = True) -> None: """ Basic light curve validation and class instance initialisation. Parameters ---------- times : array-like Sequence of light curve observation time points in days. fluxes : array-like Sequence of light curve relative (to baseline) flux points. flux_errors : array-like Sequence of light curve relative (to baseline) flux error points. resample_cadence : float, optional The cadence (in seconds) to use for resampling the light curve. By default, the module will try to detect the underlying cadence. verbose : bool, optional If True (the default), reports various messages. """ # make sure the input arrays are numpy arrays times, fluxes, flux_errors = map(np.asarray, [times, fluxes, flux_errors]) # get the number of elements self.input_num_points = len(times) # all arrays must be the same length if len(fluxes) != self.input_num_points: raise RuntimeError("len(fluxes) != len(times)") if len(flux_errors) != self.input_num_points: raise RuntimeError("len(flux_errors) != len(times)") # make sure the input time and flux arrays contain no NaNs if np.any(np.isnan(times)): raise ValueError("one or more time values is NaN") if np.any(np.isnan(flux_errors)): raise ValueError("one or more flux error values is NaN") # make sure the input time array contains no infs if np.any(np.isinf(times)): raise ValueError("one or more time values is infinite") # NaN fluxes are allowed - but they must have infinite error if np.any(np.logical_and(np.isnan(fluxes), ~np.isinf(flux_errors))): raise ValueError( "One or more flux values with finite error is NaN. NaN fluxes " "are allowed, but they must have infinite error." ) # make sure the fluxes and errors are valid # zero flux is valid, zero flux error is not if np.any(fluxes < 0.0): raise ValueError("one or more flux values is negative") if np.any(flux_errors <= 0.0): raise ValueError("one or more flux error values is zero or negative") # make sure that the duration of the light curve is positive self.input_epoch_baseline = np.ptp(times) if self.input_epoch_baseline <= 0: raise RuntimeError("The light curve duration is invalid") # warn if the flux doesn't appear to have been normalised mean_flux = np.mean(fluxes) if mean_flux < 0.99 or mean_flux > 1.01: warnings.warn( "The mean flux is far from 1.0, has it been normalised by the baseline flux?", UserWarning ) # chronological sort, store as instance variables srt_idx = np.argsort(times) self.input_time = times[srt_idx] self.input_flux = fluxes[srt_idx] self.input_flux_error = flux_errors[srt_idx] # attempt to determine the cadence # delta t in milliseconds ms_per_day = Constants.seconds_per_day * 1000 _dt = np.round(np.diff(self.input_time * ms_per_day)).astype(np.int64) # remove zero delta times, which can happen with multi-camera light curves _dt = _dt[_dt>0] # count the unique delta times uq, ct = np.unique(_dt, return_counts=True) if ct.max() > 1: # modal value if possible modal_dt = uq[np.argmax(ct)] self.input_cadence = modal_dt / ms_per_day else: # median otherwise self.input_cadence = np.nanmedian(_dt) / ms_per_day self.input_cadence_range = _dt.min() / ms_per_day, _dt.max() / ms_per_day # the time point of the first and last observations self.input_time_start = np.min(self.input_time) self.input_time_end = np.max(self.input_time) # these will be populated if we run detrending on the light curve self.offset_trend = None self.error_multiplier = None # this will be populated if we mask transit(s) in the light curve # will be an array where True is in-transit self.transit_mask = None # resample the new light curve to regularise the cadence # a regularised cadence is necessary so that we can cheaply determine # the array location of a given point in time # resampled points containing no observed data have null fluxes and # infinite flux errors if resample_cadence is None: self.cadence = self.input_cadence # impose a minimum 1s resample cadence - lets be sensible self.cadence = max(self.cadence, 1.0 / Constants.seconds_per_day) else: self.cadence = resample_cadence / Constants.seconds_per_day # perform the resampling self.time, self.flux, self.flux_error = self.resample(self.cadence) # the number of points in the resampled light curve self.size = len(self.time) # the offset time is an array of times starting at zero, # it is related to the input time sequence as: # offset_time = input_time - reference_time self.reference_time = self.time[0] self.offset_time = self.time - self.reference_time # flux errors to weights self.flux_weight = 1.0 / self.flux_error ** 2 # to save a lot of 1-f operations later, lets do it now self.offset_flux = 1.0 - self.flux self.offset_flux_error = self.flux_error # log-likelihood of constant flux model self.flat_loglike = np.nansum( - 0.5 * self.offset_flux ** 2 / self.offset_flux_error**2 - 0.5 * np.log(2 * np.pi * self.offset_flux_error**2) ) if verbose: # report the input light curve information print(str(self)) def __str__(self): return f"""input light curve has {self.input_num_points} elements, \ cadence: {self.input_cadence * Constants.seconds_per_day:.0f}s \ (range: {self.input_cadence_range[0] * Constants.seconds_per_day:.0f}s -> \ {self.input_cadence_range[1] * Constants.seconds_per_day:.0f}s) constant flux model log-likelihood: {self.flat_loglike:.3e} resampled light curve has {self.size} elements, \ cadence: {self.cadence * Constants.seconds_per_day:.0f}s"""
[docs] def resample(self, new_cadence: float, cuda_blocksize: int = 1024 ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Resample the light curve at a new cadence. (Note: Any existing transit masks will be lost, it's recommended to remask again using the Transit objects.) Parameters ---------- new_cadence : float The desired output observation cadence (i.e. the time between samples). cuda_blocksize : int, optional The number of threads per block. Should be a multiple of 32 and less than or equal to 1024. Default 1024. Returns ------- A LightCurve instance with the new sampling cadence """ _ensure_kernels() # input check if new_cadence <= 0.0 or not np.isfinite(new_cadence): raise ValueError("New cadence must be finite and greater than zero") # transit mask warning if self.transit_mask is not None: warnings.warn("Transit masks are removed on resampling") # generate the new time sequence new_times = np.arange( start=self.input_time_start, stop=self.input_time_end + new_cadence, step=new_cadence ) # send some arrays to the gpu # send the time as an offset from the first _time = to_gpu(self.input_time - self.input_time_start, np.float64) # send the flux and error arrays as they are _flux = to_gpu(self.input_flux, np.float64) _ferr = to_gpu(self.input_flux_error, np.float64) # initialise output arrays on the gpu _rflux_out = gpuarray.zeros(new_times.size, dtype=np.float64) _err_out = gpuarray.zeros(new_times.size, dtype=np.float64) _sum_fw = gpuarray.zeros(new_times.size, dtype=np.float64) _sum_w = gpuarray.zeros(new_times.size, dtype=np.float64) # type specification _cadence = np.float64(new_cadence) _n_elem = np.int32(self.input_num_points) _n_elem_out = np.int32(new_times.size) # set the cuda block and grid sizes _blocksize = (cuda_blocksize, 1, 1) _gridsize1 = (int(np.ceil(_n_elem / cuda_blocksize)), 1, 1) _gridsize2 = (int(np.ceil(_n_elem_out / cuda_blocksize)), 1, 1) # run the kernel to sum the fluxes and their weights _resample_kernel_1( _time, _flux, _ferr, _cadence, _n_elem, _sum_fw, _sum_w, block=_blocksize, grid=_gridsize1 ) # now run the division kernel _resample_kernel_2( _sum_fw, _sum_w, _rflux_out, _err_out, _n_elem_out, block=_blocksize, grid=_gridsize2 ) return new_times, _rflux_out.get(), _err_out.get()
[docs] def pad(self, num_points_prepend: int, num_points_append: int, verbose: bool = True) -> None: """ Pad the light curve with null data. (Note: Any existing transit masks will be lost, it's recommended to remask again using the Transit objects.) Parameters ---------- num_points_prepend : int The number of points to prepend to the light curve. num_points_append : int The number of points to append to the light curve. verbose : bool, optional If True (the default), reports various messages. """ # transit mask warning if self.transit_mask is not None: warnings.warn("Transit masks are removed on padding") # prepend _t0 = self.time[0] - np.arange(num_points_prepend, 0, -1) * self.cadence _f0 = np.full(num_points_prepend, np.nan, dtype=self.flux.dtype) _ef0 = np.full(num_points_prepend, np.inf, dtype=self.flux_error.dtype) self.time = np.insert(self.time, 0, _t0) self.flux = np.insert(self.flux, 0, _f0) self.flux_error = np.insert(self.flux_error, 0, _ef0) if self.offset_trend is not None: # prepend NaNs self.offset_trend = np.insert(self.offset_trend, 0, _f0) # append _t1 = self.time[-1] + np.arange(1, num_points_append + 1) * self.cadence _f1 = np.full(num_points_append, np.nan, dtype=self.flux.dtype) _ef1 = np.full(num_points_append, np.inf, dtype=self.flux_error.dtype) self.time = np.append(self.time, _t1) self.flux = np.append(self.flux, _f1) self.flux_error = np.append(self.flux_error, _ef1) if self.offset_trend is not None: # append NaNs self.offset_trend = np.append(self.offset_trend, _f1) # update derivative instance variables self.size = len(self.time) self.reference_time = self.time[0] self.offset_time = self.time - self.reference_time self.flux_weight = 1.0 / self.flux_error ** 2 self.offset_flux = 1.0 - self.flux self.offset_flux_error = self.flux_error # the log-likelihood is the same if verbose: print(f"padded {num_points_prepend} null points to the start " f"and {num_points_append} null points to the end of the " f"light curve")
[docs] def mask_transit(self, transit: 'Transit', duration_multiplier: float = 1.0, return_mask: bool = False) -> np.ndarray | None: """ Mask a transit Parameters ---------- transit : Transit The transit to mask (can be single or have period) duration_multiplier : float, optional A multiplier on the transit duration, applied to ensure that all in-transit flux is removed. Default 1.0, i.e. no border. return_mask : bool, optional If `True`, returns the mask for this transit only. Default `False`. Returns ------- 1D ndarray with in-transit points as `True` and out of transit as `False`, if `return_mask==True`. """ # compute the phase phase = self.time - transit.t0 # if periodic if transit.period is not None: phase %= transit.period phase[phase > (0.5 * transit.period)] -= transit.period # duration to mask mask_duration = transit.duration * duration_multiplier # generate the mask in_transit = np.abs(phase) < (0.5 * mask_duration) if self.transit_mask is None: # if this is a new mask, set the transit mask self.transit_mask = in_transit else: # if this is not a new mask, update the existing self.transit_mask += in_transit # return the mask for this transit only, if requested if return_mask: return in_transit
[docs] def copy(self) -> 'LightCurve': """ Return a copy of this LightCurve instance Returns ------- A copy of this LightCurve instance """ return deepcopy(self)
def __copy__(self) -> 'LightCurve': return self.copy()
[docs] class TransitModel(object): """ A normalised transit shape, downsampled for use by the GPU kernels. The model is stored in GPU shared memory during the linear search, so its size is constrained by the available shared memory per block. The default of 1024 samples gives a maximum nearest-neighbour interpolation error of ~1 %. Attributes ---------- model : ndarray The downsampled transit model (flux values, max 1.0 at baseline). offset_model : ndarray ``1 - model`` (pre-computed to save repeated GPU operations). size : int Number of samples in the downsampled model. input_model : ndarray The raw model array supplied by the user (or loaded from file). """ def __init__(self, transit_model: np.ndarray | str, downsamples: int = 1024, verbose: bool = True) -> None: """ Initialise the TransitModel class Parameters ---------- transit_model : array-like or string transit_model can be either a 1D array containing floating point offsets relative to baseline flux, or a string corresponding to one of three internal models. If a 1D array is supplied, the maximum offset from baseline flux should be 1. Ideally the array length will be greater than the value of `downsamples` (which is 1024 elements by default). CETRA does not force transit depth to be positive. (So e.g. a flare model can be provided and the depths returned will be negative.) Alternatively, one of the strings 'b32', 'b93' or 'b99' can be supplied. In this case, a model with impact parameter 0.32, 0.93 or 0.99, respectively, will be used. All other parameters are: ``{'rp': 0.03, 'u': (0.4804, 0.1867), 'period': 10.0, 'semimajor_axis': 20.0}``. downsamples : int, optional Downsample the transit model to this number of samples for use by the GPU kernels. Ideally a power of 2. It's best to be conservative with this value, since the volume of GPU shared memory (in which the transit model is stored) is fairly limited. The default value is 1024 elements. The larger the value, the smaller the max error in the model. A value of 1024 gives a maximum error of ~1%, 2048 is ~0.5%, 512 is ~2%. verbose : bool, optional If True (the default), reports various messages. """ # record input self.input_model = transit_model # now deal with any transit model inputs if isinstance(transit_model, list) or isinstance(transit_model, np.ndarray): # use the user-provided transit model if np.any(np.isnan(transit_model)): raise ValueError("transit_model cannot contain NaNs") elif np.any(np.isinf(transit_model)): raise ValueError("transit_model cannot contain Infs") else: self.input_model = np.asarray(transit_model) if verbose: print(f"user-provided transit model with {len(transit_model)} elements") elif transit_model in ['b32', 'b93', 'b99']: # the user has specified one of the internal transit models _impact_params = {'b32': 0.32, 'b93': 0.93, 'b99': 0.99} tmod_file_path = os.path.join(os.path.dirname(__file__), f"transit_model_{transit_model}.npz") tmod = np.load(tmod_file_path) self.input_model = tmod["model_array"] if verbose: print(f"using internal transit model with impact parameter: " f"{_impact_params[transit_model]}") else: # the user input is invalid raise RuntimeError( "transit_model not recognised. " "Is it an array, or one of 'b32', 'b93' or 'b99'?" ) # generate the internal transit model # self.interpolator is a 1d interpolator for the transit # model with x values between 0 and 1 # self.transit_model is a 1d array of the transit model with a given # number of evenly spaced samples self.interpolator, self.model = self.interpolate(downsamples) # store the transit model size as an instance variable self.size = int(downsamples) _ensure_kernels() # to save a lot of 1-f transit model operations later, lets do it now self.offset_model = 1.0 - self.model # send the offset transit model to the gpu self.offset_model_gpu_f4 = to_gpu(self.offset_model, np.float32) self.offset_model_gpu_f8 = to_gpu(self.offset_model, np.float64) if verbose: # report the transit model size print(f"transit model size: {self.size} elements") # report the nearest neighbour error _nn_error = self.nn_error() print(f"maximum nearest-neighbour error: {100 * _nn_error[0]:.2e}%") print(f" mean nearest-neighbour error: {100 * _nn_error[1]:.2e}%")
[docs] def copy(self) -> 'TransitModel': """ Return a copy of this TransitModel instance Returns ------- A copy of this TransitModel instance """ return TransitModel(self.input_model, downsamples=self.size, verbose=False)
def __copy__(self): return self.copy()
[docs] def interpolate(self, samples: int, kind: str = 'linear' ) -> tuple[interp1d, np.ndarray]: """ Resample an input model using interpolation Parameters ---------- samples : int The required number of samples kind : str, optional A scipy.interpolate.interp1d interpolation method. 'linear' is the default. Returns ------- The interpolator object and the model with the required number of samples """ i1d = interp1d( np.linspace(0, 1, len(self.input_model)), self.input_model, kind=kind, copy=True, bounds_error=False, fill_value=1.0, assume_sorted=True ) return i1d, i1d(np.linspace(0, 1, samples))
[docs] def nn_error(self) -> tuple[float, float]: """ Return the maximum and mean error in the model due to the use of nearest-neighbour interpolation. """ s_inter = (self.size - 1) * 2 + 1 _, m_all = self.interpolate(s_inter) m_orig = m_all[0::2] m_inter = m_all[1::2] frac_diff = m_orig[1:] - m_inter return np.max(np.abs(frac_diff)), np.mean(np.abs(frac_diff))
[docs] def get_model_lc(self, times: np.ndarray, transit: 'Transit' ) -> tuple[np.ndarray, np.ndarray]: """ Get orbital phase and model flux at the given time points for the given Transit object. Parameters ---------- times : array-like The time points for which the model flux is sought. transit : Transit The Transit object for the given transit. Returns ------- Two 1D ndarrays of orbital phases and model fluxes for the given time points """ # compute the phase phase = times - transit.t0 # if periodic if transit.period is not None: phase %= transit.period phase[phase > (0.5 * transit.period)] -= transit.period # compute the model flux model = self.interpolator(phase / transit.duration + 0.5) # set the depth model = 1.0 - transit.depth * (1.0 - model) return phase, model
[docs] @dataclass class Transit(object): """ A single (or periodic) transit. Attributes ---------- t0 : float Mid-transit time in days (BJD or whichever epoch the input used). duration : float Transit duration in days. depth : float Transit depth as a fraction of the baseline flux. depth_error : float Uncertainty on the transit depth. period : float or None Orbital period in days. ``None`` for single-transit candidates. Notes ----- The signal-to-noise ratio is ``depth / depth_error`` and is shown by ``repr``. """ t0: float duration: float depth: float depth_error: float period: float = None def __repr__(self): if self.period is not None: repr_period = f"{self.period:.6f}" else: repr_period = "None" return (f"Transit(\n" f" t0 = {self.t0:.3f}\n" f" duration = {self.duration:.3f}\n" f" depth = {self.depth:.3e}\n" f" depth_error = {self.depth_error:.3e}\n" f" period = {repr_period}\n" f" SNR = {self.depth / self.depth_error:.2f}\n" f")")
[docs] def copy(self) -> 'Transit': """ Return a copy of this Transit instance Returns ------- A copy of this Transit instance """ return deepcopy(self)
def __copy__(self): return self.copy()
[docs] @dataclass class LinearResult(object): """ Output of :meth:`TransitDetector.linear_search`. Holds the 2-D likelihood, depth, and depth-variance arrays over the ``(duration, t0)`` grid, plus references to the light curve and transit model that produced them. Attributes ---------- light_curve : LightCurve The light curve used in the search. transit_model : TransitModel The transit model used in the search. duration_array : ndarray 1-D array of tested transit durations in days, shape ``(n_durations,)``. t0_array : ndarray 1-D array of tested mid-transit times in days, shape ``(n_t0,)``. like_ratio_array : ndarray 2-D likelihood ratio (vs. constant flux) array, shape ``(n_durations, n_t0)``. depth_array : ndarray 2-D best-fit depth array, shape ``(n_durations, n_t0)``. depth_variance_array : ndarray 2-D depth variance array, shape ``(n_durations, n_t0)``. """ light_curve: LightCurve transit_model: TransitModel duration_array: np.ndarray t0_array: np.ndarray like_ratio_array: np.ndarray depth_array: np.ndarray depth_variance_array: np.ndarray
[docs] def copy(self) -> 'LinearResult': """ Return a copy of this LinearResult instance Returns ------- A copy of this LinearResult instance """ return LinearResult( light_curve=self.light_curve.copy(), transit_model=self.transit_model.copy(), duration_array=self.duration_array.copy(), t0_array=self.t0_array.copy(), like_ratio_array=self.like_ratio_array.copy(), depth_array=self.depth_array.copy(), depth_variance_array=self.depth_variance_array.copy(), )
def __copy__(self): return self.copy()
[docs] def get_max_likelihood_parameters(self): """ Return the parameters of the maximum likelihood TCE Returns ------- The maximum likelihood TCE as a Transit object. """ warnings.warn( "`LinearResult.get_max_likelihood_parameters()` will be " "deprecated in a future version. Use " "`TransitDetector.get_max_likelihood_single_transit()` instead.", DeprecationWarning, ) d, t = np.unravel_index( np.nanargmax(self.like_ratio_array), self.like_ratio_array.shape ) return self.get_params(d, t)
[docs] def get_max_snr_parameters(self, absolute_depth=False): """ Return the parameters of the maximum SNR TCE Parameters ---------- absolute_depth : bool, optional If `True`, computes SNR as abs(S/N), otherwise SNR is S/N. `False` by default. Returns ------- The maximum SNR TCE as a Transit object. """ warnings.warn( "`LinearResult.get_max_snr_parameters()` will be " "deprecated in a future version. Use " "`TransitDetector.get_max_snr_single_transit()` instead.", DeprecationWarning, ) snr_array = self.depth_array / np.sqrt(self.depth_variance_array) if absolute_depth: snr_array = np.abs(snr_array) d, t = np.unravel_index(np.nanargmax(snr_array), snr_array.shape) return self.get_params(d, t)
[docs] def get_params(self, duration_index: int, t0_index: int) -> Transit: """ Find the parameters of a TCE given duration and t0 indices Parameters ---------- duration_index : int Duration index t0_index : int t0 index Returns ------- The corresponding TCE as a Transit object """ # find or compute the requested parameters t0 = self.t0_array[t0_index] duration = self.duration_array[duration_index] depth = self.depth_array[duration_index, t0_index] depth_error = np.sqrt(self.depth_variance_array[duration_index, t0_index]) # generate the grid search Transit object return Transit( t0=t0, duration=duration, depth=depth, depth_error=depth_error )
[docs] @dataclass class PeriodicResult(object): """ Output of :meth:`TransitDetector.period_search`. Holds the 1-D periodogram arrays and a reference to the :class:`LinearResult` from which it was derived. Attributes ---------- linear_result : LinearResult The linear search result used to produce this periodogram. period_array : ndarray Tested orbital periods in days, shape ``(n_periods,)``. like_ratio_array : ndarray Joint likelihood ratio for each period, shape ``(n_periods,)``. depth_array : ndarray Best-fit depth at each period, shape ``(n_periods,)``. depth_variance_array : ndarray Depth variance at each period, shape ``(n_periods,)``. duration_index_array : ndarray Index into ``duration_array`` of the best-fit duration at each period, shape ``(n_periods,)``. t0_index_array : ndarray Index into ``t0_array`` of the best-fit t0 at each period, shape ``(n_periods,)``. light_curve : LightCurve Convenience reference to ``linear_result.light_curve``. transit_model : TransitModel Convenience reference to ``linear_result.transit_model``. duration_array : ndarray Convenience reference to ``linear_result.duration_array``. t0_array : ndarray Convenience reference to ``linear_result.t0_array``. """ linear_result: LinearResult period_array: np.ndarray like_ratio_array: np.ndarray depth_array: np.ndarray depth_variance_array: np.ndarray duration_index_array: np.ndarray t0_index_array: np.ndarray
[docs] def copy(self) -> 'PeriodicResult': """ Return a copy of this PeriodicResult instance Returns ------- A copy of this PeriodicResult instance """ return PeriodicResult( linear_result=self.linear_result.copy(), period_array=self.period_array.copy(), like_ratio_array=self.like_ratio_array.copy(), depth_array=self.depth_array.copy(), depth_variance_array=self.depth_variance_array.copy(), duration_index_array=self.duration_index_array.copy(), t0_index_array=self.t0_index_array.copy(), )
def __copy__(self): return self.copy() def __post_init__(self): """ post-initialisation setup steps """ # reference some arrays from the linear result directly from this # class self.light_curve = self.linear_result.light_curve self.transit_model = self.linear_result.transit_model self.duration_array = self.linear_result.duration_array self.t0_array = self.linear_result.t0_array
[docs] def get_max_likelihood_parameters(self): """ Return the parameters of the maximum likelihood TCE Returns ------- The maximum likelihood TCE as a Transit object. """ warnings.warn( "`PeriodicResult.get_max_likelihood_parameters()` will be " "deprecated in a future version. Use " "`TransitDetector.get_max_likelihood_periodic_transit()` instead.", DeprecationWarning, ) idx = np.nanargmax(self.like_ratio_array) return self.get_params(idx)
[docs] def get_max_snr_parameters(self, absolute_depth=False): """ Return the parameters of the maximum SNR TCE Parameters ---------- absolute_depth : bool, optional If `True`, computes SNR as abs(S/N), otherwise SNR is S/N. `False` by default. Returns ------- The maximum SNR TCE as a Transit object. """ warnings.warn( "`PeriodicResult.get_max_snr_parameters()` will be " "deprecated in a future version. Use " "`TransitDetector.get_max_snr_periodic_transit()` instead.", DeprecationWarning, ) snr_array = self.depth_array / np.sqrt(self.depth_variance_array) if absolute_depth: snr_array = np.abs(snr_array) idx = np.nanargmax(snr_array) return self.get_params(idx)
[docs] def get_params(self, period_index: int) -> Transit: """ Find the parameters of a TCE given a period index Parameters ---------- period_index : int The period index Returns ------- The corresponding TCE as a Transit object """ t0_index = self.t0_index_array[period_index] duration_index = self.duration_index_array[period_index] # find or compute the requested parameters period = self.period_array[period_index] t0 = self.t0_array[t0_index] duration = self.duration_array[duration_index] depth = self.depth_array[period_index] depth_error = np.sqrt(self.depth_variance_array[period_index]) # we might want t0 to be the first complete transit, but it could # come back before the start of the data. quick fix... while t0 < self.light_curve.input_time_start: t0 += period # generate the grid search Transit object return Transit( t0=t0, duration=duration, depth=depth, depth_error=depth_error, period=period )
[docs] class TransitDetector(object): """ Main orchestrator for the CETRA two-stage detection pipeline. Wraps a :class:`LightCurve` and :class:`TransitModel`, builds the duration and t0 grids, then exposes :meth:`linear_search` and :meth:`period_search` to run the GPU kernels and extract transit parameters. Attributes ---------- lc : LightCurve A padded copy of the input light curve (padded by half the maximum duration at each end to handle edge transits). durations : ndarray Transit duration grid in days. transit_model : TransitModel The transit model in use. t0_array : ndarray Grid of mid-transit times searched during the linear search. t0_stride_length : float Spacing between t0 grid points in days. periods : ndarray or None Period grid used in the most recent :meth:`period_search`, or ``None`` if it has not been run. min_durations : ndarray or None Per-period minimum physically-motivated transit duration, populated after :meth:`period_search`. max_durations : ndarray or None Per-period maximum physically-motivated transit duration, populated after :meth:`period_search`. linear_result : LinearResult or None Result of the most recent :meth:`linear_search`, or ``None``. periodic_result : PeriodicResult or None Result of the most recent :meth:`period_search`, or ``None``. """ def __init__( self, light_curve: LightCurve, transit_model: TransitModel | None = None, durations: np.ndarray | None = None, min_duration: float = 0.02, max_duration: float = 1.0, duration_log_step: float = 1.1, t0_stride_fraction: float = 0.01, verbose: bool = True ) -> None: """ Initialise the transit detector. Parameters ---------- light_curve : LightCurve The light curve. transit_model : TransitModel, optional This TransitModel will be used, if provided, instead of the default. The default model is for a transit with the following parameters: ``{'rp': 0.03, 'b': 0.32, 'u': (0.4804, 0.1867), 'period': 10.0, 'semimajor_axis': 20.0}``. durations : array-like, optional User-specified grid of durations in days. If not provided, the module computes a grid using the minimum and maximum durations and the log step. min_duration : float, optional Minimum transit duration to check in days, default 0.02. Unnecessary if an array of durations is provided. max_duration : float, optional Maximum transit duration to check in days, default 1.0. Unnecessary if an array of durations is provided. duration_log_step : float, optional The log-spacing of the durations to be used if the duration grid is to be internally determined. Default 1.1. Unnecessary if an array of durations is provided. t0_stride_fraction : float, optional The fraction of the minimum duration that determines the length of each t0 stride. The default is 1% of the minimum duration. verbose : bool, optional If True (the default), reports various messages. """ # store a copy of the input LightCurve as an instance variable if isinstance(light_curve, LightCurve): self.lc = light_curve.copy() else: raise RuntimeError("`light_curve` should be a LightCurve instance") # validate/generate the duration grid self.durations = duration_grid( durations=durations, min_duration=min_duration, max_duration=max_duration, log_step=duration_log_step, verbose=verbose ) _ensure_kernels() # how many durations are there? self.duration_count = len(self.durations) # send the durations to the gpu self.durations_gpu_f4 = to_gpu(self.durations, np.float32) # Pad the light curve with null data to make simpler the algorithm # that searches for transits that begin before or end after the data. # This requires a regular observing cadence, another benefit of the # light curve resampling operation. # pad by half of the maximum duration num_pad = int(np.ceil(0.5 * np.max(self.durations) / self.lc.cadence)) self.lc.pad(num_pad, num_pad, verbose=verbose) # deal with the transit model if transit_model is None: self.transit_model = TransitModel('b32', verbose=verbose) elif isinstance(transit_model, TransitModel): self.transit_model = transit_model else: raise RuntimeError("transit_model not recognised") # determine the t0 stride length self.t0_stride_length = np.min(self.durations) * t0_stride_fraction if verbose: print(f"t0 stride length: {self.t0_stride_length * Constants.seconds_per_day:.3f} seconds") # generate the grid of t0s # we go from the start to the end of the PADDED light curve self.t0_array = np.arange(self.lc.time[0], self.lc.time[-1], self.t0_stride_length) self.num_t0_strides = len(self.t0_array) if verbose: print(f"{self.num_t0_strides:d} t0 strides") # initialise instance variables that get populated later self.linear_n_warps = None self.periods = None self.min_durations = None self.max_durations = None self.period_count = None self.like_ratio_2d_gpu = None self.depth_2d_gpu = None self.var_depth_2d_gpu = None self.linear_result = None self.periodic_result = None
[docs] def get_trend( self, detection_kernel_width, detrending_kernel_width, IC_type=0, dIC_threshold=10.0, min_depth_ppm=10.0, min_obs_count=20, full_output=False, n_warps=4096, verbose=True ): """ Obtain the trend of the light curve using a quadratic+transit model, after a preliminary detection of likely transit signals. Parameters ---------- detection_kernel_width : float Width of the detection kernel in days. This might be motivated by some prior knowledge about the activity or rotation rate of the target, but should be longer than the maximum transit duration. detrending_kernel_width : float Width of the detrending kernel in days. This might be motivated by some prior knowledge about the activity or rotation rate of the target, but should be longer than the maximum transit duration. Note: in the detrending kernel, when the locations of the transits are 'known', only a single depth value is fitted within a kernel width. This means transits closer together than the kernel width are likely to be poorly modelled as they'll have the same depth. Perhaps it's better to simply mask the detected transits after all... IC_type : int The information criterion type. 0 is Bayesian (default), 1 is Akaike. dIC_threshold : float The information criterion difference threshold to use to select regions in the t0,duration space in which the quadratic+transit model is more likely than the quadratic model. The statistic is defined as dIC = IC_quadratic - IC_transit, so larger dIC values mean the quad+transit model is more preferred. Default 10. min_depth_ppm : float, optional Minimum transit depth to consider in ppm. Default 10 ppm. min_obs_count : int, optional Minimum number of observations required in the kernel window. Default 20. full_output : bool, optional If `True`, returns some intermediate arrays. If `False` (the default), nothing is returned. n_warps : int, optional The number of warps to use, default 4096. We want this to be around a low integer multiple of the number of concurrent warps able to run on the GPU. The A100 has 108 SMs * 64 warps = 6912 concurrent warps. The RTX A5000 has 64 SMs * 48 warps = 3072 concurrent warps. Striding in this way limits the number of reads of the transit model from global into shared memory. This value shouldn't exceed the number of t0 strides times the number of durations. verbose : bool, optional If `True`, reports with additional verbosity. Returns ------- If `full_output == True`, then arrays of: 1) The number of data points in each detection kernel window ndarray of len(t0_array) 2) The log-likelihoods of the quadratic models ndarray of len(t0_array) 3) The log-likelihoods of the quadratic+transit models ndarray of shape (len(durations), len(t0_array)) 4) The delta IC array ndarray of shape (len(durations), len(t0_array)) 5) The transit(s) model ndarray of len(light curve) 6) The fitted trend array ndarray of len(light curve) 7) The estimated error multiplier single floating point value """ warnings.warn( "`TransitDetector.get_trend()` is now unsupported, it will be removed " "in a future version.", DeprecationWarning, ) # verify the IC type input if IC_type not in [0, 1]: raise ValueError("Information criterion type must be int(0) or int(1).") if verbose: print("obtaining trend") # step 1, fit the basic quadratic models while storing some # intermediate variables. # compute the log-likelihoods # these are the same for all durations, so we can save time (vs. a # previous version of this code) by doing it only once for all # durations # log-likelihood of the quadratic model ll_quad = gpuarray.empty(self.num_t0_strides, dtype=np.float64) # intermediate variables: # key for variable names: # s: sum # w: weight # x: time # y: flux sw = gpuarray.zeros(self.num_t0_strides, np.float64) swx = gpuarray.zeros(self.num_t0_strides, np.float64) swy = gpuarray.zeros(self.num_t0_strides, np.float64) swxx = gpuarray.zeros(self.num_t0_strides, np.float64) swxy = gpuarray.zeros(self.num_t0_strides, np.float64) swxxx = gpuarray.zeros(self.num_t0_strides, np.float64) swxxy = gpuarray.zeros(self.num_t0_strides, np.float64) swxxxx = gpuarray.zeros(self.num_t0_strides, np.float64) # number of data points in window num_pts = gpuarray.zeros(self.num_t0_strides, np.int32) # block and grid sizes block_size_k1 = 512, 1, 1 grid_size_k1 = int(np.ceil(self.num_t0_strides / block_size_k1[0])), 1 # no shared memory needed for this kernel # type specification _detection_kernel_half_width = np.int32( np.ceil(0.5 * detection_kernel_width / self.lc.cadence) ) _cadence = np.float32(self.lc.cadence) _t0_stride_length = np.float32(self.t0_stride_length) _min_in_window = np.int32(min_obs_count) _n_elem = np.int32(self.lc.size) _t0_stride_count = np.int32(self.num_t0_strides) # send the light curve to the gpu _time = to_gpu(self.lc.offset_time, np.float64) _flux = to_gpu(self.lc.offset_flux, np.float64) _wght = to_gpu(self.lc.flux_weight, np.float64) # record the start time _k1_start_time = time() # run the quadratic fit kernel _detrender_quadfit( _time, _flux, _wght, _detection_kernel_half_width, _min_in_window, _cadence, _n_elem, _t0_stride_length, _t0_stride_count, sw, swx, swy, swxx, swxy, swxxx, swxxy, swxxxx, num_pts, ll_quad, block=block_size_k1, grid=grid_size_k1 ) # make sure the operation is finished before recording the time # (device calls are asynchronous) drv.Context.synchronize() # record the stop time _k1_stop_time = time() # report elapsed if verbose: print(f"kernel 1 completed in {_k1_stop_time - _k1_start_time:.3f} seconds") # step 2, fit the quadratic plus transit models for a range of transit # durations. # compute the log-likelihoods # initialise output arrays on the gpu # t0s are along the rows, durations along columns # i.e. # [[t0,d0 t1,d0 t2,d0] # [t0,d1 t1,d1 t2,d1] # [t0,d2 t1,d2 t2,d2]] # Numpy and C are row-major outshape_2d = self.duration_count, self.num_t0_strides ll_qtr = gpuarray.empty(outshape_2d, dtype=np.float64) # block and grid sizes block_size_k2 = 32, 1, 1 # 1 warp per block grid_size_k2 = int(np.ceil(n_warps / outshape_2d[0])), int(outshape_2d[0]) # shared memory size - # space for the transit model as f64 # + 6 elements per thread for the f64 intermediate arrays smem_size_k2 = int( 8 * self.transit_model.size + 8 * 6 * block_size_k2[0] ) # type specification _min_depth_ppm = np.float32(min_depth_ppm) _tm_size = np.int32(self.transit_model.size) _duration_count = np.int32(self.duration_count) # record the start time _k2_start_time = time() # run the quadratic fit kernel _detrender_qtrfit( _time, _flux, _wght, _detection_kernel_half_width, _min_depth_ppm, _min_in_window, _cadence, _n_elem, self.transit_model.offset_model_gpu_f8, _tm_size, self.durations_gpu_f4, _duration_count, _t0_stride_length, _t0_stride_count, sw, swx, swy, swxx, swxy, swxxx, swxxy, swxxxx, num_pts, ll_qtr, block=block_size_k2, grid=grid_size_k2, shared=smem_size_k2 ) # make sure the operation is finished before recording the time # (device calls are asynchronous) drv.Context.synchronize() # record the stop time _k2_stop_time = time() # report elapsed if verbose: print(f"kernel 2 completed in {_k2_stop_time - _k2_start_time:.3f} seconds") # step 3, compute the information criteria # create the output array delta_IC = gpuarray.empty(outshape_2d, dtype=np.float64) # block and grid sizes block_size_k3 = 512, 1, 1 grid_size_k3 = (int(np.ceil(self.num_t0_strides / block_size_k3[0])), int(np.ceil(self.duration_count / block_size_k3[1]))) # record the start time _k3_start_time = time() # run the information criteria calculation kernel _detrender_calc_IC( ll_quad, ll_qtr, num_pts, _min_in_window, np.int32(0), _duration_count, _t0_stride_count, delta_IC, block=block_size_k3, grid=grid_size_k3 ) # make sure the operation is finished before recording the time # (device calls are asynchronous) drv.Context.synchronize() # record the stop time _k3_stop_time = time() # report elapsed _search_stop_time = time() if verbose: print(f"kernel 3 completed in {_k3_stop_time - _k3_start_time:.3f} seconds") # step 4, build the model containing the likely transits # record the start time _s4_start_time = time() # the transit mask # we're looking to specify the transits in the light curve time array # NOT the t0 array that we've been using up until now _transit_mask = np.zeros(self.lc.size, dtype=np.float64) # grab the delta IC array from the device _delta_IC = delta_IC.get() # nullify elements below the dIC threshold _delta_IC[_delta_IC < dIC_threshold] = np.nan # collapse the dIC array to obtain the duration index of the highest # dIC for a given t0 column # must be mindful of the fact that there are likely many all-nan # columns # this might be done more efficiently on the GPU, but let's get it # working first and see how slow it is allnan = np.count_nonzero(~np.isnan(_delta_IC), axis=0) == 0 _delta_IC[np.isnan(_delta_IC)] = -np.inf idx = np.argmax(_delta_IC, axis=0, keepdims=True) # these are the non-null t0s and their corresponding best durations # and dICs _t0 = self.t0_array[~allnan] _dur = self.durations[idx[0][~allnan]] _dIC = np.take_along_axis(_delta_IC, idx, axis=0)[0][~allnan] # now work from the most to the least likely transit, populating the # transit mask array while np.any(np.isfinite(_dIC)): max_dIC_idx = np.nanargmax(_dIC) t0 = _t0[max_dIC_idx] dur = _dur[max_dIC_idx] # generate the model with just this transit _tm = 1.0 - self.transit_model.interpolator((self.lc.time - t0)/dur + 0.5) # check whether this transit mask overlaps an existing transit if not np.any(_transit_mask[_tm > 0] > 0): # if not, add this transit to the mask _transit_mask += _tm # nullify this region in the _dIC array _dIC[np.abs(_t0 - t0) < (0.5 * dur)] = np.nan # record the stop time _s4_stop_time = time() # report elapsed _search_stop_time = time() if verbose: print(f"step 4 completed in {_s4_stop_time - _s4_start_time:.3f} seconds") # step 5, fit the light curve plus complete transit model # send the transit(s) mask to the gpu _transit_mask_gpu_f8 = to_gpu(_transit_mask, np.float64) # create the trend array on the gpu _trend = gpuarray.empty(self.lc.size, dtype=np.float64) # block and grid sizes block_size_k4 = 512, 1, 1 grid_size_k4 = int(np.ceil(self.lc.size / block_size_k4[0])), 1 # type specification for detrending kernel _detrending_kernel_half_width = np.int32( np.ceil(0.5 * detrending_kernel_width / self.lc.cadence) ) # record the start time _k4_start_time = time() # run the fitting kernel _detrender_fit_trend( _time, _flux, _wght, _transit_mask_gpu_f8, _detrending_kernel_half_width, _min_in_window, _n_elem, _trend, block=block_size_k4, grid=grid_size_k4 ) # make sure the operation is finished before recording the time # (device calls are asynchronous) drv.Context.synchronize() # record the stop time _k4_stop_time = time() # report elapsed if verbose: print(f"kernel 4 completed in {_k4_stop_time - _k4_start_time:.3f} seconds") # read the trend from the device trend = _trend.get() # measure the error multiplier detrended_offset_flux = 1 - (self.lc.flux / (1-trend)) std_resids = np.abs(detrended_offset_flux) / self.lc.offset_flux_error to_include = np.isfinite(std_resids) & (_transit_mask == 0.0) error_multiplier = np.median(std_resids[to_include]) * 1.4826 # record the results self.lc.offset_trend = trend self.lc.error_multiplier = error_multiplier if full_output: return (num_pts.get(), ll_quad.get(), ll_qtr.get(), delta_IC.get(), _transit_mask, trend, error_multiplier)
[docs] def check_period(self, period: float, min_duration: float, max_duration: float ) -> tuple[float, float, float, int, int]: """ Find the maximum likelihood ratio (vs. constant flux model), depth, depth variance, t0 and duration for a given period. Parameters ---------- period : float The period to check min_duration : float Minimum duration to check. max_duration : float, optional Maximum duration to check. Must be less than the period or results are invalid, CETRA will warn and set it to the period when necessary. Returns ------- Tuple of len=5 containing the maximum likelihood ratio and the corresponding depth, depth variance, t0 index and duration index for the input period. """ # the duration should always be less than the period, so that transit # windows can never overlap if max_duration > period: warnings.warn("the max duration should not be larger than the period") max_duration = period # which durations to run _idx_durations_in_range = np.where( (self.durations >= min_duration) & (self.durations <= max_duration) )[0] num_d_in_range = np.int32(_idx_durations_in_range.size) # return some null output if no valid durations if num_d_in_range == 0: return np.nan, np.nan, np.nan, -1, -1 # find the indices and number of durations that are in range first_d_in_range_idx = np.int32(_idx_durations_in_range[0]) last_d_in_range_idx = np.int32(_idx_durations_in_range[-1]) # maximum possible number of transits _lc_baseline = self.lc.time[-1] - self.lc.time[0] max_transit_count = np.int32(np.floor(_lc_baseline / period) + 1) # the period in strides _period_in_strides = np.float32(period / self.t0_stride_length) # block and grid sizes # the second and third block dimensions should always have size 1 block_size_k1 = 512, 1, 1 grid_size_k1 = ( int(np.ceil(_period_in_strides / block_size_k1[0])), int(num_d_in_range) ) # shared memory size, space for 2x single-precision floats per thread # we need to do a max() reduction while also finding the thread index # of the maximum value smem_size_k1 = int(2 * 4 * block_size_k1[0]) # temporary arrays for the reduction operation tmp_size = grid_size_k1[0] * grid_size_k1[1] # first pass output tmp_likerat_gpu = gpuarray.empty(tmp_size, dtype=np.float32) tmp_depth_gpu = gpuarray.empty(tmp_size, dtype=np.float32) tmp_var_depth_gpu = gpuarray.empty(tmp_size, dtype=np.float32) tmp_dur_index_gpu = gpuarray.empty(tmp_size, dtype=np.int32) tmp_t0_index_gpu = gpuarray.empty(tmp_size, dtype=np.int32) # second pass output sgl_likerat_gpu = gpuarray.empty(1, dtype=np.float32) sgl_depth_gpu = gpuarray.empty(1, dtype=np.float32) sgl_var_depth_gpu = gpuarray.empty(1, dtype=np.float32) sgl_dur_index_gpu = gpuarray.empty(1, dtype=np.int32) sgl_t0_index_gpu = gpuarray.empty(1, dtype=np.int32) # type specification _tm_size = np.int32(self.transit_model.size) _duration_count = np.int32(self.duration_count) _all_t0_stride_count = np.int32(self.num_t0_strides) # run the first kernel _periodic_search_k1( _period_in_strides, self.like_ratio_2d_gpu, self.depth_2d_gpu, self.var_depth_2d_gpu, _all_t0_stride_count, first_d_in_range_idx, last_d_in_range_idx, max_transit_count, tmp_likerat_gpu, tmp_depth_gpu, tmp_var_depth_gpu, tmp_dur_index_gpu, tmp_t0_index_gpu, block=block_size_k1, grid=grid_size_k1, shared=smem_size_k1 ) # run parameters for the second kernel # block and grid sizes block_size_k2 = 1024, 1, 1 # the second and third dimensions should have size 1 grid_size_k2 = ( int(np.ceil(tmp_size / block_size_k2[0])), int(1) ) # shared memory size, space for 2x single-precision floats per thread # we need to do a max() reduction while also finding the thread id of the maximum value smem_size_k2 = int(2 * 4 * block_size_k2[0]) # final reduction operation to obtain the best parameters _periodic_search_k2( tmp_likerat_gpu, tmp_depth_gpu, tmp_var_depth_gpu, tmp_dur_index_gpu, tmp_t0_index_gpu, sgl_likerat_gpu, sgl_depth_gpu, sgl_var_depth_gpu, sgl_dur_index_gpu, sgl_t0_index_gpu, np.int32(tmp_size), block=block_size_k2, grid=grid_size_k2, shared=smem_size_k2 ) # read outputs lrat_out = sgl_likerat_gpu.get()[0] depth_out = sgl_depth_gpu.get()[0] vdepth_out = sgl_var_depth_gpu.get()[0] t0_idx_out = sgl_t0_index_gpu.get()[0] dur_idx_out = sgl_dur_index_gpu.get()[0] return lrat_out, depth_out, vdepth_out, t0_idx_out, dur_idx_out
[docs] def get_max_likelihood_single_transit(self) -> Transit | None: """ Return the parameters of the maximum likelihood single transit event in the light curve, using the results of the linear search. Returns ------- The maximum likelihood single transit as a Transit object. """ if self.linear_result is None: warnings.warn( "A maximum likelihood single transit cannot be identified without " "first running a linear search" ) return # find the indices of the maximum likelihood ratio in the 2D array d, t = np.unravel_index( np.nanargmax(self.linear_result.like_ratio_array), self.linear_result.like_ratio_array.shape ) return self.linear_result.get_params(d, t)
[docs] def get_max_snr_single_transit(self, absolute_depth: bool = False) -> Transit | None: """ Return the parameters of the maximum SNR single transit event in the light curve, using the results of the linear search. Parameters ---------- absolute_depth : bool, optional If `True`, computes SNR as abs(S/N), otherwise SNR is S/N. `False` by default. Returns ------- The maximum SNR single transit as a Transit object. """ if self.linear_result is None: warnings.warn( "A maximum SNR single transit cannot be identified without " "first running a linear search" ) return # compute the SNR array snr_array = self.linear_result.depth_array / np.sqrt(self.linear_result.depth_variance_array) if absolute_depth: snr_array = np.abs(snr_array) # find the indices of the maximum SNR in the 2D array d, t = np.unravel_index(np.nanargmax(snr_array), snr_array.shape) return self.linear_result.get_params(d, t)
[docs] def get_single_transits_above_snr_threshold( self, snr_threshold: float, duration_multiplier: float = 1.3, max_transits: int = 200, absolute_depth: bool = False ) -> list[Transit] | None: """ Return the parameters of single transits with SNR above a given threshold. May miss transits that overlap with more significant ones. Parameters ---------- snr_threshold : float The SNR threshold above which to return TCEs. duration_multiplier : float, optional The multiplier on the transit duration to use when masking out transits. max_transits : int, optional The maximum number of transits to return. This is to prevent the method from running indefinitely if the SNR threshold is set too low. Default is 200. absolute_depth : bool, optional If `True`, computes SNR as abs(S/N), otherwise SNR is S/N. `False` by default. Returns ------- A list of Transit objects for transits with SNR above the given threshold. """ if self.linear_result is None: warnings.warn( "A search for single transits cannot be performed without " "first running a linear search" ) return # this method manipulates the light curve mask and the results of the linear # search. We'll make a copy of the light curve, then restore it and rerun # the linear search at the end of this method. starting_light_curve = self.lc.copy() # break out the snr array snr_array = self.linear_result.depth_array / np.sqrt(self.linear_result.depth_variance_array) if absolute_depth: snr_array = np.abs(snr_array) # somewhere to store the transits that we find transits = [] # run the search while np.nanmax(snr_array) > snr_threshold and len(transits) < max_transits: # nullify areas in the likelihood array where the snr is below the threshold lr_valid = np.where(snr_array >= snr_threshold, self.linear_result.like_ratio_array, np.nan) # identify the monotransit with the maximum likelihood ratio dt_idx = np.unravel_index(np.nanargmax(lr_valid), lr_valid.shape) tr = self.linear_result.get_params(*dt_idx) # record the transit in the list with the others transits.append(tr) # mask the transit in the light curve self.lc.mask_transit(tr, duration_multiplier=duration_multiplier) # rerun the linear search with the now masked light curve _ = self.linear_search(n_warps=self.linear_n_warps, verbose=False) # break out the snr array again snr_array = self.linear_result.depth_array / np.sqrt(self.linear_result.depth_variance_array) if absolute_depth: snr_array = np.abs(snr_array) # restore the original light curve and recompute the original linear search results self.lc = starting_light_curve _ = self.linear_search(n_warps=self.linear_n_warps, verbose=False) return transits
[docs] def get_max_likelihood_periodic_transit(self) -> Transit | None: """ Return the parameters of the maximum likelihood periodic transit event in the light curve, using the results of the period search. Returns ------- The maximum likelihood periodic transit as a Transit object. """ if self.periodic_result is None: warnings.warn( "A maximum likelihood periodic transit cannot be identified without " "first running a period search" ) return # find the index of the maximum likelihood ratio in the periodogram idx = np.nanargmax(self.periodic_result.like_ratio_array) return self.periodic_result.get_params(idx)
[docs] def get_max_snr_periodic_transit(self, absolute_depth: bool = False) -> Transit | None: """ Return the parameters of the maximum SNR periodic transit event in the light curve, using the results of the period search. Parameters ---------- absolute_depth : bool, optional If `True`, computes SNR as abs(S/N), otherwise SNR is S/N. `False` by default. Returns ------- The maximum SNR periodic transit as a Transit object. """ if self.periodic_result is None: warnings.warn( "A maximum SNR periodic transit cannot be identified without " "first running a period search" ) return # compute the SNR array snr_array = self.periodic_result.depth_array / np.sqrt(self.periodic_result.depth_variance_array) if absolute_depth: snr_array = np.abs(snr_array) # find the index of the maximum SNR in the periodogram idx = np.nanargmax(snr_array) return self.periodic_result.get_params(idx)
[docs] def get_periodic_transits_above_snr_threshold( self, snr_threshold: float, duration_multiplier: float = 1.3, max_transits: int = 10, absolute_depth: bool = False ) -> list[Transit] | None: """ Return the parameters of periodic transits with SNR above a given threshold. May miss transits that overlap with more significant ones. Parameters ---------- snr_threshold : float The SNR threshold above which to return TCEs. duration_multiplier : float, optional The multiplier on the transit duration to use when masking out transits. max_transits : int, optional The maximum number of transits to return. This is to prevent the method from running indefinitely if the SNR threshold is set too low. Default is 10. absolute_depth : bool, optional If `True`, computes SNR as abs(S/N), otherwise SNR is S/N. `False` by default. Returns ------- A list of Transit objects for transits with SNR above the given threshold. """ if self.periodic_result is None: warnings.warn( "A search for periodic transits cannot be performed without " "first running a period search" ) return # This method manipulates the light curve mask and the results of the linear # search. We'll make a copy of the light curve, then restore it and rerun # the linear search at the end of this method. starting_light_curve = self.lc.copy() # break out the snr array snr_array = self.periodic_result.depth_array / np.sqrt(self.periodic_result.depth_variance_array) if absolute_depth: snr_array = np.abs(snr_array) # identify the periods that meet the threshold idx_meets_threshold = np.where(snr_array > snr_threshold)[0] # reorder by descending likelihood ratio idx_meets_threshold = idx_meets_threshold[ np.argsort(-self.periodic_result.like_ratio_array[idx_meets_threshold]) ] # work on a copy of the PeriodicResult.like_ratio_array so we don't mess with the original like_ratio_array = self.periodic_result.like_ratio_array.copy() # somewhere to store the transits that we find transits = [] # check each signal for idx in idx_meets_threshold: # grab the period period = self.periodic_result.period_array[idx] min_duration = self.min_durations[idx] max_duration = self.max_durations[idx] # recheck the period # not strictly necessary on the first loop but its cheap and avoids a conditional ret = self.check_period(period, min_duration, max_duration) # break out the returned values _like_ratio, _depth, _var_depth, _t0_idx, _dur_idx = ret # if the likelihood ratio is unchanged, this is probably a new TCE new_TCE = _like_ratio == like_ratio_array[idx] # update the PeriodResult object like_ratio_array[idx] = _like_ratio # record the new TCE if new_TCE: # grab the corresponding transit object tr = self.periodic_result.get_params(idx) # record the transit in the list with the others transits.append(tr) # mask the detected transit in the light curve self.lc.mask_transit(tr, duration_multiplier=duration_multiplier) # rerun the linear search with the masked light curve _ = self.linear_search(n_warps=self.linear_n_warps, verbose=False) # stop if we've reached the max number of TCEs if len(transits) == max_transits: break # restore the original light curve and recompute the original linear search results self.lc = starting_light_curve _ = self.linear_search(n_warps=self.linear_n_warps, verbose=False) return transits
def to_gpu(arr: np.ndarray, dtype: np.dtype) -> gpuarray.GPUArray: """ Convenience function for sending an array to the gpu with a specific data type. Parameters ---------- arr: array-like The array to send to the gpu dtype: dtype The numpy data type to use Returns ------- A gpuarray """ return gpuarray.to_gpu(np.ascontiguousarray(arr, dtype=dtype))
[docs] def duration_grid( durations: np.ndarray | None = None, min_duration: float = 0.02, max_duration: float = 1.0, log_step: float = 1.1, verbose: bool = True ) -> np.ndarray: """ Validate or generate the duration grid Parameters ---------- durations : array-like, optional User-specified grid of durations in days. If not provided, the module computes a grid using the minimum and maximum durations and the log step. min_duration : float, optional Minimum transit duration to check in days, default 0.02. max_duration : float, optional Maximum transit duration to check in days, default 1.0. log_step : float, optional The log-spacing of the durations to be used if the duration grid is to be internally determined. Default 1.1. verbose : bool, optional If True (the default), reports various messages. Returns ------- The validated duration grid """ if durations is None: if log_step <= 1.0: raise ValueError("duration log step must be greater than 1.0") else: durations = [min_duration] while durations[-1] < max_duration: next_duration = durations[-1] * log_step if next_duration <= max_duration: durations.append(next_duration) else: break durations = np.asarray(durations) # sort the array (in case input is unsorted) _durations = np.sort(durations) # raise an error if the duration grid is too sparse if len(_durations) == 0: raise RuntimeError("The duration grid is empty") if verbose: # report the duration grid size print(f"{len(_durations)} durations, " f"{_durations[0]:.2f} -> {_durations[-1]:.2f} days") return _durations
[docs] def period_grid( epoch_baseline: float, min_period: float = 0.0, max_period: float = np.inf, n_transits_min: int = 2, R_star: float = 1.0, M_star: float = 1.0, oversampling_factor: int = 3 ) -> np.ndarray: """ Generates the optimal period grid. Grabbed this nice code from TLS. Thanks Hippke and Heller! Original copyright for this code belongs to Michael Hippke, it was published under an MIT license. Some modifications made by L.C. Smith. Parameters ---------- epoch_baseline : float The length of the light curve in days. min_period : float, optional The minimum period to consider. Set to 0.0 for no lower limit, which in practice means it's limited by the astrophysics (or the sampling cadence), this is the default setting. max_period : float, optional The maximum period to consider. Set to inf for no upper limit, which in practice means it's limited by the light curve duration and the minimum number of required transits, this is the default setting. n_transits_min : int, optional The minimum number of transits that must have coverage. This parameter impacts the period grid, in that the maximum period is limited to the epoch baseline divided by n_transits_min. The default requirement is that there are 2 transits. R_star : float, optional The stellar radius (in solar radii) to use, default 1.0. M_star : float, optional The stellar mass (in solar masses) to use, default 1.0. oversampling_factor : int, optional Oversample the period grid by this factor, default 3. Increasing this improves detection efficiency but at a higher computational cost. Returns ------- P_days : ndarray Period grid in days """ # unit conversions M_star *= Constants.solar_mass R_star *= Constants.solar_radius epoch_baseline = epoch_baseline * Constants.seconds_per_day # boundary conditions f_min = n_transits_min / epoch_baseline f_max = 1.0 / (2 * np.pi) * np.sqrt(Constants.G * M_star / (3 * R_star) ** 3) # Ofir et al. 2014 stuff A = ( (2 * np.pi) ** (2.0 / 3) / np.pi * R_star / (Constants.G * M_star) ** (1.0 / 3) / (epoch_baseline * oversampling_factor) ) C = f_min ** (1.0 / 3) - A / 3.0 N_opt = (f_max ** (1.0 / 3) - f_min ** (1.0 / 3) + A / 3) * 3 / A X = np.arange(N_opt) + 1 f_x = (A / 3 * X + C) ** 3 P_x = 1 / f_x # convert period grid from seconds to days P_x /= Constants.seconds_per_day # check limits P_x = P_x[np.where((P_x >= min_period) & (P_x <= max_period))[0]] # ascending order P_x = np.sort(P_x) return P_x
[docs] def concatenate_lightcurves( lc_list: list[LightCurve], resample_cadence: float | None = None ) -> LightCurve: """ Concatenate multiple LightCurve objects to produce a single LightCurve object. If LightCurves overlap in time space their trends will be lost. If this is the case, it's recommended that their trends are instead subtracted by the user before generating a new LightCurve instance. (Note: Any existing transit masks will be lost, it's recommended to remask again using the Transit objects.) Parameters ---------- lc_list : array-like Array of LightCurve objects. resample_cadence : float, optional The cadence (in seconds) to use for resampling the light curve. By default, the cadence of the first listed light curve will be used. Returns ------- A new LightCurve instance. """ # input validation try: if not isinstance(lc_list[0], LightCurve): raise TypeError() except TypeError: raise TypeError("lc_list must be an array-like object containing LightCurves") except IndexError: raise IndexError("lc_list contains no items") # transit mask warning for lc in lc_list: if lc.transit_mask is not None: warnings.warn("Transit masks are removed on concatenation") # grab the data from the combined light curve arrays times = [] fluxes = [] errors = [] tlims = [] for lc in lc_list: times.append(lc.input_time) fluxes.append(lc.input_flux) errors.append(lc.input_flux_error) tlims.append((np.min(lc.input_time), np.max(lc.input_time))) # apply the error multiplier now if present for n, lc in enumerate(lc_list): if lc.error_multiplier is not None: errors[n] *= lc.error_multiplier # concatenate the light curve data times, fluxes, errors = map(np.concatenate, [times, fluxes, errors]) # set a resampling cadence if one isn't given if resample_cadence is None: resample_cadence = lc_list[0].cadence * Constants.seconds_per_day # create a new LightCurve new_lc = LightCurve( times=times, fluxes=fluxes, flux_errors=errors, resample_cadence=resample_cadence ) # now we must migrate any previously determined trends, but only do this # if they all have them if np.all([lc.offset_trend is not None for lc in lc_list]): # now check that the light curves don't overlap in time space _overlap = False for n, lim0 in enumerate(tlims[:-1]): for lim1 in tlims[n+1:]: if (lim1[1] >= lim0[0] >= lim1[0] or lim1[1] >= lim0[1] >= lim1[0]): _overlap = True # raise an error if they overlap if _overlap: raise RuntimeError( "LightCurve objects overlap, trend cannot be propagated" ) # otherwise, carry on # extract the trends _time = [] _trend = [] for lc in lc_list: _time.append(lc.time) _trend.append(lc.offset_trend) _time, _trend = map(np.concatenate, [_time, _trend]) # must be in chronological order _order = np.argsort(_time) _time, _trend = map(lambda arr: arr[_order], [_time, _trend]) # now do some simple linear interpolation to obtain the new trend new_trend = np.interp(new_lc.time, _time, _trend, left=np.nan, right=np.nan) # apply the trend to the new light curve new_lc.offset_trend = new_trend # return the new light curve return new_lc
if __name__ == "__main__": raise RuntimeError("Don't try to run this module directly")