"""
Pipeline modules for frame selection.
"""
import time
import math
import warnings
import multiprocessing as mp
from typing import Optional, Tuple, Union
import numpy as np
from typeguard import typechecked
from skimage.metrics import structural_similarity, mean_squared_error
from pynpoint.core.processing import ProcessingModule
from pynpoint.util.apply_func import image_stat
from pynpoint.util.image import crop_image, pixel_distance, center_pixel
from pynpoint.util.module import progress
from pynpoint.util.remove import write_selected_data, write_selected_attributes
from pynpoint.util.star import star_positions
[docs]class RemoveFramesModule(ProcessingModule):
"""
Pipeline module for removing images by their index number.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
selected_out_tag: str,
removed_out_tag: str,
frames: Union[str, range, list, np.ndarray]) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
selected_out_tag : str
Tag of the database entry with the remaining images after removing the specified
images. Should be different from *image_in_tag*.
removed_out_tag : str
Tag of the database entry with the images that are removed. Should be different
from *image_in_tag*.
frames : str, list, range, numpy.ndarray
The frame indices that have to be removed or a database tag pointing to a list of
frame indices.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_selected_out_port = self.add_output_port(selected_out_tag)
self.m_removed_out_port = self.add_output_port(removed_out_tag)
if isinstance(frames, str):
self.m_index_in_port = self.add_input_port(frames)
else:
self.m_index_in_port = None
if isinstance(frames, (range, list)):
self.m_frames = np.asarray(frames, dtype=int)
elif isinstance(frames, np.ndarray):
self.m_frames = frames
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Removes the frames and corresponding attributes, updates the
NFRAMES attribute, and saves the data and attributes.
Returns
-------
NoneType
None
"""
if self.m_index_in_port is not None:
self.m_frames = self.m_index_in_port.get_all()
if np.size(np.where(self.m_frames >= self.m_image_in_port.get_shape()[0])) > 0:
raise ValueError(f'Some values in \'frames\' are larger than the total number of '
f'available frames, {self.m_image_in_port.get_shape()[0]}.')
write_selected_data(memory=self._m_config_port.get_attribute('MEMORY'),
indices=self.m_frames,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port)
write_selected_attributes(indices=self.m_frames,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port,
module_type='RemoveFramesModule',
history=f'frames removed = {np.size(self.m_frames)}')
self.m_image_in_port.close_port()
[docs]class FrameSelectionModule(ProcessingModule):
"""
Pipeline module for applying a frame selection.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
selected_out_tag: str,
removed_out_tag: str,
index_out_tag: Optional[str] = None,
method: str = 'median',
threshold: Union[float, Tuple[float, float]] = 4.,
fwhm: Optional[float] = 0.1,
aperture: Union[Tuple[str, float], Tuple[str, float, float]] = ('circular', 0.2),
position: Optional[Union[Tuple[int, int, float], Tuple[None, None, float],
Tuple[int, int, None]]] = None) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
selected_out_tag : str
Tag of the database entry with the selected images that are
written as output. Should be different from
``image_in_tag``.
removed_out_tag : str
Tag of the database entry with the removed images that are
written as output. Should be different from
``image_in_tag``.
index_out_tag : str, None
Tag of the database entry with the list of frames indices
that are removed with the frames selection. No data is
written when set to ``None``.
method : str
Method used for the frame selection. Either sigma clipping
is applied with respect to the median (``method='median'``)
or maximum (``method='max'``) aperture flux or fluxes
outside a specified range (``method='range'``) are removed.
In the latter case, the ``threshold`` argument should be
a tuple with the minimum and maximum flux value.
threshold : float, tuple(float, float)
Threshold in units of sigma for the frame selection in case
``method='median'`` or ``method='max'``. All images that are
a ``threshold`` number of sigmas away from the
median/maximum aperture flux will be removed. In case
``method='range'``, the argument should be a tuple with the
minimum and maximum flux. Aperture fluxes within this range
will be selected and stored at ``selected_out_tag``.
fwhm : float, None
The FWHM (arcsec) of the Gaussian kernel that is used to
smooth the images before the brightest pixel is located.
Recommended to be similar in size to the FWHM of the
stellar PSF. No smoothing is applied if set to ``None``.
aperture : tuple(str, float), tuple(str, float, float)
Tuple with the aperture properties for measuring the
photometry around the location of the brightest pixel. The
first element contains the aperture type ('circular',
'annulus', or 'ratio'). For a circular aperture, the second
element contains the aperture radius (arcsec). For the
other two types, the second and third element are the
inner and outer radii (arcsec) of the aperture.
position : tuple(int, int, float), None
Subframe that is selected to search for the star. The tuple
contains the center (pix) and size (arcsec) (pos_x, pos_y,
size). Setting ``position`` to None will use the full image
to search for the star. If `position=(None, None, size)`
then the center of the image will be used. If
``position=(pos_x, pos_y, None)`` then a fixed position is
used for the aperture.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_selected_out_port = self.add_output_port(selected_out_tag)
self.m_removed_out_port = self.add_output_port(removed_out_tag)
if index_out_tag is None:
self.m_index_out_port = None
else:
self.m_index_out_port = self.add_output_port(index_out_tag)
self.m_method = method
self.m_fwhm = fwhm
self.m_aperture = aperture
self.m_threshold = threshold
self.m_position = position
[docs] @staticmethod
@typechecked
def aperture_phot(image: np.ndarray,
position: np.ndarray,
aperture: Union[Tuple[str, float, float],
Tuple[str, None, float]]) -> np.float64:
"""
Parameters
----------
image : np.ndarray
Input image (2D).
position : np.ndarray
Center position (y, x) of the aperture.
aperture : tuple(str, float, float)
Tuple with the aperture properties for measuring the
photometry around the location of the brightest pixel. The
first element contains the aperture type ('circular',
'annulus', or 'ratio'). For a circular aperture, the second
element is empty and the third element contains the aperture
radius (pix). For the other two types, the second and third
element are the inner and outer radii (pix) of the aperture.
Returns
-------
np.float64
Photometry value.
"""
check_pos_in = any(np.floor(position[:]-aperture[2]) < 0.)
check_pos_out = any(np.ceil(position[:]+aperture[2]) > image.shape[0])
if check_pos_in or check_pos_out:
phot = np.nan
else:
im_crop = crop_image(image, tuple(position), 2*int(math.ceil(aperture[2])))
npix = im_crop.shape[0]
x_grid = y_grid = np.linspace(-(npix-1)/2, (npix-1)/2, npix)
xx_grid, yy_grid = np.meshgrid(x_grid, y_grid)
rr_grid = np.sqrt(xx_grid**2 + yy_grid**2)
if aperture[0] == 'circular':
phot = np.sum(im_crop[rr_grid < aperture[2]])
elif aperture[0] == 'annulus':
phot = np.sum(im_crop[(rr_grid > aperture[1]) & (rr_grid < aperture[2])])
elif aperture[0] == 'ratio':
phot = np.sum(im_crop[rr_grid < aperture[1]]) / \
np.sum(im_crop[(rr_grid > aperture[1]) & (rr_grid < aperture[2])])
return phot
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Smooths the images with a Gaussian
kernel, locates the brightest pixel in each image, measures the
integrated flux around the brightest pixel, calculates the
median and standard deviation of the photometry, and applies
sigma clipping to remove low quality images.
Returns
-------
NoneType
None
"""
pixscale = self.m_image_in_port.get_attribute('PIXSCALE')
nimages = self.m_image_in_port.get_shape()[0]
if self.m_fwhm is not None:
self.m_fwhm = int(math.ceil(self.m_fwhm/pixscale))
if self.m_position is not None and self.m_position[2] is not None:
self.m_position = (self.m_position[0],
self.m_position[0],
int(math.ceil(self.m_position[2]/pixscale)))
if len(self.m_aperture) == 2:
self.m_aperture = (self.m_aperture[0],
None,
self.m_aperture[1]/pixscale)
elif len(self.m_aperture) == 3:
self.m_aperture = (self.m_aperture[0],
self.m_aperture[1]/pixscale,
self.m_aperture[2]/pixscale)
starpos = star_positions(self.m_image_in_port, self.m_fwhm, self.m_position)
phot = np.zeros(nimages)
start_time = time.time()
for i in range(nimages):
progress(i, nimages, 'Aperture photometry...', start_time)
phot[i] = self.aperture_phot(self.m_image_in_port[i, ], starpos[i, :], self.m_aperture)
if self.m_method == 'median':
phot_ref = np.nanmedian(phot)
print(f'Median = {phot_ref:.2f}')
elif self.m_method == 'max':
phot_ref = np.nanmax(phot)
print(f'Maximum = {phot_ref:.2f}')
elif self.m_method == 'range':
phot_ref = np.nanmedian(phot)
print(f'Median = {phot_ref:.2f}')
phot_std = np.nanstd(phot)
print(f'Standard deviation = {phot_std:.2f}')
if self.m_method in ['median', 'max']:
index_del = np.logical_or((phot > phot_ref + self.m_threshold*phot_std),
(phot < phot_ref - self.m_threshold*phot_std))
elif self.m_method == 'range':
index_del = np.logical_or((phot < self.m_threshold[0]),
(phot > self.m_threshold[1]))
index_del[np.isnan(phot)] = True
index_del = np.where(index_del)[0]
index_sel = np.ones(nimages, dtype=bool)
index_sel[index_del] = False
write_selected_data(memory=self._m_config_port.get_attribute('MEMORY'),
indices=index_del,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port)
history = f'frames removed = {np.size(index_del)}'
if self.m_index_out_port is not None:
self.m_index_out_port.set_all(index_del, data_dim=1)
self.m_index_out_port.copy_attributes(self.m_image_in_port)
self.m_index_out_port.add_attribute('STAR_POSITION', starpos, static=False)
self.m_index_out_port.add_history('FrameSelectionModule', history)
write_selected_attributes(indices=index_del,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port,
module_type='FrameSelectionModule',
history=history)
self.m_selected_out_port.add_attribute('STAR_POSITION', starpos[index_sel], static=False)
self.m_selected_out_port.add_history('FrameSelectionModule', history)
self.m_removed_out_port.add_attribute('STAR_POSITION', starpos[index_del], static=False)
self.m_removed_out_port.add_history('FrameSelectionModule', history)
self.m_image_in_port.close_port()
[docs]class RemoveLastFrameModule(ProcessingModule):
"""
Pipeline module for removing every NDIT+1 image from NACO dataset obtained in cube mode. This
frame contains the average pixel values of the cube.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
image_out_tag : str
Tag of the database entry that is written as output.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_image_out_port = self.add_output_port(image_out_tag)
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Removes every NDIT+1 frame and saves the data and attributes.
Returns
-------
NoneType
None
"""
ndit = self.m_image_in_port.get_attribute('NDIT')
nframes = self.m_image_in_port.get_attribute('NFRAMES')
index = self.m_image_in_port.get_attribute('INDEX')
nframes_new = []
index_new = []
start_time = time.time()
for i, item in enumerate(ndit):
progress(i, len(ndit), 'Removing the last image of each FITS cube...', start_time)
if nframes[i] != item+1:
warnings.warn(f'Number of frames ({nframes[i]}) is not equal to NDIT+1.')
frame_start = np.sum(nframes[0:i])
frame_end = np.sum(nframes[0:i+1]) - 1
nframes_new.append(nframes[i]-1)
index_new.extend(index[frame_start:frame_end])
self.m_image_out_port.append(self.m_image_in_port[frame_start:frame_end, ])
nframes_new = np.asarray(nframes_new, dtype=int)
index_new = np.asarray(index_new, dtype=int)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.add_attribute('NFRAMES', nframes_new, static=False)
self.m_image_out_port.add_attribute('INDEX', index_new, static=False)
history = 'frames removed = NDIT+1'
self.m_image_out_port.add_history('RemoveLastFrameModule', history)
self.m_image_out_port.close_port()
[docs]class RemoveStartFramesModule(ProcessingModule):
"""
Pipeline module for removing a fixed number of images at the beginning of each cube. This can
be useful for NACO data in which the background is higher at the beginning of the cube.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
frames: int = 1) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
image_out_tag : str
Tag of the database entry that is written as output.
frames : int
Number of frames that are removed at the beginning of each cube.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_image_out_port = self.add_output_port(image_out_tag)
self.m_frames = int(frames)
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Removes a constant number of images at the beginning of each cube
and saves the data and attributes.
Returns
-------
NoneType
None
"""
if self.m_image_out_port.tag == self.m_image_in_port.tag:
raise ValueError('Input and output port should have a different tag.')
nframes = self.m_image_in_port.get_attribute('NFRAMES')
index = self.m_image_in_port.get_attribute('INDEX')
index_new = []
if 'PARANG' in self.m_image_in_port.get_all_non_static_attributes():
parang = self.m_image_in_port.get_attribute('PARANG')
parang_new = []
else:
parang = None
if 'STAR_POSITION' in self.m_image_in_port.get_all_non_static_attributes():
star = self.m_image_in_port.get_attribute('STAR_POSITION')
star_new = []
else:
star = None
start_time = time.time()
for i, _ in enumerate(nframes):
progress(i, len(nframes), 'Removing images at the begin of each cube...', start_time)
frame_start = np.sum(nframes[0:i]) + self.m_frames
frame_end = np.sum(nframes[0:i+1])
if frame_start >= frame_end:
raise ValueError('The number of frames in the original data cube is equal or '
'smaller than the number of frames that have to be removed.')
index_new.extend(index[frame_start:frame_end])
if parang is not None:
parang_new.extend(parang[frame_start:frame_end])
if star is not None:
star_new.extend(star[frame_start:frame_end])
self.m_image_out_port.append(self.m_image_in_port[frame_start:frame_end, ])
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.add_attribute('NFRAMES', nframes-self.m_frames, static=False)
self.m_image_out_port.add_attribute('INDEX', index_new, static=False)
if parang is not None:
self.m_image_out_port.add_attribute('PARANG', parang_new, static=False)
if star is not None:
self.m_image_out_port.add_attribute('STAR_POSITION', np.asarray(star_new), static=False)
history = 'frames removed = '+str(self.m_frames)
self.m_image_out_port.add_history('RemoveStartFramesModule', history)
self.m_image_out_port.close_port()
[docs]class ImageStatisticsModule(ProcessingModule):
"""
Pipeline module for calculating image statistics for the full images or a subsection of the
images.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
stat_out_tag: str,
position: Optional[Union[Tuple[int, int, float],
Tuple[None, None, float]]] = None) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry with the images that are read as input.
stat_out_tag : str
Tag of the database entry with the statistical results that are written as output. The
result is stored in the following order: minimum, maximum, sum, mean, median, and
standard deviation.
position : tuple(int, int, float)
Position (x, y) (pix) and radius (arcsec) of the circular area in which the statistics
are calculated. The full image is used if set to None.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_stat_out_port = self.add_output_port(stat_out_tag)
self.m_position = position
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Calculates the minimum, maximum, sum, mean, median, and standard
deviation of the pixel values of each image separately. NaNs are ignored for each
calculation. The values are calculated for either the full images or a circular
subsection of the images.
Returns
-------
NoneType
None
"""
pixscale = self.m_image_in_port.get_attribute('PIXSCALE')
nimages = self.m_image_in_port.get_shape()[0]
im_shape = self.m_image_in_port.get_shape()[1:]
if self.m_position is None:
indices = None
else:
if self.m_position[0] is None and self.m_position[1] is None:
center = center_pixel(self.m_image_in_port[0, ])
self.m_position = (center[0], # y position
center[1], # x position
self.m_position[2]/pixscale) # radius (pix)
else:
self.m_position = (int(self.m_position[1]), # y position
int(self.m_position[0]), # x position
self.m_position[2]/pixscale) # radius (pix)
rr_grid, _, _ = pixel_distance(im_shape, position=self.m_position[0:2])
rr_reshape = np.reshape(rr_grid, (rr_grid.shape[0]*rr_grid.shape[1]))
indices = np.where(rr_reshape <= self.m_position[2])[0]
self.apply_function_to_images(image_stat,
self.m_image_in_port,
self.m_stat_out_port,
'Calculating image statistics',
func_args=(indices, ))
history = f'number of images = {nimages}'
self.m_stat_out_port.copy_attributes(self.m_image_in_port)
self.m_stat_out_port.add_history('ImageStatisticsModule', history)
self.m_stat_out_port.close_port()
[docs]class FrameSimilarityModule(ProcessingModule):
"""
Pipeline module for measuring the similarity between frames.
"""
__author__ = 'Benedikt Schmidhuber, Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_tag: str,
method: str = 'MSE',
mask_radius: Tuple[float, float] = (0., 5.),
window_size: float = 0.1,
temporal_median: str = 'full') -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_tag : str
Tag of the database entry that is read as input.
method : str
Method for the similarity measure. There are three measures available:
- `MSE` - Mean Squared Error
- `PCC` - Pearson Correlation Coefficient
- `SSIM` - Structural Similarity
These measures compare each image to the temporal median of the image set.
mask_radius : tuple(float, float)
Inner and outer radius (arcsec) of the mask that is applied to the images.
The inner radius is actually not used so can be set to zero.
window_size : float
Size (arcsec) of the sliding window that is used when the SSIM similarity is
calculated.
temporal_median : str
Option to calculate the temporal median for each position ('full') or as a constant
value ('constant') for the entire set. The latter is computationally less expensive.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_tag)
self.m_image_out_port = self.add_output_port(image_tag)
if method not in ('MSE', 'PCC', 'SSIM'):
raise ValueError(f'The chosen method \'{method}\' is not available. Please ensure '
f'that you have selected one of \'MSE\', \'PCC\', or \'SSIM\'.')
if temporal_median not in ('full', 'constant'):
raise ValueError(f'The chosen temporal_median \'{temporal_median}\' is not '
f'available. Please ensure that you have selected one of \'full\', '
f'\'constant\'.')
self.m_method = method
self.m_temporal_median = temporal_median
self.m_mask_radii = mask_radius
self.m_window_size = window_size
@staticmethod
@typechecked
def _similarity(images: np.ndarray,
reference_index: int,
mode: str,
window_size: int,
temporal_median: Union[bool, np.ndarray] = False) -> Tuple[int, float]:
"""
Internal function to compute the MSE as defined by Ruane et al. 2019.
"""
@typechecked
def _temporal_median(reference_index: int,
images: np.ndarray) -> np.ndarray:
"""
Internal function to calculate the temporal median for all frames, except the one with
the ``reference_index``.
"""
image_m = np.concatenate((images[:reference_index], images[reference_index+1:]))
return np.median(image_m, axis=0)
image_x_i = images[reference_index]
if isinstance(temporal_median, bool):
image_m = _temporal_median(reference_index, images=images)
else:
image_m = temporal_median
if mode == 'MSE':
return reference_index, mean_squared_error(image_x_i, image_m)
if mode == 'PCC':
# calculate the covariance matrix of the flattened images
cov_mat = np.cov(image_x_i.flatten(), image_m.flatten(), ddof=1)
# the variances are stored in the diagonal, therefore take the sqrt to obtain std
std = np.sqrt(np.diag(cov_mat))
# does not matter whether [0, 1] or [1, 0] as cov_mat is symmetric
return reference_index, cov_mat[0, 1] / (std[0] * std[1])
if mode == 'SSIM':
# winsize needs to be odd
if int(window_size) % 2 == 0:
winsize = int(window_size) + 1
else:
winsize = int(window_size)
# TODO Unclear what value to pass to data_range
# Previously the argument was not requires
data_range = np.amax(image_x_i)-np.amin(image_x_i)
struc_sim = structural_similarity(im1=image_x_i,
im2=image_m,
win_size=winsize,
data_range=data_range)
return reference_index, struc_sim
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Computes the similarity between frames based on the Mean Squared
Error (MSE), the Pearson Correlation Coefficient (PCC), or the Structural Similarity (SSIM).
The correlation values are stored as non-static attribute (``MSE``, ``PCC``, or ``SSIM``)
to the input data. After running this module, the
:class:`~pynpoint.processing.frameselection.SelectByAttributeModule` can be used to select
the images with the highest correlation.
Returns
-------
NoneType
None
"""
cpu = self._m_config_port.get_attribute('CPU')
pixscale = self.m_image_in_port.get_attribute('PIXSCALE')
# get number of images
nimages = self.m_image_in_port.get_shape()[0]
# convert arcsecs to pixels
self.m_mask_radii = (math.floor(self.m_mask_radii[0] / pixscale),
math.floor(self.m_mask_radii[1] / pixscale))
self.m_window_size = int(self.m_window_size / pixscale)
# overlay the same mask over all images
images = self.m_image_in_port.get_all()
# close the port during the calculations
self.m_image_out_port.close_port()
# Change the radius to the image size
images = crop_image(images, None, int(2.*self.m_mask_radii[1]))
if self.m_temporal_median == 'constant':
temporal_median = np.median(images, axis=0)
else:
temporal_median = False
# compare images and store similarity
similarities = np.zeros(nimages)
pool = mp.Pool(cpu)
async_results = []
for i in range(nimages):
async_results.append(pool.apply_async(FrameSimilarityModule._similarity,
args=(images,
i,
self.m_method,
self.m_window_size,
temporal_median)))
pool.close()
start_time = time.time()
# wait for all processes to finish
while mp.active_children():
# number of finished processes
nfinished = sum([i.ready() for i in async_results])
progress(nfinished, nimages, 'Calculating image similarity...', start_time)
# check if new processes have finished every 5 seconds
time.sleep(5)
if nfinished != nimages:
print('\r ')
print('\rCalculating image similarity... [DONE]')
# get the results for every async_result object
for async_result in async_results:
reference, similarity = async_result.get()
similarities[reference] = similarity
pool.terminate()
# reopen the port after the calculation
self.m_image_out_port.open_port()
self.m_image_out_port.add_attribute(f'{self.m_method}', similarities, static=False)
self.m_image_out_port.close_port()
[docs]class SelectByAttributeModule(ProcessingModule):
"""
Pipeline module for selecting frames based on attribute values.
"""
__author__ = 'Benedikt Schmidhuber, Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
selected_out_tag: str,
removed_out_tag: str,
attribute_tag: str,
number_frames: int = 100,
order: str = 'descending') -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
selected_out_tag : str
Tag of the database entry to which the selected frames are written.
removed_out_tag : str
Tag of the database entry to which the removed frames are written.
attribute_tag : str
Name of the attribute which is used to sort and select the frames.
number_frames : int
Number of frames that are selected.
order : str
Order in which the frames are selected. Can be either 'descending' (will select the
lowest attribute values) or 'ascending' (will select the highest attribute values).
Returns
-------
NoneType
None
Examples
--------
The example below selects the first 100 frames with an ascending order of the ``INDEX``
values that are stored to the 'im_arr' dataset::
SelectByAttributeModule(name_in='frame_selection',
image_in_tag='im_arr',
attribute_tag='INDEX',
selected_frames=100,
order='ascending',
selected_out_tag='im_arr_selected',
removed_out_tag='im_arr_removed'))
The example below selects the 200 frames with the largest ``SSIM`` values that
are stored to the 'im_arr' dataset::
SelectByAttributeModule(name_in='frame_selection',
image_in_tag='im_arr',
attribute_tag='SSIM',
selected_frames=200,
order='descending',
selected_out_tag='im_arr_selected',
removed_out_tag='im_arr_removed'))
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_selected_out_port = self.add_output_port(selected_out_tag)
self.m_removed_out_port = self.add_output_port(removed_out_tag)
if order not in ('ascending', 'descending'):
raise ValueError('The selected order is not available. The available options are '
'\'ascending\' or \'descending\'.')
self.m_attribute_tag = attribute_tag
self.m_number_frames = number_frames
self.m_order = order
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Selects images according to a specified attribute tag and
ordering, e.g. the highest 150 ``INDEX`` frames, or the lowest 50 ``PCC`` frames.
The order of the selected images is determined by the `descending` or `ascending`
attribute values. To sort the images again by their original order, the
:class:`~pynpoint.processing.psfpreparation.SortParangModule` can be used.
Returns
-------
NoneType
None
"""
nimages = self.m_image_in_port.get_shape()[0]
attribute = self.m_image_in_port.get_attribute(f'{self.m_attribute_tag}')
if nimages != attribute.size:
raise ValueError(f'The attribute {{self.m_attribute_tag}} does not have the same '
f'length ({len(attribute)}) as the tag has images ({nimages}). '
f'Please check the attribute you have chosen for selection.')
if self.m_order == 'descending':
# sort attribute in descending order
sorting_order = np.argsort(attribute)[::-1]
else:
# sort attribute in ascending order
sorting_order = np.argsort(attribute)
index_del = sorting_order[self.m_number_frames:]
write_selected_data(memory=self._m_config_port.get_attribute('MEMORY'),
indices=index_del,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port)
write_selected_attributes(indices=index_del,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port,
module_type='SelectByAttributeModule',
history=f'selected tag = {self.m_attribute_tag}')
self.m_image_in_port.close_port()
[docs]class ResidualSelectionModule(ProcessingModule):
"""
Pipeline module for applying a frame selection on the residuals of the PSF subtraction.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
selected_out_tag: str,
removed_out_tag: str,
percentage: float = 10.,
annulus_radii: Tuple[float, float] = (0.1, 0.2)) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
selected_out_tag : str
Tag of the database entry with the selected images that are written as output.
removed_out_tag : str
Tag of the database entry with the removed images that are written as output.
percentage : float
The percentage of best frames that is selected.
annulus_radii : tuple(float, float)
Inner and outer radius of the annulus (arcsec).
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_selected_out_port = self.add_output_port(selected_out_tag)
self.m_removed_out_port = self.add_output_port(removed_out_tag)
self.m_percentage = percentage
self.m_annulus_radii = annulus_radii
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Applies a frame selection on the derotated residuals from the
PSF subtraction. The pixels within an annulus (e.g. at the separation of an expected
planet) are selected and the standard deviation is calculated. The chosen percentage
of images with the lowest standard deviation are stored as output.
Returns
-------
NoneType
None
"""
pixscale = self.m_image_in_port.get_attribute('PIXSCALE')
nimages = self.m_image_in_port.get_shape()[0]
npix = self.m_image_in_port.get_shape()[-1]
rr_grid, _, _ = pixel_distance((npix, npix), position=None)
pixel_select = np.where((rr_grid > self.m_annulus_radii[0]/pixscale) &
(rr_grid < self.m_annulus_radii[1]/pixscale))
start_time = time.time()
phot_annulus = np.zeros(nimages)
for i in range(nimages):
progress(i, nimages, 'Aperture photometry...', start_time)
phot_annulus[i] = np.sum(np.abs(self.m_image_in_port[i][pixel_select]))
print(f'Minimum, maximum = {np.amin(phot_annulus):.2f}, {np.amax(phot_annulus):.2f}')
print(f'Mean, median = {np.nanmean(phot_annulus):.2f}, {np.nanmedian(phot_annulus):.2f}')
print(f'Standard deviation = {np.nanstd(phot_annulus):.2f}')
n_select = int(nimages*self.m_percentage/100.)
index_del = np.argsort(phot_annulus)[n_select:]
write_selected_data(memory=self._m_config_port.get_attribute('MEMORY'),
indices=index_del,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port)
write_selected_attributes(indices=index_del,
image_in_port=self.m_image_in_port,
selected_out_port=self.m_selected_out_port,
removed_out_port=self.m_removed_out_port,
module_type='ResidualSelectionModule',
history=f'frames removed = {index_del.size}')
self.m_image_in_port.close_port()