Source code for pynpoint.processing.resizing

"""
Pipeline modules for resizing of images.
"""

import math
import time

from typing import Tuple, Union

import numpy as np

from typeguard import typechecked

from pynpoint.core.processing import ProcessingModule
from pynpoint.util.apply_func import image_scaling
from pynpoint.util.image import crop_image
from pynpoint.util.module import memory_frames, progress


[docs]class CropImagesModule(ProcessingModule): """ Pipeline module for cropping of images around a given position. """ @typechecked def __init__(self, name_in: str, image_in_tag: str, image_out_tag: str, size: float, center: Union[Tuple[int, int], None]) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. image_in_tag : str Tag of the database entry that is read as input. image_out_tag : str Tag of the database entry that is written as output. Should be different from *image_in_tag*. size : float New image size (arcsec). The same size will be used for both image dimensions. center : tuple(int, int), None Tuple (x0, y0) with the new image center. Python indexing starts at 0. The center of the input images will be used when *center* is set to *None*. Note that if the image is even-sized, it is not possible to a uniquely define a pixel position in the center of the image. The image center is determined (with pixel precision) with the :func:`~pynpoint.util.image.center_pixel` function. Returns ------- NoneType None """ super().__init__(name_in) self.m_image_in_port = self.add_input_port(image_in_tag) self.m_image_out_port = self.add_output_port(image_out_tag) self.m_size = size self.m_center = center if self.m_center is not None: self.m_center = (self.m_center[1], self.m_center[0]) # (y, x)
[docs] @typechecked def run(self) -> None: """ Run method of the module. Decreases the image size by cropping around an given position. The module always returns odd-sized images. Returns ------- NoneType None """ # Get memory and number of images to split the frames into chunks memory = self._m_config_port.get_attribute('MEMORY') nimages = self.m_image_in_port.get_shape()[0] # Get the numnber of dimensions and shape ndim = self.m_image_in_port.get_ndim() im_shape = self.m_image_in_port.get_shape() if ndim == 3: # Number of images nimages = im_shape[-3] # Split into batches to comply with memory constraints frames = memory_frames(memory, nimages) elif ndim == 4: # Process all wavelengths per exposure at once frames = np.linspace(0, im_shape[-3], im_shape[-3]+1) # Convert size parameter from arcseconds to pixels pixscale = self.m_image_in_port.get_attribute('PIXSCALE') print(f'New image size (arcsec) = {self.m_size}') self.m_size = int(math.ceil(self.m_size / pixscale)) print(f'New image size (pixels) = {self.m_size}') if self.m_center is not None: print(f'New image center (x, y) = ({self.m_center[1]}, {self.m_center[0]})') # Crop images chunk by chunk start_time = time.time() for i in range(len(frames[:-1])): # Update progress bar progress(i, len(frames[:-1]), 'Cropping images...', start_time) # Select images in the current chunk if ndim == 3: images = self.m_image_in_port[frames[i]:frames[i+1], ] elif ndim == 4: # Process all wavelengths per exposure at once images = self.m_image_in_port[:, i, ] # crop images according to input parameters images = crop_image(images, self.m_center, self.m_size, copy=False) # Write processed images to output port if ndim == 3: self.m_image_out_port.append(images, data_dim=3) elif ndim == 4: self.m_image_out_port.append(images, data_dim=4) # Save history and copy attributes history = f'image size (pix) = {self.m_size}' self.m_image_out_port.add_history('CropImagesModule', history) self.m_image_out_port.copy_attributes(self.m_image_in_port) self.m_image_out_port.close_port()
[docs]class ScaleImagesModule(ProcessingModule): """ Pipeline module for rescaling of an image. """ @typechecked def __init__(self, name_in: str, image_in_tag: str, image_out_tag: str, scaling: Union[Tuple[float, float, float], Tuple[None, None, float], Tuple[float, float, None]], pixscale: bool = True) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. image_in_tag : str Tag of the database entry that is read as input. image_out_tag : str Tag of the database entry that is written as output. Should be different from ``image_in_tag``. scaling : tuple(float, float, float) Tuple with the scaling factors for the image size and flux, (scaling_x, scaling_y, scaling_flux). Upsampling and downsampling of the image corresponds to ``scaling_x/y`` > 1 and 0 < ``scaling_x/y`` < 1, respectively. pixscale : bool Adjust the pixel scale by the average scaling in x and y direction. Returns ------- NoneType None """ super().__init__(name_in) self.m_image_in_port = self.add_input_port(image_in_tag) self.m_image_out_port = self.add_output_port(image_out_tag) if scaling[0] is None: self.m_scaling_x = 1. else: self.m_scaling_x = scaling[0] if scaling[1] is None: self.m_scaling_y = 1. else: self.m_scaling_y = scaling[1] if scaling[2] is None: self.m_scaling_flux = 1. else: self.m_scaling_flux = scaling[2] self.m_pixscale = pixscale
[docs] @typechecked def run(self) -> None: """ Run method of the module. Rescales an image with a fifth order spline interpolation and a reflecting boundary condition. Returns ------- NoneType None """ pixscale = self.m_image_in_port.get_attribute('PIXSCALE') self.apply_function_to_images(image_scaling, self.m_image_in_port, self.m_image_out_port, 'Scaling images', func_args=(self.m_scaling_y, self.m_scaling_x, self.m_scaling_flux)) history = f'scaling = ({self.m_scaling_x:.2f}, {self.m_scaling_y:.2f}, ' \ f'{self.m_scaling_flux:.2f})' self.m_image_out_port.add_history('ScaleImagesModule', history) self.m_image_out_port.copy_attributes(self.m_image_in_port) if self.m_pixscale: mean_scaling = (self.m_scaling_x+self.m_scaling_y)/2. self.m_image_out_port.add_attribute('PIXSCALE', pixscale/mean_scaling) self.m_image_out_port.close_port()
[docs]class AddLinesModule(ProcessingModule): """ Module to add lines of pixels to increase the size of an image. """ @typechecked def __init__(self, name_in: str, image_in_tag: str, image_out_tag: str, lines: Tuple[int, int, int, int]) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. image_in_tag : str Tag of the database entry that is read as input. image_out_tag : str Tag of the database entry that is written as output, including the images with increased size. Should be different from *image_in_tag*. lines : tuple(int, int, int, int) The number of lines that are added in left, right, bottom, and top direction. Returns ------- NoneType None """ super().__init__(name_in) self.m_image_in_port = self.add_input_port(image_in_tag) self.m_image_out_port = self.add_output_port(image_out_tag) self.m_lines = np.asarray(lines)
[docs] @typechecked def run(self) -> None: """ Run method of the module. Adds lines of zero-value pixels to increase the size of an image. Returns ------- NoneType None """ memory = self._m_config_port.get_attribute('MEMORY') nimages = self.m_image_in_port.get_shape()[0] frames = memory_frames(memory, nimages) shape_in = self.m_image_in_port.get_shape() shape_out = (shape_in[-2]+int(self.m_lines[2])+int(self.m_lines[3]), shape_in[-1]+int(self.m_lines[0])+int(self.m_lines[1])) self.m_lines[1] = shape_out[1] - self.m_lines[1] # right side of image self.m_lines[3] = shape_out[0] - self.m_lines[3] # top side of image start_time = time.time() for i in range(len(frames[:-1])): progress(i, len(frames[:-1]), 'Adding lines...', start_time) image_in = self.m_image_in_port[frames[i]:frames[i+1], ] image_out = np.zeros((frames[i+1]-frames[i], shape_out[0], shape_out[1])) image_out[:, int(self.m_lines[2]):int(self.m_lines[3]), int(self.m_lines[0]):int(self.m_lines[1])] = image_in self.m_image_out_port.append(image_out, data_dim=3) history = f'number of lines = {self.m_lines}' self.m_image_out_port.add_history('AddLinesModule', history) self.m_image_out_port.copy_attributes(self.m_image_in_port) self.m_image_out_port.close_port()
[docs]class RemoveLinesModule(ProcessingModule): """ Module to decrease the dimensions of an image by removing lines of pixels. """ @typechecked def __init__(self, name_in: str, image_in_tag: str, image_out_tag: str, lines: Tuple[int, int, int, int]) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. image_in_tag : str Tag of the database entry that is read as input. image_out_tag : str Tag of the database entry that is written as output, including the images with decreased size. Should be different from *image_in_tag*. lines : tuple(int, int, int, int) The number of lines that are removed in left, right, bottom, and top direction. Returns ------- NoneType None """ super().__init__(name_in) self.m_image_in_port = self.add_input_port(image_in_tag) self.m_image_out_port = self.add_output_port(image_out_tag) self.m_lines = lines
[docs] @typechecked def run(self) -> None: """ Run method of the module. Removes the lines given by *lines* from each frame. Returns ------- NoneType None """ memory = self._m_config_port.get_attribute('MEMORY') nimages = self.m_image_in_port.get_shape()[0] frames = memory_frames(memory, nimages) start_time = time.time() for i in range(len(frames[:-1])): progress(i, len(frames[:-1]), 'Removing lines...', start_time) image_in = self.m_image_in_port[frames[i]:frames[i+1], ] image_out = image_in[:, int(self.m_lines[2]):image_in.shape[1]-int(self.m_lines[3]), int(self.m_lines[0]):image_in.shape[2]-int(self.m_lines[1])] self.m_image_out_port.append(image_out, data_dim=3) history = f'number of lines = {self.m_lines}' self.m_image_out_port.add_history('RemoveLinesModule', history) self.m_image_out_port.copy_attributes(self.m_image_in_port) self.m_image_out_port.close_port()