Source code for tjpcov.wigner_transform

# Sukhdeep: This code is copied from Skylens. Skylens is not ready to be public
# yet, but TJPCov have our permission to use this code.

import itertools
from functools import partial
from multiprocessing import Pool, cpu_count

import numpy as np
from scipy.interpolate import RectBivariateSpline, interp1d
from scipy.special import binom
from scipy.special import eval_jacobi as jacobi
from scipy.special import jn

# FIXME:
# 1. Do we need to pass logger?
# 2. Need to add inverse transform functionality.


[docs] class WignerTransform: """Class to compute curved sky Hankel transforms with wigner-d matrices.""" def __init__(self, theta, ell, s1_s2, ncpu=None): """Initialize the class for the given angles, scales and spins. Args: theta: Values of angular separation, theta, at which the Hankel transform is done. Should be in radians. ell: ell values at which the Hankel transform is done. Should be integers s1_s2: List of spin pairs of the tracers. Each spin pair should be a tuple. e.g. for 3X2 analysis, pass [(0,0),(0,2),(2,2),(2,-2)]. (0,0): (galaxy,galaxy) (0,2): (galaxy,shear). (2,0) is equivalent. (2,2): (shear,shear), xi+ (2,-2): (shear,shear), xi- ncpu: Number of python processes to use when computing wigner-d matrices. """
[docs] self.name = "Wigner"
[docs] self.ell = ell
[docs] self.grad_ell = np.gradient(ell)
[docs] self.norm = (2 * ell + 1.0) / (4.0 * np.pi)
# ignoring some factors of -1, assuming sum and differences of s1,s2 # are even for all correlations we need. # for inverse wigner transform
[docs] self.grad_theta = np.gradient(theta)
[docs] self.inv_norm = np.sin(theta) * 2 * np.pi
[docs] self.inv_wig_norm = self.inv_norm * self.grad_theta
[docs] self.wig_d = {}
[docs] self.wig_3j = {}
[docs] self.s1_s2s = s1_s2
[docs] self.theta = {}
self.theta = theta # compute the wigner-d matrices. for s1, s2 in s1_s2: self.wig_d[(s1, s2)] = wigner_d_parallel( s1, s2, theta, self.ell, ncpu=ncpu )
[docs] self.taper_f = None
[docs] self.taper_f2 = None
[docs] def cl_grid(self, ell_cl, cl, taper=False, **taper_kwargs): """Interpolate the input C_ell. This is done in case the ell values of C_ell are different from the grid on which wigner-d matrices were computed during intialization. Args: ell_cl (array): ell at which the input C_ell is computed. cl (array): input C_ell taper (bool): if True apply the tapering to the input C_ell. Tapering can help in reducing ringing. Returns: array: C_ell evaluated at the initialization ells. """ if taper: self.taper_f = self.taper(ell=ell_cl, **taper_kwargs) cl = cl * self.taper_f cl_int = interp1d( ell_cl, cl, bounds_error=False, fill_value=0, kind="linear" ) cl2 = cl_int(self.ell) return cl2
[docs] def cl_cov_grid(self, ell_cl, cl_cov, taper=False, **taper_kwargs): """Interpolate the input 2D covariances. This is done in case in case the ell values are different from the grid on which wigner-d matrices were computed during intialization. Args: ell_cl (array): ell at which the input covariance was computed. cl (array): input covariance taper (bool): if True apply the tapering to the input C_ell. Tapering can help in reducing ringing. **taper_kwargs: Arguments to pass to the tapering method Returns: array: covariance evaluated at the initialization ells. """ # TODO: This method is not used in TJPCov. Consider enforcing passing a # covariance that is sampled at the ells given at intialization. if taper: # FIXME there is no check on change in taper_kwargs if self.taper_f2 is None or not np.all( np.isclose(self.taper_f["ell"], cl_cov) ): self.taper_f = self.taper(ell=ell_cl, **taper_kwargs) taper_f2 = np.outer( self.taper_f["taper_f"], self.taper_f["taper_f"] ) self.taper_f2 = {"ell": ell_cl, "taper_f2": taper_f2} cl_cov = cl_cov * self.taper_f2["taper_f2"] # TODO: Note that here we are extrapolating by using the last value of # the array. In cl_grid, the extrapolated values were 0. cl_int = RectBivariateSpline( ell_cl, ell_cl, cl_cov, ) # interp2d is slow. Make sure ell_cl is on regular grid. cl2 = cl_int(self.ell, self.ell) return cl2
# TODO: These methods are not used anywhere. Should we keep them? They are # nice, though. Commented out for now. # # def projected_correlation( # self, ell_cl=[], cl=[], s1_s2=(), taper=False, **taper_kwargs # ): # """ # Convert input C_ell to the correlation function. # Args: # cl: # Input C_ell # ell_cl: # ell values at which input C_ell is computer. # s1_s2: # Tuple of the spin factors of the tracers. Used to identify the # correct wigner-d matrix to use. # taper: # If true, apply tapering to the input C_ell # taper_kwargs: # Arguments to be passed to the tapering function. # """ # cl2 = self.cl_grid(ell_cl=ell_cl, cl=cl, taper=taper, **taper_kwargs) # w = np.dot(self.wig_d[s1_s2] * self.grad_ell * self.norm, cl2) # return self.theta, w # def inv_projected_correlation( # self, theta_xi=[], xi=[], s1_s2=[], taper=False, **kwargs # ): # """ # Convert input xi to C_ell, the inverse hankel transform # Args: # xi: # The input correlation function # theta_xi: # theta values at which xi is computed. # s1_s2: # Tuple of the spin factors of the tracers. Used to identify the # correct wigner-d matrix to use. # """ # wig_d = self.wig_d[s1_s2].T # wig_theta = self.theta # wig_norm = self.inv_wig_norm # xi2 = self.cl_grid( # ell_cl=theta_xi, cl=xi, taper=taper, wig_l=wig_theta, **kwargs # ) # cl = np.dot(wig_d * wig_norm, xi2) # return self.ell, cl
[docs] def projected_covariance( self, ell_cl, cl_cov, s1_s2, s1_s2_cross=None, taper=False, **kwargs ): """Convert C_ell covariance to correlation function. This function assumes that cl_cov is 2D matrix. Args: cl_cov: C_ell covariance matrix. ell_cl: ell values at which input C_ell is computed. s1_s2: Tuple of the spin factors of the first set of tracers. Used to identify the correct wigner-d matrix to use. s1_s2_cross: Tuple of the spin factors of the second set of tracers, if different from s1_s2. Used to identify the correct wigner-d matrix to use. taper (bool): if True apply the tapering to the input C_ell. Tapering can help in reducing ringing. **kwargs: Arguments to pass to the tapering method Returns: tuple: - theta (array): angles given at initialization - cov (array): real space covariance at the given angles """ if (ell_cl.size != self.ell.size) or np.all(ell_cl != self.ell): # TODO: This option is not used in TJPCov. We can generate the # covariance with all the ells to avoid doing this that will be # less accurate. Consider enforcing passing a covariance that is # sampled at the ells given at intialization and removing this # method. # # Raise NotImplementedError because although it is implemented, it # has not been tested if the extrapolation done in cl_cov_grid # breaks things or not. raise NotImplementedError( "The covariance is assumed to be computed at the same ells as " "those used at intialization" ) cl_cov2 = self.cl_cov_grid( cl_cov=cl_cov, ell_cl=ell_cl, s1_s2=s1_s2, taper=taper, **kwargs, ) else: cl_cov2 = cl_cov if s1_s2_cross is None: s1_s2_cross = s1_s2 cov = np.einsum( "rk,kl,sl->rs", self.wig_d[s1_s2] * np.sqrt(self.norm) * self.grad_ell, cl_cov2, self.wig_d[s1_s2_cross] * np.sqrt(self.norm), optimize=True, ) # FIXME: Check normalization return self.theta, cov
[docs] def taper( self, ell, large_k_lower=10, large_k_upper=100, low_k_lower=0, low_k_upper=1.0e-5, ): """Apply tapering to input C_ell. Tapering is useful to reduce the ringing. This function uses the cosine function to apply the tapering. See eq. 71 in https://arxiv.org/pdf/2105.04548.pdf for the function and meaning of input parameters. Args: ell: ell values at which input C_ell is computed. large_k_lower: large_k_upper: low_k_lower: low_k_upper: """ raise NotImplementedError("Tapering is not implemented yet")
# TODO: Commented out because it is not used anywhere in the covariance # computation and k variable is not defined. If needed we can solve # these issues. # # FIXME there is no check on change in taper_kwargs # if self.taper_f is None or not np.all( # np.isclose(self.taper_f["k"], k) # ): # taper_f = np.zeros_like(k) # x = k > large_k_lower # taper_f[x] = np.cos( # (k[x] - large_k_lower) # / (large_k_upper - large_k_lower) # * np.pi # / 2.0 # ) # x = k < large_k_lower and k > low_k_upper # taper_f[x] = 1 # x = k < low_k_upper # taper_f[x] = np.cos( # (k[x] - low_k_upper) # / (low_k_upper - low_k_lower) # * np.pi # / 2.0 # ) # self.taper_f = {"taper_f": taper_f, "k": k} # return self.taper_f
[docs] def diagonal_err(self, cov): """Returns the diagonal error from the covariance. Useful for errorbar plots. Args: cov (array): Covariance Returns: array: Diagonal errors (i.e. sqrt(diag(cov))) """ return np.sqrt(np.diagonal(cov))
[docs] def wigner_d(s1, s2, theta, ell, l_use_bessel=1.0e4): """Function to compute the wigner-d matrices. Args: s1,s2: Spin factors for the wigner-d matrix. theta: Angular separation for which to compute the wigner-d matrix. The matrix depends on cos(theta). ell: The spherical harmonics mode ell for which to compute the matrix. l_use_bessel: Due to numerical issues, we need to switch from wigner-d matrix to bessel functions at high ell (see the note below). This defines the scale at which the switch happens. Returns: array: Wigner-d matrix """ l0 = np.copy(ell) if l_use_bessel is not None: # FIXME: This is not great. Due to a issues with the scipy # hypergeometric function, jacobi can output nan for large ell, # ell>1.e4 # As a temporary fix, for ell>1.e4, we are replacing the wigner # function with the bessel function. Fingers and toes crossed!!! # mpmath is slower and also has convergence issues at large ell. # https://github.com/scipy/scipy/issues/4446 ell = np.atleast_1d(ell) x = ell < l_use_bessel ell = np.atleast_1d(ell[x]) k = np.amin([ell - s1, ell - s2, ell + s1, ell + s2], axis=0) a = np.absolute(s1 - s2) lamb = 0 # lambda if s2 > s1: lamb = s2 - s1 b = 2 * ell - 2 * k - a d_mat = (-1) ** lamb # binom gives array of shape ell with elements choose(2l[i]-k[i], k[i]+a) d_mat *= np.sqrt(binom(2 * ell - k, k + a)) d_mat /= np.sqrt(binom(k + b, b)) d_mat = np.atleast_1d(d_mat) x = k < 0 d_mat[x] = 0 d_mat = d_mat.reshape(1, len(d_mat)) theta = theta.reshape(len(theta), 1) d_mat = d_mat * ((np.sin(theta / 2.0) ** a) * (np.cos(theta / 2.0) ** b)) d_mat *= jacobi(ell, a, b, np.cos(theta)) if l_use_bessel is not None: ell = np.atleast_1d(l0) x = ell >= l_use_bessel ell = np.atleast_1d(ell[x]) d_mat = np.append(d_mat, jn(s1 - s2, ell * theta), axis=1) return d_mat
[docs] def wigner_d_parallel(s1, s2, theta, ell, ncpu=None, l_use_bessel=1.0e4): """Compute the wigner-d matrix in parallel using multiprocessing Pool. This function calls the wigner-d function defined above. Args: s1,s2: Spin factors for the wigner-d matrix. theta: Angular separation for which to compute the wigner-d matrix. The matrix depends on cos(theta). ell: The spherical harmonics mode ell for which to compute the matrix. ncpu: number of processes to use for computing the matrix. l_use_bessel: Due to numerical issues, we need to switch from wigner-d matrix to bessel functions at high ell (see the note below). This defines the scale at which the switch happens. Returns: array: Wigner-d matrix """ if ncpu is None: ncpu = cpu_count() p = Pool(ncpu) d_mat = np.array( p.map(partial(wigner_d, s1, s2, theta, l_use_bessel=l_use_bessel), ell) ) p.close() return d_mat[:, :, 0].T
# This function was called bin_mat before and was not used. I think I fixed # the eror and checked the output with the previous bin_cov. It should be safe # to use the faster version, but if weird results appear, it might be this # function has some remaining bug.
[docs] def bin_cov(r, cov, r_bins): # works for cov and skewness """Function to apply the binning operator. This function works on both one dimensional vectors and two dimensional covariance covrices. Args: r: theta or ell values at which the un-binned vector is computed. cov: Unbinned covariance. It also works for a vector of C_ell or xi r_bins: theta or ell bins to which the values should be binned. Returns: array_like: Binned covariance or vector of C_ell or xi """ bin_center = 0.5 * (r_bins[1:] + r_bins[:-1]) n_bins = len(bin_center) ndim = len(cov.shape) cov_int = np.zeros([n_bins] * ndim, dtype="float64") norm_int = np.zeros([n_bins] * ndim, dtype="float64") bin_idx = np.digitize(r, r_bins) - 1 r2 = np.sort( np.unique(np.append(r, r_bins)) ) # this takes care of problems around bin edges dr = np.gradient(r2) r2_idx = [i for i in np.arange(len(r2)) if r2[i] in r] dr = dr[r2_idx] r_dr = r * dr ls = ["i", "j", "k", "ell"] s1 = ls[0] s2 = ls[0] r_dr_m = r_dr for i in np.arange(ndim - 1): s1 = s2 + "," + ls[i + 1] s2 += ls[i + 1] # works ok for 2-d case r_dr_m = np.einsum(s1 + "->" + s2, r_dr_m, r_dr) cov_r_dr = cov * r_dr_m for indxs in itertools.product( np.arange(min(bin_idx), n_bins), repeat=ndim ): norm_ijk = 1 cov_t = [] for nd in np.arange(ndim): slc = [slice(None)] * (ndim) slc[nd] = bin_idx == indxs[nd] if nd == 0: cov_t = cov_r_dr[slc[0]][:, slc[1]] else: cov_t = cov_t[slc[0]][:, slc[1]] norm_ijk *= np.sum(r_dr[slc[nd]]) if norm_ijk == 0: continue cov_int[indxs] = np.sum(cov_t) / norm_ijk norm_int[indxs] = norm_ijk return bin_center, cov_int