Source code for floodlight.transforms.temporal

from copy import deepcopy
from typing import Union

import numpy as np
import scipy.interpolate

from floodlight import Code, XY
from floodlight.core.property import (
    BaseProperty,
    DyadicProperty,
    PlayerProperty,
    TeamProperty,
)


def _resample_array(
    values: np.ndarray,
    src_framerate: int,
    tgt_framerate: int,
    interp_method: Union[str, None],
    order: int,
    k: int,
) -> np.ndarray:
    """Core numerical resample routine.

    Parameters
    ----------
    values: np.ndarray
        Two-dimensional source array of shape ``(T, M)``. Callers flatten
        higher-rank payloads to ``(T, M)`` before calling.
    src_framerate, tgt_framerate: int
        Source and target framerates in Hz.
    interp_method: str or None
        Interpolation method for the upsample path; ignored on the identity
        and downsample paths.
    order, k: int
        Polynomial order / spline degree used by the respective methods.

    Returns
    -------
    out: np.ndarray
        Two-dimensional array of shape ``(N_out, M)``. Identity and
        downsample paths preserve ``values.dtype``; the upsample path
        returns ``float64`` to accommodate NaN.
    """
    # Empty-input short-circuit: bypass framerate math for T == 0 arrays.
    if values.shape[0] == 0:
        return values.copy()

    if tgt_framerate == src_framerate:
        resampled = deepcopy(values)
    elif tgt_framerate < src_framerate:
        resampled = _downsample(values, tgt_framerate, src_framerate)
    else:
        resampled = _upsample_columns(
            values, src_framerate, tgt_framerate, interp_method, order, k
        )

    return resampled


def _upsample_columns(
    values: np.ndarray,
    src_framerate: int,
    tgt_framerate: int,
    interp_method: Union[str, None],
    order: int,
    k: int,
) -> np.ndarray:
    """Per-column upsample helper.

    Parameters
    ----------
    values: np.ndarray
        Two-dimensional source array of shape ``(T, M)``.
    src_framerate, tgt_framerate: int
        Source and target framerates in Hz; ``tgt_framerate > src_framerate``.
    interp_method: str or None
        Interpolation method. ``None`` fills only target positions whose
        time coincides with a source time and leaves the rest NaN.
    order, k: int
        Polynomial order / spline degree used by the respective methods.

    Returns
    -------
    out: np.ndarray
        Two-dimensional array of shape ``(N_out, M)`` and dtype ``float64``.
    """
    N_src = values.shape[0]
    # Endpoint-inclusive length: last source sample aligns with last target sample.
    N_out = (N_src - 1) * tgt_framerate // src_framerate + 1
    M = values.shape[1]

    min_required = {
        None: 1,
        "linear": 2,
        "nearest": 1,
        "polynomial": order + 1,
        "spline": k + 1,
    }[interp_method]
    if N_src < min_required:
        raise ValueError(
            f"Method {interp_method!r} requires at least {min_required} source "
            f"samples, got {N_src}."
        )

    src_times = np.arange(N_src, dtype=np.int64) / src_framerate
    tgt_times = np.arange(N_out, dtype=np.int64) / tgt_framerate
    out = np.full((N_out, M), np.nan, dtype=np.float64)

    if interp_method is None:
        # Fill only targets coinciding with source times; leave rest NaN.
        aligned_targets = (np.arange(N_out) * src_framerate) % tgt_framerate == 0
        aligned_src_idx = (np.arange(N_out) * src_framerate) // tgt_framerate

    for j in range(M):
        column_f = np.asarray(values[:, j], dtype=np.float64)
        valid = ~np.isnan(column_f)
        if not np.any(valid):
            continue

        xp = src_times[valid]
        fp = column_f[valid]

        if interp_method is None:
            fill = aligned_targets & valid[aligned_src_idx]
            out[fill, j] = column_f[aligned_src_idx[fill]]
        elif interp_method == "linear":
            out[:, j] = np.interp(tgt_times, xp, fp, left=np.nan, right=np.nan)
        elif interp_method == "polynomial":
            if xp.size < order + 1:
                continue
            spl = scipy.interpolate.make_interp_spline(xp, fp, k=order)
            out[:, j] = spl(tgt_times, extrapolate=False)
        elif interp_method == "spline":
            if xp.size < k + 1:
                continue
            spl = scipy.interpolate.make_interp_spline(xp, fp, k=k)
            out[:, j] = spl(tgt_times, extrapolate=False)
        elif interp_method == "nearest":
            # Earlier-source tie-break at midpoints.
            idx_right = np.searchsorted(xp, tgt_times, side="left")
            idx_right = np.clip(idx_right, 1, xp.size - 1)
            idx_left = idx_right - 1
            d_left = tgt_times - xp[idx_left]
            d_right = xp[idx_right] - tgt_times
            choose_left = d_left <= d_right
            chosen = np.where(choose_left, idx_left, idx_right)
            in_range = (tgt_times >= xp[0]) & (tgt_times <= xp[-1])
            out[:, j] = np.where(in_range, fp[chosen], np.nan)

    return out


def _downsample(
    values: np.ndarray,
    tgt_framerate: int,
    src_framerate: int,
) -> np.ndarray:
    """Nearest-sample downsample helper.

    Parameters
    ----------
    values: np.ndarray
        Two-dimensional source array of shape ``(T, M)``.
    tgt_framerate, src_framerate: int
        Target and source framerates in Hz; ``tgt_framerate < src_framerate``.

    Returns
    -------
    out: np.ndarray
        Two-dimensional array of shape ``(N_out, M)``; preserves ``values.dtype``.
    """
    N_src = values.shape[0]
    N_out = (N_src - 1) * tgt_framerate // src_framerate + 1
    # +tgt_framerate//2 rounds to the nearest source index instead of flooring.
    src_idx = (np.arange(N_out) * src_framerate + tgt_framerate // 2) // tgt_framerate
    src_idx = np.clip(src_idx, 0, N_src - 1)
    resampled = values[src_idx].copy()
    return resampled


[docs] def resample( obj: Union[XY, Code, TeamProperty, PlayerProperty, DyadicProperty], target_framerate: int, interp_method: Union[str, None] = None, order: int = 3, k: int = 3, ) -> Union[XY, Code, TeamProperty, PlayerProperty, DyadicProperty]: """Resample a floodlight core object to a new framerate. Rescales any framerate-bearing core object (``XY``, ``Code``, ``TeamProperty``, ``PlayerProperty``, or ``DyadicProperty``) to a target framerate. Parameters ---------- obj: XY | Code | TeamProperty | PlayerProperty | DyadicProperty Framerate-bearing core object to resample. target_framerate: int Target framerate in Hz. Must be a positive integer. interp_method : str or None, optional Interpolation method for the upsample path. Must be one of ``"linear"``, ``"polynomial"``, ``"spline"``, ``"nearest"`` or ``None``. Default ``None``: new rows are left NaN, only target samples that coincide with source samples take source values. Ignored on the identity and downsample paths. order: int, optional Polynomial order for ``interp_method="polynomial"``. Default 3. k: int, optional Spline degree for ``interp_method="spline"``. Default 3. Returns ------- obj_resampled: same type as ``obj`` New object with ``framerate`` rescaled to ``target_framerate``. Notes ----- In case of ``target_framerate == obj.framerate`` ``resample()`` returns a deep copy of ``obj``. Pre-existing NaN gaps in the source pass through unchanged on both the upsample and downsample paths. The downsample path uses an integer-math nearest-sample index formula; ``interp_method`` is ignored there. No anti-aliasing filter is applied. """ ALLOWED_METHODS = ("linear", "polynomial", "spline", "nearest") if not isinstance(obj, (XY, Code, BaseProperty)): raise ValueError( f"Expected obj to be one of (XY, Code, TeamProperty, " f"PlayerProperty, DyadicProperty), got {type(obj).__name__}." ) if obj.framerate is None: raise ValueError("Expected obj.framerate to be set, got None.") if ( not isinstance(target_framerate, (int, np.integer)) or isinstance(target_framerate, bool) or target_framerate <= 0 ): raise ValueError( f"Expected target_framerate to be a positive integer, " f"got {target_framerate!r}." ) if interp_method is not None and interp_method not in ALLOWED_METHODS: raise ValueError( f"Expected interp_method to be None or one of {ALLOWED_METHODS}, " f"got {interp_method!r}." ) if isinstance(obj, XY): resampled_array = _resample_array( obj.xy, obj.framerate, target_framerate, interp_method, order, k ) resampled = XY( xy=resampled_array, framerate=target_framerate, direction=deepcopy(obj.direction), ) elif isinstance(obj, Code): code_2d = np.asarray(obj.code).reshape(-1, 1) resampled_2d = _resample_array( code_2d, obj.framerate, target_framerate, interp_method, order, k ) resampled = Code( code=resampled_2d.reshape(-1), name=deepcopy(obj.name), definitions=deepcopy(obj.definitions), framerate=target_framerate, ) elif isinstance(obj, TeamProperty): prop_2d = np.asarray(obj.property).reshape(-1, 1) resampled_2d = _resample_array( prop_2d, obj.framerate, target_framerate, interp_method, order, k ) resampled = TeamProperty( property=resampled_2d.reshape(-1), name=deepcopy(obj.name), framerate=target_framerate, ) elif isinstance(obj, PlayerProperty): resampled_array = _resample_array( obj.property, obj.framerate, target_framerate, interp_method, order, k ) resampled = PlayerProperty( property=resampled_array, name=deepcopy(obj.name), framerate=target_framerate, ) elif isinstance(obj, DyadicProperty): prop = np.asarray(obj.property) T, N1, N2 = prop.shape prop_2d = prop.reshape(T, N1 * N2) resampled_2d = _resample_array( prop_2d, obj.framerate, target_framerate, interp_method, order, k ) N_out = resampled_2d.shape[0] resampled = DyadicProperty( property=resampled_2d.reshape(N_out, N1, N2), name=deepcopy(obj.name), framerate=target_framerate, ) return resampled