Source code for pynpoint.util.tests

"""
Functions for testing the pipeline and its modules.
"""

import os
import math
import shutil
import subprocess

from typing import List, Optional, Tuple, Union

import h5py
import numpy as np

from typeguard import typechecked
from astropy.io import fits
from scipy.ndimage import shift


[docs]@typechecked def create_config(filename: str) -> None: """ Create a configuration file. Parameters ---------- filename : str Configuration filename. Returns ------- NoneType None """ with open(filename, 'w') as file_obj: file_obj.write('[header]\n\n') file_obj.write('INSTRUMENT: INSTRUME\n') file_obj.write('NFRAMES: NAXIS3\n') file_obj.write('EXP_NO: ESO DET EXP NO\n') file_obj.write('NDIT: ESO DET NDIT\n') file_obj.write('PARANG_START: ESO ADA POSANG\n') file_obj.write('PARANG_END: ESO ADA POSANG END\n') file_obj.write('DITHER_X: ESO SEQ CUMOFFSETX\n') file_obj.write('DITHER_Y: ESO SEQ CUMOFFSETY\n') file_obj.write('DIT: None\n') file_obj.write('LATITUDE: None\n') file_obj.write('LONGITUDE: None\n') file_obj.write('PUPIL: None\n') file_obj.write('DATE: None\n') file_obj.write('RA: None\n') file_obj.write('DEC: None\n\n') file_obj.write('[settings]\n\n') file_obj.write('PIXSCALE: 0.027\n') file_obj.write('MEMORY: 39\n') file_obj.write('CPU: 1\n')
[docs]@typechecked def create_random(path: str, nimages: float = 5) -> None: """ Create a dataset of images with Gaussian distributed pixel values. Parameters ---------- path : str Working folder. nimages : int Number of images. Returns ------- NoneType None """ if not os.path.exists(path): os.makedirs(path) file_in = os.path.join(path, 'PynPoint_database.hdf5') np.random.seed(1) images = np.random.normal(loc=0, scale=2e-4, size=(nimages, 11, 11)) with h5py.File(file_in, 'w') as h5_file: dset = h5_file.create_dataset('images', data=images) dset.attrs['PIXSCALE'] = 0.01 h5_file.create_dataset('header_images/PARANG', data=np.arange(float(nimages)))
[docs]@typechecked def create_fits(path: str, filename: str, image: np.ndarray, ndit: int, exp_no: int, dither_x: float, dither_y: float) -> None: """ Create a FITS file with images and header information. Parameters ---------- path : str Working folder. filename : str FITS filename. image : np.ndarray Images. ndit : int Number of integrations. exp_no : int Exposure number. dither_x : float Horizontal dither position relative to the image center. dither_y : float Vertical dither position relative to the image center. Returns ------- NoneType None """ hdu = fits.PrimaryHDU() header = hdu.header header['INSTRUME'] = 'IMAGER' header['HIERARCH ESO DET EXP NO'] = 1. header['HIERARCH ESO DET NDIT'] = ndit header['HIERARCH ESO DET EXP NO'] = exp_no header['HIERARCH ESO ADA POSANG'] = 0. header['HIERARCH ESO ADA POSANG END'] = 180. header['HIERARCH ESO SEQ CUMOFFSETX'] = dither_x header['HIERARCH ESO SEQ CUMOFFSETY'] = dither_y hdu.data = image hdu.writeto(os.path.join(path, filename))
[docs]@typechecked def create_fake_data(path: str) -> None: """ Create an ADI dataset with a star and planet. Parameters ---------- path : str Working folder. Returns ------- NoneType None """ if not os.path.exists(path): os.makedirs(path) ndit = 10 npix = 21 fwhm = 3. sep = 6. contrast = 1e-1 pos_star = 10. exp_no = 1 parang = np.linspace(0., 180., 10) np.random.seed(1) sigma = fwhm / (2.*math.sqrt(2.*math.log(2.))) x = np.arange(0., 21., 1.) y = np.arange(0., 21., 1.) xx, yy = np.meshgrid(x, y) images = np.zeros((ndit, npix, npix)) for i, item in enumerate(parang): images[i, ] = np.random.normal(loc=0, scale=2e-4, size=(npix, npix)) star = np.exp(-((xx-pos_star)**2+(yy-pos_star)**2)/(2.*sigma**2))/(2.*np.pi*sigma**2) x_shift = sep*math.cos(math.radians(item)) y_shift = sep*math.sin(math.radians(item)) images[i, ] += star + shift(contrast*star, (x_shift, y_shift), order=5) create_fits(path, 'images.fits', images, ndit, exp_no, 0., 0.)
[docs]@typechecked def create_ifs_data(path: str) -> None: """ Create an IFS dataset with a star and planet. Parameters ---------- path : str Working folder. Returns ------- NoneType None """ ndit = 10 npix = 21 nwavel = 3 fwhm = 3. sep = 6. contrast = 1. pos_star = 10. exp_no = 1 parang = np.linspace(0., 180., 10) wavelength = [1., 1.1, 1.2] if not os.path.exists(path): os.makedirs(path) sigma = fwhm / (2.*math.sqrt(2.*math.log(2.))) x = y = np.arange(0., 21., 1.) xx, yy = np.meshgrid(x, y) np.random.seed(1) images = np.random.normal(loc=0, scale=0.05, size=(nwavel, ndit, npix, npix)) for i, par_item in enumerate(parang): for j, wav_item in enumerate(wavelength): sigma_scale = sigma*wav_item star = np.exp(-((xx-pos_star)**2+(yy-pos_star)**2)/(2.*sigma_scale**2)) x_shift = sep*math.cos(math.radians(par_item)) y_shift = sep*math.sin(math.radians(par_item)) images[j, i, ] += star + shift(contrast*star, (x_shift, y_shift), order=5) create_fits(path, 'images.fits', images, ndit, exp_no, 0., 0.)
[docs]@typechecked def create_star_data(path: str, npix: int = 11, pos_star: float = 5.) -> None: """ Create a dataset with a PSF and Gaussian noise. Parameters ---------- path : str Working folder. npix : int Number of pixels in each dimension. Returns ------- NoneType None """ fwhm = 3. nimages = 5 exp_no = [1, 2] parang_start = [0., 90.] parang_end = [90., 180.] if not os.path.exists(path): os.makedirs(path) np.random.seed(1) for j, item in enumerate(exp_no): sigma = fwhm / (2. * math.sqrt(2.*math.log(2.))) x = y = np.arange(0., float(npix), 1.) xx, yy = np.meshgrid(x, y) images = np.random.normal(loc=0, scale=0.1, size=(nimages, npix, npix)) images += np.exp(-((xx-pos_star)**2+(yy-pos_star)**2)/(2.*sigma**2)) hdu = fits.PrimaryHDU() header = hdu.header header['INSTRUME'] = 'IMAGER' header['HIERARCH ESO DET EXP NO'] = item header['HIERARCH ESO DET NDIT'] = nimages header['HIERARCH ESO ADA POSANG'] = parang_start[j] header['HIERARCH ESO ADA POSANG END'] = parang_end[j] header['HIERARCH ESO SEQ CUMOFFSETX'] = 'None' header['HIERARCH ESO SEQ CUMOFFSETY'] = 'None' hdu.data = images hdu.writeto(os.path.join(path, f'images_{j}.fits'))
[docs]@typechecked def create_dither_data(path: str) -> None: """ Create a dithering dataset with a stellar PSF. Parameters ---------- path : str Working folder. Returns ------- NoneType None """ if not os.path.exists(path): os.makedirs(path) ndit = 5 npix = 21 fwhm = 3. exp_no = [1, 2, 3, 4] pos_star = [(5., 5.), (5., 15.), (15., 15.), (15., 5.)] parang = np.full(10, 0.) np.random.seed(1) sigma = fwhm / (2.*math.sqrt(2.*math.log(2.))) x = np.arange(0., 21., 1.) y = np.arange(0., 21., 1.) xx, yy = np.meshgrid(x, y) for i, item in enumerate(exp_no): images = np.random.normal(loc=0, scale=0.1, size=(ndit, npix, npix)) for j in range(ndit): images[j, ] += np.exp(-((xx-pos_star[i][0])**2+(yy-pos_star[i][1])**2)/(2.*sigma**2)) create_fits(path, f'images_{i}.fits', images, ndit, item, pos_star[i][0]-10., pos_star[i][1]-10.)
[docs]@typechecked def create_waffle_data(path: str) -> None: """ Create data with satellite spots and Gaussian noise. Parameters ---------- path : str Working folder. Returns ------- NoneType None """ if not os.path.exists(path): os.makedirs(path) fwhm = 3 npix = 101 x_spot = [25., 25., 75., 75.] y_spot = [25., 75., 75., 25.] sigma = fwhm / (2. * math.sqrt(2.*math.log(2.))) x = y = np.arange(0., npix, 1.) xx, yy = np.meshgrid(x, y) image = np.zeros((npix, npix)) for j in range(4): image += np.exp(-((xx-x_spot[j])**2+(yy-y_spot[j])**2)/(2.*sigma**2))/(2.*np.pi*sigma**2) hdu = fits.PrimaryHDU() header = hdu.header header['INSTRUME'] = 'IMAGER' header['HIERARCH ESO DET EXP NO'] = 'None' header['HIERARCH ESO DET NDIT'] = 'None' header['HIERARCH ESO ADA POSANG'] = 'None' header['HIERARCH ESO ADA POSANG END'] = 'None' header['HIERARCH ESO SEQ CUMOFFSETX'] = 'None' header['HIERARCH ESO SEQ CUMOFFSETY'] = 'None' hdu.data = image hdu.writeto(os.path.join(path, 'images.fits'))
[docs]@typechecked def remove_test_data(path: str, folders: Optional[List[str]] = None, files: Optional[List[str]] = None) -> None: """ Function to remove data created by the test cases. Parameters ---------- path : str Working folder. folders : list(str, ) Folders to remove. files : list(str, ) Files to removes. Returns ------- NoneType None """ os.remove(path+'PynPoint_database.hdf5') os.remove(path+'PynPoint_config.ini') if folders is not None: for item in folders: shutil.rmtree(path+item) if files is not None: for item in files: os.remove(path+item)
[docs]@typechecked def create_near_data(path: str) -> None: """ Create a stack of images with Gaussian distributed pixel values. Parameters ---------- path : str Working folder. Returns ------- NoneType None """ if not os.path.exists(path): os.makedirs(path) np.random.seed(1) image = np.random.normal(loc=0., scale=1., size=(10, 10)) exp_no = [1, 2] for i, item in enumerate(exp_no): fits_file = os.path.join(path, f'images_{i}.fits') primary_header = fits.Header() primary_header['INSTRUME'] = 'VISIR' primary_header['HIERARCH ESO DET CHOP NCYCLES'] = 5 primary_header['HIERARCH ESO DET SEQ1 DIT'] = 1. primary_header['HIERARCH ESO TPL EXPNO'] = item primary_header['HIERARCH ESO DET CHOP ST'] = 'T' primary_header['HIERARCH ESO DET CHOP CYCSKIP'] = 0 primary_header['HIERARCH ESO DET CHOP CYCSUM'] = 'F' chopa_header = fits.Header() chopa_header['HIERARCH ESO DET FRAM TYPE'] = 'HCYCLE1' chopb_header = fits.Header() chopb_header['HIERARCH ESO DET FRAM TYPE'] = 'HCYCLE2' hdu = [fits.PrimaryHDU(header=primary_header)] for _ in range(5): hdu.append(fits.ImageHDU(image, header=chopa_header)) hdu.append(fits.ImageHDU(image, header=chopb_header)) # last image is the average of all images hdu.append(fits.ImageHDU(image)) hdulist = fits.HDUList(hdu) hdulist.writeto(fits_file) subprocess.call('compress '+fits_file, shell=True)