"""
Module for reading FITS files obtained with VLT/VISIR for the NEAR experiment.
"""
import os
import math
import time
import shlex
import subprocess
import threading
import warnings
from typing import Optional, Union, Tuple
import numpy as np
from astropy.io import fits
from typeguard import typechecked
from pynpoint.core.processing import ReadingModule
from pynpoint.util.attributes import set_static_attr, set_nonstatic_attr, set_extra_attr
from pynpoint.util.module import progress, memory_frames
from pynpoint.util.image import crop_image
[docs]class NearReadingModule(ReadingModule):
"""
Pipeline module for reading VLT/VISIR data of the NEAR experiment. The FITS files and required
header information are read from the input directory and stored in two datasets, corresponding
to chop A and chop B. The primary HDU of the FITS files should contain the main header
information, while the subsequent HDUs contain each a single image (alternated for chop A and
chop B) and some additional header information for that image. The last HDU is ignored as it
contains the average of all images.
"""
__author__ = 'Jasper Jonker, Tomas Stolker, Anna Boehle'
@typechecked
def __init__(self,
name_in: str,
input_dir: Optional[str] = None,
chopa_out_tag: str = 'chopa',
chopb_out_tag: str = 'chopb',
subtract: bool = False,
crop: Optional[Union[Tuple[int, int, float], Tuple[None, None, float]]] = None,
combine: Optional[str] = None):
"""
Parameters
----------
name_in : str
Unique name of the instance.
input_dir : str, None
Input directory where the FITS files are located. The default input folder of the
Pypeline is used if set to None.
chopa_out_tag : str
Database entry where the chop A images will be stored. Should be different from
``chop_b_out_tag``.
chopb_out_tag : str
Database entry where the chop B images will be stored. Should be different from
``chop_a_out_tag``.
subtract : bool
If True, the other chop position is subtracted before saving out the chop A and chop B
images.
crop: tuple(int, int, float), None
The pixel position (x, y) around which the chop A and chop B images are cropped and
the new image size (arcsec), together provided as (pos_x, pos_y, size). The same size
will be used for both image dimensions. It is recommended to crop the images around
the approximate coronagraph position. No cropping is applied if set to None.
combine: str, None
Method ('mean' or 'median') for combining (separately) the chop A and chop B frames
from each cube into a single frame. All frames are stored if set to None.
Returns
-------
NoneType
None
"""
super().__init__(name_in, input_dir=input_dir)
self.m_chopa_out_port = self.add_output_port(chopa_out_tag)
self.m_chopb_out_port = self.add_output_port(chopb_out_tag)
self.m_subtract = subtract
self.m_crop = crop
self.m_combine = combine
@staticmethod
@typechecked
def _uncompress_file(filename: str) -> None:
"""
Internal function to uncompress a .Z file.
Parameters
----------
filename : str
Compressed .Z file.
Returns
-------
NoneType
None
"""
try:
# try running a subprocess with the 'uncompress' command
command = 'uncompress ' + filename
subprocess.check_call(shlex.split(command))
except(FileNotFoundError, OSError):
# or else run a subprocess with the 'gunzip' command
command = 'gunzip -d ' + filename
subprocess.check_call(shlex.split(command))
[docs] @typechecked
def uncompress(self) -> None:
"""
Method to check if the input directory contains compressed files ending with .fits.Z.
If this is the case, the files will be uncompressed using multithreading. The number
of threads can be set with the ``CPU`` parameter in the configuration file.
Returns
-------
NoneType
None
"""
cpu = self._m_config_port.get_attribute('CPU')
# list all files ending with .fits.Z in the input location
files = []
for item in os.listdir(self.m_input_location):
if item.endswith('.fits.Z'):
files.append(os.path.join(self.m_input_location, item))
if files:
# subdivide the file indices by number of CPU
indices = memory_frames(cpu, len(files))
start_time = time.time()
for i, _ in enumerate(indices[:-1]):
progress(i, len(indices[:-1]), 'Uncompressing NEAR data...', start_time)
# select subset of compressed files
subset = files[indices[i]:indices[i+1]]
# create a list of threads to uncompress CPU number of files
# each file is processed by a different thread
threads = []
for filename in subset:
thread = threading.Thread(target=self._uncompress_file, args=(filename, ))
threads.append(thread)
# start the threads
for item in threads:
item.start()
# join the threads
for item in threads:
item.join()
[docs] @staticmethod
@typechecked
def read_images(filename: str,
im_shape: Tuple[int, int, int]) -> Tuple[np.ndarray, np.ndarray]:
"""
Function that opens a FITS file and separates the chop A and chop B images. The primary HDU
contains only a general header. The subsequent HDUs contain a single image with a small
extra header. The last HDU is the average of all images, which will be ignored.
Parameters
----------
filename : str
Absolute path and filename of the FITS file.
im_shape : tuple(int, int, int)
Shape of a stack of images for chop A or B.
Returns
-------
numpy.array
Array containing the images of chop A.
numpy.array
Array containing the images of chop B.
"""
# open the FITS file
hdulist = fits.open(filename)
# initialize the image arrays for chop A and B
chopa = np.zeros(im_shape, dtype=np.float32)
chopb = np.zeros(im_shape, dtype=np.float32)
count_chopa, count_chopb = 0, 0
prev_cycle = None
for i in range(2*im_shape[0]):
# get the chop position (HCYCLE1 = chop A, HCYCLE2 = chop B)
# primary HDU is skipped with +1
if 'ESO DET FRAM TYPE' in hdulist[i+1].header:
cycle = hdulist[i+1].header['ESO DET FRAM TYPE']
else:
hdulist.close()
raise ValueError(f'Frame type not found in the FITS header. Image number: {i}.')
# write the HDU image to the chop A or B array
# count the number of chop A and B images
if cycle == 'HCYCLE1' and cycle != prev_cycle:
chopa[count_chopa, ] = hdulist[i+1].data.byteswap().newbyteorder()
count_chopa += 1
prev_cycle = cycle
elif cycle == 'HCYCLE2' and cycle != prev_cycle:
chopb[count_chopb, ] = hdulist[i+1].data.byteswap().newbyteorder()
count_chopb += 1
prev_cycle = cycle
elif cycle == prev_cycle:
warnings.warn(f'Previous and current chop position ({cycle}) are the same. '
'Skipping the current image.')
else:
hdulist.close()
raise ValueError(f'Frame type ({cycle}) not a valid value. Expecting HCYCLE1 or '
'HCYCLE2 as value for ESO DET FRAM TYPE.')
# check if the number of chop A and B images is equal, this error should never occur
if count_chopa != count_chopb:
warnings.warn('The number of images is not equal for chop A and chop B.')
hdulist.close()
return chopa, chopb
[docs] @typechecked
def run(self) -> None:
"""
Run the module. The FITS files are collected from the input directory and uncompressed if
needed. The images are then sorted by the two chop positions (chop A and chop B). The
required FITS header keywords (which should be set in the configuration file) are also
imported and stored as attributes to the two output datasets in the HDF5 database.
Returns
-------
NoneType
None
"""
# clear the output ports
self.m_chopa_out_port.del_all_data()
self.m_chopa_out_port.del_all_attributes()
self.m_chopb_out_port.del_all_data()
self.m_chopb_out_port.del_all_attributes()
# uncompress the FITS files if needed
self.uncompress()
# find and sort the FITS files
files = []
for filename in os.listdir(self.m_input_location):
if filename.endswith('.fits'):
files.append(os.path.join(self.m_input_location, filename))
files.sort()
# check if there are FITS files present in the input location
assert files, f'No FITS files found in {self.m_input_location}.'
# if cropping chop A, get pixscale and convert crop_size to pixels and swap x/y
if self.m_crop is not None:
pixscale = self._m_config_port.get_attribute('PIXSCALE')
self.m_crop = (self.m_crop[1], self.m_crop[0], int(math.ceil(self.m_crop[2]/pixscale)))
start_time = time.time()
for i, filename in enumerate(files):
progress(i, len(files), 'Preprocessing NEAR data...', start_time)
# get the primary header data and the image shape
header, im_shape = self.read_header(filename)
# get the images of chop A and chop B
chopa, chopb = self.read_images(filename, im_shape)
if self.m_subtract:
chopa = chopa - chopb
chopb = -1. * np.copy(chopa)
if self.m_crop is not None:
chopa = crop_image(chopa,
center=self.m_crop[0:2],
size=self.m_crop[2],
copy=False)
chopb = crop_image(chopb,
center=self.m_crop[0:2],
size=self.m_crop[2],
copy=False)
if self.m_combine is not None:
if self.m_combine == 'mean':
chopa = np.mean(chopa, axis=0)
chopb = np.mean(chopb, axis=0)
elif self.m_combine == 'median':
chopa = np.median(chopa, axis=0)
chopb = np.median(chopb, axis=0)
header[self._m_config_port.get_attribute('NFRAMES')] = 1
# append the images of chop A and B
self.m_chopa_out_port.append(chopa, data_dim=3)
self.m_chopb_out_port.append(chopb, data_dim=3)
# starting value for the INDEX attribute
first_index = 0
for port in (self.m_chopa_out_port, self.m_chopb_out_port):
# set the static attributes
set_static_attr(fits_file=filename,
header=header,
config_port=self._m_config_port,
image_out_port=port,
check=True)
# set the non-static attributes
set_nonstatic_attr(header=header,
config_port=self._m_config_port,
image_out_port=port,
check=True)
# set the remaining attributes
set_extra_attr(fits_file=filename,
nimages=im_shape[0]//2,
config_port=self._m_config_port,
image_out_port=port,
first_index=first_index)
# increase the first value of the INDEX attribute
first_index += im_shape[0]//2
# flush the output port
port.flush()
# add history information
self.m_chopa_out_port.add_history('NearReadingModule', 'Chop A')
self.m_chopb_out_port.add_history('NearReadingModule', 'Chop B')
# close all connections to the database
self.m_chopa_out_port.close_port()