Source code for floodlight.transforms.permutation

import warnings

import numpy as np
from scipy.spatial.distance import cdist
from scipy.optimize import linear_sum_assignment

from floodlight import XY


def _assign_roles_once(
    xy_data: np.ndarray, reference: np.ndarray, T: int, N: int
) -> np.ndarray:
    """Single pass of Hungarian role assignment across all frames.

    Parameters
    ----------
    xy_data: np.ndarray
        Raw data array of shape (T, 2*N).
    reference: np.ndarray
        Reference positions of shape (N, 2).
    T: int
        Number of frames.
    N: int
        Number of players.

    Returns
    -------
    assigned_data: np.ndarray
        Role-assigned data array of shape (T, 2*N).
    """
    assigned_data = np.full_like(xy_data, np.nan, dtype=float)

    for t in range(T):
        frame = xy_data[t].reshape(-1, 2).astype(float)  # (N, 2)

        # identify valid (non-NaN) players
        valid_mask = ~np.isnan(frame[:, 0])
        if not np.any(valid_mask):
            continue

        valid_positions = frame[valid_mask]  # (n_valid, 2)

        # filter reference to non-NaN role slots
        ref_mask = ~np.isnan(reference[:, 0])
        valid_ref = reference[ref_mask]  # (n_ref, 2)
        ref_indices = np.where(ref_mask)[0]

        # rectangular cost matrix: valid players vs valid role slots
        cost_matrix = cdist(valid_positions, valid_ref)  # (n_valid, n_ref)
        row, col = linear_sum_assignment(cost_matrix)

        # map valid players into their assigned role slots
        for r, c in zip(row, col):
            role_idx = ref_indices[c]
            assigned_data[t, role_idx * 2 : role_idx * 2 + 2] = valid_positions[r]

    return assigned_data


[docs] def assign_roles(xy: XY, reference: np.ndarray = None, n_iter: int = 1) -> XY: """Assigns consistent roles to players across frames using the Hungarian algorithm. [1]_ For each frame, player positions are matched to reference positions by minimizing the total Euclidean distance. The resulting XY object has columns reordered so that column *i* consistently represents role *i* across all frames. Parameters ---------- xy: XY Spatiotemporal data with shape (T, 2*N). reference: np.ndarray, optional Reference positions of shape (N, 2) used as the assignment target. If None (default), the mean position of each player column across all frames is used. n_iter: int, optional Number of assignment iterations. After each iteration the reference is recomputed as the column-wise mean of the assigned data. More iterations can improve convergence for longer sequences. Defaults to 1, which is sufficient for short sequences as proposed in [1]_. Returns ------- xy_assigned: XY New XY object with columns reordered per frame so that column *i* consistently represents role *i*. Same shape, framerate, and direction as input. Notes ----- Players with NaN positions in a given frame are excluded from the assignment. Their corresponding role slots in the output are filled with NaN. Reference positions that are NaN are likewise excluded from the cost matrix, so the assignment operates only on valid data. References ---------- .. [1] `Bialkowski, A., Lucey, P., Carr, P., Yue, Y., Sridharan, S., & Matthews, I. (2014). Identifying team style in soccer using formations learned from spatiotemporal tracking data. In IEEE International Conference on Data Mining Workshop (pp. 9-14). <https://doi.org/10.1109/ICDMW.2014.167>`_ """ T = len(xy) N = xy.N # compute reference from mean positions if not provided if reference is None: with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) reference = np.nanmean(xy.xy, axis=0).reshape(-1, 2) # (N, 2) if reference.shape != (N, 2): raise ValueError( f"Expected reference of shape ({N}, 2), got {reference.shape}." ) assigned_data = _assign_roles_once(xy.xy, reference, T, N) # iterative refinement: recompute reference from assigned data for _ in range(n_iter - 1): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) reference = np.nanmean(assigned_data.reshape(T, -1, 2), axis=0) # (N, 2) assigned_data = _assign_roles_once(assigned_data, reference, T, N) xy_assigned = XY(xy=assigned_data, framerate=xy.framerate, direction=xy.direction) return xy_assigned