"""
Pipeline modules for resizing of images.
"""
import math
import time
from typing import Tuple, Union
import numpy as np
from typeguard import typechecked
from pynpoint.core.processing import ProcessingModule
from pynpoint.util.apply_func import image_scaling
from pynpoint.util.image import crop_image
from pynpoint.util.module import memory_frames, progress
[docs]class CropImagesModule(ProcessingModule):
"""
Pipeline module for cropping of images around a given position.
"""
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
size: float,
center: Union[Tuple[int, int], None]) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
image_out_tag : str
Tag of the database entry that is written as output. Should be different from
*image_in_tag*.
size : float
New image size (arcsec). The same size will be used for both image dimensions.
center : tuple(int, int), None
Tuple (x0, y0) with the new image center. Python indexing starts at 0. The center of
the input images will be used when *center* is set to *None*. Note that if the image
is even-sized, it is not possible to a uniquely define a pixel position in the center
of the image. The image center is determined (with pixel precision) with the
:func:`~pynpoint.util.image.center_pixel` function.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_image_out_port = self.add_output_port(image_out_tag)
self.m_size = size
self.m_center = center
if self.m_center is not None:
self.m_center = (self.m_center[1], self.m_center[0]) # (y, x)
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Decreases the image size by cropping around an given position.
The module always returns odd-sized images.
Returns
-------
NoneType
None
"""
# Get memory and number of images to split the frames into chunks
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in_port.get_shape()[0]
# Get the numnber of dimensions and shape
ndim = self.m_image_in_port.get_ndim()
im_shape = self.m_image_in_port.get_shape()
if ndim == 3:
# Number of images
nimages = im_shape[-3]
# Split into batches to comply with memory constraints
frames = memory_frames(memory, nimages)
elif ndim == 4:
# Process all wavelengths per exposure at once
frames = np.linspace(0, im_shape[-3], im_shape[-3]+1)
# Convert size parameter from arcseconds to pixels
pixscale = self.m_image_in_port.get_attribute('PIXSCALE')
print(f'New image size (arcsec) = {self.m_size}')
self.m_size = int(math.ceil(self.m_size / pixscale))
print(f'New image size (pixels) = {self.m_size}')
if self.m_center is not None:
print(f'New image center (x, y) = ({self.m_center[1]}, {self.m_center[0]})')
# Crop images chunk by chunk
start_time = time.time()
for i in range(len(frames[:-1])):
# Update progress bar
progress(i, len(frames[:-1]), 'Cropping images...', start_time)
# Select images in the current chunk
if ndim == 3:
images = self.m_image_in_port[frames[i]:frames[i+1], ]
elif ndim == 4:
# Process all wavelengths per exposure at once
images = self.m_image_in_port[:, i, ]
# crop images according to input parameters
images = crop_image(images, self.m_center, self.m_size, copy=False)
# Write processed images to output port
if ndim == 3:
self.m_image_out_port.append(images, data_dim=3)
elif ndim == 4:
self.m_image_out_port.append(images, data_dim=4)
# Save history and copy attributes
history = f'image size (pix) = {self.m_size}'
self.m_image_out_port.add_history('CropImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()
[docs]class ScaleImagesModule(ProcessingModule):
"""
Pipeline module for rescaling of an image.
"""
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
scaling: Union[Tuple[float, float, float],
Tuple[None, None, float],
Tuple[float, float, None]],
pixscale: bool = True) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
image_out_tag : str
Tag of the database entry that is written as output. Should be different from
``image_in_tag``.
scaling : tuple(float, float, float)
Tuple with the scaling factors for the image size and flux, (scaling_x, scaling_y,
scaling_flux). Upsampling and downsampling of the image corresponds to
``scaling_x/y`` > 1 and 0 < ``scaling_x/y`` < 1, respectively.
pixscale : bool
Adjust the pixel scale by the average scaling in x and y direction.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_image_out_port = self.add_output_port(image_out_tag)
if scaling[0] is None:
self.m_scaling_x = 1.
else:
self.m_scaling_x = scaling[0]
if scaling[1] is None:
self.m_scaling_y = 1.
else:
self.m_scaling_y = scaling[1]
if scaling[2] is None:
self.m_scaling_flux = 1.
else:
self.m_scaling_flux = scaling[2]
self.m_pixscale = pixscale
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Rescales an image with a fifth order spline interpolation and a
reflecting boundary condition.
Returns
-------
NoneType
None
"""
pixscale = self.m_image_in_port.get_attribute('PIXSCALE')
self.apply_function_to_images(image_scaling,
self.m_image_in_port,
self.m_image_out_port,
'Scaling images',
func_args=(self.m_scaling_y,
self.m_scaling_x,
self.m_scaling_flux))
history = f'scaling = ({self.m_scaling_x:.2f}, {self.m_scaling_y:.2f}, ' \
f'{self.m_scaling_flux:.2f})'
self.m_image_out_port.add_history('ScaleImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
if self.m_pixscale:
mean_scaling = (self.m_scaling_x+self.m_scaling_y)/2.
self.m_image_out_port.add_attribute('PIXSCALE', pixscale/mean_scaling)
self.m_image_out_port.close_port()
[docs]class AddLinesModule(ProcessingModule):
"""
Module to add lines of pixels to increase the size of an image.
"""
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
lines: Tuple[int, int, int, int]) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
image_out_tag : str
Tag of the database entry that is written as output, including the images with
increased size. Should be different from *image_in_tag*.
lines : tuple(int, int, int, int)
The number of lines that are added in left, right, bottom, and top direction.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_image_out_port = self.add_output_port(image_out_tag)
self.m_lines = np.asarray(lines)
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Adds lines of zero-value pixels to increase the size of an image.
Returns
-------
NoneType
None
"""
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in_port.get_shape()[0]
frames = memory_frames(memory, nimages)
shape_in = self.m_image_in_port.get_shape()
shape_out = (shape_in[-2]+int(self.m_lines[2])+int(self.m_lines[3]),
shape_in[-1]+int(self.m_lines[0])+int(self.m_lines[1]))
self.m_lines[1] = shape_out[1] - self.m_lines[1] # right side of image
self.m_lines[3] = shape_out[0] - self.m_lines[3] # top side of image
start_time = time.time()
for i in range(len(frames[:-1])):
progress(i, len(frames[:-1]), 'Adding lines...', start_time)
image_in = self.m_image_in_port[frames[i]:frames[i+1], ]
image_out = np.zeros((frames[i+1]-frames[i], shape_out[0], shape_out[1]))
image_out[:,
int(self.m_lines[2]):int(self.m_lines[3]),
int(self.m_lines[0]):int(self.m_lines[1])] = image_in
self.m_image_out_port.append(image_out, data_dim=3)
history = f'number of lines = {self.m_lines}'
self.m_image_out_port.add_history('AddLinesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()
[docs]class RemoveLinesModule(ProcessingModule):
"""
Module to decrease the dimensions of an image by removing lines of pixels.
"""
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
lines: Tuple[int, int, int, int]) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
Tag of the database entry that is read as input.
image_out_tag : str
Tag of the database entry that is written as output, including the images with
decreased size. Should be different from *image_in_tag*.
lines : tuple(int, int, int, int)
The number of lines that are removed in left, right, bottom, and top direction.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
self.m_image_in_port = self.add_input_port(image_in_tag)
self.m_image_out_port = self.add_output_port(image_out_tag)
self.m_lines = lines
[docs] @typechecked
def run(self) -> None:
"""
Run method of the module. Removes the lines given by *lines* from each frame.
Returns
-------
NoneType
None
"""
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in_port.get_shape()[0]
frames = memory_frames(memory, nimages)
start_time = time.time()
for i in range(len(frames[:-1])):
progress(i, len(frames[:-1]), 'Removing lines...', start_time)
image_in = self.m_image_in_port[frames[i]:frames[i+1], ]
image_out = image_in[:,
int(self.m_lines[2]):image_in.shape[1]-int(self.m_lines[3]),
int(self.m_lines[0]):image_in.shape[2]-int(self.m_lines[1])]
self.m_image_out_port.append(image_out, data_dim=3)
history = f'number of lines = {self.m_lines}'
self.m_image_out_port.add_history('RemoveLinesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()