Source code for pynpoint.processing.psfpreparation

"""
Pipeline modules to prepare the data for the PSF subtraction.
"""

import time
import warnings

from typing import Optional, Tuple

import numpy as np

from astropy.coordinates import EarthLocation
from astropy.time import Time
from typeguard import typechecked

from pynpoint.core.processing import ProcessingModule
from pynpoint.util.module import progress, memory_frames
from pynpoint.util.image import create_mask, scale_image, shift_image


[docs]class PSFpreparationModule(ProcessingModule): """ Module to prepare the data for PSF subtraction with PCA. The preparation steps include masking and an optional normalization. """ __author__ = 'Markus Bonse, Tomas Stolker, Timothy Gebhard, Sven Kiefer' @typechecked def __init__(self, name_in: str, image_in_tag: str, image_out_tag: str, mask_out_tag: Optional[str] = None, norm: bool = False, resize: Optional[float] = None, cent_size: Optional[float] = None, edge_size: Optional[float] = None) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. image_in_tag : str Tag of the database entry that is read as input. image_out_tag : str Tag of the database entry with images that is written as output. mask_out_tag : str, None, optional Tag of the database entry with the mask that is written as output. If set to None, no mask array is saved. norm : bool Normalize each image by its Frobenius norm. Only supported for 3D datasets (i.e. regular imaging). resize : float, None DEPRECATED. This parameter is currently ignored by the module and will be removed in a future version of PynPoint. cent_size : float, None, optional Radius of the central mask (in arcsec). No mask is used when set to None. edge_size : float, None, optional Outer radius (in arcsec) beyond which pixels are masked. No outer mask is used when set to None. If the value is larger than half the image size then it will be set to half the image size. Returns ------- NoneType None """ super().__init__(name_in) self.m_image_in_port = self.add_input_port(image_in_tag) if mask_out_tag is None: self.m_mask_out_port = None else: self.m_mask_out_port = self.add_output_port(mask_out_tag) self.m_image_out_port = self.add_output_port(image_out_tag) self.m_cent_size = cent_size self.m_edge_size = edge_size self.m_norm = norm # Raise a DeprecationWarning if the resize argument is used if resize is not None: warnings.warn('The \'resize\' parameter has been deprecated. Its value is currently ' 'being ignored, and the argument will be removed in a future version ' 'of PynPoint.', DeprecationWarning)
[docs] @typechecked def run(self) -> None: """ Run method of the module. Masks and normalizes the images. Returns ------- NoneType None """ # Get the PIXSCALE and MEMORY attributes pixscale = self.m_image_in_port.get_attribute('PIXSCALE') memory = self._m_config_port.get_attribute('MEMORY') # Get the numnber of dimensions and shape ndim = self.m_image_in_port.get_ndim() im_shape = self.m_image_in_port.get_shape() if ndim == 3: # Number of images nimages = im_shape[-3] # Split into batches to comply with memory constraints frames = memory_frames(memory, nimages) elif ndim == 4: # Process all wavelengths per exposure at once frames = np.linspace(0, im_shape[-3], im_shape[-3]+1) if self.m_norm and ndim == 4: warnings.warn('The \'norm\' parameter does not support 4D datasets and will therefore ' 'be ignored.') # Convert m_cent_size and m_edge_size from arcseconds to pixels if self.m_cent_size is not None: self.m_cent_size /= pixscale if self.m_edge_size is not None: self.m_edge_size /= pixscale # Create 2D disk mask which will be applied to every frame mask = create_mask((int(im_shape[-2]), int(im_shape[-1])), (self.m_cent_size, self.m_edge_size)).astype(bool) # Keep track of the normalization vectors in case we are normalizing the images (if # we are not normalizing, this list will remain empty) norms = list() start_time = time.time() # Run the PSFpreparationModule for each subset of frames for i in range(frames[:-1].size): # Print progress to command line progress(i, len(frames[:-1]), 'Preparing images for PSF subtraction...', start_time) if ndim == 3: # Get the images and ensure they have the correct 3D shape with the following # three dimensions: (batch_size, height, width) images = self.m_image_in_port[frames[i]:frames[i+1], ] if images.ndim == 2: warnings.warn('The input data has 2 dimensions whereas 3 dimensions are ' 'required. An extra dimension has been added.') images = images[np.newaxis, ...] elif ndim == 4: # Process all wavelengths per exposure at once images = self.m_image_in_port[:, i, ] # Apply the mask, i.e., set all pixels to 0 where the mask is False images[:, ~mask] = 0. # If desired, normalize the images using the Frobenius norm if self.m_norm and ndim == 3: im_norm = np.linalg.norm(images, ord='fro', axis=(1, 2)) images /= im_norm[:, np.newaxis, np.newaxis] norms.append(im_norm) # Write processed images to output port if ndim == 3: self.m_image_out_port.append(images, data_dim=3) elif ndim == 4: self.m_image_out_port.append(images, data_dim=4) # Store information about mask if self.m_mask_out_port is not None: self.m_mask_out_port.set_all(mask) self.m_mask_out_port.copy_attributes(self.m_image_in_port) # Copy attributes from input port self.m_image_out_port.copy_attributes(self.m_image_in_port) # If the norms list is not empty (i.e., if we have computed the norm for every image), # we can also save the corresponding norm vector as an additional attribute if norms: self.m_image_out_port.add_attribute(name='norm', value=np.hstack(norms), static=False) # Save cent_size and edge_size as attributes to the output port if self.m_cent_size is not None: self.m_image_out_port.add_attribute(name='cent_size', value=self.m_cent_size * pixscale, static=True) if self.m_edge_size is not None: self.m_image_out_port.add_attribute(name='edge_size', value=self.m_edge_size * pixscale, static=True)
[docs]class AngleInterpolationModule(ProcessingModule): """ Module for calculating the parallactic angle values by interpolating between the begin and end value of a data cube. """ __author__ = 'Markus Bonse, Tomas Stolker' @typechecked def __init__(self, name_in: str, data_tag: str) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. data_tag : str Tag of the database entry for which the parallactic angles are written as attributes. Returns ------- NoneType None """ super().__init__(name_in) self.m_data_in_port = self.add_input_port(data_tag) self.m_data_out_port = self.add_output_port(data_tag)
[docs] @typechecked def run(self) -> None: """ Run method of the module. Calculates the parallactic angles of each frame by linearly interpolating between the start and end values of the data cubes. The values are written as attributes to *data_tag*. A correction of 360 deg is applied when the start and end values of the angles change sign at +/-180 deg. Returns ------- NoneType None """ parang_start = self.m_data_in_port.get_attribute('PARANG_START') parang_end = self.m_data_in_port.get_attribute('PARANG_END') steps = self.m_data_in_port.get_attribute('NFRAMES') if 'NDIT' in self.m_data_in_port.get_all_non_static_attributes(): ndit = self.m_data_in_port.get_attribute('NDIT') if not np.all(ndit == steps): warnings.warn('There is a mismatch between the NDIT and NFRAMES values. The ' 'parallactic angles are calculated with a linear interpolation by ' 'using NFRAMES steps. A frame selection should be applied after ' 'the parallactic angles are calculated.') new_angles = [] start_time = time.time() for i, _ in enumerate(parang_start): progress(i, len(parang_start), 'Interpolating parallactic angles...', start_time) if parang_start[i] < -170. and parang_end[i] > 170.: parang_start[i] += 360. elif parang_end[i] < -170. and parang_start[i] > 170.: parang_end[i] += 360. if steps[i] == 1: new_angles = np.append(new_angles, [(parang_start[i] + parang_end[i])/2.]) elif steps[i] != 1: new_angles = np.append(new_angles, np.linspace(parang_start[i], parang_end[i], num=steps[i])) self.m_data_out_port.add_attribute('PARANG', new_angles, static=False)
[docs]class SortParangModule(ProcessingModule): """ Module to sort the images and attributes with increasing ``INDEX``. """ __author__ = 'Tomas Stolker' @typechecked def __init__(self, name_in: str, image_in_tag: str, image_out_tag: str) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. image_in_tag : str Database tag with the input data. image_out_tag : str Database tag where the output data will be stored. Should be different from ``image_in_tag``. Returns ------- NoneType None """ super().__init__(name_in) self.m_image_in_port = self.add_input_port(image_in_tag) self.m_image_out_port = self.add_output_port(image_out_tag)
[docs] @typechecked def run(self) -> None: """ Run method of the module. Sorts the images and attributes with increasing ``INDEX``. Therefore, the images are sorted by there original (usually chronological) order. Returns ------- NoneType None """ memory = self._m_config_port.get_attribute('MEMORY') index = self.m_image_in_port.get_attribute('INDEX') ndim = self.m_image_in_port.get_ndim() nimages = self.m_image_in_port.get_shape()[-3] index_new = np.zeros(index.shape, dtype=int) if 'PARANG' in self.m_image_in_port.get_all_non_static_attributes(): parang = self.m_image_in_port.get_attribute('PARANG') parang_new = np.zeros(parang.shape) else: parang_new = None if 'STAR_POSITION' in self.m_image_in_port.get_all_non_static_attributes(): star = self.m_image_in_port.get_attribute('STAR_POSITION') star_new = np.zeros(star.shape) else: star_new = None index_sort = np.argsort(index) frames = memory_frames(memory, nimages) start_time = time.time() for i, _ in enumerate(frames[:-1]): progress(i, len(frames[:-1]), 'Sorting images in time...', start_time) index_new[frames[i]:frames[i+1]] = index[index_sort[frames[i]:frames[i+1]]] if parang_new is not None: parang_new[frames[i]:frames[i+1]] = parang[index_sort[frames[i]:frames[i+1]]] if star_new is not None: star_new[frames[i]:frames[i+1]] = star[index_sort[frames[i]:frames[i+1]]] # HDF5 indexing elements must be in increasing order for item in index_sort[frames[i]:frames[i+1]]: if ndim == 3: self.m_image_out_port.append(self.m_image_in_port[item, ], data_dim=3) elif ndim == 4: self.m_image_out_port.append(self.m_image_in_port[:, item, ], data_dim=4) self.m_image_out_port.copy_attributes(self.m_image_in_port) self.m_image_out_port.add_history('SortParangModule', 'sorted by INDEX') self.m_image_out_port.add_attribute('INDEX', index_new, static=False) if parang_new is not None: self.m_image_out_port.add_attribute('PARANG', parang_new, static=False) if star_new is not None: self.m_image_out_port.add_attribute('STAR_POSITION', star_new, static=False) self.m_image_out_port.close_port()
[docs]class AngleCalculationModule(ProcessingModule): """ Module for calculating the parallactic angles. The start time of the observation is taken and multiples of the exposure time are added to derive the parallactic angle of each frame inside the cube. Instrument specific overheads are included. """ __author__ = 'Alexander Bohn, Tomas Stolker' @typechecked def __init__(self, name_in: str, data_tag: str, instrument: str = 'NACO') -> None: """ Parameters ---------- name_in : str Unique name of the module instance. data_tag : str Tag of the database entry for which the parallactic angles are written as attributes. instrument : str Instrument name ('NACO', 'SPHERE/IRDIS', or 'SPHERE/IFS'). Returns ------- NoneType None """ super().__init__(name_in) # Parameters self.m_instrument = instrument # Set parameters according to choice of instrument if self.m_instrument == 'NACO': # pupil offset in degrees self.m_pupil_offset = 0. # No offset here # no overheads in cube mode, since cube is read out after all individual exposures # see NACO manual page 62 (v102) self.m_O_START = 0. self.m_DIT_DELAY = 0. self.m_ROT = 0. # rotator offset in degrees self.m_rot_offset = 89.44 # According to NACO manual page 65 (v102) elif self.m_instrument == 'SPHERE/IRDIS': # pupil offset in degrees self.m_pupil_offset = -135.99 # According to SPHERE manual page 64 (v102) # overheads in cube mode (several NDITS) in hours self.m_O_START = 0.3 / 3600. # According to SPHERE manual page 90/91 (v102) self.m_DIT_DELAY = 0.1 / 3600. # According to SPHERE manual page 90/91 (v102) self.m_ROT = 0.838 / 3600. # According to SPHERE manual page 90/91 (v102) # rotator offset in degrees self.m_rot_offset = 0. # no offset here elif self.m_instrument == 'SPHERE/IFS': # pupil offset in degrees self.m_pupil_offset = -135.99 - 100.48 # According to SPHERE manual page 64 (v102) # overheads in cube mode (several NDITS) in hours self.m_O_START = 0.3 / 3600. # According to SPHERE manual page 90/91 (v102) self.m_DIT_DELAY = 0.2 / 3600. # According to SPHERE manual page 90/91 (v102) self.m_ROT = 1.65 / 3600. # According to SPHERE manual page 90/91 (v102) # rotator offset in degrees self.m_rot_offset = 0. # no offset here else: raise ValueError('The instrument argument should be set to either \'NACO\', ' '\'SPHERE/IRDIS\', or \'SPHERE/IFS\'.') self.m_data_in_port = self.add_input_port(data_tag) self.m_data_out_port = self.add_output_port(data_tag) @typechecked def _attribute_check(self, ndit: np.ndarray, steps: np.ndarray) -> None: if not np.all(ndit == steps): warnings.warn('There is a mismatch between the NDIT and NFRAMES values. A frame ' 'selection should be applied after the parallactic angles are ' 'calculated.') if self.m_instrument == 'SPHERE/IFS': warnings.warn('AngleCalculationModule has not been tested for SPHERE/IFS data.') if self.m_instrument in ('SPHERE/IRDIS', 'SPHERE/IFS'): if self._m_config_port.get_attribute('RA') != 'ESO INS4 DROT2 RA': warnings.warn('For SPHERE data it is recommended to use the header keyword ' '\'ESO INS4 DROT2 RA\' to specify the object\'s right ascension. ' 'The input will be parsed accordingly. Using the regular ' '\'RA\' keyword will lead to wrong parallactic angles.') if self._m_config_port.get_attribute('DEC') != 'ESO INS4 DROT2 DEC': warnings.warn('For SPHERE data it is recommended to use the header keyword ' '\'ESO INS4 DROT2 DEC\' to specify the object\'s declination. ' 'The input will be parsed accordingly. Using the regular ' '\'DEC\' keyword will lead to wrong parallactic angles.')
[docs] @typechecked def run(self) -> None: """ Run method of the module. Calculates the parallactic angles from the position of the object on the sky and the telescope location on earth. The start of the observation is used to extrapolate for the observation time of each individual image of a data cube. The values are written as PARANG attributes to *data_tag*. Returns ------- NoneType None """ # Load cube sizes steps = self.m_data_in_port.get_attribute('NFRAMES') ndit = self.m_data_in_port.get_attribute('NDIT') self._attribute_check(ndit, steps) # Load exposure time [hours] exptime = self.m_data_in_port.get_attribute('DIT')/3600. # Load telescope location tel_lat = self.m_data_in_port.get_attribute('LATITUDE') tel_lon = self.m_data_in_port.get_attribute('LONGITUDE') # Load temporary target position tmp_ra = self.m_data_in_port.get_attribute('RA') tmp_dec = self.m_data_in_port.get_attribute('DEC') # Parse to degree depending on instrument if 'SPHERE' in self.m_instrument: # get sign of declination tmp_dec_sign = np.sign(tmp_dec) tmp_dec = np.abs(tmp_dec) # parse RA tmp_ra_s = tmp_ra % 100 tmp_ra_m = ((tmp_ra - tmp_ra_s) / 1e2) % 100 tmp_ra_h = ((tmp_ra - tmp_ra_s - tmp_ra_m * 1e2) / 1e4) # parse DEC tmp_dec_s = tmp_dec % 100 tmp_dec_m = ((tmp_dec - tmp_dec_s) / 1e2) % 100 tmp_dec_d = ((tmp_dec - tmp_dec_s - tmp_dec_m * 1e2) / 1e4) # get RA and DEC in degree ra = (tmp_ra_h + tmp_ra_m / 60. + tmp_ra_s / 3600.) * 15. dec = tmp_dec_sign * (tmp_dec_d + tmp_dec_m / 60. + tmp_dec_s / 3600.) else: ra = tmp_ra dec = tmp_dec # Load start times of exposures obs_dates = self.m_data_in_port.get_attribute('DATE') # Load pupil positions during observations if self.m_instrument == 'NACO': pupil_pos = self.m_data_in_port.get_attribute('PUPIL') elif self.m_instrument == 'SPHERE/IRDIS': pupil_pos = np.zeros(steps.shape) elif self.m_instrument == 'SPHERE/IFS': pupil_pos = np.zeros(steps.shape) new_angles = np.array([]) pupil_pos_arr = np.array([]) start_time = time.time() # Calculate parallactic angles for each cube for i, tmp_steps in enumerate(steps): progress(i, len(steps), 'Calculating parallactic angles...', start_time) t = Time(obs_dates[i].decode('utf-8'), location=EarthLocation(lat=tel_lat, lon=tel_lon)) sid_time = t.sidereal_time('apparent').value # Extrapolate sideral times from start time of the cube for each frame of it sid_time_arr = np.linspace(sid_time+self.m_O_START, (sid_time+self.m_O_START) + (exptime+self.m_DIT_DELAY + self.m_ROT)*(tmp_steps-1), tmp_steps) # Convert to degrees sid_time_arr_deg = sid_time_arr * 15. # Calculate hour angle in degrees hour_angle = sid_time_arr_deg - ra[i] # Conversion to radians: hour_angle_rad = np.deg2rad(hour_angle) dec_rad = np.deg2rad(dec[i]) lat_rad = np.deg2rad(tel_lat) p_angle = np.arctan2(np.sin(hour_angle_rad), (np.cos(dec_rad)*np.tan(lat_rad) - np.sin(dec_rad)*np.cos(hour_angle_rad))) new_angles = np.append(new_angles, np.rad2deg(p_angle)) pupil_pos_arr = np.append(pupil_pos_arr, np.ones(tmp_steps)*pupil_pos[i]) # Correct for rotator (SPHERE) or pupil offset (NACO) if self.m_instrument == 'NACO': # See NACO manual page 65 (v102) new_angles_corr = new_angles - (90. + (self.m_rot_offset-pupil_pos_arr)) elif self.m_instrument == 'SPHERE/IRDIS': # See SPHERE manual page 64 (v102) new_angles_corr = new_angles - self.m_pupil_offset elif self.m_instrument == 'SPHERE/IFS': # See SPHERE manual page 64 (v102) new_angles_corr = new_angles - self.m_pupil_offset indices = np.where(new_angles_corr < -180.)[0] if indices.size > 0: new_angles_corr[indices] += 360. indices = np.where(new_angles_corr > 180.)[0] if indices.size > 0: new_angles_corr[indices] -= 360. self.m_data_out_port.add_attribute('PARANG', new_angles_corr, static=False)
[docs]class SDIpreparationModule(ProcessingModule): """ Module for preparing continuum frames for dual-band simultaneous differential imaging. """ __author__ = 'Gabriele Cugno, Tomas Stolker' @typechecked def __init__(self, name_in: str, image_in_tag: str, image_out_tag: str, wavelength: Tuple[float, float], width: Tuple[float, float]) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. image_in_tag : str Tag of the database entry that is read as input. image_out_tag : str Tag of the database entry that is written as output. Should be different from *image_in_tag*. wavelength : tuple(float, float) The central wavelengths of the line and continuum filter, (line, continuum), in arbitrary but identical units. width : tuple(float, float) The equivalent widths of the line and continuum filter, (line, continuum), in arbitrary but identical units. Returns ------- NoneType None """ super().__init__(name_in) self.m_image_in_port = self.add_input_port(image_in_tag) self.m_image_out_port = self.add_output_port(image_out_tag) self.m_line_wvl = wavelength[0] self.m_cnt_wvl = wavelength[1] self.m_line_width = width[0] self.m_cnt_width = width[1]
[docs] @typechecked def run(self) -> None: """ Run method of the module. Normalizes the images for the different filter widths, upscales the images, and crops the images to the initial image shape in order to align the PSF patterns. Returns ------- NoneType None """ wvl_factor = self.m_line_wvl/self.m_cnt_wvl width_factor = self.m_line_width/self.m_cnt_width nimages = self.m_image_in_port.get_shape()[0] start_time = time.time() for i in range(nimages): progress(i, nimages, 'Preparing images for dual-band SDI...', start_time) image = self.m_image_in_port[i, ] im_scale = width_factor * scale_image(image, wvl_factor, wvl_factor) if i == 0: npix_del = im_scale.shape[-1] - image.shape[-1] if npix_del % 2 == 0: npix_del_a = int(npix_del/2) npix_del_b = int(npix_del/2) else: npix_del_a = int((npix_del-1)/2) npix_del_b = int((npix_del+1)/2) im_crop = im_scale[npix_del_a:-npix_del_b, npix_del_a:-npix_del_b] if npix_del % 2 == 1: im_crop = shift_image(im_crop, (-0.5, -0.5), interpolation='spline') self.m_image_out_port.append(im_crop, data_dim=3) history = f'(line, continuum) = ({self.m_line_wvl}, {self.m_cnt_wvl})' self.m_image_out_port.copy_attributes(self.m_image_in_port) self.m_image_out_port.add_history('SDIpreparationModule', history) self.m_image_in_port.close_port()