"""
Interfaces for pipeline modules.
"""
import math
import os
import time
import warnings
from abc import ABCMeta, abstractmethod
from typing import Callable, List, Optional
import numpy as np
from typeguard import typechecked
from pynpoint.core.dataio import ConfigPort, DataStorage, InputPort, OutputPort
from pynpoint.util.module import progress, update_arguments
from pynpoint.util.multiline import LineProcessingCapsule
from pynpoint.util.multiproc import apply_function
from pynpoint.util.multistack import StackProcessingCapsule
[docs]class PypelineModule(metaclass=ABCMeta):
"""
Abstract interface for the PypelineModule:
* Reading module (:class:`pynpoint.core.processing.ReadingModule`)
* Writing module (:class:`pynpoint.core.processing.WritingModule`)
* Processing module (:class:`pynpoint.core.processing.ProcessingModule`)
Each :class:`~pynpoint.core.processing.PypelineModule` has a name as a unique identifier in the
:class:`~pynpoint.core.pypeline.Pypeline` and requires the ``connect_database`` and ``run``
methods.
"""
@typechecked
def __init__(self,
name_in: str) -> None:
"""
Abstract constructor of a :class:`~pynpoint.core.processing.PypelineModule`.
Parameters
----------
name_in : str
The name of the :class:`~pynpoint.core.processing.PypelineModule`.
Returns
-------
NoneType
None
"""
self._m_name = name_in
self._m_data_base = None
self._m_config_port = ConfigPort('config')
@property
@typechecked
def name(self) -> str:
"""
Returns the name of the :class:`~pynpoint.core.processing.PypelineModule`. This property
makes sure that the internal module name can not be changed.
Returns
-------
str
The name of the :class:`~pynpoint.core.processing.PypelineModule`
"""
return self._m_name
[docs] @abstractmethod
@typechecked
def connect_database(self,
data_base_in: DataStorage) -> None:
"""
Abstract interface for the function ``connect_database`` which is needed to connect a
:class:`~pynpoint.core.dataio.Port` of a :class:`~pynpoint.core.processing.PypelineModule`
with the :class:`~pynpoint.core.dataio.DataStorage`.
Parameters
----------
data_base_in : pynpoint.core.dataio.DataStorage
The central database.
"""
[docs] @abstractmethod
@typechecked
def run(self) -> None:
"""
Abstract interface for the run method of :class:`~pynpoint.core.processing.PypelineModule`
which inheres the actual algorithm behind the module.
"""
[docs]class ReadingModule(PypelineModule, metaclass=ABCMeta):
"""
The abstract class ReadingModule is an interface for processing steps in the Pypeline which
have only read access to the central data storage. One can specify a directory on the hard
drive where the input data for the module is located. If no input directory is given then
default Pypeline input directory is used. Reading modules have a dictionary of output ports
(self._m_out_ports) but no input ports.
"""
@typechecked
def __init__(self,
name_in: str,
input_dir: Optional[str] = None) -> None:
"""
Abstract constructor of ReadingModule which needs the unique name identifier as input
(more information: :class:`pynpoint.core.processing.PypelineModule`). An input directory
can be specified for the location of the data or else the Pypeline default directory is
used. This function is called in all *__init__()* functions inheriting from this class.
Parameters
----------
name_in : str
The name of the ReadingModule.
input_dir : str
Directory where the input files are located.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
assert (os.path.isdir(str(input_dir)) or input_dir is None), 'Input directory for ' \
'reading module does not exist - input requested: %s.' % input_dir
self.m_input_location = input_dir
self._m_output_ports = {}
[docs] @typechecked
def add_output_port(self,
tag: str,
activation: bool = True) -> OutputPort:
"""
Function which creates an OutputPort for a ReadingModule and appends it to the internal
OutputPort dictionary. This function should be used by classes inheriting from
ReadingModule to make sure that only output ports with unique tags are added. The new
port can be used as: ::
port = self._m_output_ports[tag]
or by using the returned Port.
Parameters
----------
tag : str
Tag of the new output port.
activation : bool
Activation status of the Port after creation. Deactivated ports will not save their
results until they are activated.
Returns
-------
pynpoint.core.dataio.OutputPort
The new OutputPort for the ReadingModule.
"""
port = OutputPort(tag, activate_init=activation)
if tag in self._m_output_ports:
warnings.warn(f'Tag \'{tag}\' of ReadingModule \'{self._m_name}\' is already used.')
if self._m_data_base is not None:
port.set_database_connection(self._m_data_base)
self._m_output_ports[tag] = port
return port
[docs] @typechecked
def connect_database(self,
data_base_in: DataStorage) -> None:
"""
Function used by a ReadingModule to connect all ports in the internal input and output
port dictionaries to the database. The function is called by Pypeline and connects the
DataStorage object to all module ports.
Parameters
----------
data_base_in : pynpoint.core.dataio.DataStorage
The central database.
Returns
-------
NoneType
None
"""
for port in self._m_output_ports.values():
port.set_database_connection(data_base_in)
self._m_config_port.set_database_connection(data_base_in)
self._m_data_base = data_base_in
[docs] @abstractmethod
@typechecked
def run(self) -> None:
"""
Abstract interface for the run method of a ReadingModule which inheres the actual
algorithm behind the module.
"""
[docs]class WritingModule(PypelineModule, metaclass=ABCMeta):
"""
The abstract class WritingModule is an interface for processing steps in the pipeline which
do not change the content of the internal DataStorage. They only have reading access to the
central data base. WritingModules can be used to export data from the HDF5 database.
WritingModules know the directory on the hard drive where the output of the module can be
saved. If no output directory is given the default Pypeline output directory is used.
WritingModules have a dictionary of input ports (self._m_input_ports) but no output ports.
"""
@typechecked
def __init__(self,
name_in: str,
output_dir: Optional[str] = None) -> None:
"""
Abstract constructor of a WritingModule which needs the unique name identifier as input
(more information: :class:`pynpoint.core.processing.PypelineModule`). In addition one can
specify a output directory where the module will save its results. If no output directory is
given the Pypeline default directory is used. This function is called in all *__init__()*
functions inheriting from this class.
Parameters
----------
name_in : str
The name of the WritingModule.
output_dir : str
Directory where the results will be saved.
Returns
-------
NoneType
None
"""
super().__init__(name_in)
assert (os.path.isdir(str(output_dir)) or output_dir is None), 'Output directory for ' \
'writing module does not exist - input requested: %s.' % output_dir
self.m_output_location = output_dir
self._m_input_ports = {}
[docs] @typechecked
def connect_database(self,
data_base_in: DataStorage) -> None:
"""
Function used by a WritingModule to connect all ports in the internal input and output
port dictionaries to the database. The function is called by Pypeline and connects the
DataStorage object to all module ports.
Parameters
----------
data_base_in : pynpoint.core.dataio.DataStorage
The central database.
Returns
-------
NoneType
None
"""
for port in self._m_input_ports.values():
port.set_database_connection(data_base_in)
self._m_config_port.set_database_connection(data_base_in)
self._m_data_base = data_base_in
[docs] @abstractmethod
@typechecked
def run(self) -> None:
"""
Abstract interface for the run method of a WritingModule which inheres the actual
algorithm behind the module.
"""
[docs]class ProcessingModule(PypelineModule, metaclass=ABCMeta):
"""
The abstract class ProcessingModule is an interface for all processing steps in the pipeline
which read, process, and store data. Hence processing modules have read and write access to the
central database through a dictionary of output ports (self._m_output_ports) and a dictionary
of input ports (self._m_input_ports).
"""
@typechecked
def __init__(self,
name_in: str) -> None:
"""
Abstract constructor of a ProcessingModule which needs the unique name identifier as input
(more information: :class:`pynpoint.core.processing.PypelineModule`). Call this function in
all __init__() functions inheriting from this class.
Parameters
----------
name_in : str
The name of the ProcessingModule.
"""
super().__init__(name_in)
self._m_input_ports = {}
self._m_output_ports = {}
[docs] @typechecked
def add_output_port(self,
tag: str,
activation: bool = True) -> OutputPort:
"""
Function which creates an :class:`~pynpoint.core.dataio.OutputPort` for a
:class:`~pynpoint.core.processing.ProcessingModule` and appends it to the internal
:class:`~pynpoint.core.dataio.OutputPort` dictionary. This function should be used by
classes inheriting from :class:`~pynpoint.core.processing.ProcessingModule` to make sure
that only output ports with unique tags are added. The new port can be used as:
.. code-block:: python
port = self._m_output_ports[tag]
or by using the returned :class:`~pynpoint.core.dataio.Port`.
Parameters
----------
tag : str
Tag of the new output port.
activation : bool
Activation status of the :class:`~pynpoint.core.dataio.Port` after creation.
Deactivated ports will not save their results until they are activated.
Returns
-------
pynpoint.core.dataio.OutputPort
The new :class:`~pynpoint.core.dataio.OutputPort` for the
:class:`~pynpoint.core.processing.ProcessingModule`.
"""
port = OutputPort(tag, activate_init=activation)
if tag in self._m_output_ports:
warnings.warn(f'Tag \'{tag}\' of ProcessingModule \'{self._m_name}\' is already used.')
if self._m_data_base is not None:
port.set_database_connection(self._m_data_base)
self._m_output_ports[tag] = port
return port
[docs] @typechecked
def connect_database(self,
data_base_in: DataStorage) -> None:
"""
Function used by a ProcessingModule to connect all ports in the internal input and output
port dictionaries to the database. The function is called by Pypeline and connects the
DataStorage object to all module ports.
Parameters
----------
data_base_in : pynpoint.core.dataio.DataStorage
The central database.
Returns
-------
NoneType
None
"""
for port in self._m_input_ports.values():
port.set_database_connection(data_base_in)
for port in self._m_output_ports.values():
port.set_database_connection(data_base_in)
self._m_config_port.set_database_connection(data_base_in)
self._m_data_base = data_base_in
[docs] @typechecked
def apply_function_in_time(self,
func: Callable,
image_in_port: InputPort,
image_out_port: OutputPort,
func_args: Optional[tuple] = None) -> None:
"""
Applies a function to all pixel lines in time.
Parameters
----------
func : function
The input function.
image_in_port : pynpoint.core.dataio.InputPort
Input port which is linked to the input data.
image_out_port : pynpoint.core.dataio.OutputPort
Output port which is linked to the results.
func_args : tuple, None
Additional arguments which are required by the input function. Not used if set to None.
Returns
-------
NoneType
None
"""
cpu = self._m_config_port.get_attribute('CPU')
init_line = image_in_port[:, 0, 0]
im_shape = image_in_port.get_shape()
size = apply_function(init_line, 0, func, func_args).shape[0]
image_out_port.set_all(data=np.zeros((size, im_shape[1], im_shape[2])),
data_dim=3,
keep_attributes=False)
image_in_port.close_port()
image_out_port.close_port()
capsule = LineProcessingCapsule(image_in_port=image_in_port,
image_out_port=image_out_port,
num_proc=cpu,
function=func,
function_args=func_args,
data_length=size)
capsule.run()
[docs] @typechecked
def apply_function_to_images(self,
func: Callable[..., np.ndarray],
image_in_port: InputPort,
image_out_port: OutputPort,
message: str,
func_args: Optional[tuple] = None) -> None:
"""
Function which applies a function to all images of an input port. Stacks of images are
processed in parallel if the CPU and MEMORY attribute are set in the central configuration.
The number of images per process is equal to the value of MEMORY divided by the value of
CPU. Note that the function *func* is not allowed to change the shape of the images if the
input and output port have the same tag and ``MEMORY`` is not set to None.
Parameters
----------
func : function
The function which is applied to all images. Its definitions should be similar to::
def function(image_in,
parameter1,
parameter2,
parameter3)
The function must return a numpy array.
image_in_port : pynpoint.core.dataio.InputPort
Input port which is linked to the input data.
image_out_port : pynpoint.core.dataio.OutputPort
Output port which is linked to the results.
message : str
Progress message.
func_args : tuple
Additional arguments that are required by the input function.
Returns
-------
NoneType
None
"""
memory = self._m_config_port.get_attribute('MEMORY')
cpu = self._m_config_port.get_attribute('CPU')
nimages = image_in_port.get_shape()[0]
if memory == 0:
memory = nimages
if image_out_port.tag == image_in_port.tag:
# load all images in the memory at once if the input and output tag are the
# same or if the MEMORY attribute is set to None in the configuration file
images = image_in_port.get_all()
result = []
start_time = time.time()
for i in range(nimages):
progress(i, nimages, message+'...', start_time)
args = update_arguments(i, nimages, func_args)
if args is None:
result.append(func(images[i, ], i))
else:
result.append(func(images[i, ], i, *args))
image_out_port.set_all(np.asarray(result), keep_attributes=True)
elif cpu == 1:
# process images one-by-one with a single process if CPU is set to 1
start_time = time.time()
for i in range(nimages):
progress(i, nimages, message+'...', start_time)
args = update_arguments(i, nimages, func_args)
if args is None:
result = func(image_in_port[i, ], i)
else:
result = func(image_in_port[i, ], i, *args)
if result.ndim == 1:
image_out_port.append(result, data_dim=2)
elif result.ndim == 2:
image_out_port.append(result, data_dim=3)
else:
# process images in parallel in stacks of MEMORY/CPU images
print(message, end='')
args = update_arguments(0, nimages, func_args)
result = apply_function(image_in_port[0, :, :], 0, func, args)
result_shape = result.shape
out_shape = [nimages]
for item in result_shape:
out_shape.append(item)
image_out_port.set_all(data=np.zeros(out_shape),
data_dim=len(result_shape)+1,
keep_attributes=False)
image_in_port.close_port()
image_out_port.close_port()
capsule = StackProcessingCapsule(image_in_port=image_in_port,
image_out_port=image_out_port,
num_proc=cpu,
function=func,
function_args=func_args,
stack_size=math.ceil(memory/cpu),
result_shape=result_shape,
nimages=nimages)
capsule.run()
print(' [DONE]')
[docs] @abstractmethod
@typechecked
def run(self) -> None:
"""
Abstract interface for the run method of a
:class:`~pynpoint.core.processing.ProcessingModule` which inheres the actual
algorithm behind the module.
"""