"""
Modules for accessing data and attributes in the central database.
"""
import os
import warnings
from abc import ABCMeta, abstractmethod
from typing import Dict, List, Optional, Tuple, Union
import h5py
import numpy as np
from typeguard import typechecked
from pynpoint.util.type_aliases import NonStaticAttribute, StaticAttribute
[docs]class DataStorage:
"""
Instances of DataStorage manage to open and close the Pypeline HDF5 databases. They have an
internal h5py data bank (self.m_data_bank) which gives direct access to the data if the storage
is open (self.m_open == True).
"""
@typechecked
def __init__(self,
location_in: str) -> None:
"""
Constructor of a DataStorage instance. It needs the location of the HDF5 file (Pypeline
database) as input. If the file already exists it is opened and extended, if not a new File
will be created.
Parameters
----------
location_in : str
Location (directory + filename) of the HDF5 database.
Returns
-------
NoneType
None
"""
assert (os.path.isdir(os.path.split(location_in)[0])), 'Input directory for DataStorage ' \
'does not exist - input requested:'\
' %s.' % location_in
self._m_location = location_in
self.m_data_bank = None
self.m_open = False
[docs] @typechecked
def open_connection(self) -> None:
"""
Opens the connection to the HDF5 file by opening an old file or creating a new one.
Returns
-------
NoneType
None
"""
if not self.m_open:
self.m_data_bank = h5py.File(self._m_location, mode='a')
self.m_open = True
[docs] @typechecked
def close_connection(self) -> None:
"""
Closes the connection to the HDF5 file. All entries of the data bank will be stored on the
hard drive and the memory is cleaned.
Returns
-------
NoneType
None
"""
if self.m_open:
self.m_data_bank.close()
self.m_data_bank = None
self.m_open = False
[docs]class Port(metaclass=ABCMeta):
"""
Abstract interface and implementation of common functionality of the InputPort, OutputPort, and
ConfigPort. Each Port has a internal tag which is its key to a dataset in the DataStorage. If
for example data is stored under the entry ``im_arr`` in the central data storage only a port
with the tag (``self._m_tag = im_arr``) can access and change that data. A port knows exactly
one DataStorage instance, whether it is active or not (``self._m_data_base_active``).
"""
@abstractmethod
@typechecked
def __init__(self,
tag: str,
data_storage_in: Optional[DataStorage] = None) -> None:
"""
Abstract constructor of a Port. As input the tag / key is expected which is needed to build
the connection to the database entry with the same tag / key. It is possible to give the
Port a DataStorage. If this storage is not given the Pypeline module has to set it or the
connection needs to be added manually using
:func:`~pynpoint.core.dataio.Port.set_database_connection`.
Parameters
----------
tag : str
Input Tag.
data_storage_in : pynpoint.core.dataio.DataStorage
The data storage to which the port is connected.
Returns
-------
NoneType
None
"""
assert isinstance(tag, str), 'Port tag needs to be a string.'
self._m_tag = tag
self._m_data_storage = data_storage_in
self._m_data_base_active = False
@property
@typechecked
def tag(self) -> str:
"""
Getter for the internal tag (no setter).
Returns
-------
str
Database tag name.
"""
return self._m_tag
[docs] @typechecked
def open_port(self) -> None:
"""
Opens the connection to the :class:`~pynpoint.core.dataio.DataStorage` and activates its
data bank.
Returns
-------
NoneType
None
"""
if not self._m_data_base_active:
self._m_data_storage.open_connection()
self._m_data_base_active = True
[docs] @typechecked
def close_port(self) -> None:
"""
Closes the connection to the :class:`~pynpoint.core.dataio.DataStorage` and forces it to
save the data to the hard drive. All data that was accessed using the port is cleaned from
the memory.
Returns
-------
NoneType
None
"""
if self._m_data_base_active:
self._m_data_storage.close_connection()
self._m_data_base_active = False
[docs] @typechecked
def set_database_connection(self,
data_base_in: DataStorage) -> None:
"""
Sets the internal DataStorage instance.
Parameters
----------
data_base_in: pynpoint.core.dataio.DataStorage
The input DataStorage.
Returns
-------
NoneType
None
"""
self._m_data_storage = data_base_in
[docs]class ConfigPort(Port):
"""
ConfigPort can be used to read the 'config' tag from a (HDF5) database. This tag contains
the central settings used by PynPoint, as well as the relevant FITS header keywords. You can
use a ConfigPort instance to access a single attribute of the dataset using get_attribute().
"""
@typechecked
def __init__(self,
tag: str,
data_storage_in: Optional[DataStorage] = None) -> None:
"""
Constructor of the ConfigPort class which creates the config port instance which can read
the settings stored in the central database under the tag `config`. An instance of the
ConfigPort is created in the constructor of PypelineModule such that the attributes in
the ConfigPort can be accessed from within all type of modules. For example:
.. code-block:: python
memory = self._m_config_port.get_attribute('MEMORY')
Parameters
----------
tag : str
The tag name of the port. The port can be used to get data from the dataset with the
key `config`.
data_storage_in : pynpoint.core.dataio.DataStorage
The input DataStorage. It is possible to give the constructor of an ConfigPort a
DataStorage instance which will link the port to that DataStorage. Usually the
DataStorage is set later by calling
:func:`~pynpoint.core.dataio.Port.set_database_connection`.
Returns
-------
NoneType
None
"""
super().__init__(tag, data_storage_in)
if tag != 'config':
raise ValueError('The tag name of the central configuration should be \'config\'.')
@typechecked
def _check_status_and_activate(self) -> bool:
"""
Internal function which checks if the ConfigPort is ready to use and open it.
Returns
-------
bool
Returns True if the ConfigPort can be used, False if not.
"""
if self._m_data_storage is None:
warnings.warn('ConfigPort can not load data unless a database is connected.')
status = False
else:
if not self._m_data_base_active:
self.open_port()
status = True
return status
@typechecked
def _check_if_data_exists(self) -> bool:
"""
Internal function which checks if data exists for the 'config' tag.
Returns
-------
bool
Returns True if data exists, False if not.
"""
return 'config' in self._m_data_storage.m_data_bank
@typechecked
def _check_error_cases(self) -> bool:
"""'
Internal function which checks the error cases.
"""
if not self._check_status_and_activate():
status = False
elif self._check_if_data_exists() is False:
warnings.warn('No data under the tag which is linked by the ConfigPort.')
status = False
else:
status = True
return status
[docs] @typechecked
def get_attribute(self,
name: str) -> Optional[StaticAttribute]:
"""
Returns a static attribute which is connected to the dataset of the ConfigPort.
Parameters
----------
name : str
The name of the attribute.
Returns
-------
str, float, or int
The attribute value. Returns None if the attribute does not exist.
"""
if not self._check_error_cases():
attr_val = None
elif name in self._m_data_storage.m_data_bank['config'].attrs:
attr_val = self._m_data_storage.m_data_bank['config'].attrs[name]
else:
warnings.warn(f'The attribute \'{name}\' was not found.')
attr_val = None
# Convert numpy types to base types (e.g., np.float64 -> float)
if isinstance(attr_val, np.generic):
attr_val = attr_val.item()
return attr_val
[docs]class OutputPort(Port):
"""
Output ports can be used to save results under a given tag to the HDF5 DataStorage. An instance
of OutputPort with self.tag=`tag` can store data under the key `tag` by using one of the
following methods:
* set_all(...) - replaces and sets the whole dataset
* append(...) - appends data to the existing data set. For more information see
function documentation (:func:`~pynpoint.core.dataio.OutputPort.append`).
* slicing - sets a part of the actual dataset. Example:
.. code-block:: python
out_port = OutputPort('Some_tag')
data = np.ones(200, 200) # 2D image filled with ones
out_port[0,:,:] = data # Sets the first 2D image of a 3D image stack
* add_attribute(...) - modifies or creates a attribute of the dataset
* del_attribute(...) - deletes a attribute
* del_all_attributes(...) - deletes all attributes
* append_attribute_data(...) - appends information to non-static attributes. See
add_attribute() (:func:`~pynpoint.core.dataio.OutputPort.add_attribute`) for more
information about static and non-static attributes.
* check_static_attribute(...) - checks if a static attribute exists and if it is equal to a
given value
* other functions listed below
For more information about how data is organized inside the central database have a look at the
function documentation of the function :func:`~pynpoint.core.dataio.OutputPort.set_all` and
:func:`~pynpoint.core.dataio.OutputPort.append`.
Furthermore it is possible to deactivate a OutputPort to stop him saving data.
"""
@typechecked
def __init__(self,
tag: str,
data_storage_in: Optional[DataStorage] = None,
activate_init: bool = True) -> None:
"""
Constructor of the OutputPort class which creates an output port instance which can write
data to the the central database under the tag `tag`. If you write a PypelineModule you
should not create instances manually! Use the add_output_port() function instead.
Parameters
----------
tag : str
The tag of the port. The port can be used in order to write data to the dataset with
the key = `tag`.
data_storage_in : pynpoint.core.dataio.DataStorage
It is possible to give the constructor of an OutputPort a DataStorage instance which
will link the port to that DataStorage. Usually the DataStorage is set later by calling
:func:`~pynpoint.core.dataio.Port.set_database_connection`.
Returns
-------
NoneType
None
"""
super().__init__(tag, data_storage_in)
self.m_activate = activate_init
if tag == 'config':
raise ValueError('The tag name \'config\' is reserved for the central configuration '
'of PynPoint.')
if tag == 'fits_header':
raise ValueError('The tag name \'fits_header\' is reserved for storage of the FITS '
'headers.')
@typechecked
def _check_status_and_activate(self) -> bool:
"""
Internal function which checks if the OutputPort is ready to use and open it.
Returns
-------
:return: Returns True if the OutputPort can be used, False if not.
:rtype: bool
"""
if not self.m_activate:
status = False
elif self._m_data_storage is None:
warnings.warn('OutputPort can not store data unless a database is connected.')
status = False
else:
if not self._m_data_base_active:
self.open_port()
status = True
return status
@typechecked
def _init_dataset(self,
first_data: Union[np.ndarray, list],
tag: str,
data_dim: Optional[int] = None) -> None:
"""
Internal function which is used to initialize a dataset in the HDF5 database.
Parameters
----------
first_data : np.ndarray, list
The initial data.
tag : str
Database tag.
data_dim : int, None
Number of dimensions. The dimensions of ``first_data`` is used if set to ``None``.
Returns
-------
NoneType
None
"""
@typechecked
def _ndim_check(data_dim: int,
first_dim: int) -> None:
if first_dim > 5 or first_dim < 1:
raise ValueError('Output port can only save numpy arrays from 1D to 5D. Use Port '
'attributes to save as int, float, or string.')
if data_dim > 5 or data_dim < 1:
raise ValueError('The data dimensions should be 1D, 2D, 3D, 4D, or 5D.')
if data_dim < first_dim:
raise ValueError('The dimensions of the data should be equal to or larger than the '
'dimensions of the input data.')
if data_dim == 3 and first_dim == 1:
raise ValueError('Cannot initialize 1D data in 3D data container.')
first_data = np.asarray(first_data)
if data_dim is None:
data_dim = first_data.ndim
_ndim_check(data_dim, first_data.ndim)
if data_dim == first_data.ndim:
if first_data.ndim == 1: # 1D
data_shape = (None, )
elif first_data.ndim == 2: # 2D
data_shape = (None, first_data.shape[1])
elif first_data.ndim == 3: # 3D
data_shape = (None, first_data.shape[1], first_data.shape[2])
elif first_data.ndim == 4: # 4D
data_shape = (first_data.shape[0], None, first_data.shape[2], first_data.shape[3])
elif first_data.ndim == 5: # 5D
data_shape = (first_data.shape[0], first_data.shape[1], first_data.shape[2],
first_data.shape[3], first_data.shape[4])
else:
if data_dim == 2: # 1D -> 2D
data_shape = (None, first_data.shape[0])
first_data = first_data[np.newaxis, :]
elif data_dim == 3: # 2D -> 3D
data_shape = (None, first_data.shape[0], first_data.shape[1])
first_data = first_data[np.newaxis, :, :]
elif data_dim == 4: # 3D -> 4D
data_shape = (first_data.shape[0], None, first_data.shape[1], first_data.shape[2])
first_data = first_data[:, np.newaxis, :, :]
if first_data.size == 0:
warnings.warn(f'The new dataset that is stored under the tag name \'{tag}\' is empty.')
else:
if isinstance(first_data[0], str):
first_data = np.array(first_data, dtype='|S')
self._m_data_storage.m_data_bank.create_dataset(tag,
data=first_data,
maxshape=data_shape)
@typechecked
def _set_all_key(self,
tag: str,
data: np.ndarray,
data_dim: Optional[int] = None,
keep_attributes: bool = False) -> None:
"""
Internal function which sets the values of a dataset under the *tag* name in the database.
If old data exists it will be overwritten. This function is used by
:func:`~pynpoint.core.dataio.OutputPort.set_all` and for setting non-static attributes.
Parameters
----------
tag : str
Database tag of the data that will be modified.
data : np.ndarray
The data that will be stored and replace any old data.
data_dim : int
Number of dimension of the data.
keep_attributes : bool
Keep all static attributes of the dataset if set to True. Non-static attributes will be
kept anyway so not needed for setting non-static attributes.
Returns
-------
NoneType
None
"""
tmp_attributes = {}
# check if database entry is new...
if tag in self._m_data_storage.m_data_bank:
# NO -> database entry exists
if keep_attributes:
# we have to copy all attributes since deepcopy is not supported
for key, value in self._m_data_storage.m_data_bank[tag].attrs.items():
tmp_attributes[key] = value
# remove database entry
del self._m_data_storage.m_data_bank[tag]
# make new database entry
self._init_dataset(data, tag, data_dim=data_dim)
if keep_attributes:
for key, value in tmp_attributes.items():
self._m_data_storage.m_data_bank[tag].attrs[key] = value
@typechecked
def _append_key(self,
tag: str,
data: Union[np.ndarray, list],
data_dim: Optional[int] = None,
force: bool = False) -> None:
"""
Internal function for appending data to a dataset or appending non-static attributes.
See :func:`~pynpoint.core.dataio.OutputPort.append` for more information.
Parameters
----------
tag : str
Database tag where the data will be stored.
data : np.ndarray
The data that will be appended.
data_dim : int
Number of dimension of the data.
force : bool
The existing data will be overwritten if shape or type does not match.
Returns
-------
NoneType
None
"""
# check if database entry is new...
if tag not in self._m_data_storage.m_data_bank:
# YES -> database entry is new
self._init_dataset(data, tag, data_dim=data_dim)
return None
# NO -> database entry exists
# check if the existing data has the same dim and datatype
tmp_shape = self._m_data_storage.m_data_bank[tag].shape
tmp_dim = len(tmp_shape)
if data_dim is None:
data_dim = tmp_dim
# convert input data to numpy array
data = np.asarray(data)
# if the dimension offset is 1 add that dimension (e.g. save 2D image in 3D image stack)
if data.ndim + 1 == data_dim:
if data_dim == 2:
data = data[np.newaxis, :]
elif data_dim == 3:
data = data[np.newaxis, :, :]
elif data_dim == 4:
data = data[:, np.newaxis, :, :]
@typechecked
def _type_check() -> bool:
check_result = False
if tmp_dim == data.ndim:
if tmp_dim == 1:
check_result = True
elif tmp_dim == 2:
check_result = tmp_shape[1] == data.shape[1]
elif tmp_dim == 3:
# check if the spatial shape is the same
check_result = (tmp_shape[1] == data.shape[1]) and \
(tmp_shape[2] == data.shape[2])
elif tmp_dim == 4:
# check if the spectral and spatial shape is the same
check_result = (tmp_shape[0] == data.shape[0]) and \
(tmp_shape[2] == data.shape[2]) and \
(tmp_shape[3] == data.shape[3])
return check_result
if _type_check():
# YES -> dim and type match
# we always append in axis one independent of the dimension
# 1D case
if data.size == 0:
warnings.warn(f'The dataset that is appended under the tag name \'{tag}\' '
f'is empty.')
else:
if isinstance(data[0], str):
data = np.array(data, dtype='|S')
if data.ndim == 4:
# IFS data: (n_wavelength, n_dit, y_pos, x_pos)
self._m_data_storage.m_data_bank[tag].resize(tmp_shape[1] + data.shape[1], axis=1)
self._m_data_storage.m_data_bank[tag][:, tmp_shape[1]:, :, :] = data
else:
# Other data: n_dit is the first dimension
self._m_data_storage.m_data_bank[tag].resize(tmp_shape[0] + data.shape[0], axis=0)
self._m_data_storage.m_data_bank[tag][tmp_shape[0]:, ] = data
return None
# NO -> shape or type is different
# Check force
if force:
# YES -> Force is true
self._set_all_key(tag, data=data)
return None
# NO -> Error message
raise ValueError(f'The port tag \'{self._m_tag}\' is already used with a different data '
f'type. The \'force\' parameter can be used to replace the tag.')
@typechecked
def __setitem__(self,
key: Union[slice, int, tuple],
value: Union[np.ndarray, int]) -> None:
"""
Internal function needed to change data using slicing. See class documentation for an
example (:class:`~pynpoint.core.dataio.OutputPort`).
Parameters
----------
key : slice
Index slice to be changed.
value : np.ndarray
New data.
Returns
-------
NoneType
None
"""
if self._check_status_and_activate():
self._m_data_storage.m_data_bank[self._m_tag][key] = value
[docs] @typechecked
def del_all_data(self) -> None:
"""
Delete all data belonging to the database tag.
"""
if self._check_status_and_activate():
if self._m_tag in self._m_data_storage.m_data_bank:
del self._m_data_storage.m_data_bank[self._m_tag]
[docs] @typechecked
def set_all(self,
data: Union[np.ndarray, list],
data_dim: Optional[int] = None,
keep_attributes: bool = False) -> None:
"""
Set the data in the database by replacing all old values with the values of the input data.
If no old values exists the data is just stored. Since it is not possible to change the
number of dimensions of a data set later in the processing history one can choose a
dimension different to the input data. The following cases are implemented:
* (#dimension of the first input data#, #desired data_dim#)
* (1, 1) 1D input or single value will be stored as list in HDF5
* (1, 2) 1D input, but 2D array stored inside (i.e. a list of lists with a fixed size).
* (2, 2) 2D input (single image) and 2D array stored inside (i.e. a list of lists with a
fixed size).
* (2, 3) 2D input (single image) but 3D array stored inside (i.e. a stack of images with
a fixed size).
* (3, 3) 3D input and 3D array stored inside (i.e. a stack of images with a fixed size).
For 2D and 3D data the first dimension always represents the list / stack (variable size)
while the second (or third) dimension has a fixed size. After creation it is possible to
extend a data set using :func:`~pynpoint.core.dataio.OutputPort.append` along the first
dimension.
**Example 1:**
Input 2D array with size (200, 200). Desired dimension 3D. The result is a 3D dataset with
the dimension (1, 200, 200). It is possible to append other images with the size (200, 200)
or other stacks of images with the size (:, 200, 200).
**Example 2:**
Input 2D array with size (200, 200). Desired dimension 2D. The result is a 2D dataset with
the dimension (200, 200). It is possible to append other list with the length 200
or other stacks of lines with the size (:, 200). However it is not possible to append other
2D images along a third dimension.
Parameters
----------
data : np.ndarray
The data to be saved.
data_dim : int
Number of data dimensions. The dimension of the *first_data* is used if set to None.
keep_attributes : bool
All attributes of the old dataset will remain the same if set to True.
Returns
-------
NoneType
None
"""
data = np.asarray(data)
if self._check_status_and_activate():
self._set_all_key(tag=self._m_tag,
data=data,
data_dim=data_dim,
keep_attributes=keep_attributes)
[docs] @typechecked
def append(self,
data: Union[np.ndarray, list],
data_dim: Optional[int] = None,
force: bool = False) -> None:
"""
Appends data to an existing dataset along the first dimension. If no data exists for the
:class:`~pynpoint.core.dataio.OutputPort`, then a new data set is created. For more
information about how the dimensions are organized, see the documentation of
:func:`~pynpoint.core.dataio.OutputPort.set_all`. Note it is not possible to append data
with a different shape or data type to an existing dataset.
**Example:** An internal data set is 3D (storing a stack of 2D images) with shape of
``(233, 300, 300)``, that is, it contains 233 images with a resolution of 300 by 300
pixels. Thus it is only possible to extend along the first dimension by appending new
images with a shape of ``(300, 300)`` or by appending a stack of images with a shape of
``(:, 300, 300)``.
It is possible to force the function to overwrite existing data set if the shape or type of
the input data do not match the existing data.
Parameters
----------
data : np.ndarray
The data that will be appended.
data_dim : int
Number of data dimensions used if a new data set is created. The dimension of the
``data`` is used if set to None.
force : bool
The existing data will be overwritten if the shape or type does not match.
Returns
-------
NoneType
None
"""
if self._check_status_and_activate():
self._append_key(self._m_tag,
data=data,
data_dim=data_dim,
force=force)
[docs] @typechecked
def activate(self) -> None:
"""
Activates the port. A non activated port will not save data.
Returns
-------
NoneType
None
"""
self.m_activate = True
[docs] @typechecked
def deactivate(self) -> None:
"""
Deactivates the port. A non activated port will not save data.
Returns
-------
NoneType
None
"""
self.m_activate = False
[docs] @typechecked
def add_attribute(self,
name: str,
value: Union[StaticAttribute, NonStaticAttribute],
static: bool = True) -> None:
"""
Adds an attribute to the dataset of the Port with the attribute name = `name` and the
value = `value`. If the attribute already exists it will be overwritten. Two different
types of attributes are supported:
1. **static attributes**:
Contain a single value or name (e.g. The name of the used Instrument).
2. **non-static attributes**:
Contain a dataset which is connected to the actual data set (e.g. Instrument
temperature). It is possible to append additional information to non-static
attributes later (:func:`~pynpoint.core.dataio.OutputPort.append_attribute_data`).
This is not supported by static attributes.
Static and non-static attributes are stored in a different way using the HDF5 file format.
Static attributes will be direct attributes while non-static attributes are stored in a
group with the name *header_* + name of the dataset.
Parameters
----------
name : str
Name of the attribute.
value : StaticAttribute, NonStaticAttribute
Value of the attribute.
static : bool
Indicate if the attribute is static (True) or non-static (False).
Returns
-------
NoneType
None
"""
if self._check_status_and_activate():
if self._m_tag not in self._m_data_storage.m_data_bank:
warnings.warn(f'Can not store the attribute \'{name}\' because the dataset '
f'\'{self._m_tag}\' does not exist.')
else:
if static:
self._m_data_storage.m_data_bank[self._m_tag].attrs[name] = value
else:
self._set_all_key(tag=('header_' + self._m_tag + '/' + name),
data=np.asarray(value))
[docs] @typechecked
def append_attribute_data(self,
name: str,
value: Union[StaticAttribute, NonStaticAttribute]) -> None:
"""
Function which appends data (either a single value or an array) to non-static attributes.
Parameters
----------
name : str
Name of the attribute.
value : StaticAttribute, NonStaticAttribute
Value which will be appended to the attribute dataset.
Returns
-------
NoneType
None
"""
if self._check_status_and_activate():
self._append_key(tag=('header_' + self._m_tag + '/' + name),
data=np.asarray([value, ]))
[docs] @typechecked
def copy_attributes(self,
input_port: InputPort) -> None:
"""
Copies all static and non-static attributes from a given InputPort. Attributes which already
exist will be overwritten. Non-static attributes will be linked not copied. If the InputPort
tag = OutputPort tag (self.tag) nothing will be changed. Use this function in all modules
to keep the header information.
Parameters
----------
input_port : pynpoint.core.dataio.InputPort
The InputPort with the header information.
Returns
-------
NoneType
None
"""
if self._check_status_and_activate() and input_port.tag != self._m_tag:
# link non-static attributes
if 'header_' + input_port.tag + '/' in self._m_data_storage.m_data_bank:
for attr_name, attr_data in self._m_data_storage\
.m_data_bank['header_' + input_port.tag + '/'].items():
database_name = 'header_'+self._m_tag+'/'+attr_name
# overwrite existing header information in the database
if database_name in self._m_data_storage.m_data_bank:
del self._m_data_storage.m_data_bank[database_name]
self._m_data_storage.m_data_bank[database_name] = attr_data
# copy static attributes
attributes = input_port.get_all_static_attributes()
for attr_name, attr_val in attributes.items():
self.add_attribute(attr_name, attr_val)
self._m_data_storage.m_data_bank.flush()
[docs] @typechecked
def del_attribute(self,
name: str) -> None:
"""
Deletes the attribute of the dataset with the given name. Finds and removes static and
non-static attributes.
Parameters
----------
name : str
Name of the attribute.
Returns
-------
NoneType
None
"""
if self._check_status_and_activate():
# check if attribute is static
if name in self._m_data_storage.m_data_bank[self._m_tag].attrs:
del self._m_data_storage.m_data_bank[self._m_tag].attrs[name]
elif 'header_'+self._m_tag+'/'+name in self._m_data_storage.m_data_bank:
# remove non-static attribute
del self._m_data_storage.m_data_bank[('header_' + self._m_tag + '/' + name)]
else:
warnings.warn(f'Attribute \'{name}\' does not exist and could not be deleted.')
[docs] @typechecked
def del_all_attributes(self) -> None:
"""
Deletes all static and non-static attributes of the dataset.
Returns
-------
NoneType
None
"""
if self._check_status_and_activate():
# static attributes
if self._m_tag in self._m_data_storage.m_data_bank:
self._m_data_storage.m_data_bank[self._m_tag].attrs.clear()
# non-static attributes
if 'header_' + self._m_tag + '/' in self._m_data_storage.m_data_bank:
del self._m_data_storage.m_data_bank[('header_' + self._m_tag + '/')]
[docs] @typechecked
def check_static_attribute(self,
name: str,
comparison_value: StaticAttribute) -> Optional[int]:
"""
Checks if a static attribute exists and if it is equal to a comparison value.
Parameters
----------
name : str
Name of the static attribute.
comparison_value : StaticAttribute
Comparison value.
Returns
-------
int, None
Status: 1 if the static attribute does not exist, 0 if the static attribute exists
and is equal, and -1 if the static attribute exists but is not equal.
"""
if not self._check_status_and_activate():
return None
if name in self._m_data_storage.m_data_bank[self._m_tag].attrs:
if self._m_data_storage.m_data_bank[self._m_tag].attrs[name] == comparison_value:
return 0
return -1
return 1
[docs] @typechecked
def check_non_static_attribute(self,
name: str,
comparison_value: NonStaticAttribute) -> Optional[int]:
"""
Checks if a non-static attribute exists and if it is equal to a comparison value.
Parameters
----------
name : str
Name of the non-static attribute.
comparison_value : NonStaticAttribute
Comparison values
Returns
-------
int, None
Status: 1 if the non-static attribute does not exist, 0 if the non-static attribute
exists and is equal, and -1 if the non-static attribute exists but is not equal.
"""
if not self._check_status_and_activate():
return None
group = 'header_' + self._m_tag + '/'
if group in self._m_data_storage.m_data_bank:
if name in self._m_data_storage.m_data_bank[group]:
if np.array_equal(self._m_data_storage.m_data_bank[group+name][:],
comparison_value):
return 0
return -1
return 1
return 1
[docs] @typechecked
def add_history(self,
module: str,
history: str) -> None:
"""
Adds an attribute with history information about the pipeline module.
Parameters
----------
module : str
Name of the pipeline module which was executed.
history : str
History information.
Returns
-------
NoneType
None
"""
self.add_attribute('History: ' + module, history)
[docs] @typechecked
def flush(self) -> None:
"""
Forces the :class:`~pynpoint.core.dataio.DataStorage` to save all data from the memory to
the hard drive without closing the :class:`~pynpoint.core.dataio.OutputPort`.
Returns
-------
NoneType
None
"""
self._m_data_storage.m_data_bank.flush()