"""
Network - Nodes
===============
Define a collection of :class:`colour.utilities.PortNode` nodes for raw and HDR
image processing.
"""
from __future__ import annotations
import json
import os
import sys
import time
import typing
from dataclasses import dataclass, field
import numpy as np
from colour import (
RGB_COLOURSPACES,
CCT_to_uv,
RGB_Colourspace,
RGB_to_RGB,
UCS_uv_to_xy,
XYZ_to_RGB,
linear_function,
matrix_idt,
sd_blackbody,
sd_CIE_illuminant_D_series,
uv_to_CCT,
xy_to_UCS_uv,
)
from colour.algebra import vecmul
from colour.characterisation import RGB_CameraSensitivities
from colour.hints import Dict
if typing.TYPE_CHECKING:
from colour.hints import Any, NDArrayFloat, Sequence
from colour.io import (
Image_Specification_Attribute,
convert_bit_depth,
read_image_OpenImageIO,
write_image_OpenImageIO,
)
from colour.temperature import CCT_to_xy_CIE_D
from colour.utilities import (
CanonicalMapping,
ExecutionNode,
as_float_array,
batch,
ones,
orient,
required,
slugify,
zeros,
)
from colour_hdri import (
ImageStack,
convert_raw_files_to_dng_files,
double_sigmoid_anchored_function,
image_stack_to_HDRI,
read_dng_files_exif_tags,
read_exif_tags,
tonemapping_operator_Reinhard2004,
)
from colour_hdri.models import (
CCS_ILLUMINANT_ADOBEDNG,
CCT_ILLUMINANTS_ADOBEDNG,
LIGHT_SOURCE_TAG_TO_DNG_ILLUMINANTS,
camera_neutral_to_xy,
matrix_XYZ_to_camera_space,
xy_to_camera_neutral,
)
from colour_hdri.process import DNG_CONVERTER
from colour_hdri.utilities import notify_process_state, vivified_to_dict
__author__ = "Colour Developers"
__copyright__ = "Copyright 2015 Colour Developers"
__license__ = "BSD-3-Clause - https://opensource.org/licenses/BSD-3-Clause"
__maintainer__ = "Colour Developers"
__email__ = "colour-developers@colour-science.org"
__status__ = "Production"
__all__ = [
"JSONEncoderEXRAttribute",
"InputTransform",
"NodeConvertRawFileToDNGFile",
"NodeReadImage",
"NodeWriteImage",
"NodeWritePreviewImage",
"NodeRemoveFile",
"NodeOrient",
"NodeWatermark",
"NodeProcessingMetadata",
"NodeReadFileExifData",
"NodeReadFileMetadataDNG",
"NodeComputeInputTransformDNG",
"NodeApplyInputTransformDNG",
"NodeFetchCameraSensitivities",
"NodeComputeInputTransformCameraSensitivities",
"NodeApplyInputTransformCameraSensitivities",
"NodeProcessRawFileRawpy",
"NodeCorrectLensAberrationLensFun",
"NodeDownsample",
"NodeCreateBatches",
"NodeCreateImageStack",
"NodeMergeImageStack",
"NodeNormaliseExposure",
]
class JSONEncoderEXRAttribute(json.JSONEncoder):
"""
Define an encoder that serialize *JSON* data for storing inside an *EXR*
attribute.
"""
def default(self, o: Any) -> Any:
"""
Return a *JSON* serialisable object from given object.
Parameters
----------
o
Object to return a *JSON* serialisable object from.
Returns
-------
:class:`object`
*JSON* serialisable object
"""
if isinstance(o, CanonicalMapping):
return dict(o.items())
if isinstance(o, (np.float32, np.float64)): # pyright: ignore
return float(o)
if isinstance(o, (np.int32, np.int64)): # pyright: ignore
return int(o)
if isinstance(o, np.ndarray):
return o.tolist()
return super().default(o)
@dataclass
class InputTransform:
"""
Define an input transform for a camera.
Parameters
----------
M
Matrix :math:`M`.
RGB_w
White balance multipliers :math:`RGB_w`.
"""
M: NDArrayFloat = field(default_factory=lambda: np.identity(3))
RGB_w: NDArrayFloat = field(default_factory=lambda: ones(3))
__hash__ = None # pyright: ignore
def __eq__(self, other: object) -> bool:
"""
Return whether the input transform is equal to given other object.
Parameters
----------
other
Object to test whether it is equal to the input transform.
Returns
-------
:class:`bool`
Whether given object is equal to the input transform.
"""
if not isinstance(other, InputTransform):
return False
return np.all(self.M == other.M) and np.all(self.RGB_w == other.RGB_w) # pyright: ignore
[docs]
class NodeConvertRawFileToDNGFile(ExecutionNode):
"""
Convert given raw file, e.g., *CR2*, *CR3*, *NEF*, to *DNG*.
Methods
-------
- :meth:`~colour_hdri.NodeConvertRawFileToDNGFile.__init__`
- :meth:`~colour_hdri.NodeConvertRawFileToDNGFile.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Raw/Processing", **kwargs})
self.description = 'Convert given raw file, e.g., "CR2", "CR3", "NEF", to "DNG"'
self.add_input_port("raw_file_path")
self.add_input_port("output_directory")
self.add_input_port("dng_converter")
self.add_input_port("dng_converter_arguments")
self.add_output_port("dng_file_path")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
raw_file_path = self.get_input("raw_file_path")
if raw_file_path is None:
return
if not os.path.exists(raw_file_path):
self.log(f'"{raw_file_path}" file does not exist!', "error")
return
output_directory = self.get_input("output_directory")
if output_directory is None:
output_directory = os.path.dirname(raw_file_path)
dng_file_path = next(
iter(
convert_raw_files_to_dng_files(
[raw_file_path],
output_directory,
self.get_input("dng_converter"),
self.get_input("dng_converter_arguments"),
)
)
)
if not os.path.exists(dng_file_path):
self.log(
f'"{dng_file_path}" file does not exist, conversion failed!',
"error",
)
return
self.set_output("dng_file_path", dng_file_path)
self.dirty = False
def _is_linear_file_format(path: str) -> bool:
"""
Return whether the file at specified path is a linear file type.
"""
return os.path.splitext(path)[-1].lower() in (".exr", ".hdr")
[docs]
class NodeReadImage(ExecutionNode):
"""
Read the image from input path and return its data and metadata. The
decoding CCTF of the input colourspace is used to linearise the image if it
is stored using non-linear file format.
Methods
-------
- :meth:`~colour_hdri.NodeReadImage.__init__`
- :meth:`~colour_hdri.NodeReadImage.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Image/IO", **kwargs})
self.description = (
"Read the image from input path and return its data and metadata"
)
self.add_input_port("path")
self.add_input_port("input_colourspace", "sRGB")
self.add_output_port("image")
self.add_output_port("metadata")
self.add_output_port("exif_tags")
@required("OpenImageIO")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
path = self.get_input("path")
if path is None:
return
if not os.path.exists(path):
self.log(f'"{path}" image does not exist!')
return
image, metadata = read_image_OpenImageIO(path, additional_data=True)
input_colourspace = self.get_input("input_colourspace")
if isinstance(input_colourspace, str):
input_colourspace = RGB_COLOURSPACES[input_colourspace]
if not _is_linear_file_format(path):
image = input_colourspace.cctf_decoding(image)
tags = read_exif_tags(path)
self.set_output("image", image)
self.set_output("metadata", metadata)
self.set_output("exif_tags", tags)
self.dirty = False
[docs]
class NodeWriteImage(ExecutionNode):
"""
Write the input image to input path using the input metadata. The encoding
CCTF of the output colourspace is used to non-linearly encode the image if
it is to be stored using a non-linear file format.
Methods
-------
- :meth:`~colour_hdri.NodeWriteImage.__init__`
- :meth:`~colour_hdri.NodeWriteImage.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Image/IO", **kwargs})
self.description = (
"Write the input image to input path using the input metadata"
)
self.add_input_port("image")
self.add_input_port("metadata")
self.add_input_port("path")
self.add_input_port("exif_tags")
self.add_input_port("output_colourspace")
self.add_input_port("bit_depth", "float32")
self.add_input_port("bypass", False)
@required("OpenImageIO")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
if self.get_input("bypass"):
return
image = self.get_input("image")
if image is None:
return
path = self.get_input("path")
if path is None:
return
metadata = self.get_input("metadata")
if metadata is None:
metadata = {}
attributes = []
for key, value in metadata.items():
attributes.append(
Image_Specification_Attribute(
str(key), json.dumps(value, cls=JSONEncoderEXRAttribute)
)
)
output_colourspace = self.get_input("output_colourspace")
if isinstance(output_colourspace, str):
output_colourspace = RGB_COLOURSPACES[output_colourspace]
if not _is_linear_file_format(path):
image = output_colourspace.cctf_encoding(image)
bit_depth = self.get_input("bit_depth")
write_image_OpenImageIO(image, path, bit_depth=bit_depth, attributes=attributes)
self.dirty = False
def _cctf_encoding_preview(a: NDArrayFloat) -> NDArrayFloat:
"""
Encode given image :math:`a` using the Reinhard (2004) global tonemapping
function.
"""
# Reinhard (2004) rendering is dim, increasing exposure by a stop.
return tonemapping_operator_Reinhard2004(a * 2)
[docs]
class NodeWritePreviewImage(ExecutionNode):
"""
Write the image at input image path as a preview image.
Methods
-------
- :meth:`~colour_hdri.NodeWritePreviewImage.__init__`
- :meth:`~colour_hdri.NodeWritePreviewImage.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Image/IO", **kwargs})
self.description = "Write the image at input image path as a preview image"
self.add_input_port("path")
self.add_input_port("cctf_encoding", _cctf_encoding_preview)
self.add_input_port("bypass", False)
self.add_output_port("preview_path")
@required("OpenImageIO")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
if self.get_input("bypass"):
return
path = self.get_input("path")
if path is None:
return
if not os.path.exists(path):
self.log(f'"{path}" image does not exist!')
return
cctf_encoding = self.get_input("cctf_encoding")
filename, extension = os.path.splitext(path)
preview_path = f"{filename}.jpg"
write_image_OpenImageIO(
cctf_encoding(read_image_OpenImageIO(path)), preview_path
)
self.set_output("preview_path", preview_path)
self.dirty = False
[docs]
class NodeRemoveFile(ExecutionNode):
"""
Remove the file at input path.
Methods
-------
- :meth:`~colour_hdri.NodeRemoveFile.__init__`
- :meth:`~colour_hdri.NodeRemoveFile.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Os", **kwargs})
self.description = "Remove the file at input path"
self.add_input_port("path")
self.add_input_port("bypass", False)
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
if self.get_input("bypass"):
return
path = self.get_input("path")
if not os.path.exists(path):
self.log(f'"{path}" file does not exist!', "error")
return
os.remove(path)
self.dirty = False
class NodeOrient(ExecutionNode):
"""
Orient the input image.
Methods
-------
- :meth:`~colour_hdri.NodeRemoveFile.__init__`
- :meth:`~colour_hdri.NodeRemoveFile.process`
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Image/Transform", **kwargs})
self.description = "Orient the input image"
self.add_input_port("input_image")
self.add_input_port("orientation")
self.add_input_port("bypass", False)
self.add_output_port("output_image")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
input_image = self.get_input("input_image")
if input_image is None:
return
self.set_output("output_image", input_image)
if self.get_input("bypass"):
return
orientation = self.get_input("orientation")
if orientation is None:
return
self.log(f'Orienting image "{orientation}"...')
self.set_output("output_image", orient(input_image, orientation))
self.dirty = False
[docs]
class NodeWatermark(ExecutionNode):
"""
Watermark the input image using given input metadata.
Methods
-------
- :meth:`~colour_hdri.NodeWatermark.__init__`
- :meth:`~colour_hdri.NodeWatermark.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Image/Filter", **kwargs})
self.description = "Watermark the input image using given input metadata"
self.add_input_port("input_image")
self.add_input_port("metadata")
self.add_input_port("include_exposure_information", True)
self.add_input_port("bypass", False)
self.add_output_port("output_image")
@required("OpenCV") # pyright: ignore
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
input_image = self.get_input("input_image")
if input_image is None:
return
self.set_output("output_image", input_image)
if self.get_input("bypass"):
return
metadata = self.get_input("metadata")
if metadata is None:
metadata = {}
exif_group = metadata.get("EXIF")
if exif_group is None:
self.log(
'Could not read "EXIF" metadata from input metadata!',
"error",
)
return
import cv2 # noqa: PLC0415
text = (
f"{exif_group['Camera Model Name']} - "
f"{exif_group['Lens Model']} - "
f"{exif_group['Focal Length']}mm"
)
if self.get_input("include_exposure_information"):
text += (
" - "
f'{exif_group["Exposure Time"]:.6f}" '
f"f{exif_group['F Number']} "
f"{exif_group['ISO']}"
)
watermark = zeros(input_image.shape)
cv2.putText(
watermark,
text,
(10, input_image.shape[0] - 10),
cv2.FONT_HERSHEY_PLAIN,
fontScale=2.5,
color=(255, 255, 255),
thickness=1,
lineType=cv2.LINE_AA,
)
output_image = np.maximum(input_image, watermark)
self.set_output("output_image", output_image)
self.dirty = False
class NodeReadFileExifData(ExecutionNode):
"""
Return the EXIF tags from the input image.
Methods
-------
- :meth:`~colour_hdri.NodeReadFileExifData.__init__`
- :meth:`~colour_hdri.NodeReadFileExifData.process`
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Image/Metadata", **kwargs})
self.description = "Return the EXIF tags from the input image."
self.add_input_port("file_path")
self.add_output_port("exif_tags")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
file_path = self.get_input("file_path")
if file_path is None:
return
if not os.path.exists(file_path):
self.log(f'"{file_path}" file does not exist!', "error")
return
self.set_output("exif_tags", vivified_to_dict(read_exif_tags(file_path)))
self.dirty = False
class NodeFetchCameraSensitivities(ExecutionNode):
"""
Fetch the *Camera Sensitivities* from *colour-datasets*.
Methods
-------
- :meth:`~colour_hdri.NodeFetchCameraSensitivities.__init__`
- :meth:`~colour_hdri.NodeFetchCameraSensitivities.process`
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Raw/InputTransform", **kwargs})
self.description = "Fetch the *Camera Sensitivities* from *colour-datasets*"
self.add_input_port("dataset")
self.add_input_port("keys")
self.add_input_port("name")
self.add_output_port("camera_sensitivities")
@required("colour-datasets") # pyright: ignore
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
import colour_datasets # noqa: PLC0415
def get_camera_sensitivities(
mapping: CanonicalMapping | Dict, keys: Sequence[str]
) -> RGB_CameraSensitivities | None:
"""Get the *camera sensitivities* from the dataset."""
current = mapping
for key in keys:
if isinstance(current, (CanonicalMapping, Dict)):
current = current.get(key, None)
else:
return None
return current # pyright: ignore
dataset = colour_datasets.load(self.get_input("dataset"))
camera_sensitivities = CanonicalMapping(
{
self.get_input("name"): get_camera_sensitivities(
dataset, self.get_input("keys")
)
}
)
self.set_output("camera_sensitivities", camera_sensitivities)
self.dirty = False
[docs]
class NodeProcessRawFileRawpy(ExecutionNode):
"""
Process given raw file, e.g., *CR2*, *CR3*, *NEF*, using *Rawpy*.
Methods
-------
- :meth:`~colour_hdri.NodeProcessRawFileRawpy.__init__`
- :meth:`~colour_hdri.NodeProcessRawFileRawpy.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Raw/Processing", **kwargs})
self.description = (
'Process given raw file, e.g., "CR2", "CR3", "NEF", using "Rawpy"'
)
self.add_input_port("raw_file_path")
self.add_input_port("input_transform", InputTransform())
self.add_output_port("image")
@required("rawpy") # pyright: ignore
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
raw_file_path = self.get_input("raw_file_path")
if raw_file_path is None:
return
if not os.path.exists(raw_file_path):
self.log(f'"{raw_file_path}" file does not exist!', "error")
return
import rawpy # noqa: PLC0415
input_transform = self.get_input("input_transform")
with rawpy.imread(raw_file_path) as raw_file:
self.log(f'Processing "{raw_file_path}" file...')
image = raw_file.postprocess(
gamma=(1, 1),
no_auto_bright=True,
demosaic_algorithm=rawpy.DemosaicAlgorithm(12), # pyright: ignore
fbdd_noise_reduction=rawpy.FBDDNoiseReductionMode(2), # pyright: ignore
highlight_mode=rawpy.HighlightMode(5), # pyright: ignore
output_color=rawpy.ColorSpace(0), # pyright: ignore
output_bps=16,
user_wb=np.hstack(
[
input_transform.RGB_w,
input_transform.RGB_w[1],
]
).tolist(),
)
image = convert_bit_depth(image, "float32")
self.set_output("image", image)
self.dirty = False
[docs]
class NodeCorrectLensAberrationLensFun(ExecutionNode):
"""
Correct the input image lens aberrations, i.e., vignette, distortion and
chromatic aberration, using *LensFun*.
Methods
-------
- :meth:`~colour_hdri.NodeCorrectLensAberrationLensFun.__init__`
- :meth:`~colour_hdri.NodeCorrectLensAberrationLensFun.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Raw/Correction", **kwargs})
self.description = (
"Correct the input image lens aberrations, i.e., vignette, "
'distortion and chromatic aberration, using "LensFun"'
)
self.add_input_port("input_image")
self.add_input_port("metadata", 1)
self.add_input_port("correct_vignette", True)
self.add_input_port("correct_chromatic_aberration", True)
self.add_input_port("correct_distortion", True)
self.add_input_port("focus_distance", 100)
self.add_input_port("bypass", False)
self.add_output_port("output_image")
@required("lensfunpy", "OpenCV") # pyright: ignore
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
input_image = self.get_input("input_image")
if input_image is None:
return
self.set_output("output_image", input_image)
if self.get_input("bypass"):
return
metadata = self.get_input("metadata")
if metadata is None:
return
exif_group = metadata.get("EXIF")
if exif_group is None:
self.log(
'Could not read "EXIF" metadata from input metadata!',
"error",
)
return
import cv2 # noqa: PLC0415
import lensfunpy # noqa: PLC0415
database = lensfunpy.Database() # pyright: ignore
camera_make = exif_group["Make"]
camera_model = exif_group["Camera Model Name"]
if DNG_CONVERTER == "dnglab":
self.log(
f'"dnglab" used, prepending "{camera_make}" camera make.', "warning"
)
camera_model = f"{camera_make} {camera_model}"
self.log(
f'Searching for "{camera_make}" "{camera_model}" camera model.',
)
camera = next(
iter(
database.find_cameras(
maker=camera_make, model=camera_model, loose_search=True
)
),
None,
)
self.log(
f'Using "{camera}" camera for lens aberrations correction.',
)
lens_model = exif_group["Lens Model"]
self.log(
f'Searching for "{lens_model}" lens model.',
)
lens = next(
iter(database.find_lenses(camera, lens=lens_model, loose_search=True)), None
)
self.log(
f'Using "{lens}" lens for lens aberrations correction.',
)
focal_length = exif_group["Focal Length"]
aperture = exif_group["F Number"]
distance = self.get_input("focus_distance")
width = input_image.shape[1]
height = input_image.shape[0]
modifier = lensfunpy.Modifier(lens, camera.crop_factor, width, height) # pyright: ignore
modifier.initialize(
focal_length,
aperture,
distance,
pixel_format=np.float32,
flags=lensfunpy.ModifyFlags.ALL, # pyright: ignore
)
output_image = input_image
if self.get_input("correct_vignette"):
self.log("Correcting lens vignette...")
if modifier.apply_color_modification(output_image):
self.log("Lens vignette was successfully corrected!")
else:
self.log(
"Lens vignette was not corrected, the lens might be missing data."
)
if self.get_input("correct_chromatic_aberration"):
self.log("Correcting lens chromatic aberration...")
coordinates = modifier.apply_subpixel_distortion()
if coordinates is not None:
output_image[..., 0] = cv2.remap( # pyright: ignore
output_image[..., 0],
coordinates[..., 0, :],
None, # pyright: ignore
cv2.INTER_CUBIC,
)
output_image[..., 1] = cv2.remap( # pyright: ignore
output_image[..., 1],
coordinates[..., 1, :],
None, # pyright: ignore
cv2.INTER_CUBIC,
)
output_image[..., 2] = cv2.remap( # pyright: ignore
output_image[..., 2],
coordinates[..., 2, :],
None, # pyright: ignore
cv2.INTER_CUBIC,
)
self.log("Lens chromatic aberration was successfully corrected!")
else:
self.log(
"Lens chromatic aberration was not corrected, "
"the lens might be missing data."
)
if self.get_input("correct_distortion"):
self.log("Correcting lens distortion...")
coordinates = modifier.apply_geometry_distortion()
if coordinates is not None:
output_image = cv2.remap( # pyright: ignore
output_image,
coordinates,
None, # pyright: ignore
cv2.INTER_CUBIC,
)
self.log("Lens distortion was successfully corrected!")
else:
self.log(
"Lens distortion was not corrected, the lens might be missing data."
)
self.set_output("output_image", output_image)
self.dirty = False
[docs]
class NodeDownsample(ExecutionNode):
"""
Downsample the input image by the input downsampling factor.
Methods
-------
- :meth:`~colour_hdri.NodeDownsample.__init__`
- :meth:`~colour_hdri.NodeDownsample.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Image/Transform", **kwargs})
self.description = "Downsample the input image by the input downsampling factor"
self.add_input_port("input_image")
self.add_input_port("factor", 1)
self.add_input_port("bypass", False)
self.add_output_port("output_image")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
input_image = self.get_input("input_image")
if input_image is None:
return
self.set_output("output_image", input_image)
if self.get_input("bypass"):
return
factor = self.get_input("factor")
output_image = input_image[::factor, ::factor, ...]
self.set_output("output_image", output_image)
self.dirty = False
[docs]
class NodeCreateBatches(ExecutionNode):
"""
Create batches from the input array.
Methods
-------
- :meth:`~colour_hdri.NodeCreateBatches.__init__`
- :meth:`~colour_hdri.NodeCreateBatches.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Utilities", **kwargs})
self.description = "Create batches from the input array"
self.add_input_port("array", [])
self.add_input_port("batch_size", 3)
self.add_output_port("batches", [])
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
array = self.get_input("array")
if len(array) == 0:
return
self.set_output("batches", list(batch(array, self.get_input("batch_size"))))
self.dirty = False
[docs]
class NodeCreateImageStack(ExecutionNode):
"""
Create an image stack from the input files.
Methods
-------
- :meth:`~colour_hdri.NodeCreateImageStack.__init__`
- :meth:`~colour_hdri.NodeCreateImageStack.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "HDRI", **kwargs})
self.description = "Create an image stack from the input files"
self.add_input_port("paths")
self.add_input_port("cctf_decoding", linear_function)
self.add_output_port("image_stack")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
paths = self.get_input("paths")
if len(paths) == 0:
return
for path in paths:
if not os.path.exists(path):
self.log(f'"{path}" file does not exist!', "error")
return
self.set_output(
"image_stack",
ImageStack.from_files(
paths, self.get_input("cctf_decoding"), read_data=False
),
)
self.dirty = False
[docs]
class NodeMergeImageStack(ExecutionNode):
"""
Merge to HDRI the input image stack.
Methods
-------
- :meth:`~colour_hdri.NodeMergeImageStack.__init__`
- :meth:`~colour_hdri.NodeMergeImageStack.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "HDRI", **kwargs})
self.description = "Merge to HDRI the input image stack"
self.add_input_port("image_stack")
self.add_input_port("weighting_function", double_sigmoid_anchored_function)
self.add_output_port("image")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
image_stack = self.get_input("image_stack")
if image_stack is None:
return
self.set_output(
"image",
image_stack_to_HDRI(image_stack, self.get_input("weighting_function")),
)
image_stack.clear_data()
self.dirty = False
[docs]
class NodeNormaliseExposure(ExecutionNode):
"""
Normalise the exposure of the input images by dividing them by given
normalising factor, or automatically derived from the median of the
images at given paths and multiplying them by the given scaling factor.
Methods
-------
- :meth:`~colour_hdri.NodeNormaliseExposure.__init__`
- :meth:`~colour_hdri.NodeNormaliseExposure.process`
"""
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **{"category": "Exposure", **kwargs})
self.description = (
"Normalise the exposure of the input images by dividing them by given "
"normalisation factor, or automatically derived from the median of the "
"images at given paths and multiplying them by the given scaling factor."
)
self.add_input_port("image_paths", [])
self.add_input_port("normalisation_factor", None)
self.add_input_port("scaling_factor", 0.2)
self.add_input_port("bypass", False)
@required("OpenImageIO")
@notify_process_state
def process(self, **kwargs: Any) -> None: # noqa: ARG002
"""
Process the node.
"""
if self.get_input("bypass"):
return
image_paths = self.get_input("image_paths")
if len(image_paths) == 0:
return
normalisation_factor = self.get_input("normalisation_factor")
if normalisation_factor is None:
median = []
for image_path in image_paths:
if not os.path.exists(image_path):
self.log(f'"{image_path}" image does not exist!')
return
median.append(
np.median(read_image_OpenImageIO(image_path, additional_data=False))
)
normalisation_factor = 1 / np.median(median)
self.log(f"Normalisation factor: {normalisation_factor}")
for image_path in image_paths:
image, attributes = read_image_OpenImageIO(image_path, additional_data=True)
image *= normalisation_factor
image *= self.get_input("scaling_factor")
write_image_OpenImageIO(image, image_path, attributes=attributes)
self.dirty = False