import warnings
import cv2
import numpy as np
import pandas as pd
# Default Lowe's ratio test threshold
_DEFAULT_LOWE_RATIO = 0.7
# Built-in methods that use OpenCV only
_BUILTIN_METHODS = ["akaze", "sift"]
# Lightweight methods that can handle full-resolution images without auto-resize.
# Matched by suffix/keyword: methods containing these patterns are considered lightweight.
_LIGHTWEIGHT_PATTERNS = ["lightglue", "-nn", "superglue", "xfeat", "sphereglue", "-subpx"]
# Default resize for memory-intensive methods
_DEFAULT_RESIZE_HEAVY = 640
def _opencv_match(im_org, im_sim, detector_type="akaze", ratio=_DEFAULT_LOWE_RATIO):
"""
OpenCV feature matching (no Homography RANSAC).
Parameters
----------
im_org : numpy.ndarray
Original image (BGR).
im_sim : numpy.ndarray
Simulated image (BGR).
detector_type : str, default "akaze"
Feature detector type: "akaze" or "sift".
ratio : float, default 0.7
Lowe's ratio test threshold.
Returns
-------
pts1, pts2 : numpy.ndarray
Matched point pairs, shape (N, 2).
"""
if detector_type.lower() == "akaze":
detector = cv2.AKAZE_create()
elif detector_type.lower() == "sift":
detector = cv2.SIFT_create()
else:
raise ValueError(f"Unknown detector_type: {detector_type}")
kp1, des1 = detector.detectAndCompute(im_org, None)
kp2, des2 = detector.detectAndCompute(im_sim, None)
if des1 is None or des2 is None or len(kp1) < 2 or len(kp2) < 2:
return np.array([]).reshape(0, 2).astype("int32"), np.array([]).reshape(0, 2).astype("int32")
bf = cv2.BFMatcher()
matches = bf.knnMatch(des1, des2, k=2)
# Lowe's ratio test
good = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < ratio * n.distance:
good.append(m)
if len(good) == 0:
return np.array([]).reshape(0, 2).astype("int32"), np.array([]).reshape(0, 2).astype("int32")
pts1 = np.float32([kp1[m.queryIdx].pt for m in good]).astype("int32")
pts2 = np.float32([kp2[m.trainIdx].pt for m in good]).astype("int32")
return pts1, pts2
def _vismatch_match(path_org, path_sim, method, device, resize=None, **kwargs):
"""
Internal vismatch-based matching implementation (no Homography RANSAC).
Parameters
----------
path_org : str
Path to original image.
path_sim : str
Path to simulated image.
method : str
Vismatch matching method name.
device : str
Device for computation ("cpu" or "cuda").
resize : int, optional
Resize images to this size.
**kwargs
Additional arguments for vismatch's get_matcher.
Returns
-------
pts1, pts2 : numpy.ndarray
Matched point pairs, shape (N, 2).
"""
try:
from vismatch import get_matcher
except ImportError:
raise ImportError(
f"Method '{method}' requires the 'vismatch' package. "
"Install with: pip install alproj[vismatch]"
)
# Get original image sizes for coordinate scaling
im_org_cv = cv2.imread(path_org)
im_sim_cv = cv2.imread(path_sim)
h_org, w_org = im_org_cv.shape[:2]
h_sim, w_sim = im_sim_cv.shape[:2]
# Suppress torchvision deprecation warnings about 'pretrained' parameter
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, module="torchvision")
matcher = get_matcher(method, device=device, **kwargs)
# Use matcher's load_image for proper preprocessing (may resize)
img0 = matcher.load_image(path_org, resize=resize)
img1 = matcher.load_image(path_sim, resize=resize)
# Get the size after loading (to compute scale factors)
# img shape is (C, H, W) or (1, C, H, W)
if img0.dim() == 4:
_, _, h0_loaded, w0_loaded = img0.shape
_, _, h1_loaded, w1_loaded = img1.shape
else:
_, h0_loaded, w0_loaded = img0.shape
_, h1_loaded, w1_loaded = img1.shape
result = matcher(img0, img1)
pts1 = result["matched_kpts0"]
pts2 = result["matched_kpts1"]
# Handle both torch tensors and numpy arrays
if hasattr(pts1, 'cpu'):
pts1 = pts1.cpu().numpy()
pts2 = pts2.cpu().numpy()
if len(pts1) == 0:
return np.array([]).reshape(0, 2).astype("int32"), np.array([]).reshape(0, 2).astype("int32")
# Scale keypoints back to original image coordinates
scale_x0 = w_org / w0_loaded
scale_y0 = h_org / h0_loaded
scale_x1 = w_sim / w1_loaded
scale_y1 = h_sim / h1_loaded
pts1[:, 0] *= scale_x0
pts1[:, 1] *= scale_y0
pts2[:, 0] *= scale_x1
pts2[:, 1] *= scale_y1
pts1 = pts1.astype("int32")
pts2 = pts2.astype("int32")
return pts1, pts2
def _filter_geometric(pts1, pts2, method, focal_length, principal_point,
threshold, image_size, ransac_method):
"""
Internal geometric outlier filtering using Essential or Fundamental Matrix.
Parameters
----------
pts1 : numpy.ndarray
Points from image 1, shape (N, 2).
pts2 : numpy.ndarray
Points from image 2, shape (N, 2).
method : str
Filtering method: "essential", "fundamental", or "none".
focal_length : float or None
Focal length in pixels.
principal_point : tuple or None
Principal point (cx, cy) in pixels.
threshold : float
Maximum allowed error in pixels.
image_size : tuple or None
Image size (width, height).
ransac_method : str
RANSAC variant.
Returns
-------
mask : numpy.ndarray
Boolean array where True indicates inliers.
"""
pts1 = np.asarray(pts1, dtype=np.float64)
pts2 = np.asarray(pts2, dtype=np.float64)
if len(pts1) == 0:
return np.array([], dtype=bool)
method_lower = method.lower()
if method_lower == "none":
return np.ones(len(pts1), dtype=bool)
elif method_lower == "essential":
# Essential matrix requires at least 5 points
if len(pts1) < 5:
return np.ones(len(pts1), dtype=bool)
# Estimate focal length if not provided
if focal_length is None:
if image_size is not None:
focal_length = float(image_size[0])
else:
focal_length = max(
pts1[:, 0].max() - pts1[:, 0].min(),
pts1[:, 1].max() - pts1[:, 1].min()
)
warnings.warn(
f"focal_length not provided for Essential Matrix filtering. "
f"Estimated as {focal_length:.0f} pixels. "
f"For better results, provide the actual focal length.",
UserWarning,
stacklevel=3
)
# Estimate principal point if not provided
if principal_point is None:
if image_size is not None:
principal_point = (image_size[0] / 2, image_size[1] / 2)
else:
cx = (pts1[:, 0].max() + pts1[:, 0].min()) / 2
cy = (pts1[:, 1].max() + pts1[:, 1].min()) / 2
principal_point = (cx, cy)
# Build camera matrix
K = np.array([
[focal_length, 0, principal_point[0]],
[0, focal_length, principal_point[1]],
[0, 0, 1]
], dtype=np.float64)
# Map method string to OpenCV flag
method_map = {
"RANSAC": cv2.RANSAC,
"LMEDS": cv2.LMEDS,
"USAC_MAGSAC": cv2.USAC_MAGSAC,
"USAC_DEFAULT": cv2.USAC_DEFAULT,
}
flag = method_map.get(ransac_method.upper(), cv2.USAC_MAGSAC)
_, mask = cv2.findEssentialMat(pts1, pts2, K, method=flag, threshold=threshold)
if mask is None:
return np.ones(len(pts1), dtype=bool)
return mask.ravel().astype(bool)
elif method_lower == "fundamental":
# Fundamental matrix requires at least 8 points
if len(pts1) < 8:
return np.ones(len(pts1), dtype=bool)
# Map method string to OpenCV flag
method_map = {
"RANSAC": cv2.FM_RANSAC,
"LMEDS": cv2.FM_LMEDS,
"USAC_MAGSAC": cv2.USAC_MAGSAC,
"USAC_DEFAULT": cv2.USAC_DEFAULT,
}
flag = method_map.get(ransac_method.upper(), cv2.USAC_MAGSAC)
_, mask = cv2.findFundamentalMat(pts1, pts2, flag, threshold)
if mask is None:
return np.ones(len(pts1), dtype=bool)
return mask.ravel().astype(bool)
else:
raise ValueError(
f"Unknown outlier_filter '{method}'. "
"Available: 'essential', 'fundamental', 'none'"
)
def _filter_spatial(pts, grid_size, image_size, selection="first", random_state=None):
"""
Spatial thinning of matched points using grid-based sampling.
Parameters
----------
pts : numpy.ndarray
Points to thin, shape (N, 2). Typically pts1 (original image coords).
grid_size : int
Grid cell size in pixels (e.g., 100 for 100x100 px cells).
Must be positive integer.
image_size : tuple of int
Image size as (width, height) in pixels. Required for proper grid coverage.
selection : str, default "first"
Point selection method within each grid cell:
- "first": First point by input order (deterministic, fastest)
- "random": Random selection (requires random_state for reproducibility)
- "center": Point closest to cell center (ties: first by index)
random_state : int or None, default None
Seed for random selection. Required when selection="random".
Returns
-------
mask : numpy.ndarray
Boolean array where True indicates selected points, shape (N,).
Raises
------
ValueError
If grid_size <= 0 or selection is unknown.
"""
if grid_size <= 0:
raise ValueError("grid_size must be positive")
if len(pts) == 0:
return np.array([], dtype=bool)
# Calculate cell indices (vectorized)
cell_col = (pts[:, 0] // grid_size).astype(int)
cell_row = (pts[:, 1] // grid_size).astype(int)
n_cols = int(np.ceil(image_size[0] / grid_size))
cell_id = cell_row * n_cols + cell_col
# Use pandas for efficient groupby
df = pd.DataFrame({'idx': np.arange(len(pts)), 'cell': cell_id})
if selection == "first":
selected = df.groupby('cell')['idx'].first().values
elif selection == "random":
rng = np.random.default_rng(random_state)
selected = df.groupby('cell')['idx'].apply(
lambda x: rng.choice(x.values)
).values
elif selection == "center":
# Compute cell centers
cell_center_x = (cell_col + 0.5) * grid_size
cell_center_y = (cell_row + 0.5) * grid_size
# Compute distance to cell center
dist_to_center = np.sqrt(
(pts[:, 0] - cell_center_x) ** 2 +
(pts[:, 1] - cell_center_y) ** 2
)
df['dist'] = dist_to_center
# Select point with minimum distance (ties: first by index)
selected = df.loc[df.groupby('cell')['dist'].idxmin(), 'idx'].values
else:
raise ValueError(
f"Unknown selection '{selection}'. "
"Available: 'first', 'random', 'center'"
)
mask = np.zeros(len(pts), dtype=bool)
mask[selected] = True
return mask
[docs]
def image_match(path_org, path_sim, method="akaze", outlier_filter="fundamental",
params=None, threshold=10.0, ransac_method="USAC_MAGSAC",
spatial_thin_grid=None, spatial_thin_selection="first",
spatial_thin_random_state=None,
plot_result=False, device="cpu", resize=None, **kwargs):
"""
Feature matching between the original (real) photograph and a simulated landscape image.
The workflow is:
- Local feature detection (method-dependent)
- Feature matching with Lowe's ratio test
- Geometric outlier filtering (Essential/Fundamental Matrix)
- Optional spatial thinning (grid-based sampling)
Parameters
----------
path_org : str
Path for original photograph.
path_sim : str
Path for simulated landscape image.
method : str, default "akaze"
Matching method to use. Built-in methods: "akaze", "sift".
With vismatch package installed, any method supported by vismatch
can be used (e.g., "superpoint-lightglue", "roma", "minima-roma",
"loftr", "dedode-lightglue", "xfeat", etc.). See
https://github.com/gmberton/vismatch for the full list of 70+ methods.
outlier_filter : str, default "fundamental"
Outlier filtering method:
- "fundamental": Fundamental Matrix filtering (default). Use when camera
intrinsics are unknown. Requires at least 8 matches.
- "essential": Essential Matrix filtering. Recommended when params
with fov is provided. Requires at least 5 matches.
- "none": No geometric filtering. Use this when you plan to
apply custom filtering later.
params : dict, optional
Camera parameters dict containing fov, w, h, and optionally cx, cy.
Used to compute focal_length and principal_point for "essential" filter.
focal_length is calculated as: (w / 2) / tan(fov * pi / 180 / 2)
threshold : float, default 10.0
Outlier threshold in pixels for Essential/Fundamental filtering.
Higher values are more permissive (allow more matches through).
ransac_method : str, default "USAC_MAGSAC"
RANSAC variant for geometric filtering:
- "RANSAC": Standard RANSAC
- "LMEDS": Least-Median of Squares
- "USAC_MAGSAC": MAGSAC++ (most robust, recommended)
- "USAC_DEFAULT": Default USAC configuration
spatial_thin_grid : int, optional
Grid cell size in pixels for spatial thinning. If specified, keeps at most
one match per grid cell. Example: 100 keeps ~1 point per 100x100 px region.
Applied AFTER geometric outlier filtering.
spatial_thin_selection : str, default "first"
Selection method for spatial thinning:
- "first": First point by input order (deterministic, fastest)
- "random": Random selection (use spatial_thin_random_state for reproducibility)
- "center": Point closest to cell center
spatial_thin_random_state : int, optional
Random seed when spatial_thin_selection="random".
plot_result : bool, default False
Whether to return a result plot.
device : str, default "cpu"
Device to use for vismatch methods ("cpu" or "cuda"). Ignored for built-in methods.
resize : int, optional
Resize images to this size for vismatch methods.
Keypoints are automatically scaled back to original coordinates.
.. note::
For memory-intensive methods (roma, tiny-roma, minima-roma, loftr,
minima-loftr, ufm, rdd, master), if resize is None, images are
automatically resized to 640px to prevent out-of-memory errors.
LightGlue-based methods (sift-lightglue, superpoint-lightglue,
minima-superpoint-lightglue) can handle full-resolution images
and do not auto-resize.
**kwargs
Additional keyword arguments passed to vismatch's get_matcher (e.g., max_num_keypoints).
Returns
-------
points : pd.DataFrame
The coordinates of matched points. (Left-Top origin)
plot : np.array or None
An OpenCV image of result plot if plot_result=True, otherwise None.
Examples
--------
Basic matching with Fundamental Matrix filtering (default):
>>> match, _ = image_match(path_org, path_sim, method="akaze")
Matching with Essential Matrix filtering using params:
>>> match, _ = image_match(path_org, path_sim, method="minima-roma",
... outlier_filter="essential", params=params,
... device="cuda")
"""
import math
# Compute focal_length and principal_point from params if provided
focal_length = None
principal_point = None
if params is not None:
if "fov" in params and "w" in params:
focal_length = (params["w"] / 2) / math.tan(params["fov"] * math.pi / 180 / 2)
if "cx" in params and "cy" in params:
principal_point = (params["cx"], params["cy"])
elif "w" in params and "h" in params:
principal_point = (params["w"] / 2, params["h"] / 2)
method_lower = method.lower()
im_org = None
if method_lower == "akaze":
im_org = cv2.imread(path_org)
im_sim = cv2.imread(path_sim)
pts1, pts2 = _opencv_match(im_org, im_sim, detector_type="akaze")
elif method_lower == "sift":
im_org = cv2.imread(path_org)
im_sim = cv2.imread(path_sim)
pts1, pts2 = _opencv_match(im_org, im_sim, detector_type="sift")
else:
# Delegate to vismatch for all non-builtin methods
is_lightweight = any(p in method_lower for p in _LIGHTWEIGHT_PATTERNS)
effective_resize = resize
if not is_lightweight and resize is None:
effective_resize = _DEFAULT_RESIZE_HEAVY
warnings.warn(
f"Method '{method}' may be memory-intensive. "
f"Automatically resizing images to {_DEFAULT_RESIZE_HEAVY}px "
f"to prevent out-of-memory errors. "
f"Set 'resize' explicitly to override this behavior.",
UserWarning,
stacklevel=2
)
pts1, pts2 = _vismatch_match(path_org, path_sim, method, device, resize=effective_resize, **kwargs)
im_org = cv2.imread(path_org)
# Get image size for filtering
image_size = (im_org.shape[1], im_org.shape[0]) if im_org is not None else None
# Apply geometric outlier filtering if requested
if outlier_filter != "none" and len(pts1) > 0:
mask = _filter_geometric(
pts1, pts2,
method=outlier_filter,
focal_length=focal_length,
principal_point=principal_point,
threshold=threshold,
image_size=image_size,
ransac_method=ransac_method
)
pts1 = pts1[mask]
pts2 = pts2[mask]
# Apply spatial thinning AFTER geometric filtering
if spatial_thin_grid is not None and len(pts1) > 0:
if image_size is None:
raise ValueError(
"image_size could not be determined. "
"Ensure path_org is a valid image file."
)
mask = _filter_spatial(
pts1,
grid_size=spatial_thin_grid,
image_size=image_size,
selection=spatial_thin_selection,
random_state=spatial_thin_random_state
)
pts1 = pts1[mask]
pts2 = pts2[mask]
# Ensure pts1/pts2 are 2D arrays with shape (N, 2)
if len(pts1) == 0:
pts = pd.DataFrame(columns=["u_org", "v_org", "u_sim", "v_sim"])
else:
pts1 = np.atleast_2d(pts1)
pts2 = np.atleast_2d(pts2)
pts = pd.DataFrame(np.hstack((pts1, pts2)), columns=["u_org", "v_org", "u_sim", "v_sim"])
if plot_result:
if im_org is None:
im_org = cv2.imread(path_org)
plot_img = plot_matches(im_org, pts)
return pts, plot_img
else:
return pts, None
[docs]
def plot_matches(image, matches, color=(180, 105, 255), thickness=None,
tip_length=0.3, font_scale=None, font_thickness=None):
"""
Plot feature matches on an image.
Parameters
----------
image : numpy.ndarray
Image to draw on (BGR format).
matches : pd.DataFrame
Match coordinates with columns u_org, v_org, u_sim, v_sim.
color : tuple, default (180, 105, 255)
Arrow color in BGR.
thickness : int or None, default None
Arrow thickness. If None, automatically scaled based on image size.
tip_length : float, default 0.3
Arrow tip length as fraction of arrow length.
font_scale : float or None, default None
Font size for the label text. If None, automatically scaled based on image size.
font_thickness : int or None, default None
Font stroke thickness. If None, automatically scaled based on image size.
Returns
-------
plot : numpy.ndarray
Image with arrows drawn.
"""
plot_img = image.copy()
if len(matches) == 0:
return plot_img
# Scale drawing parameters relative to a 3744px reference height
scale = min(image.shape[:2]) / 3744
if thickness is None:
thickness = max(1, int(20 * scale))
if font_scale is None:
font_scale = max(0.5, 5 * scale)
if font_thickness is None:
font_thickness = max(1, int(5 * scale))
pts1 = matches[["u_org", "v_org"]].to_numpy().astype(np.int32)
pts2 = matches[["u_sim", "v_sim"]].to_numpy().astype(np.int32)
for i in range(len(pts1)):
plot_img = cv2.arrowedLine(
plot_img, tuple(pts1[i]), tuple(pts2[i]),
color=color, thickness=thickness, tipLength=tip_length
)
plot_img = cv2.putText(
plot_img, f"simulated <- original ({len(pts1)} matches)",
(int(plot_img.shape[1] * 0.15), int(plot_img.shape[0] * 0.05)),
cv2.FONT_HERSHEY_TRIPLEX, font_scale, (0, 0, 0), font_thickness, cv2.LINE_AA
)
return plot_img
[docs]
def set_gcp(match, rev_proj):
"""
Adds geographic coordinates to the matched point pairs.
The result of this function will be used as the Ground Control Points (GCPs)
during camera parameter estimation.
Parameters
----------
match : pd.DataFrame
Result of alproj.gcp.image_match()
rev_proj : pd.DataFrame
Result of alproj.project.reverse_proj
Returns
-------
gcp : pd.DataFrame
A dataframe with 5 columns
u : int
x_axis coordinates of the Ground Control Points on the original photograph. Left-Top origin.
v : int
y_axis coordinates of the GCPs.
x : float
X coordinates of GCPs in a projected coordinate system (e.g., UTM in meters).
y : float
Y coordinates of GCPs in the same projected coordinate system.
z : float
Z coordinates (elevation) of GCPs in the same unit (e.g., meters).
"""
gcp = pd.merge(match, rev_proj, how="left",\
left_on=["u_sim", "v_sim"], right_on=["u", "v"]) \
[["u_org","v_org","x","y","z"]] \
.rename(columns={"u_org":"u", "v_org":"v"})
return gcp.dropna(how="any", axis=0)
[docs]
def filter_gcp_distance(gcp, params, min_distance=None, max_distance=None):
"""
Filter GCPs based on 3D distance from the camera position.
Parameters
----------
gcp : pd.DataFrame
GCP data with columns u, v, x, y, z (geographic coordinates in
projected CRS, e.g., meters). Typically the result of set_gcp().
params : dict
Camera parameters containing 'x', 'y', 'z' keys (camera position
in the same projected CRS as gcp).
min_distance : float, optional
Minimum distance threshold in coordinate units (e.g., meters).
Points closer than this are excluded. Must be non-negative if specified.
max_distance : float, optional
Maximum distance threshold. Points farther than this are excluded.
Must be >= min_distance if both are specified.
Returns
-------
gcp_filtered : pd.DataFrame
Filtered GCP data with reset index.
Raises
------
KeyError
If params lacks 'x', 'y', or 'z' keys.
ValueError
If min_distance < 0 or max_distance < min_distance.
Notes
-----
- Coordinates must be in a projected CRS (e.g., UTM) for accurate
Euclidean distance. Using lat/lon directly will produce incorrect results.
- Rows with NaN in x, y, or z are excluded from the result.
Examples
--------
>>> gcps = set_gcp(match, df)
>>> gcps_filtered = filter_gcp_distance(gcps, params, min_distance=100)
"""
# Validate params keys
for key in ('x', 'y', 'z'):
if key not in params:
raise KeyError(f"params must contain '{key}' key")
# Validate distance parameters
if min_distance is not None and min_distance < 0:
raise ValueError("min_distance must be non-negative")
if min_distance is not None and max_distance is not None:
if max_distance < min_distance:
raise ValueError("max_distance must be >= min_distance")
if len(gcp) == 0:
return gcp.copy()
if min_distance is None and max_distance is None:
return gcp.copy()
# Drop rows with NaN coordinates
gcp_valid = gcp.dropna(subset=['x', 'y', 'z'])
# Calculate 3D Euclidean distance (vectorized)
dx = gcp_valid["x"].values - params["x"]
dy = gcp_valid["y"].values - params["y"]
dz = gcp_valid["z"].values - params["z"]
distance = np.sqrt(dx**2 + dy**2 + dz**2)
# Apply filters
mask = np.ones(len(gcp_valid), dtype=bool)
if min_distance is not None:
mask &= (distance >= min_distance)
if max_distance is not None:
mask &= (distance <= max_distance)
return gcp_valid[mask].reset_index(drop=True)