Source code for pygeoprocessing.kernels

"""A library of kernels for use in ``pygeoprocessing.convolve_2d``.

``pygeoprocessing.convolve_2d`` has some stringent requirements for its kernels
that require a thorough understanding of GDAL and GeoTiffs to be able to use,
including:

    * Kernels must be GDAL-readable rasters, preferably GeoTiffs
    * Kernels must not be striped, and, for more efficient disk I/O, should be
      tiled.
    * The pixel values of a kernel may not represent nodata (GDAL's internal
      way of indicating invalid pixel data)
    * Pixel values must be finite, real numbers.  That is, they may not
      represent positive or negative infinity or NaN.

The functions in this module represent kernels that we have found to be
commonly used, including:

    * Dichotomous kernel: :meth:`dichotomous_kernel`
    * Gaussian (normal) decay kernel: :meth:`normal_distribution_kernel`
    * Linear decay kernel: :meth:`linear_decay_kernel`
    * Exponential decay kernel: :meth:`exponential_decay_kernel`

Additionally, the user may define their own kernel using the helper functions
included here:

    * :meth:`kernel_from_numpy_array`, for kernels where the kernel array is
      already available as a ``numpy`` array, such as those used in common
      image processing operations like sharpening and edge detection.
      https://en.wikipedia.org/wiki/Kernel_(image_processing)
    * :meth:`create_distance_decay_kernel`, for kernels that are a function of
      distance from the center pixel
"""
import logging
import math
from typing import Callable
from typing import Text
from typing import Union

import numpy
import pygeoprocessing
from numpy.typing import ArrayLike
from osgeo import gdal

FLOAT32_NODATA = float(numpy.finfo(numpy.float32).min)
LOGGER = logging.getLogger(__name__)


[docs]def kernel_from_numpy_array( numpy_array: ArrayLike, target_kernel_path: Text) -> None: """Create a convolution kernel from a numpy array. Args: numpy_array: A 2-dimensional numpy array to convert into a raster. target_kernel_path: The path to where the kernel should be written. Returns: ``None`` """ n_dimensions = numpy.array(numpy_array).ndim if n_dimensions != 2: raise ValueError( "The kernel array must have exactly 2 dimensions, not " f"{n_dimensions}") # The kernel is technically a TIFF, not a GeoTiff. pygeoprocessing.numpy_array_to_raster( numpy_array, target_nodata=None, pixel_size=None, origin=None, projection_wkt=None, target_path=target_kernel_path)
[docs]def dichotomous_kernel( target_kernel_path: Text, max_distance: Union[int, float], normalize: bool = True) -> None: """Create a binary kernel indicating presence/absence within a distance. This is equivalent to ``int(dist <= max_distance)`` for each pixel in the kernel, where ``dist`` is the euclidean distance of the pixel's centerpoint to the centerpoint of the center pixel. Args: target_kernel_path: The path to where the kernel will be written. Must have a file extension of ``.tif``. max_distance: The distance threshold, in pixels. Kernel pixels that are greater than ``max_distance`` from the centerpoint of the kernel will have values of ``0.0``. normalize: Whether to normalize the kernel. Returns: ``None`` """ def _dichotomous(dist): return dist <= max_distance create_distance_decay_kernel( target_kernel_path=target_kernel_path, distance_decay_function=_dichotomous, max_distance=max_distance, normalize=normalize )
[docs]def exponential_decay_kernel( target_kernel_path: Text, max_distance: Union[int, float], expected_distance: Union[int, float], normalize: bool = True) -> None: """Create an exponential decay kernel. This is equivalent to ``e**(-dist / expected_distance)`` for each pixel in the kernel, where ``dist`` is the euclidean distance of the pixel's centerpoint to the centerpoint of the center pixel and ``expected_distance`` represents the distance at which the kernel's values reach ``1/e``. The kernel will continue to have nonzero values out to ``max_distance``. Args: target_kernel_path: The path to where the kernel will be written. Must have a file extension of ``.tif``. max_distance: The maximum distance of the kernel, in pixels. Kernel pixels that are greater than ``max_distance`` from the centerpoint of the kernel will have values of ``0.0``. expected_distance: The distance, in pixels, from the centerpoint at which decayed values will equal ``1/e``. normalize: Whether to normalize the kernel. Returns: ``None`` """ def _exponential_decay(dist): return numpy.exp(-dist / expected_distance) create_distance_decay_kernel( target_kernel_path=target_kernel_path, distance_decay_function=_exponential_decay, max_distance=max_distance, normalize=normalize )
[docs]def linear_decay_kernel( target_kernel_path: Text, max_distance: Union[int, float], normalize: bool = True) -> None: """Create a linear decay kernel. This is equivalent to ``(max_distance - dist) / max_distance`` for each pixel in the kernel, where ``dist`` is the euclidean distance between the centerpoint of the current pixel and the centerpoint of the center pixel in the kernel. Args: target_kernel_path: The path to where the kernel will be written. Must have a file extension of ``.tif``. max_distance: The maximum distance of the kernel, in pixels. Kernel pixels that are greater than ``max_distance`` from the centerpoint of the kernel will have values of ``0.0``. normalize: Whether to normalize the kernel. Returns: ``None`` """ def _linear_decay(dist): return (max_distance - dist) / max_distance create_distance_decay_kernel( target_kernel_path=target_kernel_path, distance_decay_function=_linear_decay, max_distance=max_distance, normalize=normalize )
[docs]def normal_distribution_kernel( target_kernel_path: Text, sigma: Union[int, float], n_std_dev: Union[int, float] = 3, normalize: bool = True): """Create an decay kernel following a normal distribution. This is equivalent to ``(1/(2*pi*sigma**2))*(e**((-dist**2)/(2*sigma**2)))`` for each pixel, where ``dist`` is the euclidean distance between the current pixel and the centerpoint of the center pixel. Args: target_kernel_path: The path to where the kernel will be written. Must have a file extension of ``.tif``. sigma: The width (in pixels) of a standard deviation. n_std_dev: The number of standard deviations to include in the kernel. The kernel will have values of 0 when at a distance of ``(sigma * n_std_dev)`` away from the centerpoint. normalize: Whether to normalize the kernel. Returns: ``None`` """ def _normal_decay(dist): return (1 / (2 * numpy.pi * sigma ** 2)) * numpy.exp( (-dist ** 2) / (2 * sigma ** 2)) create_distance_decay_kernel( target_kernel_path=target_kernel_path, distance_decay_function=_normal_decay, max_distance=(sigma * n_std_dev), normalize=normalize )
[docs]def create_distance_decay_kernel( target_kernel_path: str, distance_decay_function: Union[str, Callable], max_distance: Union[int, float], normalize=True): """ Create a kernel raster based on pixel distance from the centerpoint. Args: target_kernel_path (string): The path to where the target kernel should be written on disk. If this file does not have the suffix ``.tif``, it will be added to the filepath. distance_decay_function (callable or str): A python callable that takes as input a single 1D numpy array and returns a 1D numpy array. The input array will contain float32 distances to the centerpoint pixel of the kernel, in units of pixels. If a ``str``, then it must be a python expression using the local variables: * ``dist`` - a 1D numpy array of distances from the centerpoint. * ``max_dist`` - a float indicating the max distance. max_distance (float): The maximum distance of kernel values from the center point. Values outside of this distance will be set to ``0.0``. normalize=False (bool): Whether to normalize the resulting kernel. Returns: ``None`` """ apothem = math.floor(max_distance) kernel_size = apothem * 2 + 1 # allow for a center pixel assert kernel_size % 2 == 1 driver = gdal.GetDriverByName('GTiff') kernel_dataset = driver.Create( target_kernel_path.encode('utf-8'), kernel_size, kernel_size, 1, gdal.GDT_Float32, options=[ 'BIGTIFF=IF_SAFER', 'TILED=YES', 'BLOCKXSIZE=256', 'BLOCKYSIZE=256']) # NOTE: We are deliberately NOT setting a coordinate system because it # isn't needed. By omitting this, we're telling GDAL to just create a # TIFF. kernel_band = kernel_dataset.GetRasterBand(1) kernel_nodata = FLOAT32_NODATA kernel_band.SetNoDataValue(kernel_nodata) kernel_band = None kernel_dataset = None kernel_raster = gdal.OpenEx(target_kernel_path, gdal.GA_Update) kernel_band = kernel_raster.GetRasterBand(1) band_x_size = kernel_band.XSize band_y_size = kernel_band.YSize running_sum = 0 # If the user provided a string rather than a callable, assume it's a # python expression appropriate for evaling. if isinstance(distance_decay_function, str): # Avoid recompiling on each iteration. code = compile(distance_decay_function, '<string>', 'eval') numpy_namespace = {name: getattr(numpy, name) for name in dir(numpy)} def distance_decay_function(d): result = eval( code, numpy_namespace, # globals {'dist': d, 'max_dist': max_distance}) # locals return result for block_data in pygeoprocessing.iterblocks( (target_kernel_path, 1), offset_only=True): array_xmin = block_data['xoff'] - apothem array_xmax = min( array_xmin + block_data['win_xsize'], band_x_size - apothem) array_ymin = block_data['yoff'] - apothem array_ymax = min( array_ymin + block_data['win_ysize'], band_y_size - apothem) pixel_dist_from_center = numpy.hypot( *numpy.mgrid[ array_ymin:array_ymax, array_xmin:array_xmax]) valid_pixels = (pixel_dist_from_center <= max_distance) kernel = numpy.zeros(pixel_dist_from_center.shape, dtype=numpy.float32) kernel[valid_pixels] = distance_decay_function( pixel_dist_from_center[valid_pixels]) if normalize: running_sum += kernel.sum() kernel_band.WriteArray( kernel, yoff=block_data['yoff'], xoff=block_data['xoff']) kernel_raster.FlushCache() kernel_band = None kernel_raster = None if normalize: kernel_raster = gdal.OpenEx(target_kernel_path, gdal.GA_Update) kernel_band = kernel_raster.GetRasterBand(1) for block_data, kernel_block in pygeoprocessing.iterblocks( (target_kernel_path, 1)): # divide by sum to normalize kernel_block /= running_sum kernel_band.WriteArray( kernel_block, xoff=block_data['xoff'], yoff=block_data['yoff']) kernel_raster.FlushCache() kernel_band = None kernel_raster = None