Source code for pynpoint.readwrite.fitsreading

Module for reading FITS files.

import os
import time
import warnings

from typing import List, Optional, Tuple, Union

import numpy as np

from import fits
from typeguard import typechecked

from pynpoint.core.processing import ReadingModule
from pynpoint.util.attributes import set_static_attr, set_nonstatic_attr, set_extra_attr
from pynpoint.util.module import progress

[docs]class FitsReadingModule(ReadingModule): """ Reads FITS files from the given *input_dir* or the default directory of the Pypeline. The FITS files need to contain either single images (2D) or cubes of images (3D). Individual images should have the same shape and type. The header of the FITS is scanned for the required static attributes (should be identical for each FITS file) and non-static attributes. Static entries will be saved as HDF5 attributes while non-static attributes will be saved as separate data sets in a subfolder of the database named *header_* + image_tag. If the FITS files in the input directory have changing static attributes or the shape of the input images is changing a warning appears. FitsReadingModule overwrites by default all existing data with the same tags in the central database. """ __author__ = 'Markus Bonse, Tomas Stolker' @typechecked def __init__(self, name_in: str, input_dir: Optional[str] = None, image_tag: str = 'im_arr', overwrite: bool = True, check: bool = True, filenames: Optional[Union[str, List[str]]] = None, ifs_data: bool = False) -> None: """ Parameters ---------- name_in : str Unique name of the module instance. input_dir : str, None Input directory where the FITS files are located. If not specified the Pypeline default directory is used. image_tag : str Tag of the read data in the HDF5 database. Non static header information is stored with the tag: *header_* + image_tag / header_entry_name. overwrite : bool Overwrite existing data and header in the central database. check : bool Print a warning if certain attributes from the configuration file are not present in the FITS header. If set to `False`, attributes are still written to the dataset but there will be no warning if a keyword is not found in the FITS header. filenames : str, list(str, ), None If a string, then a path of a text file should be provided. This text file should contain a list of FITS files. If a list, then the paths of the FITS files should be provided directly. If set to None, the FITS files in the `input_dir` are read. All paths should be provided either relative to the Python working folder (i.e., the folder where Python is executed) or as absolute paths. ifs_data : bool Import IFS data which is stored as a 4D array with the wavelength and temporal dimensions as first and second dimension, respectively. If set to ``False`` (default), the data is imported as a 3D array with the temporal dimension as first dimension. Returns ------- NoneType None """ super().__init__(name_in, input_dir=input_dir) self.m_image_out_port = self.add_output_port(image_tag) self.m_overwrite = overwrite self.m_check = check self.m_filenames = filenames self.m_ifs_data = ifs_data
[docs] @typechecked def read_single_file(self, fits_file: str, overwrite_tags: list) -> Tuple[fits.header.Header, tuple]: """ Function which reads a single FITS file and appends it to the database. The function gets a list of *overwriting_tags*. If a new key (header entry or image data) is found that is not on this list the old entry is overwritten if *self.m_overwrite* is active. After replacing the old entry the key is added to the *overwriting_tags*. This procedure guaranties that all previous database information, that does not belong to the new data set that is read by FitsReadingModule is replaced and the rest is kept. Parameters ---------- fits_file : str Absolute path and filename of the FITS file. overwrite_tags : list(str, ) The list of database tags that will not be overwritten. Returns ------- FITS header. tuple(int, ) Image shape. """ hdu_list = if hdu_list[0].data is not None: images = hdu_list[0].data.byteswap().newbyteorder() elif len(hdu_list) > 1: for i, item in enumerate(hdu_list[1:]): if isinstance(item, fits.hdu.image.ImageHDU): warnings.simplefilter('always', UserWarning) warnings.warn(f"No data was found in the PrimaryHDU " f"so reading data from the ImageHDU " f"at number {i+1} instead.") images = hdu_list[i+1].data.byteswap().newbyteorder() break else: raise RuntimeError(f"No data was found in {fits_file}.") images = np.nan_to_num(images) if images.ndim == 4 and not self.m_ifs_data: raise ValueError('The input data is 4D but ifs_data is set to False. Reading in 4D ' 'data is only possible by setting the argument to True.') if images.ndim < 3 and self.m_ifs_data: raise ValueError('It is only possible to read 3D or 4D data when ifs_data is set to ' 'True.') if self.m_overwrite and self.m_image_out_port.tag not in overwrite_tags: overwrite_tags.append(self.m_image_out_port.tag) if self.m_ifs_data: self.m_image_out_port.set_all(images, data_dim=4) else: self.m_image_out_port.set_all(images, data_dim=3) self.m_image_out_port.del_all_attributes() else: if self.m_ifs_data: self.m_image_out_port.append(images, data_dim=4) else: self.m_image_out_port.append(images, data_dim=3) header = hdu_list[0].header fits_header = [] for key in header: fits_header.append(f'{key} = {header[key]}') hdu_list.close() header_out_port = self.add_output_port('fits_header/'+os.path.basename(fits_file)) header_out_port.set_all(fits_header) return header, images.shape
@typechecked def _txt_file_list(self) -> list: """ Internal function to import a list of FITS files from a text file. """ with open(self.m_filenames) as file_obj: files = file_obj.readlines() # remove newlines files = [x.strip() for x in files] # remove of empty lines files = filter(None, files) return list(files)
[docs] @typechecked def run(self) -> None: """ Run method of the module. Looks for all FITS files in the input directory and imports the images into the database. Note that previous database information is overwritten if ``overwrite=True``. The filenames are stored as attributes. Returns ------- NoneType None """ files = [] if isinstance(self.m_filenames, str): files = self._txt_file_list() for item in files: if not os.path.isfile(item): raise ValueError(f'The file {item} does not exist. Please check that the ' f'path is correct.') elif isinstance(self.m_filenames, list): files = self.m_filenames for item in files: if not os.path.isfile(item): raise ValueError(f'The file {item} does not exist. Please check that the ' f'path is correct.') elif isinstance(self.m_filenames, type(None)): for filename in os.listdir(self.m_input_location): if filename.endswith('.fits') and not filename.startswith('._'): files.append(os.path.join(self.m_input_location, filename)) assert files, 'No FITS files found in %s.' % self.m_input_location files.sort() overwrite_tags = [] first_index = 0 start_time = time.time() for i, fits_file in enumerate(files): progress(i, len(files), 'Reading FITS files...', start_time) header, shape = self.read_single_file(fits_file, overwrite_tags) if len(shape) == 2: nimages = 1 elif len(shape) == 3: if self.m_ifs_data: nimages = 1 else: nimages = shape[0] elif len(shape) == 4: nimages = shape[1] else: raise ValueError('Data read from FITS file has an invalid shape.') set_static_attr(fits_file=fits_file, header=header, config_port=self._m_config_port, image_out_port=self.m_image_out_port, check=self.m_check) set_nonstatic_attr(header=header, config_port=self._m_config_port, image_out_port=self.m_image_out_port, check=self.m_check) set_extra_attr(fits_file=fits_file, nimages=nimages, config_port=self._m_config_port, image_out_port=self.m_image_out_port, first_index=first_index) first_index += nimages self.m_image_out_port.flush() self.m_image_out_port.close_port()