diff --git a/README.md b/README.md index 5487922..2d680e4 100644 --- a/README.md +++ b/README.md @@ -47,50 +47,43 @@ pip install -e . ## GPU Acceleration -Both `PQEncoder.transform()` and `PQKMeans.predict()` support optional GPU acceleration via the `device` parameter. When a CUDA GPU is available, `device='auto'` (the default) uses the GPU transparently; otherwise it falls back to CPU. +Every step in the pipeline supports optional GPU acceleration via the `device` parameter: encoder training, PQ encoding, cluster training, and label assignment. When a CUDA GPU is available, `device='auto'` (the default) uses the GPU transparently; otherwise it falls back to CPU. **Requirements**: `torch` and `triton` (both installed with `pip install torch`). ```python -encoder = PQEncoder.load('encoder.joblib') -clusterer = PQKMeans.load('clusterer.joblib') +encoder = PQEncoder(k=256, m=6, iterations=20) +encoder.fit(training_fps, device='auto') # GPU KMeans fitting +pq_codes = encoder.transform(fingerprints) # GPU batch assignment -# GPU is used automatically when available -pq_codes = encoder.transform(fingerprints) # device='auto' by default -labels = clusterer.predict(pq_codes) # device='auto' by default +clusterer = PQKMeans(encoder, k=100000) +clusterer.fit(pq_codes) # GPU Triton assign + CPU centroid update +labels = clusterer.predict(pq_codes) # GPU Triton kernel # Or force a specific device labels_cpu = clusterer.predict(pq_codes, device='cpu') labels_gpu = clusterer.predict(pq_codes, device='gpu') ``` -**Benchmarks** (20M molecules, K=100,000 clusters, RTX 4070 Ti 16GB): - -| Step | GPU | CPU | Speedup | -|---:|---:|---:|---:| -| PQ Transform | 7.3s | 45.3s | 6.2x | -| Cluster Assignment | 29.9s | ~879s | 29.4x | - -Extrapolated to **9.6B molecules** (Enamine REAL): +**GPU benchmarks on 1B Enamine REAL molecules** (RTX 4070 Ti SUPER 16GB, K=100,000): -| Step | GPU | CPU | -|---:|---:|---:| -| PQ Transform | 59 min | 6.0 h | -| Cluster Assignment | 4.0 h | 117 h | -| **Combined** | **5.0 h** | **123 h** | +| Stage | Description | Time | +|:---|:---|---:| +| Encoder training | Train codebook on 50M MQN fingerprints | 1.8 min | +| PQ encoding | Encode 1B MQN fingerprints → 1B PQ codes (6-dim) | 3.7 min | +| Cluster training | Train PQKMeans on 1B PQ codes (5 iters, tol=0) | 2.3 hrs | +| Label assignment | Assign 1B PQ codes to 100K clusters | 26.2 min | +| **Total** | | **2.9 hrs** | -The GPU implementation uses a custom Triton kernel for cluster assignment that tiles over centers with an online argmin, never materializing the N x K distance matrix. VRAM usage is ~10 bytes/point, so even an 8 GB GPU can process hundreds of millions of points per batch. - -To reproduce the benchmarks: +Cluster training with the default `tol=1e-3` converges earlier (changed centers dropped from 11.8% → 0.2% over 5 iterations), reducing training time further. Extrapolated to 10B molecules: ~1.2 days. +To reproduce (requires 1B MQN fingerprints as `.npy` chunks): ```bash -# Decompress the test SMILES (if using the gzipped version) -gunzip -k data/10M_smiles.txt.gz - -# Run benchmark (pre-computes and caches fingerprints on first run) -python scripts/benchmark_gpu_predict.py +python scripts/benchmark_1B_pipeline.py --chunks /path/to/chunks --n 1000000000 ``` +The GPU implementation uses a custom Triton kernel for cluster assignment that tiles over centers with an online argmin, never materializing the N x K distance matrix. The kernel supports any number of subvectors M via compile-time unrolling (`tl.static_range`). VRAM usage is ~(M+4) bytes/point, so even an 8 GB GPU can process hundreds of millions of points per batch. + ## Quick Start ```python @@ -147,21 +140,20 @@ python scripts/select_k.py \ --plot results/k_selection.png ``` -**Results on 100M Enamine REAL molecules** (AMD Ryzen 7, 64GB RAM): +**Results on 100M Enamine REAL molecules** (RTX 4070 Ti SUPER 16GB, AMD Ryzen 7 64GB RAM): -| k | Avg Distance | Empty Clusters | Median Cluster Size | Fit Time | -|---:|---:|---:|---:|---:| -| 10,000 | 3.65 | 6.8% | 8,945 | 1.3 h | -| 25,000 | 2.74 | 13.3% | 3,673 | 3.1 h | -| 50,000 | 2.17 | 19.6% | 1,876 | 6.2 h | -| 100,000 | 1.69 | 26.6% | 956 | 12.6 h | -| 200,000 | 1.30 | 34.7% | 492 | 26.4 h | +| k | Avg Distance | Empty Clusters | Median Cluster Size | Fit Time (GPU) | Fit Time (CPU) | +|---:|---:|---:|---:|---:|---:| +| 10,000 | 3.67 | 7.1% | 9,061 | 1.7 min | 1.3 h | +| 25,000 | 2.74 | 13.5% | 3,680 | 3.3 min | 3.1 h | +| 50,000 | 2.17 | 19.5% | 1,879 | 6.0 min | 6.2 h | +| 100,000 | 1.69 | 26.7% | 960 | 9.1 min | 12.6 h | +| 200,000 | 1.30 | 34.7% | 492 | 17.6 min | 26.4 h | **Guidelines:** - **k = 50,000** is a good default — under 20% empty clusters, median size ~1,900, and the avg distance improvement starts plateauing beyond this point. - **k = 100,000** if you need tighter clusters and can tolerate ~27% empty clusters. - Beyond 200K, over a third of clusters are empty — diminishing returns. -- Fit time scales linearly with both n and k (e.g., 1B molecules at k=50K ≈ 2.6 days). ## Documentation diff --git a/chelombus/clustering/PyQKmeans.py b/chelombus/clustering/PyQKmeans.py index 97b8168..6ad01c1 100644 --- a/chelombus/clustering/PyQKmeans.py +++ b/chelombus/clustering/PyQKmeans.py @@ -64,6 +64,58 @@ def _predict_numba(pq_codes, centers, dtables): return labels +def _update_centers( + pq_codes, + labels, + K, + dtables, + previous_centers=None, + chunk_size=50_000_000, +): + """Recompute PQ-code cluster centers from point assignments. + + For each cluster *j* and subspace *s*, pick the codebook entry that + minimises total symmetric distance to every point in the cluster: + + center[j, s] = argmin_{c'} Σ_c hist[j, c] · dtable[s, c, c'] + + where hist[j, c] counts points in cluster j whose subvector-s code is c. + + Histograms are accumulated in chunks to keep peak memory bounded. + Empty clusters keep their previous center when *previous_centers* is given. + """ + N, m = pq_codes.shape + k_cb = dtables.shape[1] + new_centers = np.zeros((K, m), dtype=np.uint8) + old_centers = None + if previous_centers is not None: + old_centers = np.asarray(previous_centers, dtype=np.uint8) + if old_centers.shape != (K, m): + raise ValueError( + f"previous_centers must have shape {(K, m)}, got {old_centers.shape}" + ) + + for s in range(m): + hist = np.zeros(K * k_cb, dtype=np.int64) + for start in range(0, N, chunk_size): + end = min(start + chunk_size, N) + flat = (labels[start:end] * k_cb + + pq_codes[start:end, s].astype(np.int64)) + hist += np.bincount(flat, minlength=K * k_cb) + + hist_2d = hist.reshape(K, k_cb) + cost = hist_2d.astype(np.float32) @ dtables[s] + best_codes = np.argmin(cost, axis=1).astype(np.uint8) + + if old_centers is not None: + empty = hist_2d.sum(axis=1) == 0 + best_codes[empty] = old_centers[empty, s] + + new_centers[:, s] = best_codes + + return new_centers + + class PQKMeans: """ This class provides a scikit-learn-like interface to the PQk-means algorithm, @@ -72,7 +124,10 @@ class PQKMeans: Args: encoder: A trained PQEncoder instance k: Number of clusters - iteration: Number of k-means iterations (default: 20) + iteration: Maximum number of k-means iterations (default: 20) + tol: Early-stopping tolerance for the GPU path. Training stops when + the fraction of changed center coordinates drops below *tol*. + 0 means stop only on exact convergence. (default: 1e-3) verbose: Whether to print progress information (default: False) Example: @@ -88,6 +143,7 @@ def __init__( encoder: PQEncoder, k: int, iteration: int = 20, + tol: float = 1e-3, verbose: bool = False ): if not encoder.encoder_is_trained: # type: ignore @@ -96,16 +152,41 @@ def __init__( self.encoder = encoder self.k = k self.iteration = iteration + self.tol = tol self.verbose = verbose self.trained = False self._dtables = None self._centers_u8 = None + self._fit_labels = None self._cluster = pqkmeans.clustering.PQKMeans( encoder=encoder, k=k, - verbose=verbose + iteration=iteration, + verbose=verbose, ) + def _gpu_support_reason(self) -> str | None: + if not _GPU_AVAILABLE: + return "CUDA/Triton not available" + if self.encoder.k > 256: + return ( + "GPU path currently supports only 8-bit PQ codes " + f"(encoder.k <= 256), got encoder.k={self.encoder.k}" + ) + return None + + def _should_use_gpu(self, device: str) -> bool: + if device not in {"auto", "cpu", "gpu"}: + raise ValueError(f"device must be 'auto', 'cpu', or 'gpu', got {device!r}") + + gpu_reason = self._gpu_support_reason() + if device == "gpu": + if gpu_reason is not None: + raise RuntimeError(f"GPU requested but unavailable: {gpu_reason}") + return True + + return device == "auto" and gpu_reason is None + @property def cluster_centers_(self) -> np.ndarray: """Get cluster centers (PQ codes of shape (k, m)).""" @@ -114,25 +195,119 @@ def cluster_centers_(self) -> np.ndarray: @cluster_centers_.setter def cluster_centers_(self, centers: np.ndarray) -> None: """Set cluster centers and mark as trained.""" - centers_uint8 = np.array(centers).astype(np.uint8) - self._cluster._impl.set_cluster_centers(centers_uint8.tolist()) + centers_codes = np.asarray(centers, dtype=self.encoder.codebook_dtype) + self._cluster._impl.set_cluster_centers(centers_codes.tolist()) self.trained = True + self._centers_u8 = None + self._fit_labels = None - def fit(self, X_train: np.ndarray) -> 'PQKMeans': + def fit(self, X_train: np.ndarray, device: str = 'auto') -> 'PQKMeans': """Fit the PQk-means model to training PQ codes. Args: X_train: PQ codes of shape (n_samples, n_subvectors) + device: 'cpu' uses the C++ backend, + 'gpu' uses Triton assignment + CPU centroid update, + 'auto' picks GPU when available (default). Returns: self """ - self._cluster.fit(X_train) + use_gpu = self._should_use_gpu(device) + + if use_gpu: + self._fit_gpu(X_train) + else: + self._cluster.fit(X_train) + self.trained = True self._dtables = None self._centers_u8 = None + self._fit_labels = None return self + def _fit_gpu(self, X_train: np.ndarray, return_labels: bool = False) -> np.ndarray | None: + """GPU-accelerated training: Triton assignment + CPU centroid update.""" + import time + + pq_codes = np.ascontiguousarray(X_train, dtype=np.uint8) + N, m = pq_codes.shape + dtables = _build_distance_tables(self.encoder.codewords) + fit_batch = max(N // 20, 1_000_000) + + # Initialise centres by sampling K data points at random + rng = np.random.default_rng() + indices = rng.choice(N, size=self.k, replace=(N < self.k)) + centers = pq_codes[indices].copy() + + labels = None + prev_centers = None + final_labels_match_centers = False + for it in range(self.iteration): + t0 = time.time() + + # assignment (GPU) + # Force ~20 batches for progress reporting on large N + labels = predict_gpu(pq_codes, centers, dtables, + batch_size=fit_batch, verbose=self.verbose) + t1 = time.time() + + # centroid update (CPU) + new_centers = _update_centers( + pq_codes, + labels, + self.k, + dtables, + previous_centers=centers, + ) + t2 = time.time() + + changed = int(np.sum(new_centers != centers)) + total_coords = self.k * m + frac = changed / total_coords + + if self.verbose: + print( + f" iter {it + 1}/{self.iteration} " + f"assign={t1 - t0:.1f}s update={t2 - t1:.1f}s " + f"changed={changed}/{total_coords} ({frac:.4%})" + ) + + # Early stopping: below tolerance or an exact 2-cycle oscillation. + converged = frac <= self.tol + oscillating = prev_centers is not None and np.array_equal(new_centers, prev_centers) + final_labels_match_centers = changed == 0 + old_centers = centers + centers = new_centers + + if converged or oscillating: + if self.verbose: + reason = "converged" if converged else "oscillation" + print(f" Stopped at iteration {it + 1} ({reason})") + break + prev_centers = old_centers.copy() + + # Store centres in the pqkmeans backend for compatibility + self._cluster._impl.set_cluster_centers(centers.tolist()) + self._fit_labels = None + + if not return_labels: + return None + + if labels is None: + raise RuntimeError("GPU fit did not produce assignments") + + if final_labels_match_centers: + return labels + + return predict_gpu( + pq_codes, + centers, + dtables, + batch_size=fit_batch, + verbose=self.verbose, + ) + def predict(self, X: np.ndarray, device: str = 'auto') -> np.ndarray: """Predict cluster labels for PQ codes. @@ -147,34 +322,56 @@ def predict(self, X: np.ndarray, device: str = 'auto') -> np.ndarray: raise ValueError("Must be trained before clustering. Use `.fit()` first") if self._dtables is None: self._dtables = _build_distance_tables(self.encoder.codewords) - self._centers_u8 = self.cluster_centers_.astype(np.uint8) + self._centers_u8 = self.cluster_centers_.astype(self.encoder.codebook_dtype) - use_gpu = (device == 'gpu') or (device == 'auto' and _GPU_AVAILABLE) - codes = np.asarray(X, dtype=np.uint8) + use_gpu = self._should_use_gpu(device) if use_gpu: - if not _GPU_AVAILABLE: - raise RuntimeError("GPU requested but CUDA/Triton not available") - return predict_gpu(codes, self._centers_u8, self._dtables) + codes = np.asarray(X, dtype=np.uint8) + centers = np.asarray(self._centers_u8, dtype=np.uint8) + return predict_gpu(codes, centers, self._dtables) + codes = np.asarray(X, dtype=self.encoder.codebook_dtype) return _predict_numba(codes, self._centers_u8, self._dtables) - def fit_predict(self, X: np.ndarray) -> np.ndarray: + def fit_predict(self, X: np.ndarray, device: str = 'auto') -> np.ndarray: """Fit the model and predict cluster labels in one step. Args: X: PQ codes of shape (n_samples, n_subvectors) + device: 'cpu', 'gpu', or 'auto' (default). Returns: Cluster labels of shape (n_samples,) """ + use_gpu = self._should_use_gpu(device) + + if use_gpu: + labels = self._fit_gpu(X, return_labels=True) + self.trained = True + self._dtables = None + self._centers_u8 = None + self._fit_labels = None + return labels + + labels = np.array(self._cluster.fit_predict(X)) self.trained = True - return np.array(self._cluster.fit_predict(X)) + self._dtables = None + self._centers_u8 = None + self._fit_labels = None + return labels @property def is_trained(self) -> bool: return bool(self.trained) # type: ignore + def __getstate__(self): + state = self.__dict__.copy() + state["_dtables"] = None + state["_centers_u8"] = None + state["_fit_labels"] = None + return state + def save(self, path: str | Path) -> None: path = Path(path) joblib.dump(self, path) @@ -187,4 +384,8 @@ def load(cls, path: str | Path) -> "PQKMeans": raise TypeError(f"Expected {cls.__name__}, got {type(obj).__name__}") if not hasattr(obj, '_dtables'): obj._dtables = None + if not hasattr(obj, '_centers_u8'): + obj._centers_u8 = None + if not hasattr(obj, '_fit_labels'): + obj._fit_labels = None return obj diff --git a/chelombus/clustering/_gpu_predict.py b/chelombus/clustering/_gpu_predict.py index a911473..218702f 100644 --- a/chelombus/clustering/_gpu_predict.py +++ b/chelombus/clustering/_gpu_predict.py @@ -4,18 +4,21 @@ The kernel computes symmetric PQ distance via lookup tables and maintains an online argmin, never materializing the N x K distance matrix. +Supports any number of subvectors M via tl.static_range compile-time +unrolling — Triton JIT-compiles a specialized kernel per M value. + VRAM budget per call -------------------- Resident (cached, allocated once): - centers: K × M bytes (100K × 6 = 600 KB) - dtables: M × 256 × 256 × 4 bytes (6 × 256 × 256 × 4 = 1.5 MB) + centers: K x M bytes + dtables: M x 256 x 256 x 4 bytes Per-batch (freed after each batch): - codes: batch_n × M bytes (1 byte per subvector) - labels: batch_n × 4 bytes (int32) - → 10 bytes per point + codes: batch_n x M bytes (1 byte per subvector) + labels: batch_n x 4 bytes (int32) + -> (M + 4) bytes per point -So for a given free VRAM of F bytes, max batch ≈ F / 10. +So for a given free VRAM of F bytes, max batch ~ F / (M + 4). """ import numpy as np @@ -40,51 +43,27 @@ def _pq_assign_kernel( BLOCK_K: tl.constexpr, # number of centers per tile ): pid = tl.program_id(0) - point_offs = pid * BLOCK_N + tl.arange(0, BLOCK_N) + # int64 offsets: point_offs * M overflows int32 when N > ~357M + point_offs = (pid * BLOCK_N + tl.arange(0, BLOCK_N)).to(tl.int64) point_mask = point_offs < N best_dist = tl.full((BLOCK_N,), float('inf'), dtype=tl.float32) best_label = tl.zeros((BLOCK_N,), dtype=tl.int32) - # Pre-load point codes per subvector (M=6, unrolled) - pc0 = tl.load(codes_ptr + point_offs * M + 0, mask=point_mask, other=0).to(tl.int32) - pc1 = tl.load(codes_ptr + point_offs * M + 1, mask=point_mask, other=0).to(tl.int32) - pc2 = tl.load(codes_ptr + point_offs * M + 2, mask=point_mask, other=0).to(tl.int32) - pc3 = tl.load(codes_ptr + point_offs * M + 3, mask=point_mask, other=0).to(tl.int32) - pc4 = tl.load(codes_ptr + point_offs * M + 4, mask=point_mask, other=0).to(tl.int32) - pc5 = tl.load(codes_ptr + point_offs * M + 5, mask=point_mask, other=0).to(tl.int32) + TABLE: tl.constexpr = 256 * 256 # Tile over centers for c_start in range(0, K, BLOCK_K): c_offs = c_start + tl.arange(0, BLOCK_K) c_mask = c_offs < K - cc0 = tl.load(centers_ptr + c_offs * M + 0, mask=c_mask, other=0).to(tl.int32) - cc1 = tl.load(centers_ptr + c_offs * M + 1, mask=c_mask, other=0).to(tl.int32) - cc2 = tl.load(centers_ptr + c_offs * M + 2, mask=c_mask, other=0).to(tl.int32) - cc3 = tl.load(centers_ptr + c_offs * M + 3, mask=c_mask, other=0).to(tl.int32) - cc4 = tl.load(centers_ptr + c_offs * M + 4, mask=c_mask, other=0).to(tl.int32) - cc5 = tl.load(centers_ptr + c_offs * M + 5, mask=c_mask, other=0).to(tl.int32) - - TABLE = 256 * 256 - - idx0 = pc0[:, None] * 256 + cc0[None, :] - dist = tl.load(dtables_ptr + 0 * TABLE + idx0) - - idx1 = pc1[:, None] * 256 + cc1[None, :] - dist += tl.load(dtables_ptr + 1 * TABLE + idx1) - - idx2 = pc2[:, None] * 256 + cc2[None, :] - dist += tl.load(dtables_ptr + 2 * TABLE + idx2) + dist = tl.zeros((BLOCK_N, BLOCK_K), dtype=tl.float32) - idx3 = pc3[:, None] * 256 + cc3[None, :] - dist += tl.load(dtables_ptr + 3 * TABLE + idx3) - - idx4 = pc4[:, None] * 256 + cc4[None, :] - dist += tl.load(dtables_ptr + 4 * TABLE + idx4) - - idx5 = pc5[:, None] * 256 + cc5[None, :] - dist += tl.load(dtables_ptr + 5 * TABLE + idx5) + for m in tl.static_range(M): + pc = tl.load(codes_ptr + point_offs * M + m, mask=point_mask, other=0).to(tl.int32) + cc = tl.load(centers_ptr + c_offs * M + m, mask=c_mask, other=0).to(tl.int32) + idx = pc[:, None] * 256 + cc[None, :] + dist += tl.load(dtables_ptr + m * TABLE + idx) dist = tl.where(c_mask[None, :], dist, float('inf')) @@ -99,20 +78,26 @@ def _pq_assign_kernel( tl.store(labels_ptr + point_offs, best_label, mask=point_mask) -# --------------------------------------------------------------------------- # GPU tensor cache (centers + dtables persist across predict calls) -# --------------------------------------------------------------------------- _gpu_cache: dict = {} def _get_or_upload(key: str, array: np.ndarray, dtype: torch.dtype) -> torch.Tensor: - """Upload numpy array to GPU, caching by (key, data_ptr, shape).""" - cache_key = (key, array.ctypes.data, array.shape) - cached = _gpu_cache.get(cache_key) - if cached is not None: - return cached - tensor = torch.from_numpy(np.ascontiguousarray(array)).to(dtype=dtype, device='cuda') - _gpu_cache[cache_key] = tensor + """Upload numpy array to GPU, caching by key with content comparison. + + Stores one tensor per key. On a cache hit the stored reference array is + compared element-wise to *array*; a mismatch triggers a re-upload. + This avoids the old scheme of caching by memory address, which silently + returned stale tensors when Python reused a freed address. + """ + arr = np.ascontiguousarray(array) + entry = _gpu_cache.get(key) + if entry is not None: + ref, tensor = entry + if ref.shape == arr.shape and np.array_equal(ref, arr): + return tensor + tensor = torch.from_numpy(arr).to(dtype=dtype, device='cuda') + _gpu_cache[key] = (arr.copy(), tensor) return tensor @@ -122,7 +107,7 @@ def _auto_batch_size(N: int, M: int) -> int: Per-point VRAM: M bytes (codes) + 4 bytes (labels) = M + 4 We leave _VRAM_OVERHEAD_MB for PyTorch/Triton context and cached tensors. """ - free, total = torch.cuda.mem_get_info() + free, _total = torch.cuda.mem_get_info() usable = free - _VRAM_OVERHEAD_MB * 1024 * 1024 if usable < 0: usable = free // 2 @@ -131,15 +116,14 @@ def _auto_batch_size(N: int, M: int) -> int: return min(max_batch, N) -# --------------------------------------------------------------------------- # Public API -# --------------------------------------------------------------------------- def predict_gpu( pq_codes: np.ndarray, centers: np.ndarray, dtables: np.ndarray, batch_size: int = 0, + verbose: bool = False, ) -> np.ndarray: """GPU-accelerated PQ assignment. @@ -149,16 +133,16 @@ def predict_gpu( dtables: (M, 256, 256) float32 distance lookup tables. batch_size: Max points per GPU batch. 0 (default) = auto-detect from free VRAM. + verbose: Print per-batch progress (useful for billion-scale runs). Returns: (N,) int64 cluster labels (same dtype as CPU path). """ + import time as _time + N, M = pq_codes.shape K = centers.shape[0] - if M != 6: - raise ValueError(f"Triton kernel is compiled for M=6, got M={M}") - # Kernel assumes dtables are (M, 256, 256). Pad if k_codebook < 256. if dtables.shape[1] != 256 or dtables.shape[2] != 256: padded = np.zeros((M, 256, 256), dtype=np.float32) @@ -175,7 +159,20 @@ def predict_gpu( labels_out = np.empty(N, dtype=np.int64) - for start in range(0, N, batch_size): + # Adaptive BLOCK_K: larger M means more registers per subvector, + # so reduce BLOCK_K to avoid register spill. + if M <= 8: + BLOCK_K = 128 + elif M <= 32: + BLOCK_K = 64 + else: + BLOCK_K = 32 + BLOCK_N = 32 + + n_batches = (N + batch_size - 1) // batch_size + t0 = _time.time() + + for batch_idx, start in enumerate(range(0, N, batch_size)): end = min(start + batch_size, N) chunk = pq_codes[start:end] n_chunk = end - start @@ -183,20 +180,24 @@ def predict_gpu( codes_gpu = torch.from_numpy(np.ascontiguousarray(chunk, dtype=np.uint8)).cuda() labels_gpu = torch.empty(n_chunk, dtype=torch.int32, device='cuda') - _launch_kernel(codes_gpu, centers_gpu, dtables_gpu, labels_gpu, n_chunk, K, M) + grid = ((n_chunk + BLOCK_N - 1) // BLOCK_N,) + _pq_assign_kernel[grid]( + codes_gpu, centers_gpu, dtables_gpu, labels_gpu, + n_chunk, K, M, + BLOCK_N=BLOCK_N, BLOCK_K=BLOCK_K, + ) labels_out[start:end] = labels_gpu.cpu().numpy() del codes_gpu, labels_gpu - return labels_out - + if verbose and n_batches > 1: + elapsed = _time.time() - t0 + rate = end / elapsed + eta = (N - end) / rate if rate > 0 else 0 + print(f" batch {batch_idx+1}/{n_batches} " + f"{end:,}/{N:,} ({end/N*100:.0f}%) " + f"rate={rate:,.0f} pts/s " + f"ETA={int(eta//60)}m{int(eta%60)}s", + flush=True) -def _launch_kernel(codes, centers, dtables, labels, N, K, M): - BLOCK_N = 32 - BLOCK_K = 128 - grid = ((N + BLOCK_N - 1) // BLOCK_N,) - _pq_assign_kernel[grid]( - codes, centers, dtables, labels, - N, K, M, - BLOCK_N=BLOCK_N, BLOCK_K=BLOCK_K, - ) + return labels_out diff --git a/chelombus/encoder/encoder.py b/chelombus/encoder/encoder.py index 7208d14..cbfd61b 100644 --- a/chelombus/encoder/encoder.py +++ b/chelombus/encoder/encoder.py @@ -12,7 +12,9 @@ if torch.cuda.is_available(): _GPU_AVAILABLE = True except ImportError: - print('something went wrong') + pass + + class PQEncoder(PQEncoderBase): """ Class to encode high-dimensional vectors into PQ-codes. @@ -50,50 +52,155 @@ def __init__(self, k:int=256, m:int=8, iterations=20): @property def is_trained(self) -> bool: return self.encoder_is_trained - def fit(self, X_train:NDArray, verbose:int=1, **kwargs)->None: - """ KMeans fitting of every subvector matrix from the X_train matrix. Populates + def fit(self, X_train:NDArray, verbose:int=1, device:str='cpu', **kwargs)->None: + """ KMeans fitting of every subvector matrix from the X_train matrix. Populates the codebook by storing the cluster centers of every subvector - X_train is the input matrix. For a vector that has dimension D + X_train is the input matrix. For a vector that has dimension D then X_train is a matrix of size (N, D) where N is the number of rows (vectors) and D the number of columns (dimension of every vector i.e. fingerprint in the case of molecular data) Args: - X_train(np.array): Input matrix to train the encoder. + X_train(np.array): Input matrix to train the encoder. verbose(int): Level of verbosity. Default is 1 - **kwargs: Optional keyword arguments passed to the underlying KMeans `fit()` function. + device: 'cpu' for sklearn KMeans, 'gpu' for torch-based KMeans on CUDA, + 'auto' to pick GPU if available. Default is 'cpu'. + **kwargs: Optional keyword arguments passed to the underlying KMeans `fit()` function + (only used on the CPU path). """ - + assert X_train.ndim == 2, "The input can only be a matrix (X.ndim == 2)" N, D = X_train.shape # N number of input vectors, D dimension of the vectors assert self.k < N, "the number of training vectors (N for N,D = X_train.shape) should be more than the number of centroids (K)" - assert D % self.m == 0, f"Vector (fingeprint) dimension should be divisible by the number of subvectors (m). Got {D} / {self.m}" - self.D_subvector = int(D / self.m) # Dimension of the subvector. + assert D % self.m == 0, f"Vector (fingeprint) dimension should be divisible by the number of subvectors (m). Got {D} / {self.m}" + self.D_subvector = int(D / self.m) # Dimension of the subvector. self.og_D = D # We save the original dimensions of the input vector (fingerprint) for later use assert self.encoder_is_trained == False, "Encoder can only be fitted once" - - self.codewords= np.zeros((self.m, self.k, self.D_subvector), dtype=float) - + + self.codewords= np.zeros((self.m, self.k, self.D_subvector), dtype=np.float32) + + use_gpu = (device == 'gpu') or (device == 'auto' and _GPU_AVAILABLE) + if use_gpu: + if not _GPU_AVAILABLE: + raise RuntimeError("GPU requested but CUDA not available") + self._fit_gpu(X_train, verbose) + else: + self._fit_cpu(X_train, verbose, **kwargs) + + self.encoder_is_trained = True + del X_train # remove initial training data from memory + + def _fit_cpu(self, X_train: NDArray, verbose: int = 1, **kwargs) -> None: + subvector_dim = self.D_subvector + iterable = range(self.m) - if verbose > 0: - iterable = tqdm(iterable, desc='Training PQ-encoder', total=self.m) - - # Divide the input vector into m subvectors - subvector_dim = int(D / self.m) + if verbose > 0: + iterable = tqdm(iterable, desc='Training PQ-encoder', total=self.m) for subvector_idx in iterable: - X_train_subvector = X_train[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] - # For every subvector, run KMeans and store the centroids in the codebook + X_train_subvector = X_train[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] kmeans = KMeans(n_clusters=self.k, init='k-means++', max_iter=self.iterations, **kwargs).fit(X_train_subvector) self.pq_trained.append(kmeans) + self.codewords[subvector_idx] = kmeans.cluster_centers_ + + @staticmethod + def _gpu_encoder_batch_size( + N: int, + k: int, + reserve_mb: int = 1024, + min_batch: int = 250_000, + max_batch: int = 2_000_000, + ) -> int: + """Choose a GPU assignment batch size from the current free VRAM. + + This is called after the resident tensors for one subvector are already + allocated on the GPU. The main scratch tensors per batch point are: + + - one float32 distance row of length ``k`` + - one int64 label from ``argmin`` + - one float32 ``ones`` entry for ``index_add_`` + + A reserve margin is kept for CUDA context, allocator fragmentation, + and GEMM workspace. If VRAM telemetry is unavailable, the method falls + back to a conservative fixed 1 GiB scratch budget. + """ + if N <= 0: + return 0 + + floor = min(min_batch, N) + ceiling = min(max_batch, N) + bytes_per_point = k * 4 + 8 + 4 + fallback_budget = 1 * 1024**3 + + try: + free, _ = torch.cuda.mem_get_info() + usable = free - reserve_mb * 1024**2 + scratch_budget = usable if usable > 0 else max(free // 2, floor * bytes_per_point) + except Exception: + scratch_budget = fallback_budget + + batch = max(scratch_budget // bytes_per_point, floor) + return int(min(max(batch, floor), ceiling)) + + def _fit_gpu(self, X_train: NDArray, verbose: int = 1) -> None: + """GPU-accelerated KMeans fitting with batched assignment. + + Each subvector's data (N x D_sub) is kept GPU-resident. The + distance matrix is computed in batches to cap VRAM at ~1 GB + scratch regardless of N. The GEMM form + ``||x-c||² = ||x||² + ||c||² - 2·x·cᵀ`` avoids ``torch.cdist`` + workspace overhead and fuses well with cuBLAS. + """ + N = X_train.shape[0] + subvector_dim = self.D_subvector + X_f32 = np.ascontiguousarray(X_train, dtype=np.float32) + rng = np.random.default_rng() - # Results for training 1M 1024 dimensional fingerprints were: 5 min 58 s, with k=256, m=4, iterations=200, this is much faster than using scipy - # Store the cluster_centers coordinates in the codebook - self.codewords[subvector_idx] = kmeans.cluster_centers_ # Store the cluster_centers coordinates in the codebook + iterable = range(self.m) + if verbose > 0: + iterable = tqdm(iterable, desc='Training PQ-encoder (GPU)', total=self.m) - self.encoder_is_trained = True - del X_train # remove initial training data from memory + for subvector_idx in iterable: + sub_slice = X_f32[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] + X_gpu = torch.from_numpy(sub_slice).cuda() + # Precompute ||x||² (stays constant across iterations) + x_sq = (X_gpu * X_gpu).sum(dim=1) # (N,) + B = self._gpu_encoder_batch_size(N, self.k) + + # Random init + idx = rng.choice(N, size=self.k, replace=(N < self.k)) + centroids = X_gpu[torch.from_numpy(idx.astype(np.int64)).cuda()].clone() + + for _ in range(self.iterations): + c_sq = (centroids * centroids).sum(dim=1) # (k,) + counts = torch.zeros(self.k, dtype=torch.float32, device='cuda') + sums = torch.zeros_like(centroids) + + for start in range(0, N, B): + end = min(start + B, N) + x_batch = X_gpu[start:end] # view, no copy + # ||x-c||² = ||x||² + ||c||² - 2·x·cᵀ + dist = (x_sq[start:end, None] + + c_sq[None, :] + - 2.0 * (x_batch @ centroids.T)) + dist.clamp_min_(0.0) + labels = dist.argmin(dim=1) + del dist + + ones = torch.ones(end - start, dtype=torch.float32, device='cuda') + counts.index_add_(0, labels, ones) + sums.index_add_(0, labels, x_batch) + del labels, ones + + empty = counts == 0 + counts.clamp_min_(1.0) + new_centroids = sums / counts.unsqueeze(1) + new_centroids[empty] = centroids[empty] + centroids = new_centroids + + self.codewords[subvector_idx] = centroids.cpu().numpy() + del X_gpu, centroids, x_sq def transform(self, X:NDArray, verbose:int=1, device:str='auto', **kwargs) -> NDArray: @@ -131,6 +238,7 @@ def transform(self, X:NDArray, verbose:int=1, device:str='auto', **kwargs) -> ND def _transform_cpu(self, X: NDArray, verbose: int = 1, **kwargs) -> NDArray: N, D = X.shape pq_codes = np.zeros((N, self.m), dtype=self.codebook_dtype) + has_sklearn_models = len(self.pq_trained) == self.m iterable = range(self.m) if verbose > 0: @@ -140,7 +248,22 @@ def _transform_cpu(self, X: NDArray, verbose: int = 1, **kwargs) -> NDArray: for subvector_idx in iterable: X_train_subvector = X[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] - pq_codes[:, subvector_idx] = self.pq_trained[subvector_idx].predict(X_train_subvector, **kwargs) + if has_sklearn_models: + pq_codes[:, subvector_idx] = self.pq_trained[subvector_idx].predict(X_train_subvector, **kwargs) + continue + + codewords = np.ascontiguousarray(self.codewords[subvector_idx], dtype=np.float32) + X_sub = np.ascontiguousarray(X_train_subvector, dtype=np.float32) + bytes_budget = 64 * 1024**2 + bytes_per_row = max(codewords.shape[0] * codewords.shape[1] * 4, 1) + chunk_size = max(1, min(N, bytes_budget // bytes_per_row)) + + for start in range(0, N, chunk_size): + end = min(start + chunk_size, N) + chunk = X_sub[start:end] + diff = chunk[:, None, :] - codewords[None, :, :] + dists = np.sum(diff * diff, axis=2) + pq_codes[start:end, subvector_idx] = np.argmin(dists, axis=1).astype(self.codebook_dtype) del X return pq_codes @@ -148,57 +271,59 @@ def _transform_cpu(self, X: NDArray, verbose: int = 1, **kwargs) -> NDArray: def _transform_gpu(self, X: NDArray) -> NDArray: """GPU-accelerated transform using torch.cdist + argmin. - Batches points to avoid OOM: each batch produces an (batch, k) distance - matrix of k × 4 bytes per point. With k=256 that is 1 KB/point, so - a 2 GB budget ≈ 2M points per batch. + Batches points to avoid OOM. Each batch is uploaded to the GPU once + and then sliced per subvector on-device, avoiding m separate PCIe + transfers and CPU-side contiguous copies. """ N, D = X.shape subvector_dim = int(D / self.m) pq_codes = np.zeros((N, self.m), dtype=self.codebook_dtype) cw_gpu = [ - torch.from_numpy(self.codewords[sub].astype(np.float32)).cuda() + torch.from_numpy(np.ascontiguousarray(self.codewords[sub], dtype=np.float32)).cuda() for sub in range(self.m) ] - # Budget: keep the (batch, k) distance matrix under ~2 GB - max_batch = max((2 * 1024**3) // (self.k * 4), 1024) - X_f32 = X.astype(np.float32) + # Budget: batch tensor (D floats) + distance matrix (k floats) per point + bytes_per_point = (D + self.k) * 4 + max_batch = max((2 * 1024**3) // bytes_per_point, 1024) + X_f32 = np.ascontiguousarray(X, dtype=np.float32) for start in range(0, N, max_batch): end = min(start + max_batch, N) + batch_gpu = torch.from_numpy(X_f32[start:end]).cuda() for sub in range(self.m): - chunk = torch.from_numpy( - np.ascontiguousarray(X_f32[start:end, subvector_dim * sub : subvector_dim * (sub + 1)]) - ).cuda() + chunk = batch_gpu[:, subvector_dim * sub : subvector_dim * (sub + 1)] dists = torch.cdist(chunk, cw_gpu[sub]) pq_codes[start:end, sub] = dists.argmin(dim=1).cpu().numpy() - del chunk, dists + del dists + del batch_gpu - del X, cw_gpu + del cw_gpu return pq_codes - def fit_transform(self, X:NDArray, verbose:int=1, **kwargs) -> NDArray: + def fit_transform(self, X:NDArray, verbose:int=1, device:str='cpu', **kwargs) -> NDArray: """Fit and transforms the input matrix `X` into its PQ-codes The encoder is trained on the matrix and then for each sample in X, - the input vector is split into `m` equal-sized vectors subvectors composed - byt the index of the closest centroid. Returns a compact representation of X, + the input vector is split into `m` equal-sized vectors subvectors composed + byt the index of the closest centroid. Returns a compact representation of X, where each sample is encoded as a sequence of centroid indices (i.e PQcodes) Args: X (np.array): Input data matrix of shape (n_samples, n_features) verbose (int, optional): Level of verbosity. Defaults to 1. + device: 'cpu', 'gpu', or 'auto'. Default is 'cpu'. **kwargs: Optional keyword. These arguments will be passed to the underlying KMeans - predict() function. + predict() function. Returns: np.array: PQ codes of shape (n_samples, m), where each element is the index of the nearest centroid for the corresponding subvector """ - self.fit(X, verbose, **kwargs) - return self.transform(X, verbose) + self.fit(X, verbose, device=device, **kwargs) + return self.transform(X, verbose, device=device) def inverse_transform(self, diff --git a/scripts/benchmark_1B_pipeline.py b/scripts/benchmark_1B_pipeline.py new file mode 100644 index 0000000..b58174b --- /dev/null +++ b/scripts/benchmark_1B_pipeline.py @@ -0,0 +1,191 @@ +"""Benchmark the full GPU pipeline on real MQN fingerprints. + +Streams fingerprint chunks from disk (never loading all into RAM at once), +runs all 4 pipeline steps on GPU, and reports wall-clock times. + +Input: a directory of .npy files, each containing an (N_chunk, 42) array +of MQN fingerprints (any integer dtype). Files are loaded in sorted order +until --n fingerprints have been consumed. + +Usage: + # Full 1B benchmark + python scripts/benchmark_1B_pipeline.py --chunks /path/to/mqn_chunks --n 1000000000 + + # Quick test on 100M + python scripts/benchmark_1B_pipeline.py --chunks /path/to/mqn_chunks --n 100000000 + + # Overnight (recommended) + nohup python -u scripts/benchmark_1B_pipeline.py --chunks /path/to/mqn_chunks > benchmark_results.txt 2>&1 & +""" +import argparse +import sys +import time +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from chelombus import PQEncoder +from chelombus.clustering.PyQKmeans import PQKMeans + + +def fmt(secs): + if secs < 60: + return f"{secs:.1f}s" + if secs < 3600: + return f"{secs/60:.1f} min" + if secs < 86400: + return f"{secs/3600:.1f} hrs" + return f"{secs/86400:.1f} days" + + +def iter_chunks(chunk_dir, n_total): + """Yield (chunk_array, start_idx, end_idx) from .npy files on disk.""" + chunks = sorted(Path(chunk_dir).glob("*.npy")) + if not chunks: + raise FileNotFoundError(f"No .npy files found in {chunk_dir}") + n = 0 + for p in chunks: + if n >= n_total: + break + arr = np.load(p) + need = n_total - n + if arr.shape[0] > need: + arr = arr[:need] + start = n + n += arr.shape[0] + yield arr, start, n + + +def load_training_sample(chunk_dir, n_train): + """Load first n_train fingerprints for encoder training.""" + loaded = [] + n = 0 + for arr, _, end in iter_chunks(chunk_dir, n_train): + loaded.append(arr) + n = end + return np.vstack(loaded)[:n_train] + + +def encode_chunked(encoder, chunk_dir, n_total): + """Encode fingerprints in streaming chunks on GPU, return PQ codes.""" + m = encoder.m + pq_codes = np.empty((n_total, m), dtype=np.uint8) + t0 = time.perf_counter() + actual_end = 0 + + for arr, start, end in iter_chunks(chunk_dir, n_total): + codes = encoder.transform(arr.astype(np.float32), verbose=0, device='gpu') + pq_codes[start:end] = codes + actual_end = end + elapsed = time.perf_counter() - t0 + rate = end / elapsed if elapsed > 0 else 0 + eta = (n_total - end) / rate if rate > 0 else 0 + print(f" {end:>13,}/{n_total:,} ({end/n_total*100:5.1f}%) " + f"rate={rate:,.0f} pts/s ETA={fmt(eta)}", flush=True) + + total = time.perf_counter() - t0 + return pq_codes[:actual_end], total + + +def bench(label, fn): + print(f" {label}...", end=" ", flush=True) + t0 = time.perf_counter() + result = fn() + elapsed = time.perf_counter() - t0 + print(f"{elapsed:.2f}s ({fmt(elapsed)})", flush=True) + return elapsed, result + + +def main(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--chunks", type=str, required=True, + help="Directory containing .npy MQN fingerprint chunks (N, 42)") + parser.add_argument("--n", type=int, default=1_000_000_000, + help="Number of fingerprints to use (default: 1B)") + parser.add_argument("--k", type=int, default=100_000, + help="Number of clusters (default: 100K)") + parser.add_argument("--fit-iters", type=int, default=5, + help="Cluster training iterations (default: 5)") + parser.add_argument("--train-sample", type=int, default=50_000_000, + help="Encoder training sample size (default: 50M)") + args = parser.parse_args() + + import torch + gpu_name = torch.cuda.get_device_name(0) + free_mb = torch.cuda.mem_get_info()[0] / 1024**2 + print(f"GPU: {gpu_name} ({free_mb:.0f} MB free)", flush=True) + print(f"N={args.n:,} K={args.k:,} fit_iters={args.fit_iters}", flush=True) + print(f"Chunks: {args.chunks}", flush=True) + print(flush=True) + + sep = "=" * 70 + results = {} + + # ── 1. Encoder training ────────────────────────────────────────────── + print(f"{sep}\n STEP 1: Encoder training ({args.train_sample:,} sample)\n{sep}", flush=True) + + print(" Loading training sample...", flush=True) + train_sample = load_training_sample(args.chunks, args.train_sample).astype(np.float32) + print(f" Sample shape: {train_sample.shape}", flush=True) + + encoder = PQEncoder(k=256, m=6, iterations=20) + t_enc, _ = bench("GPU", lambda: encoder.fit(train_sample, verbose=1, device='gpu')) + results["Encoder training"] = t_enc + del train_sample + + # ── 2. PQ encoding ─────────────────────────────────────────────────── + print(f"\n{sep}\n STEP 2: PQ encoding ({args.n:,} fingerprints, streamed)\n{sep}", flush=True) + + # Warmup + dummy = np.random.randint(0, 255, (1000, 42), dtype=np.int16) + encoder.transform(dummy.astype(np.float32), verbose=0, device='gpu') + del dummy + + pq_codes, t_encode = encode_chunked(encoder, args.chunks, args.n) + N = pq_codes.shape[0] + print(f" Total: {fmt(t_encode)} ({N/t_encode:,.0f} pts/s)", flush=True) + results["PQ encoding"] = t_encode + + print(f" PQ codes: {pq_codes.shape} = {pq_codes.nbytes / 1e9:.1f} GB in RAM", flush=True) + + # ── 3. Cluster training ────────────────────────────────────────────── + print(f"\n{sep}\n STEP 3: Cluster training ({N:,}, K={args.k:,}, {args.fit_iters} iters, tol=0)\n{sep}", flush=True) + + clusterer = PQKMeans(encoder, k=args.k, iteration=args.fit_iters, tol=0, verbose=True) + t_fit, _ = bench("GPU", lambda: clusterer.fit(pq_codes, device='gpu')) + results["Cluster training"] = t_fit + + # ── 4. Label assignment ────────────────────────────────────────────── + print(f"\n{sep}\n STEP 4: Label assignment ({N:,}, K={args.k:,})\n{sep}", flush=True) + + clusterer.predict(pq_codes[:10000], device='gpu') # warmup + + t_pred, labels = bench("GPU", lambda: clusterer.predict(pq_codes, device='gpu')) + results["Label assignment"] = t_pred + + n_unique = len(np.unique(labels)) + print(f" Unique clusters: {n_unique:,} / {args.k:,}", flush=True) + + # ── Summary ────────────────────────────────────────────────────────── + total = sum(results.values()) + print(f"\n{sep}\n GPU RESULTS AT {N:,}\n{sep}", flush=True) + print(f" {'Stage':<35} {'Time':>12}", flush=True) + print(f" {'-'*49}", flush=True) + for stage, t in results.items(): + print(f" {stage:<35} {fmt(t):>12}", flush=True) + print(f" {'-'*49}", flush=True) + print(f" {'TOTAL':<35} {fmt(total):>12}", flush=True) + + print(f"\n Note: Cluster training uses tol=0 (no early stopping).", flush=True) + print(f" With default tol=1e-3, training typically converges in fewer", flush=True) + print(f" iterations, reducing cluster training time further.", flush=True) + print(f"\n Done.", flush=True) + + +if __name__ == "__main__": + main() diff --git a/scripts/k_selection_gpu.py b/scripts/k_selection_gpu.py new file mode 100644 index 0000000..7867a9a --- /dev/null +++ b/scripts/k_selection_gpu.py @@ -0,0 +1,167 @@ +"""GPU k-selection benchmark: sweep k values on 100M Enamine fingerprints. + +Loads MQN fingerprints from chunks, encodes to PQ codes on GPU, +then fits PQKMeans at each k value on GPU and computes clustering metrics. +All heavy steps (fit + predict) run on GPU. + +Usage: + nohup python -u scripts/k_selection_gpu.py --chunks /path/to/mqn_chunks > k_selection_results.txt 2>&1 & +""" +import argparse +import sys +import time +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from chelombus import PQEncoder +from chelombus.clustering.PyQKmeans import PQKMeans, _build_distance_tables + + +def compute_avg_distance(pq_codes, labels, centers, dtables): + """Compute average distance from each point to its assigned center. + + O(N*M) — just a lookup per point, no sweep over K. + """ + N, m = pq_codes.shape + total = 0.0 + centers_u8 = centers.astype(np.uint8) + # Vectorized: for each subvector, look up distance between point code and center code + for sub in range(m): + point_codes = pq_codes[:, sub] + center_codes = centers_u8[labels, sub] + total += dtables[sub, point_codes, center_codes].sum() + return total / N + + +def fmt(secs): + if secs < 60: + return f"{secs:.1f}s" + if secs < 3600: + return f"{secs/60:.1f} min" + return f"{secs/3600:.1f} hrs" + + +def iter_chunks(chunk_dir, n_total): + chunks = sorted(Path(chunk_dir).glob("*.npy")) + n = 0 + for p in chunks: + if n >= n_total: + break + arr = np.load(p) + need = n_total - n + if arr.shape[0] > need: + arr = arr[:need] + start = n + n += arr.shape[0] + yield arr, start, n + + +def main(): + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--chunks", type=str, required=True, + help="Directory with .npy MQN fingerprint chunks") + parser.add_argument("--encoder", type=str, default="models/paper_encoder.joblib") + parser.add_argument("--n-fingerprints", type=int, default=100_000_000) + parser.add_argument("--n-subsample", type=int, default=100_000_000) + parser.add_argument("--k-values", type=int, nargs="+", + default=[10_000, 25_000, 50_000, 100_000, 200_000]) + parser.add_argument("--iterations", type=int, default=10) + parser.add_argument("--seed", type=int, default=42) + args = parser.parse_args() + + import torch + print(f"GPU: {torch.cuda.get_device_name(0)}", flush=True) + print(f"n_fingerprints={args.n_fingerprints:,}, n_subsample={args.n_subsample:,}", flush=True) + print(flush=True) + + encoder = PQEncoder.load(args.encoder) + print(f"Encoder: m={encoder.m}, k={encoder.k}", flush=True) + + # Encode fingerprints to PQ codes (streamed from disk) + print(f"Encoding {args.n_fingerprints:,} fingerprints on GPU...", flush=True) + m = encoder.m + pq_codes = np.empty((args.n_fingerprints, m), dtype=np.uint8) + actual_end = 0 + t0 = time.perf_counter() + for arr, start, end in iter_chunks(args.chunks, args.n_fingerprints): + codes = encoder.transform(arr.astype(np.float32), verbose=0, device='gpu') + pq_codes[start:end] = codes + actual_end = end + if end % 10_000_000 == 0: + print(f" {end:,}/{args.n_fingerprints:,}", flush=True) + pq_codes = pq_codes[:actual_end] + print(f" Encoded {actual_end:,} in {fmt(time.perf_counter() - t0)}", flush=True) + + # Subsample if needed + rng = np.random.default_rng(args.seed) + n_total = pq_codes.shape[0] + n_sub = min(args.n_subsample, n_total) + if n_sub < n_total: + idx = rng.choice(n_total, size=n_sub, replace=False) + idx.sort() + pq_codes = pq_codes[idx] + print(f" Using {pq_codes.shape[0]:,} PQ codes\n", flush=True) + + dtables = _build_distance_tables(encoder.codewords) + + # Sweep k values (all GPU) + sep = "=" * 85 + print(f"{sep}", flush=True) + print(f" k-SELECTION: n={n_sub:,}, iters={args.iterations}, device=gpu", flush=True) + print(f"{sep}\n", flush=True) + + print(f"{'k':>10} {'Fit Time':>10} {'Avg Dist':>10} {'Empty%':>8} " + f"{'Med Size':>10}", flush=True) + print("-" * 58, flush=True) + + results = [] + for k in sorted(args.k_values): + if k >= n_sub: + print(f"{k:>10,} SKIPPED (k >= n)", flush=True) + continue + + # Fit on GPU + clusterer = PQKMeans(encoder, k=k, iteration=args.iterations, verbose=False) + t0 = time.perf_counter() + clusterer.fit(pq_codes, device='gpu') + t_fit = time.perf_counter() - t0 + + # Predict on GPU + labels = clusterer.predict(pq_codes, device='gpu') + + # Compute metrics (cheap O(N*M) lookups, no K sweep) + centers = clusterer.cluster_centers_ + avg_dist = compute_avg_distance(pq_codes, labels, centers, dtables) + counts = np.bincount(labels.astype(np.intp), minlength=k) + n_empty = int(np.sum(counts == 0)) + pct_empty = 100.0 * n_empty / k + sizes = counts[counts > 0] + med_size = float(np.median(sizes)) + + print(f"{k:>10,} {fmt(t_fit):>10} {avg_dist:>10.2f} {pct_empty:>7.1f}% " + f"{med_size:>10,.0f}", flush=True) + + results.append({ + "k": k, "fit_time": t_fit, "avg_dist": avg_dist, + "pct_empty": pct_empty, "med_size": med_size, + }) + + # Summary table for README + print(f"\n{sep}", flush=True) + print(" README TABLE:", flush=True) + print(f"{sep}\n", flush=True) + print("| k | Avg Distance | Empty Clusters | Median Cluster Size | Fit Time (GPU) |", flush=True) + print("|---:|---:|---:|---:|---:|", flush=True) + for r in results: + print(f"| {r['k']:,} | {r['avg_dist']:.2f} | {r['pct_empty']:.1f}% | " + f"{r['med_size']:,.0f} | {fmt(r['fit_time'])} |", flush=True) + + print(f"\nDone.", flush=True) + + +if __name__ == "__main__": + main() diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 513de20..8d3bfa3 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -9,11 +9,22 @@ from pathlib import Path from chelombus.encoder.encoder import PQEncoder +import chelombus.encoder.encoder as encoder_module +import chelombus.clustering.PyQKmeans as pyqkmeans_module # Skip all tests if pqkmeans is not installed pqkmeans_available = pytest.importorskip("pqkmeans", reason="pqkmeans not installed") -from chelombus.clustering.PyQKmeans import PQKMeans +from chelombus.clustering.PyQKmeans import PQKMeans, _predict_numba, _update_centers + +_GPU_AVAILABLE = False +try: + import torch + _GPU_AVAILABLE = torch.cuda.is_available() +except ImportError: + pass + +gpu = pytest.mark.skipif(not _GPU_AVAILABLE, reason="CUDA not available") @pytest.fixture @@ -265,3 +276,304 @@ def test_large_k_clustering(self, trained_encoder): unique_labels = np.unique(labels) assert len(unique_labels) > 1 assert labels.max() < k + + +class TestPQKMeansRegressions: + """Regression tests for GPU control flow that run without CUDA.""" + + def test_update_centers_preserves_empty_clusters(self): + """Empty clusters should keep their previous center instead of collapsing to zero.""" + pq_codes = np.array([[5, 7], [5, 7], [9, 1]], dtype=np.uint8) + labels = np.array([0, 0, 1], dtype=np.int64) + previous_centers = np.array([[5, 7], [9, 1], [4, 3]], dtype=np.uint8) + + dtables = np.ones((2, 256, 256), dtype=np.float32) + for sub in range(2): + np.fill_diagonal(dtables[sub], 0.0) + + new_centers = _update_centers( + pq_codes, + labels, + K=3, + dtables=dtables, + previous_centers=previous_centers, + ) + + np.testing.assert_array_equal(new_centers[0], [5, 7]) + np.testing.assert_array_equal(new_centers[1], [9, 1]) + np.testing.assert_array_equal(new_centers[2], previous_centers[2]) + + def test_mocked_gpu_fit_predict_matches_final_centers(self, monkeypatch, trained_encoder, sample_pq_codes): + """fit_predict on the GPU path must return labels for the final stored centers.""" + monkeypatch.setattr(pyqkmeans_module, "_GPU_AVAILABLE", True) + monkeypatch.setattr( + pyqkmeans_module, + "predict_gpu", + lambda pq_codes, centers, dtables, batch_size=0, verbose=False: + _predict_numba(pq_codes, centers, dtables), + raising=False, + ) + monkeypatch.setattr( + pyqkmeans_module.np.random, + "default_rng", + lambda *args, **kwargs: np.random.Generator(np.random.PCG64(0)), + ) + + clusterer = PQKMeans(trained_encoder, k=10, iteration=20, tol=1e-3, verbose=False) + labels = clusterer.fit_predict(sample_pq_codes, device='gpu') + final_labels = clusterer.predict(sample_pq_codes, device='cpu') + + np.testing.assert_array_equal(labels, final_labels) + + def test_mocked_gpu_fit_clears_transient_labels(self, monkeypatch, trained_encoder, sample_pq_codes): + """GPU fit should not leave the last training assignments attached to the model.""" + monkeypatch.setattr(pyqkmeans_module, "_GPU_AVAILABLE", True) + monkeypatch.setattr( + pyqkmeans_module, + "predict_gpu", + lambda pq_codes, centers, dtables, batch_size=0, verbose=False: + _predict_numba(pq_codes, centers, dtables), + raising=False, + ) + monkeypatch.setattr( + pyqkmeans_module.np.random, + "default_rng", + lambda *args, **kwargs: np.random.Generator(np.random.PCG64(1)), + ) + + clusterer = PQKMeans(trained_encoder, k=5, iteration=5, verbose=False) + clusterer.fit(sample_pq_codes, device='gpu') + + assert clusterer._fit_labels is None + assert clusterer.__getstate__()["_fit_labels"] is None + + def test_auto_falls_back_to_cpu_when_k_exceeds_256(self, monkeypatch, trained_encoder, sample_pq_codes): + """device='auto' should not try the GPU path when encoder.k > 256.""" + monkeypatch.setattr(pyqkmeans_module, "_GPU_AVAILABLE", True) + + def fail_predict_gpu(*args, **kwargs): + raise AssertionError("predict_gpu should not be called for k > 256") + + monkeypatch.setattr(pyqkmeans_module, "predict_gpu", fail_predict_gpu, raising=False) + + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, verbose=False) + # Pretend the encoder has k > 256 to trigger the guard + monkeypatch.setattr(trained_encoder, "k", 512) + + clusterer.fit(sample_pq_codes, device='auto') + labels = clusterer.predict(sample_pq_codes, device='auto') + + assert labels.shape == (len(sample_pq_codes),) + assert labels.min() >= 0 + + +class TestPQEncoderRegressions: + """Regression tests for CPU portability after GPU-oriented training changes.""" + + def test_gpu_encoder_batch_size_uses_free_vram(self, monkeypatch): + """GPU encoder batch sizing should respond to the current free VRAM.""" + torch = pytest.importorskip("torch", reason="torch not installed") + monkeypatch.setattr( + torch.cuda, + "mem_get_info", + lambda: (int(1.5 * 1024**3), int(16 * 1024**3)), + ) + + batch = PQEncoder._gpu_encoder_batch_size(N=10_000_000, k=256) + expected = max((int(0.5 * 1024**3)) // (256 * 4 + 8 + 4), 250_000) + + assert batch == expected + + def test_cpu_transform_without_sklearn_models_uses_codewords(self, tmp_path): + """A saved encoder must remain usable on the CPU even without stored sklearn models.""" + np.random.seed(11) + X = np.random.rand(200, 42).astype(np.float32) + + encoder = PQEncoder(k=16, m=6, iterations=5) + encoder.fit(X, verbose=0) + expected = encoder.transform(X[:50], verbose=0, device='cpu') + + encoder.pq_trained = [] + path = tmp_path / "encoder.pkl" + encoder.save(path) + + loaded = PQEncoder.load(path) + actual = loaded.transform(X[:50], verbose=0, device='cpu') + + np.testing.assert_array_equal(actual, expected) + + +# ── GPU-specific tests ──────────────────────────────────────────────────── + + +class TestGPUPredict: + """Tests for GPU prediction path.""" + + @gpu + def test_gpu_predict_matches_cpu(self, trained_encoder, sample_pq_codes): + """GPU and CPU predict must return identical labels.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, verbose=False) + clusterer.fit(sample_pq_codes, device='cpu') + + labels_cpu = clusterer.predict(sample_pq_codes, device='cpu') + labels_gpu = clusterer.predict(sample_pq_codes, device='gpu') + + np.testing.assert_array_equal(labels_cpu, labels_gpu) + + @gpu + def test_gpu_predict_shape_and_range(self, trained_encoder, sample_pq_codes): + """GPU predict returns correct shape and valid label range.""" + k = 5 + clusterer = PQKMeans(trained_encoder, k=k, iteration=3, verbose=False) + clusterer.fit(sample_pq_codes, device='cpu') + + labels = clusterer.predict(sample_pq_codes, device='gpu') + + assert labels.shape == (len(sample_pq_codes),) + assert labels.min() >= 0 + assert labels.max() < k + assert np.issubdtype(labels.dtype, np.integer) + + @gpu + def test_gpu_predict_deterministic(self, trained_encoder, sample_pq_codes): + """Repeated GPU predict calls return the same labels.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, verbose=False) + clusterer.fit(sample_pq_codes, device='cpu') + + labels1 = clusterer.predict(sample_pq_codes, device='gpu') + labels2 = clusterer.predict(sample_pq_codes, device='gpu') + + np.testing.assert_array_equal(labels1, labels2) + + +class TestGPUFit: + """Tests for GPU training path.""" + + @gpu + def test_gpu_fit_produces_valid_clusters(self, trained_encoder, sample_pq_codes): + """GPU fit must produce a trained model with valid labels.""" + k = 5 + clusterer = PQKMeans(trained_encoder, k=k, iteration=3, verbose=False) + clusterer.fit(sample_pq_codes, device='gpu') + + assert clusterer.is_trained + labels = clusterer.predict(sample_pq_codes) + assert labels.shape == (len(sample_pq_codes),) + assert labels.min() >= 0 + assert labels.max() < k + + @gpu + def test_gpu_fit_returns_self(self, trained_encoder, sample_pq_codes): + """GPU fit returns self for method chaining.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, verbose=False) + result = clusterer.fit(sample_pq_codes, device='gpu') + + assert result is clusterer + + @gpu + def test_gpu_fit_centers_are_valid_codes(self, trained_encoder, sample_pq_codes): + """Cluster centers after GPU fit must be valid codebook indices.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, verbose=False) + clusterer.fit(sample_pq_codes, device='gpu') + + centers = clusterer.cluster_centers_ + assert centers.shape == (5, trained_encoder.m) + assert centers.min() >= 0 + assert centers.max() < trained_encoder.k + + @gpu + def test_gpu_fit_predict(self, trained_encoder, sample_pq_codes): + """GPU fit_predict returns labels of correct shape and range.""" + k = 5 + clusterer = PQKMeans(trained_encoder, k=k, iteration=3, verbose=False) + labels = clusterer.fit_predict(sample_pq_codes, device='gpu') + + assert labels.shape == (len(sample_pq_codes),) + assert labels.min() >= 0 + assert labels.max() < k + + +class TestGPUSaveLoad: + """Tests for save/load round-trip after GPU training.""" + + @gpu + def test_gpu_save_load_labels_match(self, trained_encoder, sample_pq_codes, tmp_path): + """Labels must match after save/load of a GPU-trained model.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, tol=0, verbose=False) + clusterer.fit(sample_pq_codes, device='gpu') + labels_before = clusterer.predict(sample_pq_codes) + + path = tmp_path / "gpu_model.pkl" + clusterer.save(path) + loaded = PQKMeans.load(path) + labels_after = loaded.predict(sample_pq_codes) + + np.testing.assert_array_equal(labels_before, labels_after) + + @gpu + def test_gpu_save_load_gpu_predict(self, trained_encoder, sample_pq_codes, tmp_path): + """GPU predict works correctly on a loaded model.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, tol=0, verbose=False) + clusterer.fit(sample_pq_codes, device='gpu') + labels_before = clusterer.predict(sample_pq_codes, device='gpu') + + path = tmp_path / "gpu_model.pkl" + clusterer.save(path) + loaded = PQKMeans.load(path) + labels_after = loaded.predict(sample_pq_codes, device='gpu') + + np.testing.assert_array_equal(labels_before, labels_after) + + +class TestEarlyStopping: + """Tests for early stopping in GPU training.""" + + @gpu + def test_tol_stops_before_max_iterations(self, trained_encoder, sample_pq_codes): + """With default tol, training should stop before iteration limit.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=50, tol=1e-3, verbose=False) + clusterer.fit(sample_pq_codes, device='gpu') + + # If early stopping works, it should finish quickly (not run 50 iters). + # We just verify it completes and produces valid output. + assert clusterer.is_trained + labels = clusterer.predict(sample_pq_codes) + assert labels.min() >= 0 + assert labels.max() < 5 + + @gpu + def test_tol_zero_only_stops_on_exact_convergence(self, trained_encoder, sample_pq_codes): + """tol=0 should only stop when no centers change at all.""" + clusterer = PQKMeans(trained_encoder, k=5, iteration=3, tol=0, verbose=False) + clusterer.fit(sample_pq_codes, device='gpu') + + assert clusterer.is_trained + + @gpu + def test_default_tol_value(self, trained_encoder): + """Default tol should be 1e-3.""" + clusterer = PQKMeans(trained_encoder, k=5) + assert clusterer.tol == 1e-3 + + +class TestGPUEncoderTransform: + """Tests for GPU encoder transform path.""" + + @gpu + def test_gpu_transform_matches_cpu(self, trained_encoder): + """GPU and CPU transform must produce identical PQ codes.""" + np.random.seed(99) + X = np.random.rand(200, 42).astype(np.float64) + + codes_cpu = trained_encoder.transform(X, verbose=0, device='cpu') + codes_gpu = trained_encoder.transform(X, verbose=0, device='gpu') + + np.testing.assert_array_equal(codes_cpu, codes_gpu) + + @gpu + def test_gpu_transform_dtype(self, trained_encoder): + """GPU transform must return the correct codebook dtype.""" + X = np.random.rand(50, 42).astype(np.float64) + codes = trained_encoder.transform(X, verbose=0, device='gpu') + + assert codes.dtype == trained_encoder.codebook_dtype