Source code for draco.analysis.beamform
"""Beamform visibilities to the location of known sources."""
import healpy
import numpy as np
import scipy.interpolate
from caput import config, interferometry
from caput import time as ctime
from cora.util import units
from skyfield.api import Angle, Star
from draco.analysis.sidereal import _search_nearest
from ..core import containers, io, task
from ..util._fast_tools import beamform
from ..util.tools import (
baseline_vector,
calculate_redundancy,
correct_phase_wrap,
find_contiguous_slices,
invert_no_zero,
polarization_map,
)
# Constants
NU21 = units.nu21
C = units.c
[docs]
class BeamFormBase(task.SingleTask):
"""Base class for beam forming tasks.
Defines a few useful methods. Not to be used directly
but as parent class for BeamForm and BeamFormCat.
Attributes
----------
collapse_ha : bool
Sum over hour-angle/time to complete the beamforming. Default is True.
polarization : string
Determines the polarizations that will be output:
- 'I' : Stokes I only.
- 'full' : 'XX', 'XY', 'YX' and 'YY' in this order. (default)
- 'copol' : 'XX' and 'YY' only.
- 'stokes' : 'I', 'Q', 'U' and 'V' in this order. Not implemented.
weight : string
How to weight the redundant baselines when adding:
- 'natural' : each baseline weighted by its redundancy (default)
- 'uniform' : each baseline given equal weight
- 'inverse_variance' : each baseline weighted by the weight attribute
no_beam_model : string
Do not include a primary beam factor in the beamforming
weights, i.e., use uniform weighting as a function of hour angle
and declination.
timetrack : float
How long (in seconds) to track sources at each side of transit.
Default is 900 seconds. Total transit time will be 2 * timetrack.
variable_timetrack : bool
Scale the total time to track each source by the secant of the
source declination, so that all sources are tracked through
the same angle on the sky. Default is False.
freqside : int
Number of frequencies to process at each side of the source.
Default (None) processes all frequencies.
"""
collapse_ha = config.Property(proptype=bool, default=True)
polarization = config.enum(["I", "full", "copol", "stokes"], default="full")
weight = config.enum(["natural", "uniform", "inverse_variance"], default="natural")
no_beam_model = config.Property(proptype=bool, default=False)
timetrack = config.Property(proptype=float, default=900.0)
variable_timetrack = config.Property(proptype=bool, default=False)
freqside = config.Property(proptype=int, default=None)
data_available = True
[docs]
def setup(self, manager):
"""Generic setup method.
To be complemented by specific setup methods in daughter tasks.
Parameters
----------
manager : either `ProductManager`, `BeamTransfer` or `TransitTelescope`
Contains a TransitTelescope object describing the telescope.
"""
# Get the TransitTelescope object
self.telescope = io.get_telescope(manager)
self.latitude = np.deg2rad(self.telescope.latitude)
# Polarizations.
if self.polarization == "I":
self.process_pol = ["XX", "YY"]
self.return_pol = ["I"]
elif self.polarization == "full":
self.process_pol = ["XX", "XY", "YX", "YY"]
self.return_pol = self.process_pol
elif self.polarization == "copol":
self.process_pol = ["XX", "YY"]
self.return_pol = self.process_pol
elif self.polarization == "stokes":
self.process_pol = ["XX", "XY", "YX", "YY"]
self.return_pol = ["I", "Q", "U", "V"]
msg = "Stokes parameters are not implemented"
raise RuntimeError(msg)
else:
# This should never happen. config.enum should bark first.
msg = "Invalid polarization parameter: {0}"
msg = msg.format(self.polarization)
raise ValueError(msg)
self.npol = len(self.process_pol)
self.map_pol_feed = {
pstr: list(self.telescope.polarisation).index(pstr) for pstr in ["X", "Y"]
}
# Ensure that if we are using variable time tracking,
# then we are also collapsing over hour angle.
if self.variable_timetrack:
if self.collapse_ha:
self.log.info(
"Tracking source for declination dependent amount of time "
f"[{self.timetrack:0.0f} seconds at equator]"
)
else:
raise NotImplementedError(
"Must collapse over hour angle if tracking "
"sources for declination dependent "
"amount of time."
)
else:
self.log.info(
f"Tracking source for fixed amount of time [{self.timetrack:0.0f} seconds]"
)
[docs]
def process(self):
"""Generic process method.
Performs all the beamforming, but not the data parsing.
To be complemented by specific process methods in daughter tasks.
Returns
-------
formed_beam : `containers.FormedBeam` or `containers.FormedBeamHA`
Formed beams at each source. Shape depends on parameter
`collapse_ha`.
"""
# Perform data dependent beam initialization
self._initialize_beam_with_data()
# Contruct containers for formed beams
if self.collapse_ha:
# Container to hold the formed beams
formed_beam = containers.FormedBeam(
freq=self.freq,
object_id=self.source_cat.index_map["object_id"],
pol=np.array(self.return_pol),
distributed=True,
)
else:
# Container to hold the formed beams
formed_beam = containers.FormedBeamHA(
freq=self.freq,
ha=np.arange(self.nha, dtype=np.int64),
object_id=self.source_cat.index_map["object_id"],
pol=np.array(self.return_pol),
distributed=True,
)
# Initialize container to zeros.
formed_beam.ha[:] = 0.0
formed_beam.attrs["tag"] = "_".join(
[tag for tag in [self.tag_data, self.tag_catalog] if tag is not None]
)
# Initialize container to zeros.
formed_beam.beam[:] = 0.0
formed_beam.weight[:] = 0.0
# Copy catalog information
formed_beam["position"][:] = self.source_cat["position"][:]
if "redshift" in self.source_cat:
formed_beam.add_dataset("redshift")
formed_beam["redshift"][:] = self.source_cat["redshift"][:]
# Ensure container is distributed in frequency
formed_beam.redistribute("freq")
if self.freqside is None:
# Indices of local frequency axis. Full axis if freqside is None.
f_local_indices = np.arange(self.ls, dtype=np.int32)
f_mask = np.zeros(self.ls, dtype=bool)
fbb = formed_beam.beam[:]
fbw = formed_beam.weight[:]
# For each source, beamform and populate container.
for src in range(self.nsource):
if src % 1000 == 0:
self.log.info(f"Source {src}/{self.nsource}")
# Declination of this source
dec = np.radians(self.sdec[src])
if self.freqside is not None:
# Get the frequency bin this source is closest to.
freq_diff = abs(self.freq["centre"] - self.sfreq[src])
sfreq_index = np.argmin(freq_diff)
# Start and stop indices to process in global frequency axis
freq_idx0 = np.amax([0, sfreq_index - self.freqside])
freq_idx1 = np.amin([self.nfreq, sfreq_index + self.freqside + 1])
# Mask in full frequency axis
f_mask = np.ones(self.nfreq, dtype=bool)
f_mask[freq_idx0:freq_idx1] = False
# Restrict frequency mask to local range
f_mask = f_mask[self.lo : (self.lo + self.ls)]
# TODO: In principle I should be able to skip
# sources that have no indices to be processed
# in this rank. I am getting a NaN error, however.
# I may need an mpiutil.barrier() call before the
# return statement.
if f_mask.all():
# If there are no indices to be processed in
# the local frequency range, skip source.
continue
# Frequency indices to process in local range
f_local_indices = np.arange(self.ls, dtype=np.int32)[np.invert(f_mask)]
if self.is_sstream:
# Get RA bin this source is closest to.
# Phasing will actually be done at src position.
sra_index = np.searchsorted(self.ra, self.sra[src])
else:
# Cannot use searchsorted, because RA might not be
# monotonically increasing. Slower.
# Notice: in case there is more than one transit,
# this will pick a single transit quasi-randomly!
transit_diff = abs(self.ra - self.sra[src])
sra_index = np.argmin(transit_diff)
# For now, skip sources that do not transit in the data
ra_cadence = self.ra[1] - self.ra[0]
if transit_diff[sra_index] > 1.5 * ra_cadence:
continue
if self.variable_timetrack:
ha_side = int(self.ha_side / np.cos(dec))
else:
ha_side = int(self.ha_side)
# Compute hour angle array
ha_array, ra_index_range, ha_mask = self._ha_array(
self.ra, sra_index, self.sra[src], ha_side, self.is_sstream
)
# Arrays to store beams and weights for this source
# for all polarizations prior to combining polarizations
if self.collapse_ha:
formed_beam_full = np.zeros((self.npol, self.ls), dtype=np.float64)
weight_full = np.zeros((self.npol, self.ls), dtype=np.float64)
else:
formed_beam_full = np.zeros(
(self.npol, self.ls, self.nha), dtype=np.float64
)
weight_full = np.zeros((self.npol, self.ls, self.nha), dtype=np.float64)
# Loop over polarisations
for pol, pol_str in enumerate(self.process_pol):
primary_beam = self._beamfunc(pol_str, dec, ha_array)
# Fringestop and sum over products
# 'beamform' does not normalize sum.
this_formed_beam = beamform(
self.vis[pol],
self.sumweight[pol],
dec,
self.latitude,
np.cos(ha_array),
np.sin(ha_array),
self.bvec[pol][0],
self.bvec[pol][1],
f_local_indices,
ra_index_range,
)
sumweight_inrange = self.sumweight[pol][:, ra_index_range, :]
visweight_inrange = self.visweight[pol][:, ra_index_range, :]
if self.collapse_ha:
# Sum over RA. Does not multiply by weights because
# this_formed_beam was never normalized (this avoids
# re-work and makes code more efficient).
this_sumweight = np.sum(
np.sum(sumweight_inrange, axis=-1) * primary_beam**2, axis=1
)
formed_beam_full[pol] = np.sum(
this_formed_beam * primary_beam, axis=1
) * invert_no_zero(this_sumweight)
if self.weight != "inverse_variance":
this_weight2 = np.sum(
np.sum(
sumweight_inrange**2
* invert_no_zero(visweight_inrange),
axis=-1,
)
* primary_beam**2,
axis=1,
)
weight_full[pol] = this_sumweight**2 * invert_no_zero(
this_weight2
)
else:
weight_full[pol] = this_sumweight
else:
# Need to divide by weight here for proper
# normalization because it is not done in
# beamform()
this_sumweight = np.sum(sumweight_inrange, axis=-1)
# Populate only where ha_mask is true. Zero otherwise.
formed_beam_full[pol][:, ha_mask] = (
this_formed_beam * invert_no_zero(this_sumweight)
)
if self.weight != "inverse_variance":
this_weight2 = np.sum(
sumweight_inrange**2 * invert_no_zero(visweight_inrange),
axis=-1,
)
# Populate only where ha_mask is true. Zero otherwise.
weight_full[pol][:, ha_mask] = (
this_sumweight**2 * invert_no_zero(this_weight2)
)
else:
weight_full[pol][:, ha_mask] = this_sumweight
# Ensure weights are zero for non-processed frequencies
weight_full[pol][f_mask] = 0.0
# Combine polarizations if needed.
# TODO: For now I am ignoring differences in the X and
# Y beams and just adding them as is.
if self.polarization == "I":
formed_beam_full = np.sum(
formed_beam_full * weight_full, axis=0
) * invert_no_zero(np.sum(weight_full, axis=0))
weight_full = np.sum(weight_full, axis=0)
# Add an axis for the polarization
if self.collapse_ha:
formed_beam_full = np.reshape(formed_beam_full, (1, self.ls))
weight_full = np.reshape(weight_full, (1, self.ls))
else:
formed_beam_full = np.reshape(
formed_beam_full, (1, self.ls, self.nha)
)
weight_full = np.reshape(weight_full, (1, self.ls, self.nha))
elif self.polarization == "stokes":
# TODO: Not implemented
pass
# Populate container.
fbb[src] = formed_beam_full
# Scale the weights by a factor of 2 to account for the fact that we
# have taken the real-component of the fringestopped visibility, which
# has a variance that is 1/2 the variance of the complex visibility
# that was encoded in our original weight dataset.
fbw[src] = 2.0 * weight_full
if not self.collapse_ha:
if self.is_sstream:
formed_beam.ha[src, :] = ha_array
else:
# Populate only where ha_mask is true.
formed_beam.ha[src, ha_mask] = ha_array
return formed_beam
[docs]
def process_finish(self):
"""Clear lists holding copies of data.
These lists will persist beyond this task being done, so
the data stored there will continue to use memory.
"""
for attr in ["vis", "visweight", "bvec", "sumweight"]:
try:
delattr(self, attr)
except AttributeError:
pass
def _ha_array(self, ra, source_ra_index, source_ra, ha_side, is_sstream=True):
"""Hour angle for each RA/time bin to be processed.
Also return the indices of these bins in the full RA/time axis.
Parameters
----------
ra : array
RA axis in the data
source_ra_index : int
Index in data.index_map['ra'] closest to source_ra
source_ra : float
RA of the quasar
ha_side : int
Number of RA/HA bins on each side of transit.
is_sstream : bool
True if data is sidereal stream. Flase if time stream
Returns
-------
ha_array : np.ndarray
Hour angle array in the range -180. to 180
ra_index_range : np.ndarray of int
Indices (in data.index_map['ra']) corresponding
to ha_array.
"""
# RA range to track this quasar through the beam.
ra_index_range = np.arange(
source_ra_index - ha_side, source_ra_index + ha_side + 1, dtype=np.int32
)
# Number of RA bins in data.
nra = len(ra)
if is_sstream:
# Wrap RA indices around edges.
ra_index_range[ra_index_range < 0] += nra
ra_index_range[ra_index_range >= nra] -= nra
# Hour angle array (convert to radians)
ha_array = np.deg2rad(ra[ra_index_range] - source_ra)
# For later convenience it is better if `ha_array` is
# in the range -pi to pi instead of 0 to 2pi.
ha_array = (ha_array + np.pi) % (2.0 * np.pi) - np.pi
# In this case the ha_mask is trivial
ha_mask = np.ones(len(ra_index_range), dtype=bool)
else:
# Mask-out indices out of range
ha_mask = (ra_index_range >= 0) & (ra_index_range < nra)
# Return smaller HA range, and mask.
ra_index_range = ra_index_range[ha_mask]
# Hour angle array (convert to radians)
ha_array = np.deg2rad(ra[ra_index_range] - source_ra)
# For later convenience it is better if `ha_array` is
# in the range -pi to pi instead of 0 to 2pi.
ha_array = (ha_array + np.pi) % (2.0 * np.pi) - np.pi
return ha_array, ra_index_range, ha_mask
def _initialize_beam_with_data(self):
"""Beam initialization that requires data.
This is called at the start of the process method
and can be overridden to perform any beam initialization
that requires the data and catalog to be parsed first.
"""
# Find the index of the local frequencies in
# the frequency axis of the telescope instance
if not self.no_beam_model:
self.freq_local_telescope_index = np.array(
[
np.argmin(np.abs(nu - self.telescope.frequencies))
for nu in self.freq_local
]
)
def _beamfunc(self, pol, dec, ha):
"""Calculate the primary beam at the location of a source as it transits.
Uses the frequencies in the freq_local_telescope_index attribute.
Parameters
----------
pol : str
String specifying the polarisation,
either 'XX', 'XY', 'YX', or 'YY'.
dec : float
The declination of the source in radians.
ha : np.ndarray[nha,]
The hour angle of the source in radians.
Returns
-------
primary_beam : np.ndarray[nfreq, nha]
The primary beam as a function of frequency and hour angle
at the sources declination for the requested polarisation.
"""
nfreq = self.freq_local.size
if self.no_beam_model:
return np.ones((nfreq, ha.size), dtype=np.float64)
angpos = np.array([(0.5 * np.pi - dec) * np.ones_like(ha), ha]).T
primary_beam = np.zeros((nfreq, ha.size), dtype=np.float64)
for ff, freq in enumerate(self.freq_local_telescope_index):
bii = self.telescope.beam(self.map_pol_feed[pol[0]], freq, angpos)
if pol[0] != pol[1]:
bjj = self.telescope.beam(self.map_pol_feed[pol[1]], freq, angpos)
else:
bjj = bii
primary_beam[ff] = np.sum(bii * bjj.conjugate(), axis=1)
return primary_beam
def _process_data(self, data):
"""Store code for parsing and formating data prior to beamforming."""
# Easy access to communicator
self.comm_ = data.comm
self.tag_data = data.attrs["tag"] if "tag" in data.attrs else None
# Extract data info
if "ra" in data.index_map:
self.is_sstream = True
self.ra = data.index_map["ra"]
# Calculate the epoch for the data so we can calculate the correct
# CIRS coordinates
if "lsd" not in data.attrs:
raise ValueError(
"SiderealStream must have an LSD attribute to calculate the epoch."
)
lsd = np.mean(data.attrs["lsd"])
self.epoch = self.telescope.lsd_to_unix(lsd)
dt = 240.0 * ctime.SIDEREAL_S * np.median(np.abs(np.diff(self.ra)))
else:
self.is_sstream = False
# Convert data timestamps into LSAs (degrees)
self.ra = self.telescope.unix_to_lsa(data.time)
self.epoch = data.time.mean()
dt = np.median(np.abs(np.diff(data.time)))
self.freq = data.index_map["freq"]
self.nfreq = len(self.freq)
# Ensure data is distributed in freq axis
data.redistribute(0)
# Number of RA bins to track each source at each side of transit
self.ha_side = self.timetrack / dt
self.nha = 2 * int(self.ha_side) + 1
# polmap: indices of each vis product in
# polarization list: ['XX', 'XY', 'YX', 'YY']
polmap = polarization_map(data.index_map, self.telescope)
# Baseline vectors in meters
bvec_m = baseline_vector(data.index_map, self.telescope)
# MPI distribution values
self.lo = data.vis.local_offset[0]
self.ls = data.vis.local_shape[0]
self.freq_local = self.freq["centre"][self.lo : self.lo + self.ls]
# These are to be used when gathering results in the end.
# Tuple (not list!) of number of frequencies in each rank
self.fsize = tuple(data.comm.allgather(self.ls))
# Tuple (not list!) of displacements of each rank array in full array
self.foffset = tuple(data.comm.allgather(self.lo))
fullpol = ["XX", "XY", "YX", "YY"]
# Save subsets of the data for each polarization, changing
# the ordering to 'C' (needed for the cython part).
# This doubles the memory usage.
self.vis, self.visweight, self.bvec, self.sumweight = [], [], [], []
for pol in self.process_pol:
pol = fullpol.index(pol)
polmask = polmap == pol
# Swap order of product(1) and RA(2) axes, to reduce striding
# through memory later on.
self.vis.append(
np.copy(np.moveaxis(data.vis[:, polmask, :], 1, 2), order="C")
)
# Restrict visweight to the local frequencies
self.visweight.append(
np.copy(
np.moveaxis(
data.weight[self.lo : self.lo + self.ls][:, polmask, :], 1, 2
).astype(np.float64),
order="C",
)
)
# Multiply bvec_m by frequencies to get vector in wavelengths.
# Shape: (2, nfreq_local, nvis), for each pol.
self.bvec.append(
np.copy(
bvec_m[:, np.newaxis, polmask]
* self.freq_local[np.newaxis, :, np.newaxis]
* 1e6
/ C,
order="C",
)
)
if self.weight == "inverse_variance":
# Weights for sum are just the visibility weights
self.sumweight.append(self.visweight[-1])
else:
# Ensure zero visweights result in zero sumweights
this_sumweight = (self.visweight[-1] > 0.0).astype(np.float64)
ssi = data.input_flags[:]
ssp = data.index_map["prod"][:]
sss = data.reverse_map["stack"]["stack"][:]
nstack = data.vis.shape[1]
# this redundancy takes into account input flags.
# It has shape (nstack, ntime)
redundancy = np.moveaxis(
calculate_redundancy(ssi, ssp, sss, nstack)[polmask].astype(
np.float64
),
0,
1,
)[np.newaxis, :, :]
# redundancy = (self.telescope.redundancy[polmask].
# astype(np.float64)[np.newaxis, np.newaxis, :])
this_sumweight *= redundancy
if self.weight == "uniform":
this_sumweight = (this_sumweight > 0.0).astype(np.float64)
self.sumweight.append(np.copy(this_sumweight, order="C"))
def _process_catalog(self, catalog):
"""Process the catalog to get CIRS coordinates at the correct epoch.
Note that `self._process_data` must have been called before this.
"""
if "position" not in catalog:
raise ValueError("Input is missing a position table.")
if not hasattr(self, "epoch"):
self.log.warning("Epoch not set. Was the requested data not available?")
self.data_available = False
return
coord = catalog.attrs.get("coordinates", None)
if coord == "CIRS":
self.log.info("Catalog already in CIRS coordinates.")
self.sra = catalog["position"]["ra"]
self.sdec = catalog["position"]["dec"]
else:
self.log.info("Converting catalog from ICRS to CIRS coordinates.")
self.sra, self.sdec = icrs_to_cirs(
catalog["position"]["ra"], catalog["position"]["dec"], self.epoch
)
if self.freqside is not None:
if "redshift" not in catalog:
raise ValueError("Input is missing a required redshift table.")
self.sfreq = NU21 / (catalog["redshift"]["z"][:] + 1.0) # MHz
self.source_cat = catalog
self.nsource = len(self.sra)
self.tag_catalog = catalog.attrs["tag"] if "tag" in catalog.attrs else None
[docs]
class BeamForm(BeamFormBase):
"""BeamForm for a single source catalog and multiple visibility datasets."""
[docs]
def setup(self, manager, source_cat):
"""Parse the source catalog and performs the generic setup.
Parameters
----------
manager : either `ProductManager`, `BeamTransfer` or `TransitTelescope`
Contains a TransitTelescope object describing the telescope.
source_cat : :class:`containers.SourceCatalog`
Catalog of points to beamform at.
"""
super().setup(manager)
self.catalog = source_cat
[docs]
def process(self, data):
"""Parse the visibility data and beamforms all sources.
Parameters
----------
data : `containers.SiderealStream` or `containers.TimeStream`
Data to beamform on.
Returns
-------
formed_beam : `containers.FormedBeam` or `containers.FormedBeamHA`
Formed beams at each source.
"""
# Process and make available various data
self._process_data(data)
self._process_catalog(self.catalog)
if not self.data_available:
return None
# Call generic process method.
return super().process()
[docs]
class BeamFormCat(BeamFormBase):
"""BeamForm for multiple source catalogs and a single visibility dataset."""
[docs]
def setup(self, manager, data):
"""Parse the visibility data and performs the generic setup.
Parameters
----------
manager : either `ProductManager`, `BeamTransfer` or `TransitTelescope`
Contains a TransitTelescope object describing the telescope.
data : `containers.SiderealStream` or `containers.TimeStream`
Data to beamform on.
"""
super().setup(manager)
# Process and make available various data
self._process_data(data)
[docs]
def process(self, source_cat):
"""Parse the source catalog and beamforms all sources.
Parameters
----------
source_cat : :class:`containers.SourceCatalog`
Catalog of points to beamform at.
Returns
-------
formed_beam : `containers.FormedBeam` or `containers.FormedBeamHA`
Formed beams at each source.
"""
self._process_catalog(source_cat)
if not self.data_available:
return None
# Call generic process method.
return super().process()
[docs]
class BeamFormExternalMixin:
"""Base class for tasks that beamform using an external model of the primary beam.
The primary beam is provided to the task during setup. Do not use this class
directly, instead use BeamFormExternal and BeamFormExternalCat.
"""
[docs]
def setup(self, beam, *args):
"""Initialize the beam.
Parameters
----------
beam : GridBeam
Model for the primary beam.
args : optional
Additional argument to pass to the super class
"""
super().setup(*args)
self._initialize_beam(beam)
def _initialize_beam(self, beam):
"""Initialize based on the beam container type.
Parameters
----------
beam : GridBeam
Container holding the model for the primary beam.
Currently only accepts GridBeam type containers.
"""
if isinstance(beam, containers.GridBeam):
self._initialize_grid_beam(beam)
self._beamfunc = self._grid_beam
else:
raise ValueError(f"Do not recognize beam container: {beam.__class__}")
def _initialize_beam_with_data(self):
"""Ensure that the beam and visibilities have the same frequency axis."""
if not np.array_equal(self.freq_local, self._beam_freq):
raise RuntimeError("Beam and visibility frequency axes do not match.")
def _initialize_grid_beam(self, gbeam):
"""Create an interpolator for a GridBeam.
Parameters
----------
gbeam : GridBeam
Model for the primary beam on a celestial grid where
(theta, phi) = (declination, hour angle) in degrees. The beam
must be in power units and must have a length 1 input axis that
contains the "baseline averaged" beam, which will be applied to
all baselines of a given polarisation.
"""
# Make sure the beam is in celestial coordinates
if gbeam.coords != "celestial":
raise RuntimeError(
"GridBeam must be converted to celestial coordinates for beamforming."
)
# Make sure there is a single beam to use for all inputs
if gbeam.input.size > 1:
raise NotImplementedError(
"Do not support input-dependent beams at the moment."
)
# Distribute over frequencies, extract local frequencies
gbeam.redistribute("freq")
lo = gbeam.beam.local_offset[0]
nfreq = gbeam.beam.local_shape[0]
self._beam_freq = gbeam.freq[lo : lo + nfreq]
# Find the relevant indices into the polarisation axis
process_pol = getattr(self, "process_pol", list(gbeam.pol))
ipol = np.array([list(gbeam.pol).index(pstr) for pstr in process_pol])
npol = ipol.size
self._beam_pol = [gbeam.pol[ip] for ip in ipol]
# Extract beam
flag = gbeam.weight[:].local_array[:, :, 0][:, ipol] > 0.0
beam = np.where(flag, gbeam.beam[:].local_array[:, :, 0][:, ipol].real, 0.0)
# Convert the declination and hour angle axis to radians, make sure they are sorted
ha = (gbeam.phi + 180.0) % 360.0 - 180.0
isort = np.argsort(ha)
ha = np.radians(ha[isort])
dec = np.radians(gbeam.theta)
# Create a 2D interpolator for the beam at each frequency and polarisation
self._beam = [
[
scipy.interpolate.RectBivariateSpline(dec, ha, beam[ff, pp][:, isort])
for pp in range(npol)
]
for ff in range(nfreq)
]
# Create a similair interpolator for the flag array
self._beam_flag = [
[
scipy.interpolate.RectBivariateSpline(
dec, ha, flag[ff, pp][:, isort].astype(np.float32)
)
for pp in range(npol)
]
for ff in range(nfreq)
]
self.log.info("Grid beam initialized.")
def _grid_beam(self, pol, dec, ha):
"""Interpolate a GridBeam to the requested declination and hour angles.
Parameters
----------
pol : str
String specifying the polarisation,
either 'XX', 'XY', 'YX', or 'YY'.
dec : float
The declination of the source in radians.
ha : np.ndarray[nha,]
The hour angle of the source in radians.
Returns
-------
primay_beam : np.ndarray[nfreq, nha]
The primary beam as a function of frequency and hour angle
at the sources declination for the requested polarisation.
"""
pp = self._beam_pol.index(pol)
primay_beam = np.array(
[self._beam[ff][pp](dec, ha)[0] for ff in range(self._beam_freq.size)]
)
# If the interpolated flags deviate from 1.0, then we mask
# the interpolated beam, since some fraction the underlying
# data used to construct the interpolator was masked.
flag = np.array(
[
np.abs(self._beam_flag[ff][pp](dec, ha)[0] - 1.0) < 0.01
for ff in range(self._beam_freq.size)
]
)
return np.where(flag, primay_beam, 0.0)
[docs]
class BeamFormExternal(BeamFormExternalMixin, BeamForm):
"""Beamform a single catalog and multiple datasets using an external beam model.
The setup method requires [beam, manager, source_cat] as arguments.
"""
[docs]
class BeamFormExternalCat(BeamFormExternalMixin, BeamFormCat):
"""Beamform multiple catalogs and a single dataset using an external beam model.
The setup method requires [beam, manager, data] as arguments.
"""
[docs]
class RingMapBeamForm(task.SingleTask):
"""Beamform by extracting the pixel containing each source form a RingMap.
This is significantly faster than `Beamform` or `BeamformCat` with the caveat
that they can beamform exactly on a source whereas this task is at the mercy of
what was done to produce the `RingMap` (use `DeconvolveHybridM` for best
results).
Unless it has an explicit `lsd` attribute, the ring map is assumed to be in the
same coordinate epoch as the catalog. If it does, the input catalog is assumed to be
in ICRS and then is precessed to the CIRS coordinates in the epoch of the map.
"""
[docs]
def setup(self, telescope: io.TelescopeConvertible, ringmap: containers.RingMap):
"""Set the telescope object.
Parameters
----------
telescope
The telescope object to use.
ringmap
The ringmap to extract the sources from. See the class documentation for how
the epoch is determined.
"""
self.telescope = io.get_telescope(telescope)
self.ringmap = ringmap
[docs]
def process(self, catalog: containers.SourceCatalog) -> containers.FormedBeam:
"""Extract sources from a ringmap.
Parameters
----------
catalog
The catalog to extract sources from.
Returns
-------
sources
The source spectra.
"""
ringmap = self.ringmap
src_ra, src_dec = self._process_catalog(catalog)
# Container to hold the formed beams
formed_beam = containers.FormedBeam(
object_id=catalog.index_map["object_id"],
axes_from=ringmap,
attrs_from=catalog,
distributed=True,
)
# Initialize container to zeros.
formed_beam.beam[:] = 0.0
formed_beam.weight[:] = 0.0
# Copy catalog information
formed_beam["position"][:] = catalog["position"][:]
if "redshift" in catalog:
formed_beam.add_dataset("redshift")
formed_beam["redshift"][:] = catalog["redshift"][:]
# Ensure containers are distributed in frequency
formed_beam.redistribute("freq")
ringmap.redistribute("freq")
has_weight = "weight" in ringmap.datasets
# Get the pixel indices
ra_ind, za_ind = self._source_ind(src_ra, src_dec)
# Dereference the datasets
fbb = formed_beam.beam[:]
fbw = formed_beam.weight[:]
rmm = ringmap.map[:]
rmw = ringmap.weight[:] if has_weight else invert_no_zero(ringmap.rms[:]) ** 2
# Loop over sources and extract the polarised pencil beams containing them from
# the ringmaps
for si, (ri, zi) in enumerate(zip(ra_ind, za_ind)):
fbb[si] = rmm[0, :, :, ri, zi]
fbw[si] = rmw[:, :, ri, zi] if has_weight else rmw[:, :, ri]
return formed_beam
def _process_catalog(
self, catalog: containers.SourceCatalog
) -> tuple[np.ndarray, np.ndarray]:
"""Get the current epoch coordinates of the catalog."""
if "position" not in catalog:
raise ValueError("Input is missing a position table.")
# Calculate the epoch for the data so we can calculate the correct
# CIRS coordinates
if "lsd" not in self.ringmap.attrs:
self.log.info(
"Input map has no epoch set, assuming that it matches the catalog."
)
src_ra, src_dec = catalog["position"]["ra"], catalog["position"]["dec"]
else:
lsd = (
self.ringmap.attrs["lsd"][0]
if isinstance(self.ringmap.attrs["lsd"], np.ndarray)
else self.ringmap.attrs["lsd"]
)
epoch = self.telescope.lsd_to_unix(lsd)
# Get the source positions at the current epoch
src_ra, src_dec = icrs_to_cirs(
catalog["position"]["ra"], catalog["position"]["dec"], epoch
)
return src_ra, src_dec
def _source_ind(
self, src_ra: np.ndarray, src_dec: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
"""Get the RA/ZA ringmap pixel indices of the sources."""
# Get the grid size of the map in RA and sin(ZA)
dra = np.median(np.abs(np.diff(self.ringmap.index_map["ra"])))
dza = np.median(np.abs(np.diff(self.ringmap.index_map["el"])))
za_min = self.ringmap.index_map["el"][:].min()
# Get the source indices in RA
# NOTE: that we need to take into account that sources might be less than 360
# deg, but still closer to ind=0
max_ra_ind = len(self.ringmap.ra) - 1
ra_ind = (np.rint(src_ra / dra) % max_ra_ind).astype(np.int64)
# Get the indices for the ZA direction
za_ind = np.rint(
(np.sin(np.radians(src_dec - self.telescope.latitude)) - za_min) / dza
).astype(np.int64)
return ra_ind, za_ind
[docs]
class RingMapStack2D(RingMapBeamForm):
"""Stack RingMap's on sources directly.
Parameters
----------
num_ra, num_dec : int
The number of RA and DEC pixels to stack either side of the source.
num_freq : int
Number of final frequency channels either side of the source redshift to
stack.
freq_width : float
Length of frequency interval either side of source to use in MHz.
weight : {"patch", "dec", "enum"}
How to weight the data. If `"input"` the data is weighted on a pixel by pixel
basis according to the input data. If `"patch"` then the inverse of the
variance of the extracted patch is used. If `"dec"` then the inverse variance
of each declination strip is used.
"""
num_ra = config.Property(proptype=int, default=10)
num_dec = config.Property(proptype=int, default=10)
num_freq = config.Property(proptype=int, default=256)
freq_width = config.Property(proptype=float, default=100.0)
weight = config.enum(["patch", "dec", "input"], default="input")
[docs]
def process(self, catalog: containers.SourceCatalog) -> containers.FormedBeam:
"""Extract sources from a ringmap.
Parameters
----------
catalog
The catalog to extract sources from.
Returns
-------
sources
The source spectra.
"""
from mpi4py import MPI
ringmap = self.ringmap
# Get the current epoch catalog position
src_ra, src_dec = self._process_catalog(catalog)
src_z = catalog["redshift"]["z"]
# Get the pixel indices
ra_ind, za_ind = self._source_ind(src_ra, src_dec)
# Ensure containers are distributed in frequency
ringmap.redistribute("freq")
# Get the frequencies on this rank
fs = ringmap.map.local_offset[2]
fe = fs + ringmap.map.local_shape[2]
local_freq = ringmap.freq[fs:fe]
# Dereference the datasets
rmm = ringmap.map[:].local_array
rmw = (
ringmap.weight[:].local_array
if "weight" in ringmap.datasets
else invert_no_zero(ringmap.rms[:].local_array) ** 2
)
# Calculate the frequencies bins to use
nbins = 2 * self.num_freq + 1
bin_edges = np.linspace(
-self.freq_width, self.freq_width, nbins + 1, endpoint=True
)
# Calculate the edges of the frequency distribution, sources outside this range
# will be dropped
global_fmin = ringmap.freq.min()
global_fmax = ringmap.freq.max()
# Create temporary array to accumulate into
wstack = np.zeros(
(nbins + 2, len(ringmap.pol), 2 * self.num_ra + 1, 2 * self.num_dec + 1)
)
weight = np.zeros(
(nbins + 2, len(ringmap.pol), 2 * self.num_ra + 1, 2 * self.num_dec + 1)
)
rmvar = rmm[0].var(axis=2)
w_global = invert_no_zero(np.where(rmvar < 3e-7, 0.0, rmvar))
# Loop over sources and extract the polarised pencil beams containing them from
# the ringmaps
for si, (ri, zi, z) in enumerate(zip(ra_ind, za_ind, src_z)):
source_freq = 1420.406 / (1 + z)
if source_freq > global_fmax or source_freq < global_fmin:
continue
# Get bin indices
bin_ind = np.digitize(local_freq - source_freq, bin_edges)
# Get the slices to extract the enclosing angular region
ri_slice = slice(ri - self.num_ra, ri + self.num_ra + 1)
zi_slice = slice(zi - self.num_dec, zi + self.num_dec + 1)
b = rmm[0, :, :, ri_slice, zi_slice]
w = rmw[:, :, ri_slice, np.newaxis]
if self.weight == "patch":
# Replace the weights with the variance of the patch
w = (w != 0) * invert_no_zero(b.var(axis=(2, 3)))[
:, :, np.newaxis, np.newaxis
]
elif self.weight == "dec":
# w = (w != 0) * invert_no_zero(b.var(axis=2))[:, :, np.newaxis, :]
w = (w != 0) * w_global[:, :, np.newaxis, zi_slice]
bw = b * w
# TODO: this is probably slow so should be moved into Cython
for lfi, bi in enumerate(bin_ind):
wstack[bi] += bw[:, lfi]
weight[bi] += w[:, lfi]
# Arrays to reduce the data into
wstack_all = np.zeros_like(wstack)
weight_all = np.zeros_like(weight)
self.comm.Allreduce(wstack, wstack_all, op=MPI.SUM)
self.comm.Allreduce(weight, weight_all, op=MPI.SUM)
stack_all = wstack_all * invert_no_zero(weight_all)
# Create the container to store the data in
bin_centres = 0.5 * (bin_edges[1:] + bin_edges[:-1])
stack = containers.Stack3D(
freq=bin_centres,
delta_ra=np.arange(-self.num_ra, self.num_ra + 1),
delta_dec=np.arange(-self.num_dec, self.num_dec + 1),
axes_from=ringmap,
attrs_from=ringmap,
)
stack.attrs["tag"] = catalog.attrs["tag"]
stack.stack[:] = stack_all[1:-1].transpose((1, 2, 3, 0))
return stack
[docs]
class HybridVisBeamForm(task.SingleTask):
"""Beamform on a catalog of sources using the HybridVisStream data product.
Attributes
----------
window : float
Window size in degrees. For each source, right ascensions corresponding to
abs(ra - source_ra) <= window are extracted from the hybrid beamformed
visibility at the declination closest to the sources location. Default is
5 degrees.
ignore_rot : bool
Ignore the telescope rotation_angle when calculating the baseline distances
used to beamform in the east-west direction. Defaults to False.
"""
window = config.Property(proptype=float, default=5.0)
ignore_rot = config.Property(proptype=bool, default=False)
[docs]
def setup(self, manager, catalog):
"""Define the observer and the catalog of sources.
Parameters
----------
manager : draco.core.io.TelescopeConvertible
Observer object holding the geographic location of the telescope.
Note that if ignore_rot is False and this object has a non-zero
rotation_angle, then the beamforming will account for the phase
due to the north-south component of the rotation.
catalog : draco.core.containers.SourceCatalog
Beamform on sources in this catalog.
"""
self.telescope = io.get_telescope(manager)
self.latitude = np.radians(self.telescope.latitude)
if not self.ignore_rot and hasattr(self.telescope, "rotation_angle"):
self.log.info(
"Correcting for phase due to north-south component of a "
f"{self.telescope.rotation_angle:0.2f} degree rotation."
)
self.rot = np.radians(self.telescope.rotation_angle)
else:
self.rot = 0.0
self.catalog = catalog
[docs]
def process(self, hvis):
"""Finish beamforming in the east-west direction.
Parameters
----------
hvis : draco.core.containers.HybridVisStream
Visibilities beamformed in the north-south direction to
a grid of declinations along the meridian.
Returns
-------
out : draco.core.containers.HybridFormedBeamHA
Visibilities beamformed to the location of sources
in a catalog.
"""
hvis.redistribute("freq")
fringestopped = hvis.attrs.get("fringestopped", False)
# Get source positions
lsd = hvis.attrs.get("lsd", hvis.attrs.get("csd"))
src_ra, src_dec = (
self.catalog["position"]["ra"][:],
self.catalog["position"]["dec"][:],
)
if lsd is not None:
epoch = np.atleast_1d(self.telescope.lsd_to_unix(lsd))
coords = [icrs_to_cirs(src_ra, src_dec, ep) for ep in epoch]
src_ra = np.mean([coord[0] for coord in coords], axis=0)
src_dec = np.mean([coord[1] for coord in coords], axis=0)
# Find nearest declination
dec = np.degrees(np.arcsin(hvis.index_map["el"]) + self.latitude)
nearest_dec = _search_nearest(dec, src_dec)
delta_dec = np.max(np.abs(np.diff(dec)))
valid_src = np.abs(src_dec - dec[nearest_dec]) < delta_dec
self.log.info(
f"There are {np.sum(valid_src)} catalog sources in this declination range."
)
# Find hour angle window
ra = hvis.ra
ha_arr = correct_phase_wrap(ra[np.newaxis, :] - src_ra[:, np.newaxis], deg=True)
valid = np.abs(ha_arr) <= self.window
nha = np.sum(valid, axis=-1)
ra_rad = np.radians(ra)
# Calculate baseline distances
freq = hvis.freq[hvis.vis[:].local_bounds]
lmbda = C * 1e-6 / freq
ew = hvis.index_map["ew"]
u = ew[np.newaxis, :, np.newaxis] / lmbda[:, np.newaxis, np.newaxis]
v = np.sin(self.rot) * u
# Dereference input datasets
vis = hvis.vis[:].local_array # pol, freq, ew, el, ra
weight = hvis.weight[:].local_array # pol, freq, ew, ra
# Create the output container
out = containers.FormedBeamHAEW(
object_id=self.catalog.index_map["object_id"],
ha=np.arange(np.max(nha), dtype=int),
axes_from=hvis,
attrs_from=hvis,
distributed=hvis.distributed,
comm=hvis.comm,
)
if "redshift" in hvis:
out.add_dataset("redshift")
out.redshift[:] = hvis.redshift[:]
out.position["ra"][:] = src_ra
out.position["dec"][:] = src_dec
out.redistribute("freq")
# Dereference output datasets
ofb = out.beam[:].local_array
ofb[:] = 0.0
owe = out.weight[:].local_array
owe[:] = 0.0
oha = out.ha[:]
oha[:] = 0.0
# Loop over sources and fringestop
for ss, (idec, sdec) in enumerate(zip(nearest_dec, np.radians(src_dec))):
in_range = np.flatnonzero(valid[ss])
if (in_range.size == 0) or not valid_src[ss]:
continue
cos_dec = np.cos(np.radians(dec[idec]))
isort = np.argsort(ha_arr[ss, in_range])
in_range = in_range[isort]
islcs = find_contiguous_slices(in_range)
count = 0
for islc in islcs:
svis = vis[..., idec, islc] # pol, freq, ew, ha
sweight = weight[..., islc]
nsample = svis.shape[-1]
oslc = slice(count, count + nsample)
count = count + nsample
oha[ss, oslc] = ha_arr[ss, islc]
ha = np.radians(ha_arr[ss, islc])
# Loop over local frequencies and fringestop
for ff in range(svis.shape[1]):
owe[ss, :, ff, :, oslc] = sweight[:, ff]
# Calculate the phase
phi = interferometry.fringestop_phase(
ha, self.latitude, sdec, u[ff], v[ff]
)
# If the container has already been fringestopped,
# then we need to remove that contribution from the phase
if fringestopped:
omega = 2.0 * np.pi * ew * cos_dec / lmbda[ff]
phi *= np.exp(
-1.0j * omega[:, np.newaxis] * ra_rad[np.newaxis, islc]
)
ofb[ss, :, ff, :, oslc] = svis[:, ff] * phi
return out
[docs]
class FitBeamFormed(BeamFormExternalMixin, task.SingleTask):
"""Fit beamformed visibilities as a function of hour angle to a primary beam model.
Must provide a GridBeam object in celestial coordinates as argument to setup.
Attributes
----------
weight : "uniform" or "inverse_variance"
How to weight different hour angles during the fit.
max_ha : float
Only consider hour angles less than this value in degrees.
If not provided, then will include all hour angles in the
input container.
epsilon : float
Regularisation used during the fit.
"""
weight = config.enum(["uniform", "inverse_variance"], default="uniform")
max_ha = config.Property(proptype=float, default=None)
min_num_background = config.Property(proptype=int, default=5)
min_frac_beam = config.Property(proptype=float, default=0.50)
epsilon = config.Property(proptype=float, default=1.0e-10)
[docs]
def process(self, data):
"""Fit a model to the beamformed visibilites along the hour angle axis.
Parameters
----------
data : FormedBeamHA or FormedBeamHAEW
Visibilities beamformed to the location of sources in a catalog.
Returns
-------
out : FitFormedBeam or FitFormedBeamEW
Best-fit parameters describing the hour-angle dependence.
"""
container_lookup = {
containers.FormedBeamHA: containers.FitFormedBeam,
containers.FormedBeamHAEW: containers.FitFormedBeamEW,
}
# Distrbitue over frequency
data.redistribute("freq")
# Identify local frequencies
self.freq_local = data.freq[data.beam[:].local_bounds]
self._initialize_beam_with_data()
# Create output container
OutputContainer = container_lookup[data.__class__]
out = OutputContainer(
axes_from=data,
attrs_from=data,
distributed=data.distributed,
comm=data.comm,
)
out.redistribute("freq")
# Initialize everything to zero
for dset in out.datasets.values():
dset[:] = 0.0
# Copy over source coordinates
out.position[:] = data.position[:]
if "redshift" in data:
out.add_dataset("redshift")
out.redshift[:] = data.redshift[:]
# Dereference datasets
beam = data.beam[:].local_array
weight = data.weight[:].local_array
obeam = out.beam[:].local_array
oweight = out.weight[:].local_array
obkg = out.background[:].local_array
oweightbkg = out.weight_background[:].local_array
ocorr = out.corr_background_beam[:].local_array
# Get source coordinates
src_dec = np.radians(data.position["dec"][:])
src_ha = data.ha[:]
max_nha = src_ha.shape[1]
# Loop over sources
for ss, sdec in enumerate(src_dec):
# Ignore missing sources
if not np.any(weight[ss] > 0.0):
continue
# Extract hour angle axis
nha = max_nha - np.min(np.flatnonzero(src_ha[ss, ::-1] != 0.0))
slc = slice(0, nha)
sha = np.radians(src_ha[ss, slc])
# Loop over polarisation
for pp, pol in enumerate(data.pol):
b = beam[ss, pp, ..., slc]
w = weight[ss, pp, ..., slc]
if self.weight == "uniform":
sigma = np.sqrt(invert_no_zero(w))
w = w > 0.0
if self.max_ha is not None:
flag_ha = np.abs(sha) <= np.radians(self.max_ha)
w = w * flag_ha
else:
flag_ha = np.ones(nha, dtype=bool)
# Get the template as a function of hour angle
X = self.get_template(pol, sdec, sha)
if "ew" in out.index_map:
X = X[:, np.newaxis, :, :]
# Identify frequencies/baselines that are missing a significant fraction of samples
f = w > 0
offsrc = X[..., 1] < 0.05
flag_background = np.sum(f * offsrc, axis=-1) > self.min_num_background
flag_beam = (
np.sum(f * X[..., 1], axis=-1)
* invert_no_zero(np.sum(flag_ha * X[..., 1], axis=-1))
) > self.min_frac_beam
flag = flag_background & flag_beam
if not np.any(flag):
continue
# Construct the inverse parameter covariance matrix
XT = np.swapaxes(X, -2, -1)
A = np.matmul(XT, w[..., np.newaxis] * X) + np.eye(2) * self.epsilon
# Solve for the parameters and their covariance
proj_wb = np.sum(XT * (w * b)[..., np.newaxis, :], axis=-1)
coeff = np.linalg.solve(A, proj_wb)
cov = np.linalg.solve(A, np.eye(2))
# Save to output container
obeam[ss, pp] = coeff[..., 1]
obkg[ss, pp] = coeff[..., 0]
if self.weight == "uniform":
B = np.matmul(cov, XT * (w * sigma)[..., np.newaxis, :])
cov = np.matmul(B, np.swapaxes(B, -2, -1))
oweight[ss, pp] = flag * invert_no_zero(cov[..., 1, 1])
oweightbkg[ss, pp] = flag * invert_no_zero(cov[..., 0, 0])
ocorr[ss, pp] = cov[..., 0, 1] * np.sqrt(
oweight[ss, pp] * oweightbkg[ss, pp]
)
return out
[docs]
def get_template(self, pol, dec, ha):
"""Get the template as a function of hour angle to fit to transit.
Parameters
----------
pol : str
String specifying the polarisation,
either 'XX', 'XY', 'YX', or 'YY'.
dec : float
The declination of the source in radians.
ha : np.ndarray[nha,]
The hour angle of the source in radians.
Returns
-------
t : np.ndarray[nha, 2]
Template for the source transit as a function of hour angle.
First column is entirely 1 and corresponds to an overall additive
offset. Second column is the primary beam model versus hour angle.
"""
t = np.ones((self.freq_local.size, ha.size, 2), dtype=float)
t[..., 1] = self._beamfunc(pol, dec, ha)
return t
[docs]
class HealpixBeamForm(task.SingleTask):
"""Beamform by extracting the pixel containing each source form a Healpix map.
Unless it has an explicit `epoch` attribute, the Healpix map is assumed to be in the
same coordinate epoch as the catalog. If it does, the input catalog is assumed to be
in ICRS and then is precessed to the CIRS coordinates in the epoch of the map.
Attributes
----------
fwhm : float
Smooth the map with a Gaussian with the specified FWHM in degrees. If `None`
(default), leave at native map resolution. This will modify the input map in
place.
"""
fwhm = config.Property(proptype=float, default=None)
[docs]
def setup(self, hpmap: containers.Map):
"""Set the map to extract beams from at each catalog location.
Parameters
----------
hpmap
The Healpix map to extract the sources from.
"""
self.map = hpmap
mv = self.map.map[:]
self.map.redistribute("freq")
self.log.info("Smoothing input Healpix map.")
for lfi, _ in mv.enumerate(axis=0):
for pi in range(mv.shape[1]):
mv[lfi, pi] = healpy.smoothing(
mv[lfi, pi],
fwhm=np.radians(self.fwhm),
)
[docs]
def process(self, catalog: containers.SourceCatalog) -> containers.FormedBeam:
"""Extract sources from a ringmap.
Parameters
----------
catalog
The catalog to extract sources from.
Returns
-------
formed_beam
The source spectra.
"""
if "position" not in catalog:
raise ValueError("Input is missing a position table.")
# Container to hold the formed beams
formed_beam = containers.FormedBeam(
object_id=catalog.index_map["object_id"],
axes_from=self.map,
distributed=True,
)
# Initialize container to zeros.
formed_beam.beam[:] = 0.0
formed_beam.weight[:] = 0.0
# Copy catalog information
formed_beam["position"][:] = catalog["position"][:]
if "redshift" in catalog:
formed_beam.add_dataset("redshift")
formed_beam["redshift"][:] = catalog["redshift"][:]
# Get the source positions at the epoch of the input map
epoch = self.map.attrs.get("epoch", None)
epoch = ctime.ensure_unix(epoch) if epoch is not None else None
if epoch:
src_ra, src_dec = icrs_to_cirs(
catalog["position"]["ra"], catalog["position"]["dec"], epoch
)
else:
self.log.info(
"Input map has no epoch set, assuming that it matches the catalog."
)
src_ra = catalog["position"]["ra"]
src_dec = catalog["position"]["dec"]
# Use Healpix to get the pixels containing the sources
pix_ind = healpy.ang2pix(self.map.nside, src_ra, src_dec, lonlat=True)
# Ensure containers are distributed in frequency
formed_beam.redistribute("freq")
self.map.redistribute("freq")
formed_beam.beam[:] = self.map.map[:, :, pix_ind].transpose(2, 1, 0)
# Set to some non-zero value as the Map container doesn't have a weight
formed_beam.weight[:] = 1.0
return formed_beam
[docs]
def icrs_to_cirs(ra, dec, epoch, apparent=True):
"""Convert a set of positions from ICRS to CIRS at a given data.
Parameters
----------
ra, dec : float or np.ndarray
Positions of source in ICRS coordinates including an optional
redshift position.
epoch : time_like
Time to convert the positions to. Can be any type convertible to a
time using `caput.time.ensure_unix`.
apparent : bool
Calculate the apparent position (includes abberation and deflection).
Returns
-------
ra_cirs, dec_cirs : float or np.ndarray
Arrays of the positions in *CIRS* coordiantes.
"""
positions = Star(ra=Angle(degrees=ra), dec=Angle(degrees=dec))
epoch = ctime.unix_to_skyfield_time(ctime.ensure_unix(epoch))
earth = ctime.skyfield_wrapper.ephemeris["earth"]
positions = earth.at(epoch).observe(positions)
if apparent:
positions = positions.apparent()
ra_cirs, dec_cirs, _ = positions.cirs_radec(epoch)
return ra_cirs._degrees, dec_cirs._degrees