diff --git a/nomad/stop_detection/postprocessing.py b/nomad/stop_detection/postprocessing.py index df267608..14992ab5 100644 --- a/nomad/stop_detection/postprocessing.py +++ b/nomad/stop_detection/postprocessing.py @@ -417,3 +417,227 @@ def fill_timestamp_gaps(first_time, last_time, stop_table): df_full = df_full.sort_values('start_timestamp').reset_index(drop=True) return df_full + + +def merge_stops(stops, max_time_gap="10min", location_col="loc_id", agg=None, traj_cols=None, **kwargs): + """ + Merge consecutive stops at the same location within a time threshold. + + This function aggregates stops that are: + - At the same location (same location_col value) + - Consecutive in time (gap between stops <= max_time_gap) + - From the same user + + Parameters + ---------- + stops : pd.DataFrame + Stop table with temporal and location columns. + Must contain columns for start time, end time (or duration), and location. + + max_time_gap : str or pd.Timedelta, default "10min" + Maximum duration between consecutive stops to still be merged. + If str, must be parsable by pd.to_timedelta (e.g., "10min", "1h", "30s"). + If pd.Timedelta, used directly. + + location_col : str, default "loc_id" + Name of the column containing location identifiers. + Stops are only merged if they have the same value in this column. + + agg : dict, optional + Dictionary to aggregate columns after merging stops. + Keys are column names, values are aggregation functions. + If None or empty, only required columns (user_id, start times, end times) + are aggregated and returned. + Example: {"geometry": "first", "n_pings": "sum"} + + traj_cols : dict, optional + Column name mappings. Supported keys: + - 'user_id': user identifier column + - 'timestamp' or 'datetime': start time column + - 'end_timestamp' or 'end_datetime': end time column + - 'duration': duration column (used if end time not present) + + **kwargs + Additional keyword arguments for column name specification. + + Returns + ------- + pd.DataFrame + Merged stops table with the same structure as input but with consecutive + stops at the same location merged into single rows. + + Notes + ----- + - The index of the returned DataFrame corresponds to the first stop in each + merged group. + - If a stop has no consecutive neighbor at the same location (within max_time_gap), + it remains unchanged. + - Aggregation for required columns: + - start time: first (earliest start) + - end time: last (latest end) + - location: first (same for all in group) + - user_id: first (same for all in group) + + Examples + -------- + >>> # Basic usage + >>> merged = merge_stops(stops, max_time_gap="15min", location_col="location_id") + + >>> # With custom aggregation to preserve geometry and sum n_pings + >>> merged = merge_stops( + ... stops, + ... max_time_gap="30min", + ... location_col="building_id", + ... agg={"geometry": "first", "n_pings": "sum"} + ... ) + """ + if agg is None: + agg = {} + + # Convert max_time_gap to Timedelta + if isinstance(max_time_gap, str): + max_time_gap = pd.to_timedelta(max_time_gap) + elif not isinstance(max_time_gap, pd.Timedelta): + raise TypeError("Parameter max_time_gap must be either of type str or pd.Timedelta!") + + # Validate location column exists + if location_col not in stops.columns: + raise ValueError(f"Location column '{location_col}' not found in stops DataFrame") + + if stops.empty: + return stops.copy() + + # Parse column names + traj_cols = loader._parse_traj_cols(stops.columns, traj_cols, kwargs, warn=False) + + # Determine temporal columns + t_key, use_datetime = loader._fallback_time_cols_dt(stops.columns, traj_cols, kwargs) + end_t_key = 'end_datetime' if use_datetime else 'end_timestamp' + + # Check for required temporal columns + end_col_present = loader._has_end_cols(stops.columns, traj_cols) + duration_col_present = loader._has_duration_cols(stops.columns, traj_cols) + + if not (end_col_present or duration_col_present): + raise ValueError("Stops must contain either end time or duration columns") + + # Get user_id column if present + user_col = None + if 'user_id' in traj_cols and traj_cols['user_id'] in stops.columns: + user_col = traj_cols['user_id'] + + # Work with a copy + stops_merge = stops.copy() + index_name = stops.index.name if stops.index.name else 'id' + + # Compute end time if not present + if not end_col_present: + dur_mins = stops_merge[traj_cols['duration']] + if use_datetime: + stops_merge[end_t_key] = stops_merge[traj_cols[t_key]] + pd.to_timedelta(dur_mins, unit='m') + else: + stops_merge[end_t_key] = stops_merge[traj_cols[t_key]] + dur_mins * 60 + else: + end_t_key = traj_cols[end_t_key] + + # Reset index and preserve it + stops_merge = stops_merge.reset_index() + stops_merge["index_temp"] = stops_merge[index_name] + + # Sort by user and time + if user_col: + stops_merge = stops_merge.sort_values(by=[user_col, traj_cols[t_key]]) + else: + stops_merge = stops_merge.sort_values(by=traj_cols[t_key]) + + # Get next row information + shift_cols = [traj_cols[t_key], location_col] + shift_names = ["next_started_at", "next_location"] + + if user_col: + shift_cols.insert(0, user_col) + shift_names.insert(0, "next_user_id") + + stops_merge[shift_names] = stops_merge[shift_cols].shift(-1) + stops_merge["next_id"] = stops_merge["index_temp"].shift(-1) + + # Iteratively merge stops + cond = pd.Series(data=False, index=stops_merge.index) + cond_old = pd.Series(data=True, index=stops_merge.index) + + while np.sum(cond != cond_old) >= 1: + # Build merge conditions + conditions = [] + + # Same user (if user_col exists) + if user_col: + conditions.append(stops_merge["next_user_id"] == stops_merge[user_col]) + + # Time gap within threshold + conditions.append( + stops_merge["next_started_at"] - stops_merge[end_t_key] <= max_time_gap + ) + + # Same location + conditions.append(stops_merge[location_col] == stops_merge["next_location"]) + + # Not already merged + conditions.append(stops_merge["index_temp"] != stops_merge["next_id"]) + + # Combine all conditions + cond = pd.Series(data=True, index=stops_merge.index) + for c in conditions: + cond = cond & c + + # Assign merged index + stops_merge.loc[cond, "index_temp"] = stops_merge.loc[cond, "next_id"] + + # Update for next iteration + cond_diff = cond != cond_old + cond_old = cond.copy() + + if np.sum(cond_diff) == 0: + break + + # Define aggregation dictionary + agg_dict = { + index_name: "first", + traj_cols[t_key]: "first", # earliest start time + end_t_key: "last", # latest end time + location_col: "first", # same location for all in group + } + + if user_col: + agg_dict[user_col] = "first" + + # Add user-defined aggregations + agg_dict.update(agg) + + # Group and aggregate + stops_merged = stops_merge.groupby(by="index_temp").agg(agg_dict) + + # Recompute duration if it was in the original + if duration_col_present: + if use_datetime: + stops_merged[traj_cols['duration']] = ( + (stops_merged[end_t_key] - stops_merged[traj_cols[t_key]]) + .dt.total_seconds() / 60 + ).astype(int) + else: + stops_merged[traj_cols['duration']] = ( + (stops_merged[end_t_key] - stops_merged[traj_cols[t_key]]) / 60 + ).astype(int) + + # Clean up: set index and sort + stops_merged = stops_merged.set_index(index_name) + + if user_col: + stops_merged = stops_merged.sort_values(by=[user_col, traj_cols[t_key]]) + else: + stops_merged = stops_merged.sort_values(by=traj_cols[t_key]) + + # Remove the computed end_t_key if it wasn't in the original + if not end_col_present and end_t_key in stops_merged.columns: + stops_merged = stops_merged.drop(columns=[end_t_key]) + + return stops_merged diff --git a/nomad/stop_detection/sliding.py b/nomad/stop_detection/sliding.py new file mode 100644 index 00000000..fce28fad --- /dev/null +++ b/nomad/stop_detection/sliding.py @@ -0,0 +1,448 @@ +import pandas as pd +import numpy as np +import geopandas as gpd +from shapely.geometry import Point +import warnings +from joblib import Parallel, delayed +from tqdm import tqdm + + +def haversine_dist(lat1, lon1, lat2, lon2): + """ + Calculate haversine distance between two points in meters. + + Parameters + ---------- + lat1, lon1 : float or array-like + Latitude and longitude of first point(s) + lat2, lon2 : float or array-like + Latitude and longitude of second point(s) + + Returns + ------- + float or array-like + Distance in meters + """ + R = 6371000 # Earth radius in meters + + lat1_rad = np.radians(lat1) + lat2_rad = np.radians(lat2) + dlat = np.radians(lat2 - lat1) + dlon = np.radians(lon2 - lon1) + + a = np.sin(dlat/2)**2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon/2)**2 + c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a)) + + return R * c + + +def _explode_agg(col_agg, col_id, df_left, df_right): + """ + Explode aggregated column and merge with original dataframe. + + Parameters + ---------- + col_agg : str + Name of column containing aggregated values + col_id : str + Name of ID column to assign + df_left : pd.DataFrame + DataFrame to update + df_right : pd.DataFrame + DataFrame containing aggregated data + + Returns + ------- + pd.DataFrame + Updated dataframe with exploded values + """ + # Create a mapping from original indices to staypoint IDs + exploded = df_right[[col_agg, col_id]].explode(col_agg) + exploded = exploded[exploded[col_agg].notna()] + + # Create a series mapping pfs_id to staypoint_id + mapping = pd.Series( + exploded[col_id].values, + index=exploded[col_agg].values + ) + + # Map to original dataframe + df_left[col_id] = df_left.index.map(mapping) + + return df_left + + +def _generate_staypoints_sliding_user( + pfs_user, + geo_col='geometry', + elevation_flag=False, + dist_threshold=100, + time_threshold=5.0, + gap_threshold=15.0, + distance_metric='haversine', + include_last=False +): + """ + Generate staypoints for a single user using sliding stop detection. + + Parameters + ---------- + pfs_user : tuple + (user_id, DataFrame) from groupby operation + geo_col : str + Name of geometry column + elevation_flag : bool + Whether elevation data is present + dist_threshold : float + Distance threshold in meters + time_threshold : float + Time threshold in minutes + gap_threshold : float + Maximum gap threshold in minutes + distance_metric : str + Distance metric to use + include_last : bool + Whether to include last staypoint + + Returns + ------- + pd.DataFrame + Staypoints for this user + """ + user_id, pfs = pfs_user + + if len(pfs) == 0: + return pd.DataFrame() + + # Sort by time but preserve original index + pfs = pfs.sort_values('tracked_at').reset_index(drop=False) + original_index_col = pfs.columns[0] # The original index is now the first column + + staypoints = [] + pfs_ids = [] + + i = 0 + n = len(pfs) + + while i < n: + j = i + 1 + + # Get coordinates of starting point + start_geom = pfs.iloc[i][geo_col] + start_lat = start_geom.y + start_lon = start_geom.x + start_time = pfs.iloc[i]['tracked_at'] + + # Slide window forward + while j < n: + # Check for temporal gap + time_gap = (pfs.iloc[j]['tracked_at'] - pfs.iloc[j-1]['tracked_at']).total_seconds() / 60 + if time_gap > gap_threshold: + break + + # Calculate distance from start point + curr_geom = pfs.iloc[j][geo_col] + curr_lat = curr_geom.y + curr_lon = curr_geom.x + + if distance_metric == 'haversine': + dist = haversine_dist(start_lat, start_lon, curr_lat, curr_lon) + else: + dist = start_geom.distance(curr_geom) + + # Check if moved beyond distance threshold + if dist > dist_threshold: + break + + j += 1 + + # Check if we have a valid staypoint (enough time spent) + time_spent = (pfs.iloc[j-1]['tracked_at'] - start_time).total_seconds() / 60 + + if time_spent >= time_threshold: + # Create staypoint + sp_pfs = pfs.iloc[i:j] + + # Calculate centroid + lats = [p.y for p in sp_pfs[geo_col]] + lons = [p.x for p in sp_pfs[geo_col]] + + if distance_metric == 'haversine': + # Use spherical mean for geographic coordinates + lats_rad = np.radians(lats) + lons_rad = np.radians(lons) + + x = np.cos(lats_rad) * np.cos(lons_rad) + y = np.cos(lats_rad) * np.sin(lons_rad) + z = np.sin(lats_rad) + + mean_x = np.mean(x) + mean_y = np.mean(y) + mean_z = np.mean(z) + + mean_lon = np.arctan2(mean_y, mean_x) + hyp = np.sqrt(mean_x**2 + mean_y**2) + mean_lat = np.arctan2(mean_z, hyp) + + center_lat = np.degrees(mean_lat) + center_lon = np.degrees(mean_lon) + else: + center_lat = np.mean(lats) + center_lon = np.mean(lons) + + sp_dict = { + 'user_id': user_id, + 'started_at': sp_pfs.iloc[0]['tracked_at'], + 'finished_at': sp_pfs.iloc[-1]['tracked_at'], + geo_col: Point(center_lon, center_lat), + 'pfs_id': list(sp_pfs[original_index_col]) # Use original index + } + + if elevation_flag and 'elevation' in sp_pfs.columns: + sp_dict['elevation'] = sp_pfs['elevation'].mean() + + staypoints.append(sp_dict) + + # Move to the point that broke the staypoint + i = j + else: + # Not enough time spent, move to next point + i += 1 + + # Handle last staypoint if requested + if include_last and i < n: + sp_pfs = pfs.iloc[i:] + if len(sp_pfs) > 0: + lats = [p.y for p in sp_pfs[geo_col]] + lons = [p.x for p in sp_pfs[geo_col]] + + if distance_metric == 'haversine': + lats_rad = np.radians(lats) + lons_rad = np.radians(lons) + + x = np.cos(lats_rad) * np.cos(lons_rad) + y = np.cos(lats_rad) * np.sin(lons_rad) + z = np.sin(lats_rad) + + mean_x = np.mean(x) + mean_y = np.mean(y) + mean_z = np.mean(z) + + mean_lon = np.arctan2(mean_y, mean_x) + hyp = np.sqrt(mean_x**2 + mean_y**2) + mean_lat = np.arctan2(mean_z, hyp) + + center_lat = np.degrees(mean_lat) + center_lon = np.degrees(mean_lon) + else: + center_lat = np.mean(lats) + center_lon = np.mean(lons) + + sp_dict = { + 'user_id': user_id, + 'started_at': sp_pfs.iloc[0]['tracked_at'], + 'finished_at': sp_pfs.iloc[-1]['tracked_at'], + geo_col: Point(center_lon, center_lat), + 'pfs_id': list(sp_pfs[original_index_col]) # Use original index + } + + if elevation_flag and 'elevation' in sp_pfs.columns: + sp_dict['elevation'] = sp_pfs['elevation'].mean() + + staypoints.append(sp_dict) + + return pd.DataFrame(staypoints) + + +def applyParallel(groups, func, n_jobs=1, print_progress=False, **kwargs): + """ + Apply function to groups in parallel. + + Parameters + ---------- + groups : DataFrameGroupBy + Grouped dataframe + func : callable + Function to apply to each group + n_jobs : int + Number of parallel jobs + print_progress : bool + Whether to show progress bar + **kwargs + Additional arguments to pass to func + + Returns + ------- + pd.DataFrame + Concatenated results + """ + if n_jobs == 1: + # Sequential processing + if print_progress: + results = [func(group, **kwargs) for group in tqdm(groups, desc="Processing users")] + else: + results = [func(group, **kwargs) for group in groups] + else: + # Parallel processing + group_list = list(groups) + if print_progress: + results = Parallel(n_jobs=n_jobs)( + delayed(func)(group, **kwargs) for group in tqdm(group_list, desc="Processing users") + ) + else: + results = Parallel(n_jobs=n_jobs)( + delayed(func)(group, **kwargs) for group in group_list + ) + + return pd.concat(results, ignore_index=True) if results else pd.DataFrame() + + +def generate_staypoints( + positionfixes, + method="sliding", + distance_metric="haversine", + dist_threshold=100, + time_threshold=5.0, + gap_threshold=15.0, + include_last=False, + print_progress=False, + exclude_duplicate_pfs=True, + n_jobs=1, +): + """ + Generate staypoints from positionfixes. + + Parameters + ---------- + positionfixes : pd.DataFrame or gpd.GeoDataFrame + Position fixes with columns: user_id, tracked_at, geometry + + method : {'sliding'} + Method to create staypoints. 'sliding' applies a sliding window over the data. + + distance_metric : {'haversine'} + The distance metric used by the applied method. + + dist_threshold : float, default 100 + The distance threshold for the 'sliding' method, i.e., how far someone has to travel to + generate a new staypoint. Units depend on the dist_func parameter. If 'distance_metric' is 'haversine' the + unit is in meters + + time_threshold : float, default 5.0 (minutes) + The time threshold for the 'sliding' method in minutes. + + gap_threshold : float, default 15.0 (minutes) + The time threshold of determine whether a gap exists between consecutive pfs. Consecutive pfs with + temporal gaps larger than 'gap_threshold' will be excluded from staypoints generation. + Only valid in 'sliding' method. + + include_last: boolean, default False + The algorithm in Li et al. (2008) only detects staypoint if the user steps out + of that staypoint. This will omit the last staypoint (if any). Set 'include_last' + to True to include this last staypoint. + + print_progress: boolean, default False + Show per-user progress if set to True. + + exclude_duplicate_pfs: boolean, default True + Filters duplicate positionfixes before generating staypoints. Duplicates can lead to problems in later + processing steps (e.g., when generating triplegs). It is not recommended to set this to False. + + n_jobs: int, default 1 + The maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel + computing code is used at all. + + Returns + ------- + pfs: pd.DataFrame or gpd.GeoDataFrame + The original positionfixes with a new column ``[`staypoint_id`]``. + + sp: gpd.GeoDataFrame + The generated staypoints. + + References + ---------- + Zheng, Y. (2015). Trajectory data mining: an overview. ACM Transactions on Intelligent Systems + and Technology (TIST), 6(3), 29. + + Li, Q., Zheng, Y., Xie, X., Chen, Y., Liu, W., & Ma, W. Y. (2008, November). Mining user + similarity based on location history. In Proceedings of the 16th ACM SIGSPATIAL international + conference on Advances in geographic information systems (p. 34). ACM. + """ + # Validate required columns + required_cols = ['user_id', 'tracked_at'] + for col in required_cols: + if col not in positionfixes.columns: + raise ValueError(f"Missing required column: {col}") + + if not isinstance(positionfixes, gpd.GeoDataFrame): + raise TypeError("positionfixes must be a GeoDataFrame with geometry column") + + # Copy the original pfs for adding 'staypoint_id' column + pfs = positionfixes.copy() + + if exclude_duplicate_pfs: + len_org = pfs.shape[0] + pfs = pfs.drop_duplicates() + nb_dropped = len_org - pfs.shape[0] + if nb_dropped > 0: + warn_str = ( + f"{nb_dropped} duplicates were dropped from your positionfixes. Dropping duplicates is" + + " recommended but can be prevented using the 'exclude_duplicate_pfs' flag." + ) + warnings.warn(warn_str) + + # If the positionfixes already have a column "staypoint_id", we drop it + if "staypoint_id" in pfs.columns: + pfs.drop(columns="staypoint_id", inplace=True) + + elevation_flag = "elevation" in pfs.columns # if there is elevation data + + geo_col = pfs.geometry.name + if elevation_flag: + sp_column = ["user_id", "started_at", "finished_at", "elevation", geo_col] + else: + sp_column = ["user_id", "started_at", "finished_at", geo_col] + + if method == "sliding": + # Algorithm from Li et al. (2008). For details, please refer to the paper. + sp = applyParallel( + pfs.groupby("user_id", as_index=False), + _generate_staypoints_sliding_user, + n_jobs=n_jobs, + print_progress=print_progress, + geo_col=geo_col, + elevation_flag=elevation_flag, + dist_threshold=dist_threshold, + time_threshold=time_threshold, + gap_threshold=gap_threshold, + distance_metric=distance_metric, + include_last=include_last, + ).reset_index(drop=True) + + # Index management + sp["staypoint_id"] = sp.index + sp.index.name = "id" + + if "pfs_id" not in sp.columns: + sp["pfs_id"] = None + pfs = _explode_agg("pfs_id", "staypoint_id", pfs, sp) + else: + raise ValueError(f"Unknown method: {method}. Only 'sliding' is supported.") + + sp = gpd.GeoDataFrame(sp, columns=sp_column, geometry=geo_col, crs=pfs.crs) + + # dtype consistency + # sp id (generated by this function) should be int64 + sp.index = sp.index.astype("int64") + # pfs['staypoint_id'] should be Int64 (missing values) + pfs["staypoint_id"] = pfs["staypoint_id"].astype("Int64") + + # user_id of sp should be the same as pfs + sp["user_id"] = sp["user_id"].astype(pfs["user_id"].dtype) + + if len(sp) == 0: + warnings.warn("No staypoints can be generated, returning empty sp.") + return pfs, sp + + return pfs, sp diff --git a/nomad/tests/test_stop_detection.py b/nomad/tests/test_stop_detection.py index f14c4e1a..de0accdc 100644 --- a/nomad/tests/test_stop_detection.py +++ b/nomad/tests/test_stop_detection.py @@ -8,11 +8,13 @@ from collections import defaultdict import pytest from pathlib import Path +from shapely.geometry import Point import nomad.io.base as loader from nomad import constants from nomad import filters import nomad.stop_detection.dbscan as DBSCAN import nomad.stop_detection.lachesis as LACHESIS +import nomad.stop_detection.sliding as SLIDING import pdb @pytest.fixture @@ -396,4 +398,198 @@ def test_empty_dataframe_consistency(): assert 'longitude' in hdbscan_result.columns assert 'latitude' in hdbscan_result.columns assert 'longitude' in lachesis_result.columns - assert 'latitude' in lachesis_result.columns \ No newline at end of file + assert 'latitude' in lachesis_result.columns + +########################################## +#### LOCATION CLUSTERING #### +#### (SLIDING.PY) #### +########################################## + +@pytest.fixture +def position_fixes_simple(): + """Simple position fixes for testing sliding window algorithm.""" + times = pd.date_range("2025-01-01 08:00", periods=10, freq="1min").tolist() + + # Create position fixes: + # Points 0-5: stationary (should form staypoint if time_threshold <= 5 min) + # Points 6-9: moved away + coords = [(0.0, 0.0)] * 6 + [(0.002, 0.002)] * 4 + + df = gpd.GeoDataFrame({ + "user_id": "user1", + "tracked_at": times, + "geometry": [Point(lon, lat) for lon, lat in coords] + }, crs="EPSG:4326") + + return df + + +@pytest.fixture +def position_fixes_with_gap(): + """Position fixes with temporal gap for testing gap_threshold.""" + times = pd.date_range("2025-01-01 08:00", periods=5, freq="1min").tolist() + # Add a large gap + times += [times[-1] + pd.Timedelta(minutes=20)] + times += pd.date_range(times[-1] + pd.Timedelta(minutes=1), periods=4, freq="1min").tolist() + + # All points at same location + coords = [(0.0, 0.0)] * len(times) + + df = gpd.GeoDataFrame({ + "user_id": "user1", + "tracked_at": times, + "geometry": [Point(lon, lat) for lon, lat in coords] + }, crs="EPSG:4326") + + return df + + +@pytest.fixture +def position_fixes_multi_user(): + """Position fixes for multiple users.""" + times = pd.date_range("2025-01-01 08:00", periods=8, freq="1min").tolist() + + user1_coords = [(0.0, 0.0)] * 4 + [(0.002, 0.002)] * 4 + user2_coords = [(0.001, 0.001)] * 4 + [(0.003, 0.003)] * 4 + + df = gpd.GeoDataFrame({ + "user_id": ["user1"] * 8 + ["user2"] * 8, + "tracked_at": times * 2, + "geometry": [Point(lon, lat) for lon, lat in user1_coords + user2_coords] + }, crs="EPSG:4326") + + return df + + +def test_sliding_basic_staypoint_detection(position_fixes_simple): + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + method="sliding", + dist_threshold=100, + time_threshold=3.0, + gap_threshold=15.0, + include_last=False, + exclude_duplicate_pfs=True, + n_jobs=1 + ) + + # Should detect at least one staypoint from first 6 points + assert len(sp) >= 1 + assert 'staypoint_id' in pfs.columns + assert isinstance(sp, gpd.GeoDataFrame) + + # Check required columns in staypoints + assert 'user_id' in sp.columns + assert 'started_at' in sp.columns + assert 'finished_at' in sp.columns + assert sp.geometry.name in sp.columns + + +def test_sliding_time_threshold(position_fixes_simple): + """Test that time_threshold is respected.""" + # With time_threshold=10, the first 6 points (5 min duration) should not form a staypoint + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=10.0, + gap_threshold=15.0, + include_last=False, + n_jobs=1 + ) + + # Should not detect any staypoints since max duration < 10 min + assert len(sp) == 0 + + +def test_sliding_distance_threshold(position_fixes_simple): + """Test that distance_threshold is respected.""" + # With very small distance threshold, points should not cluster together + # The test data has 6 points at (0,0) and 4 points at (0.002, 0.002) + # With dist_threshold=1m, these should form separate staypoints + pfs, sp = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=1, # 1 meter - very tight + time_threshold=3.0, + gap_threshold=15.0, + include_last=False, + n_jobs=1 + ) + + # With tight distance constraint, should detect 2 separate staypoints + # (one at each distinct location) + assert len(sp) == 2 + + +def test_sliding_gap_threshold(position_fixes_with_gap): + """Test that gap_threshold prevents clustering across large temporal gaps.""" + # With gap_threshold=15, the 20-minute gap should prevent clustering + pfs, sp = SLIDING.generate_staypoints( + position_fixes_with_gap, + dist_threshold=100, + time_threshold=3.0, + gap_threshold=15.0, + include_last=False, + n_jobs=1 + ) + + # Should detect at most 2 separate staypoints (before and after gap) + assert len(sp) <= 2 + + +def test_sliding_include_last(position_fixes_simple): + """Test include_last parameter.""" + # Without include_last + pfs1, sp1 = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=3.0, + include_last=False, + n_jobs=1 + ) + + # With include_last + pfs2, sp2 = SLIDING.generate_staypoints( + position_fixes_simple, + dist_threshold=100, + time_threshold=3.0, + include_last=True, + n_jobs=1 + ) + + # include_last=True should potentially detect more staypoints + assert len(sp2) >= len(sp1) + + +def test_sliding_multi_user(position_fixes_multi_user): + """Test sliding window with multiple users.""" + pfs, sp = SLIDING.generate_staypoints( + position_fixes_multi_user, + dist_threshold=100, + time_threshold=2.0, + gap_threshold=15.0, + n_jobs=1 + ) + + # Should detect staypoints for both users + assert 'user_id' in sp.columns + unique_users = sp['user_id'].unique() + assert len(unique_users) >= 1 # At least one user should have staypoints + +def test_sliding_empty_dataframe(): + """Test sliding window with empty dataframe.""" + empty_pfs = gpd.GeoDataFrame({ + "user_id": [], + "tracked_at": [], + "geometry": [] + }, crs="EPSG:4326") + + with pytest.warns(UserWarning, match="No staypoints can be generated"): + pfs, sp = SLIDING.generate_staypoints( + empty_pfs, + dist_threshold=100, + time_threshold=5.0, + n_jobs=1 + ) + + assert len(sp) == 0 + assert 'staypoint_id' in pfs.columns diff --git a/nomad/visit_attribution/visit_attribution.py b/nomad/visit_attribution/visit_attribution.py index 26c1ba5a..d98d31bf 100644 --- a/nomad/visit_attribution/visit_attribution.py +++ b/nomad/visit_attribution/visit_attribution.py @@ -3,9 +3,11 @@ import nomad.constants as constants import warnings import pandas as pd -import nomad.io.base as loader import pyproj import pdb +import numpy as np +from sklearn.cluster import DBSCAN +from shapely.geometry import Point, MultiPoint # TO DO: change to stops_to_poi def point_in_polygon(data, poi_table, method='centroid', data_crs=None, max_distance=0, @@ -436,6 +438,332 @@ def night_stops(stop_table, user='user', dawn_hour = 6, dusk_hour = 19, min_dwel dates = (start_date, end_date) df_clipped = clip_stays_date(stop_table, dates, dawn_hour, dusk_hour) df_clipped = df_clipped[(df_clipped['duration'] > 0) & (df_clipped['duration_night'] >= 15)] - + return df_clipped.groupby(['id', 'location'], group_keys=False).apply(count_nights(dawn_hour, dusk_hour, min_dwell)).reset_index(drop=True) + +def _get_location_center(coords, metric='euclidean'): + """ + Calculate the center of a location cluster. + + Parameters + ---------- + coords : numpy.ndarray + Array of coordinates (n_points, 2) + metric : str + 'euclidean' for projected coordinates, 'haversine' for lat/lon + + Returns + ------- + tuple + (x, y) coordinates of the center + """ + if metric == 'haversine': + # For geographic coordinates, use angle-based mean + # Convert to radians for calculation + coords_rad = np.radians(coords) + x = coords_rad[:, 0] + y = coords_rad[:, 1] + + # Convert to 3D Cartesian coordinates + cos_y = np.cos(y) + cart_x = cos_y * np.cos(x) + cart_y = cos_y * np.sin(x) + cart_z = np.sin(y) + + # Average and convert back + avg_x = np.mean(cart_x) + avg_y = np.mean(cart_y) + avg_z = np.mean(cart_z) + + lon = np.arctan2(avg_y, avg_x) + hyp = np.sqrt(avg_x**2 + avg_y**2) + lat = np.arctan2(avg_z, hyp) + + return np.degrees(lon), np.degrees(lat) + else: + # For projected coordinates, use simple mean + return np.mean(coords[:, 0]), np.mean(coords[:, 1]) + + +def _get_location_extent(points, epsilon, crs=None): + """ + Calculate the spatial extent of a location. + + Parameters + ---------- + points : list of shapely Points + Points in the location cluster + epsilon : float + Buffer distance in meters (or degrees for unprojected) + crs : str, optional + Coordinate reference system + + Returns + ------- + shapely.Polygon + Convex hull buffered by epsilon + """ + if len(points) == 1: + return points[0].buffer(epsilon) + + multipoint = MultiPoint(points) + convex_hull = multipoint.convex_hull + return convex_hull.buffer(epsilon) + + +def cluster_locations_dbscan( + stops, + epsilon=100, + num_samples=1, + distance_metric='euclidean', + agg_level='user', + traj_cols=None, + **kwargs +): + """ + Cluster stops into locations using DBSCAN. + + Parameters + ---------- + stops : pd.DataFrame or gpd.GeoDataFrame + Stop/staypoint data with spatial columns + epsilon : float, default 100 + Maximum distance between stops in the same location (meters for haversine/euclidean) + num_samples : int, default 1 + Minimum number of stops required to form a location + distance_metric : str, default 'euclidean' + Distance metric: 'euclidean' for projected coords, 'haversine' for lat/lon + agg_level : str, default 'user' + 'user' = separate locations per user, 'dataset' = shared locations across users + traj_cols : dict, optional + Column name mappings for coordinates and user_id + **kwargs + Additional arguments passed to column detection + + Returns + ------- + tuple of (pd.DataFrame, gpd.GeoDataFrame) + - stops with added 'location_id' column (NaN for unclustered stops) + - locations GeoDataFrame with cluster centers and extents + """ + if not isinstance(stops, (pd.DataFrame, gpd.GeoDataFrame)): + raise TypeError("Input 'stops' must be a pandas DataFrame or GeoDataFrame") + + if stops.empty: + # Return empty results with proper schema + stops_out = stops.copy() + stops_out['location_id'] = pd.Series(dtype='Int64') + locations = gpd.GeoDataFrame( + columns=['center', 'extent'], + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + return stops_out, locations + + # Parse column names + traj_cols = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) + loader._has_spatial_cols(stops.columns, traj_cols) + + # Determine coordinate columns + if 'longitude' in traj_cols and traj_cols['longitude'] in stops.columns: + coord_key1, coord_key2 = 'longitude', 'latitude' + use_lon_lat = True + elif 'x' in traj_cols and traj_cols['x'] in stops.columns: + coord_key1, coord_key2 = 'x', 'y' + use_lon_lat = False + else: + raise ValueError("Could not find spatial columns in stops data") + + # Override distance metric based on coordinate type if not specified + if distance_metric == 'euclidean' and use_lon_lat: + warnings.warn( + "Using haversine metric for lat/lon coordinates instead of euclidean", + UserWarning + ) + distance_metric = 'haversine' + + # Get user_id column if present + user_col = None + if 'user_id' in traj_cols and traj_cols['user_id'] in stops.columns: + user_col = traj_cols['user_id'] + + # Check aggregation level + if agg_level == 'user' and user_col is None: + warnings.warn( + "agg_level='user' requires user_id column; falling back to 'dataset'", + UserWarning + ) + agg_level = 'dataset' + + stops_out = stops.copy() + stops_out['location_id'] = pd.Series(dtype='Int64') + + location_list = [] + location_id_counter = 0 + + # Group by user if needed + if agg_level == 'user': + groups = stops_out.groupby(user_col, sort=False) + else: + groups = [(None, stops_out)] + + for group_key, group_data in groups: + if group_data.empty: + continue + + # Extract coordinates + coords = group_data[[traj_cols[coord_key1], traj_cols[coord_key2]]].to_numpy(dtype='float64') + + # For haversine, convert to radians + if distance_metric == 'haversine': + # Convert epsilon from meters to radians (approximate) + # Earth radius in meters + epsilon_rad = epsilon / 6371000.0 + coords_for_clustering = np.radians(coords) + else: + epsilon_rad = epsilon + coords_for_clustering = coords + + # Run DBSCAN + clusterer = DBSCAN( + eps=epsilon_rad, + min_samples=num_samples, + metric=distance_metric, + algorithm='ball_tree' + ) + + labels = clusterer.fit_predict(coords_for_clustering) + + # Assign location IDs (offset by counter for multi-group) + group_loc_ids = labels.copy() + valid_mask = labels >= 0 + group_loc_ids[valid_mask] += location_id_counter + group_loc_ids[~valid_mask] = -1 + + # Update stops with location IDs + stops_out.loc[group_data.index, 'location_id'] = group_loc_ids + + # Create location entries for each cluster + unique_labels = labels[labels >= 0] + if len(unique_labels) > 0: + for label in np.unique(unique_labels): + cluster_mask = labels == label + cluster_coords = coords[cluster_mask] + cluster_indices = group_data.index[cluster_mask] + + # Calculate center + center_x, center_y = _get_location_center( + cluster_coords, + metric=distance_metric + ) + center_point = Point(center_x, center_y) + + # Calculate extent (convex hull + buffer) + cluster_points = [Point(x, y) for x, y in cluster_coords] + extent = _get_location_extent( + cluster_points, + epsilon, + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + + location_entry = { + 'location_id': location_id_counter + label, + 'center': center_point, + 'extent': extent, + 'n_stops': cluster_mask.sum() + } + + # Add user_id if available + if user_col is not None and agg_level == 'user': + location_entry[user_col] = group_key + + location_list.append(location_entry) + + # Update counter for next group + location_id_counter += len(np.unique(unique_labels)) + + # Convert location_id to nullable integer (NaN for unclustered) + stops_out['location_id'] = stops_out['location_id'].replace(-1, pd.NA).astype('Int64') + + # Create locations GeoDataFrame + if location_list: + locations = gpd.GeoDataFrame( + location_list, + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + # Set extent as additional geometry column + locations['extent'] = gpd.GeoSeries( + [loc['extent'] for loc in location_list], + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + else: + locations = gpd.GeoDataFrame( + columns=['location_id', 'center', 'extent', 'n_stops'], + geometry='center', + crs=stops.crs if isinstance(stops, gpd.GeoDataFrame) else None + ) + + return stops_out, locations + + +def cluster_locations_per_user( + stops, + epsilon=100, + num_samples=1, + distance_metric='euclidean', + traj_cols=None, + **kwargs +): + """ + Convenience function to cluster locations per user. + + This is equivalent to calling cluster_locations_dbscan with agg_level='user'. + + Parameters + ---------- + stops : pd.DataFrame or gpd.GeoDataFrame + Stop data with user_id column + epsilon : float, default 100 + Maximum distance between stops in same location (meters) + num_samples : int, default 1 + Minimum stops required to form a location + distance_metric : str, default 'euclidean' + 'euclidean' or 'haversine' + traj_cols : dict, optional + Column name mappings + **kwargs + Additional arguments + + Returns + ------- + tuple of (pd.DataFrame, gpd.GeoDataFrame) + Stops with location_id and locations table + + Raises + ------ + ValueError + If user_id column is not found + """ + traj_cols_temp = loader._parse_traj_cols(stops.columns, traj_cols, kwargs) + if 'user_id' not in traj_cols_temp or traj_cols_temp['user_id'] not in stops.columns: + raise ValueError( + "cluster_locations_per_user requires a 'user_id' column " + "specified in traj_cols or kwargs" + ) + + return cluster_locations_dbscan( + stops, + epsilon=epsilon, + num_samples=num_samples, + distance_metric=distance_metric, + agg_level='user', + traj_cols=traj_cols, + **kwargs + ) + + +# Alias for convenience +generate_locations = cluster_locations_dbscan +