From 471faa08e97038e9f3c09a02d3d26929f45b437c Mon Sep 17 00:00:00 2001 From: Caroline Chen Date: Mon, 8 Dec 2025 15:01:47 -0500 Subject: [PATCH 1/5] Sliding stop implementation --- nomad/stop_detection/sliding.py | 345 +++++++++++++++++++++++++++++ nomad/tests/test_stop_detection.py | 245 +++++++++++++++++++- 2 files changed, 589 insertions(+), 1 deletion(-) create mode 100644 nomad/stop_detection/sliding.py diff --git a/nomad/stop_detection/sliding.py b/nomad/stop_detection/sliding.py new file mode 100644 index 00000000..17b51416 --- /dev/null +++ b/nomad/stop_detection/sliding.py @@ -0,0 +1,345 @@ +import pandas as pd +import numpy as np +import geopandas as gpd +from sklearn.cluster import DBSCAN +from shapely.geometry import Point, MultiPoint +import warnings +import nomad.io.base as loader + + +def _get_location_center(coords, metric='euclidean'): + """ + Calculate the center of a location cluster. + + Parameters + ---------- + coords : numpy.ndarray + Array of coordinates (n_points, 2) + metric : str + 'euclidean' for projected coordinates, 'haversine' for lat/lon + + Returns + ------- + tuple + (x, y) coordinates of the center + """ + if metric == 'haversine': + # For geographic coordinates, use angle-based mean + # Convert to radians for calculation + coords_rad = np.radians(coords) + x = coords_rad[:, 0] + y = coords_rad[:, 1] + + # Convert to 3D Cartesian coordinates + cos_y = np.cos(y) + cart_x = cos_y * np.cos(x) + cart_y = cos_y * np.sin(x) + cart_z = np.sin(y) + + # Average and convert back + avg_x = np.mean(cart_x) + avg_y = np.mean(cart_y) + avg_z = np.mean(cart_z) + + lon = np.arctan2(avg_y, avg_x) + hyp = np.sqrt(avg_x**2 + avg_y**2) + lat = np.arctan2(avg_z, hyp) + + return np.degrees(lon), np.degrees(lat) + else: + # For projected coordinates, use simple mean + return np.mean(coords[:, 0]), np.mean(coords[:, 1]) + + +def _get_location_extent(points, epsilon, crs=None): + """ + Calculate the spatial extent of a location. + + Parameters + ---------- + points : list of shapely Points + Points in the location cluster + epsilon : float + Buffer distance in meters (or degrees for unprojected) + crs : str, optional + Coordinate reference system + + Returns + ------- + shapely.Polygon + Convex hull buffered by epsilon + """ + if len(points) == 1: + return points[0].buffer(epsilon) + + multipoint = MultiPoint(points) + convex_hull = multipoint.convex_hull + return convex_hull.buffer(epsilon) + + +def cluster_locations_dbscan( + stops, + epsilon=100, + num_samples=1, + distance_metric='euclidean', + agg_level='user', + traj_cols=None, + **kwargs +): + """ + Cluster stops into locations using DBSCAN. + + Parameters + ---------- + stops : pd.DataFrame or gpd.GeoDataFrame + Stop/staypoint data with spatial columns + epsilon : float, default 100 + Maximum distance between stops in the same location (meters for haversine/euclidean) + num_samples : int, default 1 + Minimum number of stops required to form a location + distance_metric : str, default 'euclidean' + Distance metric: 'euclidean' for projected coords, 'haversine' for lat/lon + agg_level : str, default 'user' + 'user' = separate locations per user, 'dataset' = shared locations across users + traj_cols : dict, optional + Column name mappings for coordinates and user_id + **kwargs + Additional arguments passed to column detection + + Returns + ------- + tuple of (pd.DataFrame, gpd.GeoDataFrame) + - stops with added 'location_id' column (NaN for unclustered stops) + - locations GeoDataFrame with cluster centers and extents + + Examples + -------- + >>> # Cluster stops within 50 meters into locations + >>> stops_labeled, locations = cluster_locations_dbscan( + ... stops, epsilon=50, num_samples=2 + ... ) + + >>> # Use dataset-level clustering (locations shared across users) + >>> stops_labeled, locations = cluster_locations_dbscan( + ... stops, epsilon=100, agg_level='dataset' + ... ) + """ + if not isinstance(stops, (pd.DataFrame, gpd.GeoDataFrame)): + raise TypeError("Input 'stops' must be a pandas DataFrame or GeoDataFrame") + + if stops.empty: + # Return empty results with proper schema + stops_out = stops.copy() + stops_out['location_id'] = pd.Series(dtype='Int64') + locations = gpd.GeoDataFrame( + columns=['center', 'extent'], + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + return stops_out, locations + + # Parse column names + traj_cols = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) + loader._has_spatial_cols(stops.columns, traj_cols) + + # Determine coordinate columns + if 'longitude' in traj_cols and traj_cols['longitude'] in stops.columns: + coord_key1, coord_key2 = 'longitude', 'latitude' + use_lon_lat = True + elif 'x' in traj_cols and traj_cols['x'] in stops.columns: + coord_key1, coord_key2 = 'x', 'y' + use_lon_lat = False + else: + raise ValueError("Could not find spatial columns in stops data") + + # Override distance metric based on coordinate type if not specified + if distance_metric == 'euclidean' and use_lon_lat: + warnings.warn( + "Using haversine metric for lat/lon coordinates instead of euclidean", + UserWarning + ) + distance_metric = 'haversine' + + # Get user_id column if present + user_col = None + if 'user_id' in traj_cols and traj_cols['user_id'] in stops.columns: + user_col = traj_cols['user_id'] + + # Check aggregation level + if agg_level == 'user' and user_col is None: + warnings.warn( + "agg_level='user' requires user_id column; falling back to 'dataset'", + UserWarning + ) + agg_level = 'dataset' + + stops_out = stops.copy() + stops_out['location_id'] = pd.Series(dtype='Int64') + + location_list = [] + location_id_counter = 0 + + # Group by user if needed + if agg_level == 'user': + groups = stops_out.groupby(user_col, sort=False) + else: + groups = [(None, stops_out)] + + for group_key, group_data in groups: + if group_data.empty: + continue + + # Extract coordinates + coords = group_data[[traj_cols[coord_key1], traj_cols[coord_key2]]].to_numpy(dtype='float64') + + # For haversine, convert to radians + if distance_metric == 'haversine': + # Convert epsilon from meters to radians (approximate) + # Earth radius in meters + epsilon_rad = epsilon / 6371000.0 + coords_for_clustering = np.radians(coords) + else: + epsilon_rad = epsilon + coords_for_clustering = coords + + # Run DBSCAN + clusterer = DBSCAN( + eps=epsilon_rad, + min_samples=num_samples, + metric=distance_metric, + algorithm='ball_tree' + ) + + labels = clusterer.fit_predict(coords_for_clustering) + + # Assign location IDs (offset by counter for multi-group) + group_loc_ids = labels.copy() + valid_mask = labels >= 0 + group_loc_ids[valid_mask] += location_id_counter + group_loc_ids[~valid_mask] = -1 + + # Update stops with location IDs + stops_out.loc[group_data.index, 'location_id'] = group_loc_ids + + # Create location entries for each cluster + unique_labels = labels[labels >= 0] + if len(unique_labels) > 0: + for label in np.unique(unique_labels): + cluster_mask = labels == label + cluster_coords = coords[cluster_mask] + cluster_indices = group_data.index[cluster_mask] + + # Calculate center + center_x, center_y = _get_location_center( + cluster_coords, + metric=distance_metric + ) + center_point = Point(center_x, center_y) + + # Calculate extent (convex hull + buffer) + cluster_points = [Point(x, y) for x, y in cluster_coords] + extent = _get_location_extent( + cluster_points, + epsilon, + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + + location_entry = { + 'location_id': location_id_counter + label, + 'center': center_point, + 'extent': extent, + 'n_stops': cluster_mask.sum() + } + + # Add user_id if available + if user_col is not None and agg_level == 'user': + location_entry[user_col] = group_key + + location_list.append(location_entry) + + # Update counter for next group + location_id_counter += len(np.unique(unique_labels)) + + # Convert location_id to nullable integer (NaN for unclustered) + stops_out['location_id'] = stops_out['location_id'].replace(-1, pd.NA).astype('Int64') + + # Create locations GeoDataFrame + if location_list: + locations = gpd.GeoDataFrame( + location_list, + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + # Set extent as additional geometry column + locations['extent'] = gpd.GeoSeries( + [loc['extent'] for loc in location_list], + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + else: + locations = gpd.GeoDataFrame( + columns=['location_id', 'center', 'extent', 'n_stops'], + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + + return stops_out, locations + + +def cluster_locations_per_user( + stops, + epsilon=100, + num_samples=1, + distance_metric='euclidean', + traj_cols=None, + **kwargs +): + """ + Convenience function to cluster locations per user. + + This is equivalent to calling cluster_locations_dbscan with agg_level='user'. + + Parameters + ---------- + stops : pd.DataFrame or gpd.GeoDataFrame + Stop data with user_id column + epsilon : float, default 100 + Maximum distance between stops in same location (meters) + num_samples : int, default 1 + Minimum stops required to form a location + distance_metric : str, default 'euclidean' + 'euclidean' or 'haversine' + traj_cols : dict, optional + Column name mappings + **kwargs + Additional arguments + + Returns + ------- + tuple of (pd.DataFrame, gpd.GeoDataFrame) + Stops with location_id and locations table + + Raises + ------ + ValueError + If user_id column is not found + """ + traj_cols_temp = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) + if 'user_id' not in traj_cols_temp or traj_cols_temp['user_id'] not in stops.columns: + raise ValueError( + "cluster_locations_per_user requires a 'user_id' column " + "specified in traj_cols or kwargs" + ) + + return cluster_locations_dbscan( + stops, + epsilon=epsilon, + num_samples=num_samples, + distance_metric=distance_metric, + agg_level='user', + traj_cols=traj_cols, + **kwargs + ) + + +# Alias for convenience +generate_locations = cluster_locations_dbscan diff --git a/nomad/tests/test_stop_detection.py b/nomad/tests/test_stop_detection.py index f14c4e1a..8745ab72 100644 --- a/nomad/tests/test_stop_detection.py +++ b/nomad/tests/test_stop_detection.py @@ -13,6 +13,7 @@ from nomad import filters import nomad.stop_detection.dbscan as DBSCAN import nomad.stop_detection.lachesis as LACHESIS +import nomad.stop_detection.sliding as SLIDING import pdb @pytest.fixture @@ -396,4 +397,246 @@ def test_empty_dataframe_consistency(): assert 'longitude' in hdbscan_result.columns assert 'latitude' in hdbscan_result.columns assert 'longitude' in lachesis_result.columns - assert 'latitude' in lachesis_result.columns \ No newline at end of file + assert 'latitude' in lachesis_result.columns + +########################################## +#### LOCATION CLUSTERING #### +#### (SLIDING.PY) #### +########################################## + +@pytest.fixture +def stops_for_clustering(): + """Create sample stops for location clustering tests""" + # Create stops at two distinct locations + # Location 1: stops at (0, 0) with small variations + # Location 2: stops at (1000, 1000) with small variations + stops_data = { + 'x': [0, 5, 10, 1000, 1005, 1010, 2000], # Third location with single stop + 'y': [0, 5, 10, 1000, 1005, 1010, 2000], + 'timestamp': [ + 1704067200, # 2024-01-01 00:00:00 + 1704070800, # 2024-01-01 01:00:00 + 1704074400, # 2024-01-01 02:00:00 + 1704078000, # 2024-01-01 03:00:00 + 1704081600, # 2024-01-01 04:00:00 + 1704085200, # 2024-01-01 05:00:00 + 1704088800, # 2024-01-01 06:00:00 + ], + 'duration': [10, 15, 20, 10, 15, 20, 10], # minutes + 'user_id': ['user1', 'user1', 'user1', 'user1', 'user1', 'user1', 'user1'] + } + return pd.DataFrame(stops_data) + + +@pytest.fixture +def multi_user_stops(): + """Create sample stops for multiple users""" + stops_data = { + 'x': [0, 5, 10, 0, 5, 10], + 'y': [0, 5, 10, 0, 5, 10], + 'timestamp': [1704067200 + i*3600 for i in range(6)], + 'duration': [10, 15, 20, 10, 15, 20], + 'user_id': ['user1', 'user1', 'user1', 'user2', 'user2', 'user2'] + } + return pd.DataFrame(stops_data) + + +def test_cluster_locations_basic(stops_for_clustering): + """Test basic location clustering with default parameters""" + traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_dbscan( + stops_for_clustering, + epsilon=50, + num_samples=1, + distance_metric='euclidean', + agg_level='dataset', + traj_cols=traj_cols + ) + + # Should create 3 locations (2 clusters + 1 single stop at 2000,2000) + assert len(locations) == 3 + + # All stops should have location_id assigned + assert 'location_id' in stops_labeled.columns + + # Check that first 3 stops belong to same location + loc_ids = stops_labeled['location_id'].values + assert loc_ids[0] == loc_ids[1] == loc_ids[2] + + # Check that stops 3,4,5 belong to same location (different from first) + assert loc_ids[3] == loc_ids[4] == loc_ids[5] + assert loc_ids[3] != loc_ids[0] + + # Check locations have proper columns + assert 'center' in locations.columns + assert 'extent' in locations.columns + assert 'n_stops' in locations.columns + + +def test_cluster_locations_per_user(multi_user_stops): + """Test per-user location clustering""" + traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_dbscan( + multi_user_stops, + epsilon=50, + num_samples=1, + distance_metric='euclidean', + agg_level='user', + traj_cols=traj_cols + ) + + # Should create 2 locations (one per user, both at same spatial location) + assert len(locations) == 2 + + # Check that each user has their own location + assert 'user_id' in locations.columns + user1_locs = locations[locations['user_id'] == 'user1'] + user2_locs = locations[locations['user_id'] == 'user2'] + assert len(user1_locs) == 1 + assert len(user2_locs) == 1 + + # Check that user1 stops have different location_id than user2 stops + user1_stops = stops_labeled[stops_labeled['user_id'] == 'user1'] + user2_stops = stops_labeled[stops_labeled['user_id'] == 'user2'] + assert user1_stops['location_id'].iloc[0] != user2_stops['location_id'].iloc[0] + + +def test_cluster_locations_dataset_level(multi_user_stops): + """Test dataset-level location clustering (shared across users)""" + traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_dbscan( + multi_user_stops, + epsilon=50, + num_samples=1, + distance_metric='euclidean', + agg_level='dataset', + traj_cols=traj_cols + ) + + # Should create 1 shared location + assert len(locations) == 1 + + # All stops should have same location_id + location_ids = stops_labeled['location_id'].dropna().unique() + assert len(location_ids) == 1 + + +def test_cluster_locations_empty_input(): + """Test handling of empty DataFrame""" + empty_df = pd.DataFrame(columns=['x', 'y', 'timestamp', 'user_id']) + traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_dbscan( + empty_df, + epsilon=50, + num_samples=1, + traj_cols=traj_cols + ) + + assert len(stops_labeled) == 0 + assert len(locations) == 0 + assert 'location_id' in stops_labeled.columns + + +def test_cluster_locations_geographic_coords(): + """Test clustering with lat/lon coordinates using haversine metric""" + # Create stops near SF (37.7749° N, 122.4194° W) + stops_data = { + 'longitude': [-122.4194, -122.4195, -122.4196, -122.5000], # ~100m apart, then far + 'latitude': [37.7749, 37.7750, 37.7751, 37.8000], + 'timestamp': [1704067200 + i*3600 for i in range(4)], + 'duration': [10, 15, 20, 10], + 'user_id': ['user1'] * 4 + } + stops_df = pd.DataFrame(stops_data) + + traj_cols = {'longitude': 'longitude', 'latitude': 'latitude', + 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_dbscan( + stops_df, + epsilon=200, # 200 meters + num_samples=2, + distance_metric='haversine', + agg_level='dataset', + traj_cols=traj_cols + ) + + # Should cluster first 3 stops, last one is noise + assert len(locations) == 1 + assert stops_labeled.loc[0, 'location_id'] == stops_labeled.loc[1, 'location_id'] + assert pd.isna(stops_labeled.loc[3, 'location_id']) + + +def test_cluster_locations_per_user_convenience(): + """Test the cluster_locations_per_user convenience function""" + stops_data = { + 'x': [0, 5, 10, 0, 5, 10], + 'y': [0, 5, 10, 0, 5, 10], + 'timestamp': [1704067200 + i*3600 for i in range(6)], + 'duration': [10, 15, 20, 10, 15, 20], + 'user_id': ['user1', 'user1', 'user1', 'user2', 'user2', 'user2'] + } + stops_df = pd.DataFrame(stops_data) + + traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_per_user( + stops_df, + epsilon=50, + num_samples=1, + traj_cols=traj_cols + ) + + # Should behave same as agg_level='user' + assert len(locations) == 2 + assert 'user_id' in locations.columns + + +def test_location_center_calculation(stops_for_clustering): + """Test that location centers are calculated correctly""" + traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_dbscan( + stops_for_clustering, + epsilon=50, + num_samples=2, + distance_metric='euclidean', + agg_level='dataset', + traj_cols=traj_cols + ) + + # First location should be centered around (5, 5) - mean of [0,5,10] + # Second location should be centered around (1005, 1005) + centers = [(loc.center.x, loc.center.y) for _, loc in locations.iterrows()] + + # Check first location center is near (5, 5) + assert any(abs(cx - 5) < 10 and abs(cy - 5) < 10 for cx, cy in centers) + + # Check second location center is near (1005, 1005) + assert any(abs(cx - 1005) < 10 and abs(cy - 1005) < 10 for cx, cy in centers) + + +def test_location_extent_calculation(stops_for_clustering): + """Test that location extents (convex hull + buffer) are created""" + traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} + + stops_labeled, locations = SLIDING.cluster_locations_dbscan( + stops_for_clustering, + epsilon=50, + num_samples=1, + distance_metric='euclidean', + agg_level='dataset', + traj_cols=traj_cols + ) + + # All locations should have extent geometries + assert all(locations['extent'].notna()) + + # Extents should be polygons (or points buffered to polygons) + from shapely.geometry import Polygon, Point + assert all(isinstance(ext, Polygon) or isinstance(ext, Point) + for ext in locations['extent']) From 272b6b096e16d69ff00d79a06c3c21fd0795aea3 Mon Sep 17 00:00:00 2001 From: Caroline Chen Date: Tue, 9 Dec 2025 11:32:37 -0500 Subject: [PATCH 2/5] Edits --- nomad/stop_detection/sliding.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/nomad/stop_detection/sliding.py b/nomad/stop_detection/sliding.py index 17b51416..4b1d2a13 100644 --- a/nomad/stop_detection/sliding.py +++ b/nomad/stop_detection/sliding.py @@ -111,18 +111,6 @@ def cluster_locations_dbscan( tuple of (pd.DataFrame, gpd.GeoDataFrame) - stops with added 'location_id' column (NaN for unclustered stops) - locations GeoDataFrame with cluster centers and extents - - Examples - -------- - >>> # Cluster stops within 50 meters into locations - >>> stops_labeled, locations = cluster_locations_dbscan( - ... stops, epsilon=50, num_samples=2 - ... ) - - >>> # Use dataset-level clustering (locations shared across users) - >>> stops_labeled, locations = cluster_locations_dbscan( - ... stops, epsilon=100, agg_level='dataset' - ... ) """ if not isinstance(stops, (pd.DataFrame, gpd.GeoDataFrame)): raise TypeError("Input 'stops' must be a pandas DataFrame or GeoDataFrame") From f25828e47ce58999b0e05c3aae038c0b17d08b73 Mon Sep 17 00:00:00 2001 From: Caroline Chen Date: Sun, 14 Dec 2025 21:45:11 -0500 Subject: [PATCH 3/5] Initial revisions --- nomad/stop_detection/postprocessing.py | 224 ++++++ nomad/stop_detection/sliding.py | 682 +++++++++++-------- nomad/tests/test_stop_detection.py | 508 ++++++++------ nomad/visit_attribution/visit_attribution.py | 332 ++++++++- 4 files changed, 1256 insertions(+), 490 deletions(-) diff --git a/nomad/stop_detection/postprocessing.py b/nomad/stop_detection/postprocessing.py index df267608..14992ab5 100644 --- a/nomad/stop_detection/postprocessing.py +++ b/nomad/stop_detection/postprocessing.py @@ -417,3 +417,227 @@ def fill_timestamp_gaps(first_time, last_time, stop_table): df_full = df_full.sort_values('start_timestamp').reset_index(drop=True) return df_full + + +def merge_stops(stops, max_time_gap="10min", location_col="loc_id", agg=None, traj_cols=None, **kwargs): + """ + Merge consecutive stops at the same location within a time threshold. + + This function aggregates stops that are: + - At the same location (same location_col value) + - Consecutive in time (gap between stops <= max_time_gap) + - From the same user + + Parameters + ---------- + stops : pd.DataFrame + Stop table with temporal and location columns. + Must contain columns for start time, end time (or duration), and location. + + max_time_gap : str or pd.Timedelta, default "10min" + Maximum duration between consecutive stops to still be merged. + If str, must be parsable by pd.to_timedelta (e.g., "10min", "1h", "30s"). + If pd.Timedelta, used directly. + + location_col : str, default "loc_id" + Name of the column containing location identifiers. + Stops are only merged if they have the same value in this column. + + agg : dict, optional + Dictionary to aggregate columns after merging stops. + Keys are column names, values are aggregation functions. + If None or empty, only required columns (user_id, start times, end times) + are aggregated and returned. + Example: {"geometry": "first", "n_pings": "sum"} + + traj_cols : dict, optional + Column name mappings. Supported keys: + - 'user_id': user identifier column + - 'timestamp' or 'datetime': start time column + - 'end_timestamp' or 'end_datetime': end time column + - 'duration': duration column (used if end time not present) + + **kwargs + Additional keyword arguments for column name specification. + + Returns + ------- + pd.DataFrame + Merged stops table with the same structure as input but with consecutive + stops at the same location merged into single rows. + + Notes + ----- + - The index of the returned DataFrame corresponds to the first stop in each + merged group. + - If a stop has no consecutive neighbor at the same location (within max_time_gap), + it remains unchanged. + - Aggregation for required columns: + - start time: first (earliest start) + - end time: last (latest end) + - location: first (same for all in group) + - user_id: first (same for all in group) + + Examples + -------- + >>> # Basic usage + >>> merged = merge_stops(stops, max_time_gap="15min", location_col="location_id") + + >>> # With custom aggregation to preserve geometry and sum n_pings + >>> merged = merge_stops( + ... stops, + ... max_time_gap="30min", + ... location_col="building_id", + ... agg={"geometry": "first", "n_pings": "sum"} + ... ) + """ + if agg is None: + agg = {} + + # Convert max_time_gap to Timedelta + if isinstance(max_time_gap, str): + max_time_gap = pd.to_timedelta(max_time_gap) + elif not isinstance(max_time_gap, pd.Timedelta): + raise TypeError("Parameter max_time_gap must be either of type str or pd.Timedelta!") + + # Validate location column exists + if location_col not in stops.columns: + raise ValueError(f"Location column '{location_col}' not found in stops DataFrame") + + if stops.empty: + return stops.copy() + + # Parse column names + traj_cols = loader._parse_traj_cols(stops.columns, traj_cols, kwargs, warn=False) + + # Determine temporal columns + t_key, use_datetime = loader._fallback_time_cols_dt(stops.columns, traj_cols, kwargs) + end_t_key = 'end_datetime' if use_datetime else 'end_timestamp' + + # Check for required temporal columns + end_col_present = loader._has_end_cols(stops.columns, traj_cols) + duration_col_present = loader._has_duration_cols(stops.columns, traj_cols) + + if not (end_col_present or duration_col_present): + raise ValueError("Stops must contain either end time or duration columns") + + # Get user_id column if present + user_col = None + if 'user_id' in traj_cols and traj_cols['user_id'] in stops.columns: + user_col = traj_cols['user_id'] + + # Work with a copy + stops_merge = stops.copy() + index_name = stops.index.name if stops.index.name else 'id' + + # Compute end time if not present + if not end_col_present: + dur_mins = stops_merge[traj_cols['duration']] + if use_datetime: + stops_merge[end_t_key] = stops_merge[traj_cols[t_key]] + pd.to_timedelta(dur_mins, unit='m') + else: + stops_merge[end_t_key] = stops_merge[traj_cols[t_key]] + dur_mins * 60 + else: + end_t_key = traj_cols[end_t_key] + + # Reset index and preserve it + stops_merge = stops_merge.reset_index() + stops_merge["index_temp"] = stops_merge[index_name] + + # Sort by user and time + if user_col: + stops_merge = stops_merge.sort_values(by=[user_col, traj_cols[t_key]]) + else: + stops_merge = stops_merge.sort_values(by=traj_cols[t_key]) + + # Get next row information + shift_cols = [traj_cols[t_key], location_col] + shift_names = ["next_started_at", "next_location"] + + if user_col: + shift_cols.insert(0, user_col) + shift_names.insert(0, "next_user_id") + + stops_merge[shift_names] = stops_merge[shift_cols].shift(-1) + stops_merge["next_id"] = stops_merge["index_temp"].shift(-1) + + # Iteratively merge stops + cond = pd.Series(data=False, index=stops_merge.index) + cond_old = pd.Series(data=True, index=stops_merge.index) + + while np.sum(cond != cond_old) >= 1: + # Build merge conditions + conditions = [] + + # Same user (if user_col exists) + if user_col: + conditions.append(stops_merge["next_user_id"] == stops_merge[user_col]) + + # Time gap within threshold + conditions.append( + stops_merge["next_started_at"] - stops_merge[end_t_key] <= max_time_gap + ) + + # Same location + conditions.append(stops_merge[location_col] == stops_merge["next_location"]) + + # Not already merged + conditions.append(stops_merge["index_temp"] != stops_merge["next_id"]) + + # Combine all conditions + cond = pd.Series(data=True, index=stops_merge.index) + for c in conditions: + cond = cond & c + + # Assign merged index + stops_merge.loc[cond, "index_temp"] = stops_merge.loc[cond, "next_id"] + + # Update for next iteration + cond_diff = cond != cond_old + cond_old = cond.copy() + + if np.sum(cond_diff) == 0: + break + + # Define aggregation dictionary + agg_dict = { + index_name: "first", + traj_cols[t_key]: "first", # earliest start time + end_t_key: "last", # latest end time + location_col: "first", # same location for all in group + } + + if user_col: + agg_dict[user_col] = "first" + + # Add user-defined aggregations + agg_dict.update(agg) + + # Group and aggregate + stops_merged = stops_merge.groupby(by="index_temp").agg(agg_dict) + + # Recompute duration if it was in the original + if duration_col_present: + if use_datetime: + stops_merged[traj_cols['duration']] = ( + (stops_merged[end_t_key] - stops_merged[traj_cols[t_key]]) + .dt.total_seconds() / 60 + ).astype(int) + else: + stops_merged[traj_cols['duration']] = ( + (stops_merged[end_t_key] - stops_merged[traj_cols[t_key]]) / 60 + ).astype(int) + + # Clean up: set index and sort + stops_merged = stops_merged.set_index(index_name) + + if user_col: + stops_merged = stops_merged.sort_values(by=[user_col, traj_cols[t_key]]) + else: + stops_merged = stops_merged.sort_values(by=traj_cols[t_key]) + + # Remove the computed end_t_key if it wasn't in the original + if not end_col_present and end_t_key in stops_merged.columns: + stops_merged = stops_merged.drop(columns=[end_t_key]) + + return stops_merged diff --git a/nomad/stop_detection/sliding.py b/nomad/stop_detection/sliding.py index 4b1d2a13..b7f80bab 100644 --- a/nomad/stop_detection/sliding.py +++ b/nomad/stop_detection/sliding.py @@ -1,333 +1,461 @@ import pandas as pd import numpy as np import geopandas as gpd -from sklearn.cluster import DBSCAN -from shapely.geometry import Point, MultiPoint +from shapely.geometry import Point import warnings -import nomad.io.base as loader +from joblib import Parallel, delayed +from tqdm import tqdm -def _get_location_center(coords, metric='euclidean'): +def haversine_dist(lat1, lon1, lat2, lon2): """ - Calculate the center of a location cluster. + Calculate haversine distance between two points in meters. Parameters ---------- - coords : numpy.ndarray - Array of coordinates (n_points, 2) - metric : str - 'euclidean' for projected coordinates, 'haversine' for lat/lon + lat1, lon1 : float or array-like + Latitude and longitude of first point(s) + lat2, lon2 : float or array-like + Latitude and longitude of second point(s) Returns ------- - tuple - (x, y) coordinates of the center + float or array-like + Distance in meters """ - if metric == 'haversine': - # For geographic coordinates, use angle-based mean - # Convert to radians for calculation - coords_rad = np.radians(coords) - x = coords_rad[:, 0] - y = coords_rad[:, 1] - - # Convert to 3D Cartesian coordinates - cos_y = np.cos(y) - cart_x = cos_y * np.cos(x) - cart_y = cos_y * np.sin(x) - cart_z = np.sin(y) - - # Average and convert back - avg_x = np.mean(cart_x) - avg_y = np.mean(cart_y) - avg_z = np.mean(cart_z) - - lon = np.arctan2(avg_y, avg_x) - hyp = np.sqrt(avg_x**2 + avg_y**2) - lat = np.arctan2(avg_z, hyp) - - return np.degrees(lon), np.degrees(lat) - else: - # For projected coordinates, use simple mean - return np.mean(coords[:, 0]), np.mean(coords[:, 1]) + R = 6371000 # Earth radius in meters + + lat1_rad = np.radians(lat1) + lat2_rad = np.radians(lat2) + dlat = np.radians(lat2 - lat1) + dlon = np.radians(lon2 - lon1) + + a = np.sin(dlat/2)**2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon/2)**2 + c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a)) + return R * c -def _get_location_extent(points, epsilon, crs=None): + +def _explode_agg(col_agg, col_id, df_left, df_right): """ - Calculate the spatial extent of a location. + Explode aggregated column and merge with original dataframe. Parameters ---------- - points : list of shapely Points - Points in the location cluster - epsilon : float - Buffer distance in meters (or degrees for unprojected) - crs : str, optional - Coordinate reference system + col_agg : str + Name of column containing aggregated values + col_id : str + Name of ID column to assign + df_left : pd.DataFrame + DataFrame to update + df_right : pd.DataFrame + DataFrame containing aggregated data Returns ------- - shapely.Polygon - Convex hull buffered by epsilon + pd.DataFrame + Updated dataframe with exploded values """ - if len(points) == 1: - return points[0].buffer(epsilon) + # Create a mapping from original indices to staypoint IDs + exploded = df_right[[col_agg, col_id]].explode(col_agg) + exploded = exploded[exploded[col_agg].notna()] + + # Create a series mapping pfs_id to staypoint_id + mapping = pd.Series( + exploded[col_id].values, + index=exploded[col_agg].values + ) - multipoint = MultiPoint(points) - convex_hull = multipoint.convex_hull - return convex_hull.buffer(epsilon) + # Map to original dataframe + df_left[col_id] = df_left.index.map(mapping) + return df_left -def cluster_locations_dbscan( - stops, - epsilon=100, - num_samples=1, - distance_metric='euclidean', - agg_level='user', - traj_cols=None, - **kwargs + +def _generate_staypoints_sliding_user( + pfs_user, + geo_col='geometry', + elevation_flag=False, + dist_threshold=100, + time_threshold=5.0, + gap_threshold=15.0, + distance_metric='haversine', + include_last=False ): """ - Cluster stops into locations using DBSCAN. + Generate staypoints for a single user using sliding window algorithm. Parameters ---------- - stops : pd.DataFrame or gpd.GeoDataFrame - Stop/staypoint data with spatial columns - epsilon : float, default 100 - Maximum distance between stops in the same location (meters for haversine/euclidean) - num_samples : int, default 1 - Minimum number of stops required to form a location - distance_metric : str, default 'euclidean' - Distance metric: 'euclidean' for projected coords, 'haversine' for lat/lon - agg_level : str, default 'user' - 'user' = separate locations per user, 'dataset' = shared locations across users - traj_cols : dict, optional - Column name mappings for coordinates and user_id - **kwargs - Additional arguments passed to column detection + pfs_user : tuple + (user_id, DataFrame) from groupby operation + geo_col : str + Name of geometry column + elevation_flag : bool + Whether elevation data is present + dist_threshold : float + Distance threshold in meters + time_threshold : float + Time threshold in minutes + gap_threshold : float + Maximum gap threshold in minutes + distance_metric : str + Distance metric to use + include_last : bool + Whether to include last staypoint Returns ------- - tuple of (pd.DataFrame, gpd.GeoDataFrame) - - stops with added 'location_id' column (NaN for unclustered stops) - - locations GeoDataFrame with cluster centers and extents + pd.DataFrame + Staypoints for this user """ - if not isinstance(stops, (pd.DataFrame, gpd.GeoDataFrame)): - raise TypeError("Input 'stops' must be a pandas DataFrame or GeoDataFrame") - - if stops.empty: - # Return empty results with proper schema - stops_out = stops.copy() - stops_out['location_id'] = pd.Series(dtype='Int64') - locations = gpd.GeoDataFrame( - columns=['center', 'extent'], - geometry='center', - crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None - ) - return stops_out, locations - - # Parse column names - traj_cols = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) - loader._has_spatial_cols(stops.columns, traj_cols) - - # Determine coordinate columns - if 'longitude' in traj_cols and traj_cols['longitude'] in stops.columns: - coord_key1, coord_key2 = 'longitude', 'latitude' - use_lon_lat = True - elif 'x' in traj_cols and traj_cols['x'] in stops.columns: - coord_key1, coord_key2 = 'x', 'y' - use_lon_lat = False - else: - raise ValueError("Could not find spatial columns in stops data") - - # Override distance metric based on coordinate type if not specified - if distance_metric == 'euclidean' and use_lon_lat: - warnings.warn( - "Using haversine metric for lat/lon coordinates instead of euclidean", - UserWarning - ) - distance_metric = 'haversine' - - # Get user_id column if present - user_col = None - if 'user_id' in traj_cols and traj_cols['user_id'] in stops.columns: - user_col = traj_cols['user_id'] - - # Check aggregation level - if agg_level == 'user' and user_col is None: - warnings.warn( - "agg_level='user' requires user_id column; falling back to 'dataset'", - UserWarning - ) - agg_level = 'dataset' - - stops_out = stops.copy() - stops_out['location_id'] = pd.Series(dtype='Int64') - - location_list = [] - location_id_counter = 0 - - # Group by user if needed - if agg_level == 'user': - groups = stops_out.groupby(user_col, sort=False) - else: - groups = [(None, stops_out)] + user_id, pfs = pfs_user + + if len(pfs) == 0: + return pd.DataFrame() + + # Sort by time + pfs = pfs.sort_values('tracked_at').reset_index(drop=True) + + staypoints = [] + pfs_ids = [] + + i = 0 + n = len(pfs) + + while i < n: + j = i + 1 + + # Get coordinates of starting point + start_geom = pfs.iloc[i][geo_col] + start_lat = start_geom.y + start_lon = start_geom.x + start_time = pfs.iloc[i]['tracked_at'] + + # Slide window forward + while j < n: + # Check for temporal gap + time_gap = (pfs.iloc[j]['tracked_at'] - pfs.iloc[j-1]['tracked_at']).total_seconds() / 60 + if time_gap > gap_threshold: + break + + # Calculate distance from start point + curr_geom = pfs.iloc[j][geo_col] + curr_lat = curr_geom.y + curr_lon = curr_geom.x + + if distance_metric == 'haversine': + dist = haversine_dist(start_lat, start_lon, curr_lat, curr_lon) + else: + dist = start_geom.distance(curr_geom) - for group_key, group_data in groups: - if group_data.empty: - continue + # Check if moved beyond distance threshold + if dist > dist_threshold: + break - # Extract coordinates - coords = group_data[[traj_cols[coord_key1], traj_cols[coord_key2]]].to_numpy(dtype='float64') + j += 1 - # For haversine, convert to radians - if distance_metric == 'haversine': - # Convert epsilon from meters to radians (approximate) - # Earth radius in meters - epsilon_rad = epsilon / 6371000.0 - coords_for_clustering = np.radians(coords) + # Check if we have a valid staypoint (enough time spent) + time_spent = (pfs.iloc[j-1]['tracked_at'] - start_time).total_seconds() / 60 + + if time_spent >= time_threshold: + # Create staypoint + sp_pfs = pfs.iloc[i:j] + + # Calculate centroid + lats = [p.y for p in sp_pfs[geo_col]] + lons = [p.x for p in sp_pfs[geo_col]] + + if distance_metric == 'haversine': + # Use spherical mean for geographic coordinates + lats_rad = np.radians(lats) + lons_rad = np.radians(lons) + + x = np.cos(lats_rad) * np.cos(lons_rad) + y = np.cos(lats_rad) * np.sin(lons_rad) + z = np.sin(lats_rad) + + mean_x = np.mean(x) + mean_y = np.mean(y) + mean_z = np.mean(z) + + mean_lon = np.arctan2(mean_y, mean_x) + hyp = np.sqrt(mean_x**2 + mean_y**2) + mean_lat = np.arctan2(mean_z, hyp) + + center_lat = np.degrees(mean_lat) + center_lon = np.degrees(mean_lon) + else: + center_lat = np.mean(lats) + center_lon = np.mean(lons) + + sp_dict = { + 'user_id': user_id, + 'started_at': sp_pfs.iloc[0]['tracked_at'], + 'finished_at': sp_pfs.iloc[-1]['tracked_at'], + geo_col: Point(center_lon, center_lat), + 'pfs_id': list(sp_pfs.index) + } + + if elevation_flag and 'elevation' in sp_pfs.columns: + sp_dict['elevation'] = sp_pfs['elevation'].mean() + + staypoints.append(sp_dict) + + # Move to the point that broke the staypoint + i = j else: - epsilon_rad = epsilon - coords_for_clustering = coords - - # Run DBSCAN - clusterer = DBSCAN( - eps=epsilon_rad, - min_samples=num_samples, - metric=distance_metric, - algorithm='ball_tree' - ) - - labels = clusterer.fit_predict(coords_for_clustering) - - # Assign location IDs (offset by counter for multi-group) - group_loc_ids = labels.copy() - valid_mask = labels >= 0 - group_loc_ids[valid_mask] += location_id_counter - group_loc_ids[~valid_mask] = -1 - - # Update stops with location IDs - stops_out.loc[group_data.index, 'location_id'] = group_loc_ids - - # Create location entries for each cluster - unique_labels = labels[labels >= 0] - if len(unique_labels) > 0: - for label in np.unique(unique_labels): - cluster_mask = labels == label - cluster_coords = coords[cluster_mask] - cluster_indices = group_data.index[cluster_mask] - - # Calculate center - center_x, center_y = _get_location_center( - cluster_coords, - metric=distance_metric - ) - center_point = Point(center_x, center_y) - - # Calculate extent (convex hull + buffer) - cluster_points = [Point(x, y) for x, y in cluster_coords] - extent = _get_location_extent( - cluster_points, - epsilon, - crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None - ) - - location_entry = { - 'location_id': location_id_counter + label, - 'center': center_point, - 'extent': extent, - 'n_stops': cluster_mask.sum() - } - - # Add user_id if available - if user_col is not None and agg_level == 'user': - location_entry[user_col] = group_key - - location_list.append(location_entry) - - # Update counter for next group - location_id_counter += len(np.unique(unique_labels)) - - # Convert location_id to nullable integer (NaN for unclustered) - stops_out['location_id'] = stops_out['location_id'].replace(-1, pd.NA).astype('Int64') - - # Create locations GeoDataFrame - if location_list: - locations = gpd.GeoDataFrame( - location_list, - geometry='center', - crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None - ) - # Set extent as additional geometry column - locations['extent'] = gpd.GeoSeries( - [loc['extent'] for loc in location_list], - crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None - ) - else: - locations = gpd.GeoDataFrame( - columns=['location_id', 'center', 'extent', 'n_stops'], - geometry='center', - crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None - ) + # Not enough time spent, move to next point + i += 1 + + # Handle last staypoint if requested + if include_last and i < n: + sp_pfs = pfs.iloc[i:] + if len(sp_pfs) > 0: + lats = [p.y for p in sp_pfs[geo_col]] + lons = [p.x for p in sp_pfs[geo_col]] + + if distance_metric == 'haversine': + lats_rad = np.radians(lats) + lons_rad = np.radians(lons) + + x = np.cos(lats_rad) * np.cos(lons_rad) + y = np.cos(lats_rad) * np.sin(lons_rad) + z = np.sin(lats_rad) + + mean_x = np.mean(x) + mean_y = np.mean(y) + mean_z = np.mean(z) + + mean_lon = np.arctan2(mean_y, mean_x) + hyp = np.sqrt(mean_x**2 + mean_y**2) + mean_lat = np.arctan2(mean_z, hyp) + + center_lat = np.degrees(mean_lat) + center_lon = np.degrees(mean_lon) + else: + center_lat = np.mean(lats) + center_lon = np.mean(lons) - return stops_out, locations + sp_dict = { + 'user_id': user_id, + 'started_at': sp_pfs.iloc[0]['tracked_at'], + 'finished_at': sp_pfs.iloc[-1]['tracked_at'], + geo_col: Point(center_lon, center_lat), + 'pfs_id': list(sp_pfs.index) + } + if elevation_flag and 'elevation' in sp_pfs.columns: + sp_dict['elevation'] = sp_pfs['elevation'].mean() -def cluster_locations_per_user( - stops, - epsilon=100, - num_samples=1, - distance_metric='euclidean', - traj_cols=None, + staypoints.append(sp_dict) + + return pd.DataFrame(staypoints) + + +def applyParallel(groups, func, n_jobs=1, print_progress=False, **kwargs): + """ + Apply function to groups in parallel. + + Parameters + ---------- + groups : DataFrameGroupBy + Grouped dataframe + func : callable + Function to apply to each group + n_jobs : int + Number of parallel jobs + print_progress : bool + Whether to show progress bar **kwargs + Additional arguments to pass to func + + Returns + ------- + pd.DataFrame + Concatenated results + """ + if n_jobs == 1: + # Sequential processing + if print_progress: + results = [func(group, **kwargs) for group in tqdm(groups, desc="Processing users")] + else: + results = [func(group, **kwargs) for group in groups] + else: + # Parallel processing + group_list = list(groups) + if print_progress: + results = Parallel(n_jobs=n_jobs)( + delayed(func)(group, **kwargs) for group in tqdm(group_list, desc="Processing users") + ) + else: + results = Parallel(n_jobs=n_jobs)( + delayed(func)(group, **kwargs) for group in group_list + ) + + return pd.concat(results, ignore_index=True) if results else pd.DataFrame() + + +def generate_staypoints( + positionfixes, + method="sliding", + distance_metric="haversine", + dist_threshold=100, + time_threshold=5.0, + gap_threshold=15.0, + include_last=False, + print_progress=False, + exclude_duplicate_pfs=True, + n_jobs=1, ): """ - Convenience function to cluster locations per user. - - This is equivalent to calling cluster_locations_dbscan with agg_level='user'. + Generate staypoints from positionfixes. Parameters ---------- - stops : pd.DataFrame or gpd.GeoDataFrame - Stop data with user_id column - epsilon : float, default 100 - Maximum distance between stops in same location (meters) - num_samples : int, default 1 - Minimum stops required to form a location - distance_metric : str, default 'euclidean' - 'euclidean' or 'haversine' - traj_cols : dict, optional - Column name mappings - **kwargs - Additional arguments + positionfixes : pd.DataFrame or gpd.GeoDataFrame + Position fixes with columns: user_id, tracked_at, geometry + + method : {'sliding'} + Method to create staypoints. 'sliding' applies a sliding window over the data. + + distance_metric : {'haversine'} + The distance metric used by the applied method. + + dist_threshold : float, default 100 + The distance threshold for the 'sliding' method, i.e., how far someone has to travel to + generate a new staypoint. Units depend on the dist_func parameter. If 'distance_metric' is 'haversine' the + unit is in meters + + time_threshold : float, default 5.0 (minutes) + The time threshold for the 'sliding' method in minutes. + + gap_threshold : float, default 15.0 (minutes) + The time threshold of determine whether a gap exists between consecutive pfs. Consecutive pfs with + temporal gaps larger than 'gap_threshold' will be excluded from staypoints generation. + Only valid in 'sliding' method. + + include_last: boolean, default False + The algorithm in Li et al. (2008) only detects staypoint if the user steps out + of that staypoint. This will omit the last staypoint (if any). Set 'include_last' + to True to include this last staypoint. + + print_progress: boolean, default False + Show per-user progress if set to True. + + exclude_duplicate_pfs: boolean, default True + Filters duplicate positionfixes before generating staypoints. Duplicates can lead to problems in later + processing steps (e.g., when generating triplegs). It is not recommended to set this to False. + + n_jobs: int, default 1 + The maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel + computing code is used at all, which is useful for debugging. See + https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation + for a detailed description Returns ------- - tuple of (pd.DataFrame, gpd.GeoDataFrame) - Stops with location_id and locations table + pfs: pd.DataFrame or gpd.GeoDataFrame + The original positionfixes with a new column ``[`staypoint_id`]``. + + sp: gpd.GeoDataFrame + The generated staypoints. + + Notes + ----- + The 'sliding' method is adapted from Li et al. (2008). In the original algorithm, the 'finished_at' + time for the current staypoint lasts until the 'tracked_at' time of the first positionfix outside + this staypoint. Users are assumed to be stationary during this missing period and potential tracking + gaps may be included in staypoints. To avoid including too large missing signal gaps, set 'gap_threshold' + to a small value, e.g., 15 min. - Raises - ------ - ValueError - If user_id column is not found + Examples + -------- + >>> pfs, sp = generate_staypoints(pfs, method='sliding', dist_threshold=100) + + References + ---------- + Zheng, Y. (2015). Trajectory data mining: an overview. ACM Transactions on Intelligent Systems + and Technology (TIST), 6(3), 29. + + Li, Q., Zheng, Y., Xie, X., Chen, Y., Liu, W., & Ma, W. Y. (2008, November). Mining user + similarity based on location history. In Proceedings of the 16th ACM SIGSPATIAL international + conference on Advances in geographic information systems (p. 34). ACM. """ - traj_cols_temp = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) - if 'user_id' not in traj_cols_temp or traj_cols_temp['user_id'] not in stops.columns: - raise ValueError( - "cluster_locations_per_user requires a 'user_id' column " - "specified in traj_cols or kwargs" - ) - - return cluster_locations_dbscan( - stops, - epsilon=epsilon, - num_samples=num_samples, - distance_metric=distance_metric, - agg_level='user', - traj_cols=traj_cols, - **kwargs - ) + # Validate required columns + required_cols = ['user_id', 'tracked_at'] + for col in required_cols: + if col not in positionfixes.columns: + raise ValueError(f"Missing required column: {col}") + + if not isinstance(positionfixes, gpd.GeoDataFrame): + raise TypeError("positionfixes must be a GeoDataFrame with geometry column") + + # Copy the original pfs for adding 'staypoint_id' column + pfs = positionfixes.copy() + + if exclude_duplicate_pfs: + len_org = pfs.shape[0] + pfs = pfs.drop_duplicates() + nb_dropped = len_org - pfs.shape[0] + if nb_dropped > 0: + warn_str = ( + f"{nb_dropped} duplicates were dropped from your positionfixes. Dropping duplicates is" + + " recommended but can be prevented using the 'exclude_duplicate_pfs' flag." + ) + warnings.warn(warn_str) + + # If the positionfixes already have a column "staypoint_id", we drop it + if "staypoint_id" in pfs.columns: + pfs.drop(columns="staypoint_id", inplace=True) + + elevation_flag = "elevation" in pfs.columns # if there is elevation data + + geo_col = pfs.geometry.name + if elevation_flag: + sp_column = ["user_id", "started_at", "finished_at", "elevation", geo_col] + else: + sp_column = ["user_id", "started_at", "finished_at", geo_col] + + if method == "sliding": + # Algorithm from Li et al. (2008). For details, please refer to the paper. + sp = applyParallel( + pfs.groupby("user_id", as_index=False), + _generate_staypoints_sliding_user, + n_jobs=n_jobs, + print_progress=print_progress, + geo_col=geo_col, + elevation_flag=elevation_flag, + dist_threshold=dist_threshold, + time_threshold=time_threshold, + gap_threshold=gap_threshold, + distance_metric=distance_metric, + include_last=include_last, + ).reset_index(drop=True) + + # Index management + sp["staypoint_id"] = sp.index + sp.index.name = "id" + + if "pfs_id" not in sp.columns: + sp["pfs_id"] = None + pfs = _explode_agg("pfs_id", "staypoint_id", pfs, sp) + else: + raise ValueError(f"Unknown method: {method}. Only 'sliding' is supported.") + + sp = gpd.GeoDataFrame(sp, columns=sp_column, geometry=geo_col, crs=pfs.crs) + + # dtype consistency + # sp id (generated by this function) should be int64 + sp.index = sp.index.astype("int64") + # pfs['staypoint_id'] should be Int64 (missing values) + pfs["staypoint_id"] = pfs["staypoint_id"].astype("Int64") + + # user_id of sp should be the same as pfs + sp["user_id"] = sp["user_id"].astype(pfs["user_id"].dtype) + if len(sp) == 0: + warnings.warn("No staypoints can be generated, returning empty sp.") + return pfs, sp -# Alias for convenience -generate_locations = cluster_locations_dbscan + return pfs, sp diff --git a/nomad/tests/test_stop_detection.py b/nomad/tests/test_stop_detection.py index 8745ab72..4b7b5114 100644 --- a/nomad/tests/test_stop_detection.py +++ b/nomad/tests/test_stop_detection.py @@ -404,239 +404,325 @@ def test_empty_dataframe_consistency(): #### (SLIDING.PY) #### ########################################## +########################################## +#### SLIDING WINDOW TESTS #### +########################################## + @pytest.fixture -def stops_for_clustering(): - """Create sample stops for location clustering tests""" - # Create stops at two distinct locations - # Location 1: stops at (0, 0) with small variations - # Location 2: stops at (1000, 1000) with small variations - stops_data = { - 'x': [0, 5, 10, 1000, 1005, 1010, 2000], # Third location with single stop - 'y': [0, 5, 10, 1000, 1005, 1010, 2000], - 'timestamp': [ - 1704067200, # 2024-01-01 00:00:00 - 1704070800, # 2024-01-01 01:00:00 - 1704074400, # 2024-01-01 02:00:00 - 1704078000, # 2024-01-01 03:00:00 - 1704081600, # 2024-01-01 04:00:00 - 1704085200, # 2024-01-01 05:00:00 - 1704088800, # 2024-01-01 06:00:00 - ], - 'duration': [10, 15, 20, 10, 15, 20, 10], # minutes - 'user_id': ['user1', 'user1', 'user1', 'user1', 'user1', 'user1', 'user1'] - } - return pd.DataFrame(stops_data) +def position_fixes_simple(): + """Simple position fixes for testing sliding window algorithm.""" + times = pd.date_range("2025-01-01 08:00", periods=10, freq="1min").tolist() + + # Create position fixes: + # Points 0-5: stationary (should form staypoint if time_threshold <= 5 min) + # Points 6-9: moved away + coords = [(0.0, 0.0)] * 6 + [(0.002, 0.002)] * 4 + + df = gpd.GeoDataFrame({ + "user_id": "user1", + "tracked_at": times, + "geometry": [Point(lon, lat) for lon, lat in coords] + }, crs="EPSG:4326") + + return df @pytest.fixture -def multi_user_stops(): - """Create sample stops for multiple users""" - stops_data = { - 'x': [0, 5, 10, 0, 5, 10], - 'y': [0, 5, 10, 0, 5, 10], - 'timestamp': [1704067200 + i*3600 for i in range(6)], - 'duration': [10, 15, 20, 10, 15, 20], - 'user_id': ['user1', 'user1', 'user1', 'user2', 'user2', 'user2'] - } - return pd.DataFrame(stops_data) +def position_fixes_with_gap(): + """Position fixes with temporal gap for testing gap_threshold.""" + times = pd.date_range("2025-01-01 08:00", periods=5, freq="1min").tolist() + # Add a large gap + times += [times[-1] + pd.Timedelta(minutes=20)] + times += pd.date_range(times[-1] + pd.Timedelta(minutes=1), periods=4, freq="1min").tolist() + + # All points at same location + coords = [(0.0, 0.0)] * len(times) + + df = gpd.GeoDataFrame({ + "user_id": "user1", + "tracked_at": times, + "geometry": [Point(lon, lat) for lon, lat in coords] + }, crs="EPSG:4326") + return df -def test_cluster_locations_basic(stops_for_clustering): - """Test basic location clustering with default parameters""" - traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_dbscan( - stops_for_clustering, - epsilon=50, - num_samples=1, - distance_metric='euclidean', - agg_level='dataset', - traj_cols=traj_cols + +@pytest.fixture +def position_fixes_multi_user(): + """Position fixes for multiple users.""" + times = pd.date_range("2025-01-01 08:00", periods=8, freq="1min").tolist() + + user1_coords = [(0.0, 0.0)] * 4 + [(0.002, 0.002)] * 4 + user2_coords = [(0.001, 0.001)] * 4 + [(0.003, 0.003)] * 4 + + df = gpd.GeoDataFrame({ + "user_id": ["user1"] * 8 + ["user2"] * 8, + "tracked_at": times * 2, + "geometry": [Point(lon, lat) for lon, lat in user1_coords + user2_coords] + }, crs="EPSG:4326") + + return df + + +def test_sliding_basic_staypoint_detection(position_fixes_simple): + """Test basic staypoint detection with sliding window.""" + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + method="sliding", + dist_threshold=100, + time_threshold=3.0, + gap_threshold=15.0, + include_last=False, + exclude_duplicate_pfs=True, + n_jobs=1 + ) + + # Should detect at least one staypoint from first 6 points + assert len(sp) >= 1 + assert 'staypoint_id' in pfs.columns + assert isinstance(sp, gpd.GeoDataFrame) + + # Check required columns in staypoints + assert 'user_id' in sp.columns + assert 'started_at' in sp.columns + assert 'finished_at' in sp.columns + assert sp.geometry.name in sp.columns + + +def test_sliding_time_threshold(position_fixes_simple): + """Test that time_threshold is respected.""" + # With time_threshold=10, the first 6 points (5 min duration) should not form a staypoint + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=10.0, + gap_threshold=15.0, + include_last=False, + n_jobs=1 ) - - # Should create 3 locations (2 clusters + 1 single stop at 2000,2000) - assert len(locations) == 3 - - # All stops should have location_id assigned - assert 'location_id' in stops_labeled.columns - - # Check that first 3 stops belong to same location - loc_ids = stops_labeled['location_id'].values - assert loc_ids[0] == loc_ids[1] == loc_ids[2] - - # Check that stops 3,4,5 belong to same location (different from first) - assert loc_ids[3] == loc_ids[4] == loc_ids[5] - assert loc_ids[3] != loc_ids[0] - - # Check locations have proper columns - assert 'center' in locations.columns - assert 'extent' in locations.columns - assert 'n_stops' in locations.columns + # Should not detect any staypoints since max duration < 10 min + assert len(sp) == 0 -def test_cluster_locations_per_user(multi_user_stops): - """Test per-user location clustering""" - traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_dbscan( - multi_user_stops, - epsilon=50, - num_samples=1, - distance_metric='euclidean', - agg_level='user', - traj_cols=traj_cols + +def test_sliding_distance_threshold(position_fixes_simple): + """Test that distance_threshold is respected.""" + # With very small distance threshold, points should not cluster + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=1, # 1 meter - too small + time_threshold=3.0, + gap_threshold=15.0, + include_last=False, + n_jobs=1 ) - - # Should create 2 locations (one per user, both at same spatial location) - assert len(locations) == 2 - - # Check that each user has their own location - assert 'user_id' in locations.columns - user1_locs = locations[locations['user_id'] == 'user1'] - user2_locs = locations[locations['user_id'] == 'user2'] - assert len(user1_locs) == 1 - assert len(user2_locs) == 1 - - # Check that user1 stops have different location_id than user2 stops - user1_stops = stops_labeled[stops_labeled['user_id'] == 'user1'] - user2_stops = stops_labeled[stops_labeled['user_id'] == 'user2'] - assert user1_stops['location_id'].iloc[0] != user2_stops['location_id'].iloc[0] + # Should detect very few or no staypoints due to tight distance constraint + assert len(sp) <= 1 -def test_cluster_locations_dataset_level(multi_user_stops): - """Test dataset-level location clustering (shared across users)""" - traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_dbscan( - multi_user_stops, - epsilon=50, - num_samples=1, - distance_metric='euclidean', - agg_level='dataset', - traj_cols=traj_cols + +def test_sliding_gap_threshold(position_fixes_with_gap): + """Test that gap_threshold prevents clustering across large temporal gaps.""" + # With gap_threshold=15, the 20-minute gap should prevent clustering + pfs, sp = SLIDING.generate_staypoints( + position_fixes_with_gap, + dist_threshold=100, + time_threshold=3.0, + gap_threshold=15.0, + include_last=False, + n_jobs=1 ) - - # Should create 1 shared location - assert len(locations) == 1 - - # All stops should have same location_id - location_ids = stops_labeled['location_id'].dropna().unique() - assert len(location_ids) == 1 + # Should detect at most 2 separate staypoints (before and after gap) + assert len(sp) <= 2 -def test_cluster_locations_empty_input(): - """Test handling of empty DataFrame""" - empty_df = pd.DataFrame(columns=['x', 'y', 'timestamp', 'user_id']) - traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_dbscan( - empty_df, - epsilon=50, - num_samples=1, - traj_cols=traj_cols + +def test_sliding_include_last(position_fixes_simple): + """Test include_last parameter.""" + # Without include_last + pfs1, sp1 = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=3.0, + include_last=False, + n_jobs=1 ) - - assert len(stops_labeled) == 0 - assert len(locations) == 0 - assert 'location_id' in stops_labeled.columns - - -def test_cluster_locations_geographic_coords(): - """Test clustering with lat/lon coordinates using haversine metric""" - # Create stops near SF (37.7749° N, 122.4194° W) - stops_data = { - 'longitude': [-122.4194, -122.4195, -122.4196, -122.5000], # ~100m apart, then far - 'latitude': [37.7749, 37.7750, 37.7751, 37.8000], - 'timestamp': [1704067200 + i*3600 for i in range(4)], - 'duration': [10, 15, 20, 10], - 'user_id': ['user1'] * 4 - } - stops_df = pd.DataFrame(stops_data) - - traj_cols = {'longitude': 'longitude', 'latitude': 'latitude', - 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_dbscan( - stops_df, - epsilon=200, # 200 meters - num_samples=2, + + # With include_last + pfs2, sp2 = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=3.0, + include_last=True, + n_jobs=1 + ) + + # include_last=True should potentially detect more staypoints + assert len(sp2) >= len(sp1) + + +def test_sliding_multi_user(position_fixes_multi_user): + """Test sliding window with multiple users.""" + pfs, sp = SLIDING.generate_staypoints( + position_fixes_multi_user, + dist_threshold=100, + time_threshold=2.0, + gap_threshold=15.0, + n_jobs=1 + ) + + # Should detect staypoints for both users + assert 'user_id' in sp.columns + unique_users = sp['user_id'].unique() + assert len(unique_users) >= 1 # At least one user should have staypoints + + +def test_sliding_duplicate_removal(position_fixes_simple): + """Test that duplicates are handled properly.""" + # Add duplicate rows + pfs_with_dupes = pd.concat([position_fixes_simple, position_fixes_simple.iloc[:2]]) + + with pytest.warns(UserWarning, match="duplicates were dropped"): + pfs, sp = SLIDING.generate_staypoints( + pfs_with_dupes, + dist_threshold=100, + time_threshold=3.0, + exclude_duplicate_pfs=True, + n_jobs=1 + ) + + # Should still work after removing duplicates + assert len(pfs) == len(position_fixes_simple) + + +def test_sliding_empty_dataframe(): + """Test sliding window with empty dataframe.""" + empty_pfs = gpd.GeoDataFrame({ + "user_id": [], + "tracked_at": [], + "geometry": [] + }, crs="EPSG:4326") + + with pytest.warns(UserWarning, match="No staypoints can be generated"): + pfs, sp = SLIDING.generate_staypoints( + empty_pfs, + dist_threshold=100, + time_threshold=5.0, + n_jobs=1 + ) + + assert len(sp) == 0 + assert 'staypoint_id' in pfs.columns + + +def test_sliding_staypoint_id_mapping(position_fixes_simple): + """Test that staypoint_id is properly mapped back to position fixes.""" + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=3.0, + gap_threshold=15.0, + n_jobs=1 + ) + + # Check that staypoint_id in pfs matches sp index + assigned_ids = pfs['staypoint_id'].dropna().unique() + for sp_id in assigned_ids: + assert sp_id in sp.index + + +def test_sliding_haversine_metric(position_fixes_simple): + """Test using haversine distance metric.""" + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, distance_metric='haversine', - agg_level='dataset', - traj_cols=traj_cols + dist_threshold=100, + time_threshold=3.0, + n_jobs=1 ) - - # Should cluster first 3 stops, last one is noise - assert len(locations) == 1 - assert stops_labeled.loc[0, 'location_id'] == stops_labeled.loc[1, 'location_id'] - assert pd.isna(stops_labeled.loc[3, 'location_id']) - - -def test_cluster_locations_per_user_convenience(): - """Test the cluster_locations_per_user convenience function""" - stops_data = { - 'x': [0, 5, 10, 0, 5, 10], - 'y': [0, 5, 10, 0, 5, 10], - 'timestamp': [1704067200 + i*3600 for i in range(6)], - 'duration': [10, 15, 20, 10, 15, 20], - 'user_id': ['user1', 'user1', 'user1', 'user2', 'user2', 'user2'] - } - stops_df = pd.DataFrame(stops_data) - - traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_per_user( - stops_df, - epsilon=50, - num_samples=1, - traj_cols=traj_cols + + # Should work with haversine metric + assert isinstance(sp, gpd.GeoDataFrame) + + +def test_sliding_parallel_processing(position_fixes_multi_user): + """Test parallel processing with n_jobs=-1.""" + pfs_seq, sp_seq = SLIDING.generate_staypoints( + position_fixes_multi_user, + dist_threshold=100, + time_threshold=2.0, + n_jobs=1 ) - - # Should behave same as agg_level='user' - assert len(locations) == 2 - assert 'user_id' in locations.columns + pfs_par, sp_par = SLIDING.generate_staypoints( + position_fixes_multi_user, + dist_threshold=100, + time_threshold=2.0, + n_jobs=-1 + ) -def test_location_center_calculation(stops_for_clustering): - """Test that location centers are calculated correctly""" - traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_dbscan( - stops_for_clustering, - epsilon=50, - num_samples=2, - distance_metric='euclidean', - agg_level='dataset', - traj_cols=traj_cols + # Results should be the same + assert len(sp_seq) == len(sp_par) + + +def test_sliding_invalid_input(): + """Test that invalid inputs raise appropriate errors.""" + # Test missing required columns + invalid_df = pd.DataFrame({ + "tracked_at": pd.date_range("2025-01-01", periods=5, freq="1H") + }) + + with pytest.raises(ValueError, match="Missing required column"): + SLIDING.generate_staypoints(invalid_df, dist_threshold=100, time_threshold=5.0) + + # Test non-GeoDataFrame input + non_geo_df = pd.DataFrame({ + "user_id": ["user1"] * 5, + "tracked_at": pd.date_range("2025-01-01", periods=5, freq="1H") + }) + + with pytest.raises(TypeError, match="must be a GeoDataFrame"): + SLIDING.generate_staypoints(non_geo_df, dist_threshold=100, time_threshold=5.0) + + +def test_sliding_output_dtypes(position_fixes_simple): + """Test that output has correct data types.""" + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=3.0, + n_jobs=1 ) - - # First location should be centered around (5, 5) - mean of [0,5,10] - # Second location should be centered around (1005, 1005) - centers = [(loc.center.x, loc.center.y) for _, loc in locations.iterrows()] - - # Check first location center is near (5, 5) - assert any(abs(cx - 5) < 10 and abs(cy - 5) < 10 for cx, cy in centers) - - # Check second location center is near (1005, 1005) - assert any(abs(cx - 1005) < 10 and abs(cy - 1005) < 10 for cx, cy in centers) + # Check sp index dtype + assert sp.index.dtype == 'int64' -def test_location_extent_calculation(stops_for_clustering): - """Test that location extents (convex hull + buffer) are created""" - traj_cols = {'x': 'x', 'y': 'y', 'timestamp': 'timestamp', 'user_id': 'user_id'} - - stops_labeled, locations = SLIDING.cluster_locations_dbscan( - stops_for_clustering, - epsilon=50, - num_samples=1, - distance_metric='euclidean', - agg_level='dataset', - traj_cols=traj_cols + # Check staypoint_id dtype in pfs (should be nullable Int64) + assert pfs['staypoint_id'].dtype == 'Int64' + + # Check user_id dtype consistency + assert sp['user_id'].dtype == pfs['user_id'].dtype + + +def test_sliding_elevation_support(): + """Test that elevation data is preserved if present.""" + times = pd.date_range("2025-01-01 08:00", periods=6, freq="1min").tolist() + coords = [(0.0, 0.0)] * 6 + + pfs_with_elev = gpd.GeoDataFrame({ + "user_id": "user1", + "tracked_at": times, + "elevation": [100.0, 101.0, 102.0, 103.0, 104.0, 105.0], + "geometry": [Point(lon, lat) for lon, lat in coords] + }, crs="EPSG:4326") + + pfs, sp = SLIDING.generate_staypoints( + pfs_with_elev, + dist_threshold=100, + time_threshold=3.0, + n_jobs=1 ) - - # All locations should have extent geometries - assert all(locations['extent'].notna()) - - # Extents should be polygons (or points buffered to polygons) - from shapely.geometry import Polygon, Point - assert all(isinstance(ext, Polygon) or isinstance(ext, Point) - for ext in locations['extent']) + + # Elevation should be in staypoints if input had it + if len(sp) > 0: + assert 'elevation' in sp.columns diff --git a/nomad/visit_attribution/visit_attribution.py b/nomad/visit_attribution/visit_attribution.py index 26c1ba5a..d98d31bf 100644 --- a/nomad/visit_attribution/visit_attribution.py +++ b/nomad/visit_attribution/visit_attribution.py @@ -3,9 +3,11 @@ import nomad.constants as constants import warnings import pandas as pd -import nomad.io.base as loader import pyproj import pdb +import numpy as np +from sklearn.cluster import DBSCAN +from shapely.geometry import Point, MultiPoint # TO DO: change to stops_to_poi def point_in_polygon(data, poi_table, method='centroid', data_crs=None, max_distance=0, @@ -436,6 +438,332 @@ def night_stops(stop_table, user='user', dawn_hour = 6, dusk_hour = 19, min_dwel dates = (start_date, end_date) df_clipped = clip_stays_date(stop_table, dates, dawn_hour, dusk_hour) df_clipped = df_clipped[(df_clipped['duration'] > 0) & (df_clipped['duration_night'] >= 15)] - + return df_clipped.groupby(['id', 'location'], group_keys=False).apply(count_nights(dawn_hour, dusk_hour, min_dwell)).reset_index(drop=True) + +def _get_location_center(coords, metric='euclidean'): + """ + Calculate the center of a location cluster. + + Parameters + ---------- + coords : numpy.ndarray + Array of coordinates (n_points, 2) + metric : str + 'euclidean' for projected coordinates, 'haversine' for lat/lon + + Returns + ------- + tuple + (x, y) coordinates of the center + """ + if metric == 'haversine': + # For geographic coordinates, use angle-based mean + # Convert to radians for calculation + coords_rad = np.radians(coords) + x = coords_rad[:, 0] + y = coords_rad[:, 1] + + # Convert to 3D Cartesian coordinates + cos_y = np.cos(y) + cart_x = cos_y * np.cos(x) + cart_y = cos_y * np.sin(x) + cart_z = np.sin(y) + + # Average and convert back + avg_x = np.mean(cart_x) + avg_y = np.mean(cart_y) + avg_z = np.mean(cart_z) + + lon = np.arctan2(avg_y, avg_x) + hyp = np.sqrt(avg_x**2 + avg_y**2) + lat = np.arctan2(avg_z, hyp) + + return np.degrees(lon), np.degrees(lat) + else: + # For projected coordinates, use simple mean + return np.mean(coords[:, 0]), np.mean(coords[:, 1]) + + +def _get_location_extent(points, epsilon, crs=None): + """ + Calculate the spatial extent of a location. + + Parameters + ---------- + points : list of shapely Points + Points in the location cluster + epsilon : float + Buffer distance in meters (or degrees for unprojected) + crs : str, optional + Coordinate reference system + + Returns + ------- + shapely.Polygon + Convex hull buffered by epsilon + """ + if len(points) == 1: + return points[0].buffer(epsilon) + + multipoint = MultiPoint(points) + convex_hull = multipoint.convex_hull + return convex_hull.buffer(epsilon) + + +def cluster_locations_dbscan( + stops, + epsilon=100, + num_samples=1, + distance_metric='euclidean', + agg_level='user', + traj_cols=None, + **kwargs +): + """ + Cluster stops into locations using DBSCAN. + + Parameters + ---------- + stops : pd.DataFrame or gpd.GeoDataFrame + Stop/staypoint data with spatial columns + epsilon : float, default 100 + Maximum distance between stops in the same location (meters for haversine/euclidean) + num_samples : int, default 1 + Minimum number of stops required to form a location + distance_metric : str, default 'euclidean' + Distance metric: 'euclidean' for projected coords, 'haversine' for lat/lon + agg_level : str, default 'user' + 'user' = separate locations per user, 'dataset' = shared locations across users + traj_cols : dict, optional + Column name mappings for coordinates and user_id + **kwargs + Additional arguments passed to column detection + + Returns + ------- + tuple of (pd.DataFrame, gpd.GeoDataFrame) + - stops with added 'location_id' column (NaN for unclustered stops) + - locations GeoDataFrame with cluster centers and extents + """ + if not isinstance(stops, (pd.DataFrame, gpd.GeoDataFrame)): + raise TypeError("Input 'stops' must be a pandas DataFrame or GeoDataFrame") + + if stops.empty: + # Return empty results with proper schema + stops_out = stops.copy() + stops_out['location_id'] = pd.Series(dtype='Int64') + locations = gpd.GeoDataFrame( + columns=['center', 'extent'], + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + return stops_out, locations + + # Parse column names + traj_cols = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) + loader._has_spatial_cols(stops.columns, traj_cols) + + # Determine coordinate columns + if 'longitude' in traj_cols and traj_cols['longitude'] in stops.columns: + coord_key1, coord_key2 = 'longitude', 'latitude' + use_lon_lat = True + elif 'x' in traj_cols and traj_cols['x'] in stops.columns: + coord_key1, coord_key2 = 'x', 'y' + use_lon_lat = False + else: + raise ValueError("Could not find spatial columns in stops data") + + # Override distance metric based on coordinate type if not specified + if distance_metric == 'euclidean' and use_lon_lat: + warnings.warn( + "Using haversine metric for lat/lon coordinates instead of euclidean", + UserWarning + ) + distance_metric = 'haversine' + + # Get user_id column if present + user_col = None + if 'user_id' in traj_cols and traj_cols['user_id'] in stops.columns: + user_col = traj_cols['user_id'] + + # Check aggregation level + if agg_level == 'user' and user_col is None: + warnings.warn( + "agg_level='user' requires user_id column; falling back to 'dataset'", + UserWarning + ) + agg_level = 'dataset' + + stops_out = stops.copy() + stops_out['location_id'] = pd.Series(dtype='Int64') + + location_list = [] + location_id_counter = 0 + + # Group by user if needed + if agg_level == 'user': + groups = stops_out.groupby(user_col, sort=False) + else: + groups = [(None, stops_out)] + + for group_key, group_data in groups: + if group_data.empty: + continue + + # Extract coordinates + coords = group_data[[traj_cols[coord_key1], traj_cols[coord_key2]]].to_numpy(dtype='float64') + + # For haversine, convert to radians + if distance_metric == 'haversine': + # Convert epsilon from meters to radians (approximate) + # Earth radius in meters + epsilon_rad = epsilon / 6371000.0 + coords_for_clustering = np.radians(coords) + else: + epsilon_rad = epsilon + coords_for_clustering = coords + + # Run DBSCAN + clusterer = DBSCAN( + eps=epsilon_rad, + min_samples=num_samples, + metric=distance_metric, + algorithm='ball_tree' + ) + + labels = clusterer.fit_predict(coords_for_clustering) + + # Assign location IDs (offset by counter for multi-group) + group_loc_ids = labels.copy() + valid_mask = labels >= 0 + group_loc_ids[valid_mask] += location_id_counter + group_loc_ids[~valid_mask] = -1 + + # Update stops with location IDs + stops_out.loc[group_data.index, 'location_id'] = group_loc_ids + + # Create location entries for each cluster + unique_labels = labels[labels >= 0] + if len(unique_labels) > 0: + for label in np.unique(unique_labels): + cluster_mask = labels == label + cluster_coords = coords[cluster_mask] + cluster_indices = group_data.index[cluster_mask] + + # Calculate center + center_x, center_y = _get_location_center( + cluster_coords, + metric=distance_metric + ) + center_point = Point(center_x, center_y) + + # Calculate extent (convex hull + buffer) + cluster_points = [Point(x, y) for x, y in cluster_coords] + extent = _get_location_extent( + cluster_points, + epsilon, + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + + location_entry = { + 'location_id': location_id_counter + label, + 'center': center_point, + 'extent': extent, + 'n_stops': cluster_mask.sum() + } + + # Add user_id if available + if user_col is not None and agg_level == 'user': + location_entry[user_col] = group_key + + location_list.append(location_entry) + + # Update counter for next group + location_id_counter += len(np.unique(unique_labels)) + + # Convert location_id to nullable integer (NaN for unclustered) + stops_out['location_id'] = stops_out['location_id'].replace(-1, pd.NA).astype('Int64') + + # Create locations GeoDataFrame + if location_list: + locations = gpd.GeoDataFrame( + location_list, + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + # Set extent as additional geometry column + locations['extent'] = gpd.GeoSeries( + [loc['extent'] for loc in location_list], + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + else: + locations = gpd.GeoDataFrame( + columns=['location_id', 'center', 'extent', 'n_stops'], + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + + return stops_out, locations + + +def cluster_locations_per_user( + stops, + epsilon=100, + num_samples=1, + distance_metric='euclidean', + traj_cols=None, + **kwargs +): + """ + Convenience function to cluster locations per user. + + This is equivalent to calling cluster_locations_dbscan with agg_level='user'. + + Parameters + ---------- + stops : pd.DataFrame or gpd.GeoDataFrame + Stop data with user_id column + epsilon : float, default 100 + Maximum distance between stops in same location (meters) + num_samples : int, default 1 + Minimum stops required to form a location + distance_metric : str, default 'euclidean' + 'euclidean' or 'haversine' + traj_cols : dict, optional + Column name mappings + **kwargs + Additional arguments + + Returns + ------- + tuple of (pd.DataFrame, gpd.GeoDataFrame) + Stops with location_id and locations table + + Raises + ------ + ValueError + If user_id column is not found + """ + traj_cols_temp = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) + if 'user_id' not in traj_cols_temp or traj_cols_temp['user_id'] not in stops.columns: + raise ValueError( + "cluster_locations_per_user requires a 'user_id' column " + "specified in traj_cols or kwargs" + ) + + return cluster_locations_dbscan( + stops, + epsilon=epsilon, + num_samples=num_samples, + distance_metric=distance_metric, + agg_level='user', + traj_cols=traj_cols, + **kwargs + ) + + +# Alias for convenience +generate_locations = cluster_locations_dbscan + From c981796b7b5adee4ec130e419f2d043e6afd7801 Mon Sep 17 00:00:00 2001 From: Caroline Chen Date: Sun, 14 Dec 2025 21:56:40 -0500 Subject: [PATCH 4/5] Test edits --- nomad/stop_detection/sliding.py | 11 ++- nomad/tests/test_stop_detection.py | 149 ++--------------------------- 2 files changed, 14 insertions(+), 146 deletions(-) diff --git a/nomad/stop_detection/sliding.py b/nomad/stop_detection/sliding.py index b7f80bab..08fc6350 100644 --- a/nomad/stop_detection/sliding.py +++ b/nomad/stop_detection/sliding.py @@ -83,7 +83,7 @@ def _generate_staypoints_sliding_user( include_last=False ): """ - Generate staypoints for a single user using sliding window algorithm. + Generate staypoints for a single user using sliding stop detection. Parameters ---------- @@ -114,8 +114,9 @@ def _generate_staypoints_sliding_user( if len(pfs) == 0: return pd.DataFrame() - # Sort by time - pfs = pfs.sort_values('tracked_at').reset_index(drop=True) + # Sort by time but preserve original index + pfs = pfs.sort_values('tracked_at').reset_index(drop=False) + original_index_col = pfs.columns[0] # The original index is now the first column staypoints = [] pfs_ids = [] @@ -194,7 +195,7 @@ def _generate_staypoints_sliding_user( 'started_at': sp_pfs.iloc[0]['tracked_at'], 'finished_at': sp_pfs.iloc[-1]['tracked_at'], geo_col: Point(center_lon, center_lat), - 'pfs_id': list(sp_pfs.index) + 'pfs_id': list(sp_pfs[original_index_col]) # Use original index } if elevation_flag and 'elevation' in sp_pfs.columns: @@ -242,7 +243,7 @@ def _generate_staypoints_sliding_user( 'started_at': sp_pfs.iloc[0]['tracked_at'], 'finished_at': sp_pfs.iloc[-1]['tracked_at'], geo_col: Point(center_lon, center_lat), - 'pfs_id': list(sp_pfs.index) + 'pfs_id': list(sp_pfs[original_index_col]) # Use original index } if elevation_flag and 'elevation' in sp_pfs.columns: diff --git a/nomad/tests/test_stop_detection.py b/nomad/tests/test_stop_detection.py index 4b7b5114..de0accdc 100644 --- a/nomad/tests/test_stop_detection.py +++ b/nomad/tests/test_stop_detection.py @@ -8,6 +8,7 @@ from collections import defaultdict import pytest from pathlib import Path +from shapely.geometry import Point import nomad.io.base as loader from nomad import constants from nomad import filters @@ -404,10 +405,6 @@ def test_empty_dataframe_consistency(): #### (SLIDING.PY) #### ########################################## -########################################## -#### SLIDING WINDOW TESTS #### -########################################## - @pytest.fixture def position_fixes_simple(): """Simple position fixes for testing sliding window algorithm.""" @@ -465,7 +462,6 @@ def position_fixes_multi_user(): def test_sliding_basic_staypoint_detection(position_fixes_simple): - """Test basic staypoint detection with sliding window.""" pfs, sp = SLIDING.generate_staypoints( position_fixes_simple, method="sliding", @@ -507,18 +503,21 @@ def test_sliding_time_threshold(position_fixes_simple): def test_sliding_distance_threshold(position_fixes_simple): """Test that distance_threshold is respected.""" - # With very small distance threshold, points should not cluster + # With very small distance threshold, points should not cluster together + # The test data has 6 points at (0,0) and 4 points at (0.002, 0.002) + # With dist_threshold=1m, these should form separate staypoints pfs, sp = SLIDING.generate_staypoints( position_fixes_simple, - dist_threshold=1, # 1 meter - too small + dist_threshold=1, # 1 meter - very tight time_threshold=3.0, gap_threshold=15.0, include_last=False, n_jobs=1 ) - # Should detect very few or no staypoints due to tight distance constraint - assert len(sp) <= 1 + # With tight distance constraint, should detect 2 separate staypoints + # (one at each distinct location) + assert len(sp) == 2 def test_sliding_gap_threshold(position_fixes_with_gap): @@ -576,25 +575,6 @@ def test_sliding_multi_user(position_fixes_multi_user): unique_users = sp['user_id'].unique() assert len(unique_users) >= 1 # At least one user should have staypoints - -def test_sliding_duplicate_removal(position_fixes_simple): - """Test that duplicates are handled properly.""" - # Add duplicate rows - pfs_with_dupes = pd.concat([position_fixes_simple, position_fixes_simple.iloc[:2]]) - - with pytest.warns(UserWarning, match="duplicates were dropped"): - pfs, sp = SLIDING.generate_staypoints( - pfs_with_dupes, - dist_threshold=100, - time_threshold=3.0, - exclude_duplicate_pfs=True, - n_jobs=1 - ) - - # Should still work after removing duplicates - assert len(pfs) == len(position_fixes_simple) - - def test_sliding_empty_dataframe(): """Test sliding window with empty dataframe.""" empty_pfs = gpd.GeoDataFrame({ @@ -613,116 +593,3 @@ def test_sliding_empty_dataframe(): assert len(sp) == 0 assert 'staypoint_id' in pfs.columns - - -def test_sliding_staypoint_id_mapping(position_fixes_simple): - """Test that staypoint_id is properly mapped back to position fixes.""" - pfs, sp = SLIDING.generate_staypoints( - position_fixes_simple, - dist_threshold=100, - time_threshold=3.0, - gap_threshold=15.0, - n_jobs=1 - ) - - # Check that staypoint_id in pfs matches sp index - assigned_ids = pfs['staypoint_id'].dropna().unique() - for sp_id in assigned_ids: - assert sp_id in sp.index - - -def test_sliding_haversine_metric(position_fixes_simple): - """Test using haversine distance metric.""" - pfs, sp = SLIDING.generate_staypoints( - position_fixes_simple, - distance_metric='haversine', - dist_threshold=100, - time_threshold=3.0, - n_jobs=1 - ) - - # Should work with haversine metric - assert isinstance(sp, gpd.GeoDataFrame) - - -def test_sliding_parallel_processing(position_fixes_multi_user): - """Test parallel processing with n_jobs=-1.""" - pfs_seq, sp_seq = SLIDING.generate_staypoints( - position_fixes_multi_user, - dist_threshold=100, - time_threshold=2.0, - n_jobs=1 - ) - - pfs_par, sp_par = SLIDING.generate_staypoints( - position_fixes_multi_user, - dist_threshold=100, - time_threshold=2.0, - n_jobs=-1 - ) - - # Results should be the same - assert len(sp_seq) == len(sp_par) - - -def test_sliding_invalid_input(): - """Test that invalid inputs raise appropriate errors.""" - # Test missing required columns - invalid_df = pd.DataFrame({ - "tracked_at": pd.date_range("2025-01-01", periods=5, freq="1H") - }) - - with pytest.raises(ValueError, match="Missing required column"): - SLIDING.generate_staypoints(invalid_df, dist_threshold=100, time_threshold=5.0) - - # Test non-GeoDataFrame input - non_geo_df = pd.DataFrame({ - "user_id": ["user1"] * 5, - "tracked_at": pd.date_range("2025-01-01", periods=5, freq="1H") - }) - - with pytest.raises(TypeError, match="must be a GeoDataFrame"): - SLIDING.generate_staypoints(non_geo_df, dist_threshold=100, time_threshold=5.0) - - -def test_sliding_output_dtypes(position_fixes_simple): - """Test that output has correct data types.""" - pfs, sp = SLIDING.generate_staypoints( - position_fixes_simple, - dist_threshold=100, - time_threshold=3.0, - n_jobs=1 - ) - - # Check sp index dtype - assert sp.index.dtype == 'int64' - - # Check staypoint_id dtype in pfs (should be nullable Int64) - assert pfs['staypoint_id'].dtype == 'Int64' - - # Check user_id dtype consistency - assert sp['user_id'].dtype == pfs['user_id'].dtype - - -def test_sliding_elevation_support(): - """Test that elevation data is preserved if present.""" - times = pd.date_range("2025-01-01 08:00", periods=6, freq="1min").tolist() - coords = [(0.0, 0.0)] * 6 - - pfs_with_elev = gpd.GeoDataFrame({ - "user_id": "user1", - "tracked_at": times, - "elevation": [100.0, 101.0, 102.0, 103.0, 104.0, 105.0], - "geometry": [Point(lon, lat) for lon, lat in coords] - }, crs="EPSG:4326") - - pfs, sp = SLIDING.generate_staypoints( - pfs_with_elev, - dist_threshold=100, - time_threshold=3.0, - n_jobs=1 - ) - - # Elevation should be in staypoints if input had it - if len(sp) > 0: - assert 'elevation' in sp.columns From 56961ad56570a6fe81c6c19f55383f16e8d855bb Mon Sep 17 00:00:00 2001 From: Caroline Chen Date: Sun, 14 Dec 2025 22:00:35 -0500 Subject: [PATCH 5/5] Edited comments --- nomad/stop_detection/sliding.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/nomad/stop_detection/sliding.py b/nomad/stop_detection/sliding.py index 08fc6350..fce28fad 100644 --- a/nomad/stop_detection/sliding.py +++ b/nomad/stop_detection/sliding.py @@ -350,9 +350,7 @@ def generate_staypoints( n_jobs: int, default 1 The maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel - computing code is used at all, which is useful for debugging. See - https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation - for a detailed description + computing code is used at all. Returns ------- @@ -362,18 +360,6 @@ def generate_staypoints( sp: gpd.GeoDataFrame The generated staypoints. - Notes - ----- - The 'sliding' method is adapted from Li et al. (2008). In the original algorithm, the 'finished_at' - time for the current staypoint lasts until the 'tracked_at' time of the first positionfix outside - this staypoint. Users are assumed to be stationary during this missing period and potential tracking - gaps may be included in staypoints. To avoid including too large missing signal gaps, set 'gap_threshold' - to a small value, e.g., 15 min. - - Examples - -------- - >>> pfs, sp = generate_staypoints(pfs, method='sliding', dist_threshold=100) - References ---------- Zheng, Y. (2015). Trajectory data mining: an overview. ACM Transactions on Intelligent Systems