From 15083ff33b8fd2b3f391be746cd57bcce212a9e7 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Fri, 14 Nov 2025 10:39:18 +0400 Subject: [PATCH 1/9] Add partial results persistence feature - Add RUNNING status to JobStatus enum - Add ThreadLocal in BulkScanWorker to pass ScanJobDescription to scan implementations - Add upsertPartialScanResult() method to IPersistenceProvider interface - Implement upsertPartialScanResult() in MongoPersistenceProvider - Update DummyPersistenceProvider test implementation - Update BulkScanWorkerManager to pass ScanJobDescription to worker.handle() This enables real-time persistence of intermediate scan results to MongoDB as probes complete during TLS scans. --- .../rub/nds/crawler/constant/JobStatus.java | 2 + .../rub/nds/crawler/core/BulkScanWorker.java | 30 ++++++++++++--- .../crawler/core/BulkScanWorkerManager.java | 2 +- .../persistence/IPersistenceProvider.java | 11 ++++++ .../persistence/MongoPersistenceProvider.java | 37 +++++++++++++++++++ .../dummy/DummyPersistenceProvider.java | 8 ++++ 6 files changed, 84 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/rub/nds/crawler/constant/JobStatus.java b/src/main/java/de/rub/nds/crawler/constant/JobStatus.java index 99c521b..03765fd 100644 --- a/src/main/java/de/rub/nds/crawler/constant/JobStatus.java +++ b/src/main/java/de/rub/nds/crawler/constant/JobStatus.java @@ -15,6 +15,8 @@ public enum JobStatus { /** Job is waiting to be executed. */ TO_BE_EXECUTED(false), + /** Job is currently being executed. Partial results may be available in DB. */ + RUNNING(false), /** The domain was not resolvable. An empty result was written to DB. */ UNRESOLVABLE(true), /** An uncaught exception occurred while resolving the host. */ diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java index 11831cc..37ed4c2 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java @@ -9,6 +9,7 @@ package de.rub.nds.crawler.core; import de.rub.nds.crawler.data.ScanConfig; +import de.rub.nds.crawler.data.ScanJobDescription; import de.rub.nds.crawler.data.ScanTarget; import de.rub.nds.crawler.util.CanceallableThreadPoolExecutor; import de.rub.nds.scanner.core.execution.NamedThreadFactory; @@ -41,6 +42,10 @@ public abstract class BulkScanWorker { /** The scan configuration for this worker */ protected final T scanConfig; + // ThreadLocal to pass ScanJobDescription to scan() implementations + private static final ThreadLocal currentJobDescription = + new ThreadLocal<>(); + /** * Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into * a future object such that we can handle timeouts properly. @@ -75,22 +80,37 @@ protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThread * initialize itself. In this case it will also clean up itself if all jobs are done. * * @param scanTarget The target to scan. + * @param jobDescription The job description for this scan. * @return A future that resolves to the scan result once the scan is done. */ - public Future handle(ScanTarget scanTarget) { + public Future handle(ScanTarget scanTarget, ScanJobDescription jobDescription) { // if we initialized ourself, we also clean up ourself shouldCleanupSelf.weakCompareAndSetAcquire(false, init()); activeJobs.incrementAndGet(); return timeoutExecutor.submit( () -> { - Document result = scan(scanTarget); - if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) { - cleanup(); + try { + currentJobDescription.set(jobDescription); + Document result = scan(scanTarget); + if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) { + cleanup(); + } + return result; + } finally { + currentJobDescription.remove(); } - return result; }); } + /** + * Get the ScanJobDescription for the current scan. Only valid when called from within scan(). + * + * @return The current ScanJobDescription, or null if not in a scan context + */ + protected ScanJobDescription getCurrentJobDescription() { + return currentJobDescription.get(); + } + /** * Scans a target and returns the result as a Document. This is the core scanning functionality * that must be implemented by subclasses. diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java index 3e78782..482543c 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java @@ -148,6 +148,6 @@ public Future handle( bulkScanInfo.getScanConfig(), parallelConnectionThreads, parallelScanThreads); - return worker.handle(scanJobDescription.getScanTarget()); + return worker.handle(scanJobDescription.getScanTarget(), scanJobDescription); } } diff --git a/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java b/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java index 30d2ffb..1619ff6 100644 --- a/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java +++ b/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java @@ -27,6 +27,17 @@ public interface IPersistenceProvider { */ void insertScanResult(ScanResult scanResult, ScanJobDescription job); + /** + * Upsert a partial scan result into the database. This method updates an existing document with + * the same ID, or inserts a new one if it doesn't exist. Used during scanning to persist + * intermediate results as probes complete. The document is identified by the job's UUID and + * will be overwritten when insertScanResult() is called with the final result. + * + * @param partialResult The partial scan result to upsert (contains intermediate data). + * @param job The job that is being executed. + */ + void upsertPartialScanResult(ScanResult partialResult, ScanJobDescription job); + /** * Insert a bulk scan into the database. This is used to store metadata about the bulk scan. * This adds an ID to the bulk scan. diff --git a/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java b/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java index a1278c1..6308a67 100644 --- a/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java +++ b/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java @@ -275,6 +275,43 @@ public void insertScanResult(ScanResult scanResult, ScanJobDescription scanJobDe } } + @Override + public void upsertPartialScanResult( + ScanResult partialResult, ScanJobDescription scanJobDescription) { + LOGGER.debug( + "Upserting partial scan result for job ID: {} with status: {}", + scanJobDescription.getId(), + partialResult.getResultStatus()); + + // Set the ID to match the job's UUID (same document as final result will use) + partialResult.setId(scanJobDescription.getId().toString()); + + try { + var collection = + resultCollectionCache.getUnchecked( + Pair.of( + scanJobDescription.getDbName(), + scanJobDescription.getCollectionName())); + + // Use replaceOne with upsert option to update existing or insert new + com.mongodb.client.model.ReplaceOptions replaceOptions = + new com.mongodb.client.model.ReplaceOptions().upsert(true); + + org.bson.Document filter = new org.bson.Document("_id", partialResult.getId()); + + collection.replaceOne(filter, partialResult, replaceOptions); + + LOGGER.debug( + "Upserted partial result for job ID: {} to collection: {}.{}", + scanJobDescription.getId(), + scanJobDescription.getDbName(), + scanJobDescription.getCollectionName()); + } catch (Exception e) { + LOGGER.warn("Exception while upserting partial result to MongoDB (non-fatal): ", e); + // Don't throw - partial result persistence should not fail the scan + } + } + @Override public List getScanResultsByTarget( String dbName, String collectionName, String target) { diff --git a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java index 501b3d4..10ac1f8 100644 --- a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java +++ b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java @@ -25,6 +25,14 @@ public void insertScanResult(ScanResult scanResult, ScanJobDescription job) { results.add(scanResult); } + @Override + public void upsertPartialScanResult(ScanResult partialResult, ScanJobDescription job) { + // Remove existing result with same ID if present + results.removeIf(r -> r.getId() != null && r.getId().equals(partialResult.getId())); + // Add the partial result + results.add(partialResult); + } + @Override public void insertBulkScan(BulkScan bulkScan) { bulkScans.add(bulkScan); From c5ee54f7c6abe527b4d2ad4fb29f25d284a7c9e3 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Wed, 19 Nov 2025 14:25:36 +0400 Subject: [PATCH 2/9] Removed unused code --- .../persistence/IPersistenceProvider.java | 11 ------ .../persistence/MongoPersistenceProvider.java | 37 ------------------- .../dummy/DummyPersistenceProvider.java | 8 ---- 3 files changed, 56 deletions(-) diff --git a/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java b/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java index 1619ff6..30d2ffb 100644 --- a/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java +++ b/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java @@ -27,17 +27,6 @@ public interface IPersistenceProvider { */ void insertScanResult(ScanResult scanResult, ScanJobDescription job); - /** - * Upsert a partial scan result into the database. This method updates an existing document with - * the same ID, or inserts a new one if it doesn't exist. Used during scanning to persist - * intermediate results as probes complete. The document is identified by the job's UUID and - * will be overwritten when insertScanResult() is called with the final result. - * - * @param partialResult The partial scan result to upsert (contains intermediate data). - * @param job The job that is being executed. - */ - void upsertPartialScanResult(ScanResult partialResult, ScanJobDescription job); - /** * Insert a bulk scan into the database. This is used to store metadata about the bulk scan. * This adds an ID to the bulk scan. diff --git a/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java b/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java index 6308a67..a1278c1 100644 --- a/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java +++ b/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java @@ -275,43 +275,6 @@ public void insertScanResult(ScanResult scanResult, ScanJobDescription scanJobDe } } - @Override - public void upsertPartialScanResult( - ScanResult partialResult, ScanJobDescription scanJobDescription) { - LOGGER.debug( - "Upserting partial scan result for job ID: {} with status: {}", - scanJobDescription.getId(), - partialResult.getResultStatus()); - - // Set the ID to match the job's UUID (same document as final result will use) - partialResult.setId(scanJobDescription.getId().toString()); - - try { - var collection = - resultCollectionCache.getUnchecked( - Pair.of( - scanJobDescription.getDbName(), - scanJobDescription.getCollectionName())); - - // Use replaceOne with upsert option to update existing or insert new - com.mongodb.client.model.ReplaceOptions replaceOptions = - new com.mongodb.client.model.ReplaceOptions().upsert(true); - - org.bson.Document filter = new org.bson.Document("_id", partialResult.getId()); - - collection.replaceOne(filter, partialResult, replaceOptions); - - LOGGER.debug( - "Upserted partial result for job ID: {} to collection: {}.{}", - scanJobDescription.getId(), - scanJobDescription.getDbName(), - scanJobDescription.getCollectionName()); - } catch (Exception e) { - LOGGER.warn("Exception while upserting partial result to MongoDB (non-fatal): ", e); - // Don't throw - partial result persistence should not fail the scan - } - } - @Override public List getScanResultsByTarget( String dbName, String collectionName, String target) { diff --git a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java index 10ac1f8..501b3d4 100644 --- a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java +++ b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java @@ -25,14 +25,6 @@ public void insertScanResult(ScanResult scanResult, ScanJobDescription job) { results.add(scanResult); } - @Override - public void upsertPartialScanResult(ScanResult partialResult, ScanJobDescription job) { - // Remove existing result with same ID if present - results.removeIf(r -> r.getId() != null && r.getId().equals(partialResult.getId())); - // Add the partial result - results.add(partialResult); - } - @Override public void insertBulkScan(BulkScan bulkScan) { bulkScans.add(bulkScan); From ca7fa873720563c28d5e44e03671ee0701aaf6c5 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Wed, 19 Nov 2025 18:17:26 +0400 Subject: [PATCH 3/9] Add BulkScanWorker unit tests --- .../nds/crawler/core/BulkScanWorkerTest.java | 323 ++++++++++++++++++ 1 file changed, 323 insertions(+) create mode 100644 src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java new file mode 100644 index 0000000..f98fa3a --- /dev/null +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -0,0 +1,323 @@ +/* + * TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner + * + * Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH + * + * Licensed under Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0.txt + */ +package de.rub.nds.crawler.core; + +import static org.junit.jupiter.api.Assertions.*; + +import de.rub.nds.crawler.constant.JobStatus; +import de.rub.nds.crawler.data.BulkScan; +import de.rub.nds.crawler.data.ScanConfig; +import de.rub.nds.crawler.data.ScanJobDescription; +import de.rub.nds.crawler.data.ScanTarget; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import org.bson.Document; +import org.junit.jupiter.api.Test; + +class BulkScanWorkerTest { + + // Test implementation of ScanConfig + static class TestScanConfig extends ScanConfig implements Serializable { + public TestScanConfig() { + super(de.rub.nds.scanner.core.config.ScannerDetail.NORMAL, 0, 60); + } + + @Override + public BulkScanWorker createWorker( + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads) { + return new TestBulkScanWorker(bulkScanID, this, parallelScanThreads); + } + } + + // Test implementation of BulkScanWorker + static class TestBulkScanWorker extends BulkScanWorker { + private boolean initCalled = false; + private boolean cleanupCalled = false; + private ScanJobDescription capturedJobDescription = null; + + TestBulkScanWorker(String bulkScanId, TestScanConfig scanConfig, int parallelScanThreads) { + super(bulkScanId, scanConfig, parallelScanThreads); + } + + @Override + public Document scan(ScanTarget scanTarget) { + // Capture the job description during scan + capturedJobDescription = getCurrentJobDescription(); + + Document result = new Document(); + result.put("target", scanTarget.getHostname()); + result.put("hasJobDescription", capturedJobDescription != null); + if (capturedJobDescription != null) { + result.put("jobId", capturedJobDescription.getId().toString()); + } + return result; + } + + @Override + protected void initInternal() { + initCalled = true; + } + + @Override + protected void cleanupInternal() { + cleanupCalled = true; + } + + public boolean isInitCalled() { + return initCalled; + } + + public boolean isCleanupCalled() { + return cleanupCalled; + } + + public ScanJobDescription getCapturedJobDescription() { + return capturedJobDescription; + } + } + + @Test + void testGetCurrentJobDescriptionReturnsNullOutsideScanContext() { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + // getCurrentJobDescription() is protected, so we can't call it directly from test + // But we can verify through the scan() method that it returns null when not in context + assertNull( + worker.getCapturedJobDescription(), + "Job description should be null before any scan"); + } + + @Test + void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanTarget target = new ScanTarget(); + target.setHostname("example.com"); + target.setPort(443); + + BulkScan bulkScan = + new BulkScan( + BulkScanWorkerTest.class, + BulkScanWorkerTest.class, + "test-db", + config, + System.currentTimeMillis(), + false, + null); + + ScanJobDescription jobDescription = + new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + + // Execute the scan + Future future = worker.handle(target, jobDescription); + Document result = future.get(); + + // Verify the job description was available during scan + assertTrue( + result.getBoolean("hasJobDescription"), + "Job description should be available in scan context"); + assertEquals(jobDescription.getId().toString(), result.getString("jobId")); + + // Verify the captured job description matches + assertNotNull(worker.getCapturedJobDescription()); + assertEquals(jobDescription.getId(), worker.getCapturedJobDescription().getId()); + assertEquals(target, worker.getCapturedJobDescription().getScanTarget()); + } + + @Test + void testThreadLocalIsCleanedUpAfterScan() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanTarget target = new ScanTarget(); + target.setHostname("example.com"); + target.setPort(443); + + BulkScan bulkScan = + new BulkScan( + BulkScanWorkerTest.class, + BulkScanWorkerTest.class, + "test-db", + config, + System.currentTimeMillis(), + false, + null); + + ScanJobDescription jobDescription = + new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + + // Execute the scan + Future future = worker.handle(target, jobDescription); + future.get(); // Wait for completion + + // After scan completes, the ThreadLocal should be cleaned up + // We can verify this by running another scan and checking it gets the new job description + ScanTarget newTarget = new ScanTarget(); + newTarget.setHostname("example2.com"); + newTarget.setPort(443); + + ScanJobDescription newJobDescription = + new ScanJobDescription(newTarget, bulkScan, JobStatus.TO_BE_EXECUTED); + + Future future2 = worker.handle(newTarget, newJobDescription); + Document result2 = future2.get(); + + // The second scan should have the second job description, not the first + assertEquals(newJobDescription.getId().toString(), result2.getString("jobId")); + assertEquals(newJobDescription.getId(), worker.getCapturedJobDescription().getId()); + } + + @Test + void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 2); + + BulkScan bulkScan = + new BulkScan( + BulkScanWorkerTest.class, + BulkScanWorkerTest.class, + "test-db", + config, + System.currentTimeMillis(), + false, + null); + + // Create multiple job descriptions + List jobDescriptions = new ArrayList<>(); + List> futures = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + ScanTarget target = new ScanTarget(); + target.setHostname("example" + i + ".com"); + target.setPort(443); + + ScanJobDescription jobDescription = + new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + jobDescriptions.add(jobDescription); + + futures.add(worker.handle(target, jobDescription)); + } + + // Wait for all scans to complete and verify each got the correct job description + for (int i = 0; i < 5; i++) { + Document result = futures.get(i).get(); + assertTrue(result.getBoolean("hasJobDescription")); + assertEquals( + jobDescriptions.get(i).getId().toString(), + result.getString("jobId"), + "Scan " + i + " should have its own job description"); + } + } + + @Test + void testInitializationIsCalledOnFirstHandle() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + assertFalse(worker.isInitCalled(), "Init should not be called before first handle"); + + ScanTarget target = new ScanTarget(); + target.setHostname("example.com"); + target.setPort(443); + + BulkScan bulkScan = + new BulkScan( + BulkScanWorkerTest.class, + BulkScanWorkerTest.class, + "test-db", + config, + System.currentTimeMillis(), + false, + null); + + ScanJobDescription jobDescription = + new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + + Future future = worker.handle(target, jobDescription); + future.get(); + + assertTrue(worker.isInitCalled(), "Init should be called on first handle"); + } + + @Test + void testCleanupIsCalledWhenAllJobsComplete() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanTarget target = new ScanTarget(); + target.setHostname("example.com"); + target.setPort(443); + + BulkScan bulkScan = + new BulkScan( + BulkScanWorkerTest.class, + BulkScanWorkerTest.class, + "test-db", + config, + System.currentTimeMillis(), + false, + null); + + ScanJobDescription jobDescription = + new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + + Future future = worker.handle(target, jobDescription); + future.get(); + + // Give cleanup a moment to execute (it runs after job completion) + Thread.sleep(100); + + assertTrue(worker.isCleanupCalled(), "Cleanup should be called when all jobs complete"); + } + + @Test + void testManualInitPreventsSelfCleanup() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + // Call init manually + worker.init(); + assertTrue(worker.isInitCalled(), "Init should be called"); + + ScanTarget target = new ScanTarget(); + target.setHostname("example.com"); + target.setPort(443); + + BulkScan bulkScan = + new BulkScan( + BulkScanWorkerTest.class, + BulkScanWorkerTest.class, + "test-db", + config, + System.currentTimeMillis(), + false, + null); + + ScanJobDescription jobDescription = + new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + + Future future = worker.handle(target, jobDescription); + future.get(); + + // Give cleanup a moment (if it were to execute) + Thread.sleep(100); + + assertFalse( + worker.isCleanupCalled(), + "Cleanup should NOT be called when init was manual (shouldCleanupSelf = false)"); + + // Cleanup should only be called when we explicitly call it + worker.cleanup(); + assertTrue(worker.isCleanupCalled(), "Cleanup should be called when explicitly called"); + } +} From fccfe08d8ed80feb950ab7f127a8910078e62937 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Thu, 20 Nov 2025 04:26:56 +0400 Subject: [PATCH 4/9] changed BulkScanWorker.handle() to take only scanJobDescription as an input. scanTarget can be accessed from scanJobDescription, and the UUID in scanJobDescription is needed to write partial results to MongoDB during a scan --- .../de/rub/nds/crawler/core/BulkScanWorker.java | 5 ++--- .../nds/crawler/core/BulkScanWorkerManager.java | 2 +- .../rub/nds/crawler/core/BulkScanWorkerTest.java | 14 +++++++------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java index 37ed4c2..74f9fab 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java @@ -79,11 +79,10 @@ protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThread * Handles a scan target by submitting it to the executor. If init was not called, it will * initialize itself. In this case it will also clean up itself if all jobs are done. * - * @param scanTarget The target to scan. * @param jobDescription The job description for this scan. * @return A future that resolves to the scan result once the scan is done. */ - public Future handle(ScanTarget scanTarget, ScanJobDescription jobDescription) { + public Future handle(ScanJobDescription jobDescription) { // if we initialized ourself, we also clean up ourself shouldCleanupSelf.weakCompareAndSetAcquire(false, init()); activeJobs.incrementAndGet(); @@ -91,7 +90,7 @@ public Future handle(ScanTarget scanTarget, ScanJobDescription jobDesc () -> { try { currentJobDescription.set(jobDescription); - Document result = scan(scanTarget); + Document result = scan(jobDescription.getScanTarget()); if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) { cleanup(); } diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java index 482543c..7f80cd9 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java @@ -148,6 +148,6 @@ public Future handle( bulkScanInfo.getScanConfig(), parallelConnectionThreads, parallelScanThreads); - return worker.handle(scanJobDescription.getScanTarget(), scanJobDescription); + return worker.handle(scanJobDescription); } } diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index f98fa3a..3d71093 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -119,7 +119,7 @@ void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Excepti new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); // Execute the scan - Future future = worker.handle(target, jobDescription); + Future future = worker.handle(jobDescription); Document result = future.get(); // Verify the job description was available during scan @@ -157,7 +157,7 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); // Execute the scan - Future future = worker.handle(target, jobDescription); + Future future = worker.handle(jobDescription); future.get(); // Wait for completion // After scan completes, the ThreadLocal should be cleaned up @@ -169,7 +169,7 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { ScanJobDescription newJobDescription = new ScanJobDescription(newTarget, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future2 = worker.handle(newTarget, newJobDescription); + Future future2 = worker.handle(newJobDescription); Document result2 = future2.get(); // The second scan should have the second job description, not the first @@ -205,7 +205,7 @@ void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); jobDescriptions.add(jobDescription); - futures.add(worker.handle(target, jobDescription)); + futures.add(worker.handle(jobDescription)); } // Wait for all scans to complete and verify each got the correct job description @@ -243,7 +243,7 @@ void testInitializationIsCalledOnFirstHandle() throws Exception { ScanJobDescription jobDescription = new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future = worker.handle(target, jobDescription); + Future future = worker.handle(jobDescription); future.get(); assertTrue(worker.isInitCalled(), "Init should be called on first handle"); @@ -271,7 +271,7 @@ void testCleanupIsCalledWhenAllJobsComplete() throws Exception { ScanJobDescription jobDescription = new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future = worker.handle(target, jobDescription); + Future future = worker.handle(jobDescription); future.get(); // Give cleanup a moment to execute (it runs after job completion) @@ -306,7 +306,7 @@ void testManualInitPreventsSelfCleanup() throws Exception { ScanJobDescription jobDescription = new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future = worker.handle(target, jobDescription); + Future future = worker.handle(jobDescription); future.get(); // Give cleanup a moment (if it were to execute) From 1f63b37763789e31549237393393f76248806311 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Mon, 24 Nov 2025 09:44:29 +0400 Subject: [PATCH 5/9] Modify BulkScanWorkerTest to simulate MongoDB operations using DummyPersistenceProvider --- .../nds/crawler/core/BulkScanWorkerTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index 3d71093..be783c0 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -14,7 +14,9 @@ import de.rub.nds.crawler.data.BulkScan; import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; +import de.rub.nds.crawler.data.ScanResult; import de.rub.nds.crawler.data.ScanTarget; +import de.rub.nds.crawler.dummy.DummyPersistenceProvider; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -132,6 +134,49 @@ void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Excepti assertNotNull(worker.getCapturedJobDescription()); assertEquals(jobDescription.getId(), worker.getCapturedJobDescription().getId()); assertEquals(target, worker.getCapturedJobDescription().getScanTarget()); + + // Simulate the partial results persistence flow + DummyPersistenceProvider persistenceProvider = new DummyPersistenceProvider(); + + // Update job status to SUCCESS (required by ScanResult constructor) + jobDescription.setStatus(JobStatus.SUCCESS); + + // Create ScanResult from the scan result Document and job description + ScanResult scanResult = new ScanResult(jobDescription, result); + + // Verify ScanResult has the correct scanJobDescriptionId + assertEquals( + jobDescription.getId().toString(), + scanResult.getScanJobDescriptionId(), + "ScanResult should use job description UUID as scanJobDescriptionId"); + + // Simulate persisting to MongoDB + persistenceProvider.insertScanResult(scanResult, jobDescription); + + // Simulate retrieving from MongoDB by scanJobDescriptionId + ScanResult retrievedResult = + persistenceProvider.getScanResultByScanJobDescriptionId( + "test-db", "test-collection", jobDescription.getId().toString()); + + // Verify the retrieved result matches + assertNotNull( + retrievedResult, "Should be able to retrieve ScanResult by job description ID"); + assertEquals( + jobDescription.getId().toString(), + retrievedResult.getScanJobDescriptionId(), + "Retrieved result should have matching scanJobDescriptionId"); + assertEquals( + scanResult.getBulkScan(), + retrievedResult.getBulkScan(), + "Retrieved result should have matching bulk scan ID"); + assertEquals( + scanResult.getScanTarget(), + retrievedResult.getScanTarget(), + "Retrieved result should have matching scan target"); + assertEquals( + scanResult.getResult(), + retrievedResult.getResult(), + "Retrieved result should have matching result document"); } @Test From 60b44eb46fefa1cfcca8c0a7f5828a30d4cd90ab Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Wed, 26 Nov 2025 11:29:35 +0400 Subject: [PATCH 6/9] Replace example.com with example.invalid --- .../rub/nds/crawler/core/BulkScanWorkerTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index be783c0..227fd4b 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -104,7 +104,7 @@ void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Excepti TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); ScanTarget target = new ScanTarget(); - target.setHostname("example.com"); + target.setHostname("example.invalid"); target.setPort(443); BulkScan bulkScan = @@ -185,7 +185,7 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); ScanTarget target = new ScanTarget(); - target.setHostname("example.com"); + target.setHostname("example.invalid"); target.setPort(443); BulkScan bulkScan = @@ -208,7 +208,7 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { // After scan completes, the ThreadLocal should be cleaned up // We can verify this by running another scan and checking it gets the new job description ScanTarget newTarget = new ScanTarget(); - newTarget.setHostname("example2.com"); + newTarget.setHostname("example2.invalid"); newTarget.setPort(443); ScanJobDescription newJobDescription = @@ -243,7 +243,7 @@ void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { for (int i = 0; i < 5; i++) { ScanTarget target = new ScanTarget(); - target.setHostname("example" + i + ".com"); + target.setHostname("example" + i + ".invalid"); target.setPort(443); ScanJobDescription jobDescription = @@ -272,7 +272,7 @@ void testInitializationIsCalledOnFirstHandle() throws Exception { assertFalse(worker.isInitCalled(), "Init should not be called before first handle"); ScanTarget target = new ScanTarget(); - target.setHostname("example.com"); + target.setHostname("example.invalid"); target.setPort(443); BulkScan bulkScan = @@ -300,7 +300,7 @@ void testCleanupIsCalledWhenAllJobsComplete() throws Exception { TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); ScanTarget target = new ScanTarget(); - target.setHostname("example.com"); + target.setHostname("example.invalid"); target.setPort(443); BulkScan bulkScan = @@ -335,7 +335,7 @@ void testManualInitPreventsSelfCleanup() throws Exception { assertTrue(worker.isInitCalled(), "Init should be called"); ScanTarget target = new ScanTarget(); - target.setHostname("example.com"); + target.setHostname("example.invalid"); target.setPort(443); BulkScan bulkScan = From 2fc13dd27a66ae90fd9ce3db3c6e6aefce17a9f6 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Wed, 26 Nov 2025 11:40:02 +0400 Subject: [PATCH 7/9] Fix BulkScanWorkerTest --- .../java/de/rub/nds/crawler/core/BulkScanWorkerTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index 227fd4b..b0b8383 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -52,13 +52,14 @@ static class TestBulkScanWorker extends BulkScanWorker { @Override public Document scan(ScanTarget scanTarget) { // Capture the job description during scan - capturedJobDescription = getCurrentJobDescription(); + ScanJobDescription localJobDescription = getCurrentJobDescription(); + capturedJobDescription = localJobDescription; Document result = new Document(); result.put("target", scanTarget.getHostname()); - result.put("hasJobDescription", capturedJobDescription != null); - if (capturedJobDescription != null) { - result.put("jobId", capturedJobDescription.getId().toString()); + result.put("hasJobDescription", localJobDescription != null); + if (localJobDescription != null) { + result.put("jobId", localJobDescription.getId().toString()); } return result; } From 4f437a818c067b6f546d8a72d56fa06eb5a46579 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Wed, 26 Nov 2025 12:05:20 +0400 Subject: [PATCH 8/9] Update BulkScanWorkerTest to use IPs Replace all hostname usages with TEST-NET-1 IPs (RFC 5737) --- .../rub/nds/crawler/core/BulkScanWorkerTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index b0b8383..52a1a13 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -56,7 +56,7 @@ public Document scan(ScanTarget scanTarget) { capturedJobDescription = localJobDescription; Document result = new Document(); - result.put("target", scanTarget.getHostname()); + result.put("target", scanTarget.getIp()); result.put("hasJobDescription", localJobDescription != null); if (localJobDescription != null) { result.put("jobId", localJobDescription.getId().toString()); @@ -105,7 +105,7 @@ void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Excepti TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); ScanTarget target = new ScanTarget(); - target.setHostname("example.invalid"); + target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) target.setPort(443); BulkScan bulkScan = @@ -186,7 +186,7 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); ScanTarget target = new ScanTarget(); - target.setHostname("example.invalid"); + target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) target.setPort(443); BulkScan bulkScan = @@ -209,7 +209,7 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { // After scan completes, the ThreadLocal should be cleaned up // We can verify this by running another scan and checking it gets the new job description ScanTarget newTarget = new ScanTarget(); - newTarget.setHostname("example2.invalid"); + newTarget.setIp("192.0.2.2"); // TEST-NET-1 (RFC 5737) newTarget.setPort(443); ScanJobDescription newJobDescription = @@ -244,7 +244,7 @@ void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { for (int i = 0; i < 5; i++) { ScanTarget target = new ScanTarget(); - target.setHostname("example" + i + ".invalid"); + target.setIp("192.0.2." + (i + 1)); // TEST-NET-1 (RFC 5737) target.setPort(443); ScanJobDescription jobDescription = @@ -273,7 +273,7 @@ void testInitializationIsCalledOnFirstHandle() throws Exception { assertFalse(worker.isInitCalled(), "Init should not be called before first handle"); ScanTarget target = new ScanTarget(); - target.setHostname("example.invalid"); + target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) target.setPort(443); BulkScan bulkScan = @@ -301,7 +301,7 @@ void testCleanupIsCalledWhenAllJobsComplete() throws Exception { TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); ScanTarget target = new ScanTarget(); - target.setHostname("example.invalid"); + target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) target.setPort(443); BulkScan bulkScan = @@ -336,7 +336,7 @@ void testManualInitPreventsSelfCleanup() throws Exception { assertTrue(worker.isInitCalled(), "Init should be called"); ScanTarget target = new ScanTarget(); - target.setHostname("example.invalid"); + target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) target.setPort(443); BulkScan bulkScan = From 1a71f8de1c07ae94190dcc01667d0e3101abff18 Mon Sep 17 00:00:00 2001 From: mattiaformenti Date: Mon, 8 Dec 2025 09:55:20 +0400 Subject: [PATCH 9/9] Add ScheduleScan, a wrapper for Future made to handle partial results persistence. Rework Worker, BulkScanWorker and BulkScanWorkerManager handle method to use ScheduledScan instead of Future and removed use of ThreadLocal. --- .../rub/nds/crawler/core/BulkScanWorker.java | 73 ++++++++++----- .../crawler/core/BulkScanWorkerManager.java | 28 +++--- .../rub/nds/crawler/core/ScheduledScan.java | 88 +++++++++++++++++++ .../java/de/rub/nds/crawler/core/Worker.java | 15 ++-- .../persistence/IPersistenceProvider.java | 9 ++ .../persistence/MongoPersistenceProvider.java | 29 ++++++ .../nds/crawler/core/BulkScanWorkerTest.java | 46 +++++----- .../dummy/DummyPersistenceProvider.java | 7 ++ 8 files changed, 234 insertions(+), 61 deletions(-) create mode 100644 src/main/java/de/rub/nds/crawler/core/ScheduledScan.java diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java index 74f9fab..8fb9b03 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java @@ -11,9 +11,9 @@ import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; import de.rub.nds.crawler.data.ScanTarget; +import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.crawler.util.CanceallableThreadPoolExecutor; import de.rub.nds.scanner.core.execution.NamedThreadFactory; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -42,9 +42,8 @@ public abstract class BulkScanWorker { /** The scan configuration for this worker */ protected final T scanConfig; - // ThreadLocal to pass ScanJobDescription to scan() implementations - private static final ThreadLocal currentJobDescription = - new ThreadLocal<>(); + /** The persistence provider for writing partial results */ + private IPersistenceProvider persistenceProvider; /** * Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into @@ -79,45 +78,53 @@ protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThread * Handles a scan target by submitting it to the executor. If init was not called, it will * initialize itself. In this case it will also clean up itself if all jobs are done. * + *

Returns a {@link ScheduledScan} that represents the entire scan lifecycle, allowing + * callers to: + * + *

    + *
  • Get partial results as the scan progresses + *
  • Register listeners for progress updates + *
  • Wait for the final result + *
+ * * @param jobDescription The job description for this scan. - * @return A future that resolves to the scan result once the scan is done. + * @return A ScheduledScan representing the scan lifecycle */ - public Future handle(ScanJobDescription jobDescription) { + public ScheduledScan handle(ScanJobDescription jobDescription) { // if we initialized ourself, we also clean up ourself shouldCleanupSelf.weakCompareAndSetAcquire(false, init()); activeJobs.incrementAndGet(); - return timeoutExecutor.submit( + + ScheduledScan scheduledScan = new ScheduledScan(); + + timeoutExecutor.submit( () -> { try { - currentJobDescription.set(jobDescription); - Document result = scan(jobDescription.getScanTarget()); + Document result = scan(jobDescription, scheduledScan); + scheduledScan.complete(result); if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) { cleanup(); } - return result; - } finally { - currentJobDescription.remove(); + } catch (Exception e) { + scheduledScan.completeExceptionally(e); + activeJobs.decrementAndGet(); + throw e; } }); - } - /** - * Get the ScanJobDescription for the current scan. Only valid when called from within scan(). - * - * @return The current ScanJobDescription, or null if not in a scan context - */ - protected ScanJobDescription getCurrentJobDescription() { - return currentJobDescription.get(); + return scheduledScan; } /** * Scans a target and returns the result as a Document. This is the core scanning functionality * that must be implemented by subclasses. * - * @param scanTarget The target to scan + * @param jobDescription The job description containing target and metadata + * @param scheduledScan The scheduled scan for reporting progress via {@link + * ScheduledScan#updateResult} * @return The scan result as a Document */ - public abstract Document scan(ScanTarget scanTarget); + public abstract Document scan(ScanJobDescription jobDescription, ScheduledScan scheduledScan); /** * Initializes this worker if it hasn't been initialized yet. This method is thread-safe and @@ -180,4 +187,26 @@ public final boolean cleanup() { * specific resources. */ protected abstract void cleanupInternal(); + + /** + * Sets the persistence provider for writing partial results. + * + * @param persistenceProvider The persistence provider to use + */ + public void setPersistenceProvider(IPersistenceProvider persistenceProvider) { + this.persistenceProvider = persistenceProvider; + } + + /** + * Persists a partial scan result. This method can be called by subclasses during scanning to + * save intermediate results. + * + * @param jobDescription The job description for the scan + * @param partialResult The partial result document to persist + */ + protected void persistPartialResult(ScanJobDescription jobDescription, Document partialResult) { + if (persistenceProvider != null) { + persistenceProvider.upsertPartialResult(jobDescription, partialResult); + } + } } diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java index 7f80cd9..415e7e4 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java @@ -14,13 +14,12 @@ import de.rub.nds.crawler.data.BulkScanInfo; import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; +import de.rub.nds.crawler.persistence.IPersistenceProvider; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.exception.UncheckedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.bson.Document; /** * Each ScanJob has its own BulkScanWorker. This class manages the BulkScanWorkers and ensures that @@ -58,21 +57,27 @@ public static BulkScanWorkerManager getInstance() { /** * Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription, - * int, int)}. + * int, int, IPersistenceProvider)}. * * @param scanJobDescription The scan job to handle * @param parallelConnectionThreads The number of parallel connection threads to use (used to * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @return A future that returns the scan result when the target is scanned is done + * @param persistenceProvider The persistence provider for writing partial results + * @return A ScheduledScan representing the scan lifecycle */ - public static Future handleStatic( + public static ScheduledScan handleStatic( ScanJobDescription scanJobDescription, int parallelConnectionThreads, - int parallelScanThreads) { + int parallelScanThreads, + IPersistenceProvider persistenceProvider) { BulkScanWorkerManager manager = getInstance(); - return manager.handle(scanJobDescription, parallelConnectionThreads, parallelScanThreads); + return manager.handle( + scanJobDescription, + parallelConnectionThreads, + parallelScanThreads, + persistenceProvider); } private final Cache> bulkScanWorkers; @@ -135,12 +140,14 @@ public BulkScanWorker getBulkScanWorker( * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @return A future that returns the scan result when the target is scanned is done + * @param persistenceProvider The persistence provider for writing partial results + * @return A ScheduledScan representing the scan lifecycle */ - public Future handle( + public ScheduledScan handle( ScanJobDescription scanJobDescription, int parallelConnectionThreads, - int parallelScanThreads) { + int parallelScanThreads, + IPersistenceProvider persistenceProvider) { BulkScanInfo bulkScanInfo = scanJobDescription.getBulkScanInfo(); BulkScanWorker worker = getBulkScanWorker( @@ -148,6 +155,7 @@ public Future handle( bulkScanInfo.getScanConfig(), parallelConnectionThreads, parallelScanThreads); + worker.setPersistenceProvider(persistenceProvider); return worker.handle(scanJobDescription); } } diff --git a/src/main/java/de/rub/nds/crawler/core/ScheduledScan.java b/src/main/java/de/rub/nds/crawler/core/ScheduledScan.java new file mode 100644 index 0000000..b67cd84 --- /dev/null +++ b/src/main/java/de/rub/nds/crawler/core/ScheduledScan.java @@ -0,0 +1,88 @@ +/* + * TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner + * + * Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH + * + * Licensed under Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0.txt + */ +package de.rub.nds.crawler.core; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import org.bson.Document; + +/** + * Represents a scheduled scan that tracks progress and provides both partial and final results. + * + *

This class provides a clean abstraction for the scan lifecycle: + * + *

    + *
  • Check if the scan is complete via {@link #isComplete()} + *
  • Get the current result (partial or final) via {@link #getCurrentResult()} + *
  • Wait for the final result via {@link #getFinalResult()} + *
+ */ +public class ScheduledScan { + + private volatile Document currentResult; + private final CompletableFuture finalResult = new CompletableFuture<>(); + + /** + * Check if the scan has completed. + * + * @return true if the scan is complete, false if still in progress + */ + public boolean isComplete() { + return finalResult.isDone(); + } + + /** + * Get the current result document. If the scan is still in progress, this returns the latest + * partial result. If the scan is complete, this returns the final result. + * + * @return The current result document, or null if no result is available yet + */ + public Document getCurrentResult() { + return currentResult; + } + + /** + * Get a Future that will resolve to the final result when the scan completes. + * + * @return A Future containing the final scan result + */ + public Future getFinalResult() { + return finalResult; + } + + /** + * Update the current result. This is called by the scan worker when new partial results are + * available. + * + * @param partialResult The updated partial result document + */ + public void updateResult(Document partialResult) { + this.currentResult = partialResult; + } + + /** + * Mark the scan as complete with the final result. This will complete the Future and notify any + * waiting consumers. + * + * @param result The final scan result + */ + void complete(Document result) { + this.currentResult = result; + this.finalResult.complete(result); + } + + /** + * Mark the scan as failed with an exception. + * + * @param exception The exception that caused the failure + */ + void completeExceptionally(Throwable exception) { + this.finalResult.completeExceptionally(exception); + } +} diff --git a/src/main/java/de/rub/nds/crawler/core/Worker.java b/src/main/java/de/rub/nds/crawler/core/Worker.java index 1608e10..8509be4 100644 --- a/src/main/java/de/rub/nds/crawler/core/Worker.java +++ b/src/main/java/de/rub/nds/crawler/core/Worker.java @@ -70,10 +70,11 @@ public void start() { } private ScanResult waitForScanResult( - Future resultFuture, ScanJobDescription scanJobDescription) + ScheduledScan scheduledScan, ScanJobDescription scanJobDescription) throws ExecutionException, InterruptedException, TimeoutException { Document resultDocument; JobStatus jobStatus; + Future resultFuture = scheduledScan.getFinalResult(); try { resultDocument = resultFuture.get(scanTimeout, TimeUnit.MILLISECONDS); jobStatus = resultDocument != null ? JobStatus.SUCCESS : JobStatus.EMPTY; @@ -92,15 +93,19 @@ private ScanResult waitForScanResult( private void handleScanJob(ScanJobDescription scanJobDescription) { LOGGER.info("Received scan job for {}", scanJobDescription.getScanTarget()); - Future resultFuture = + ScheduledScan scheduledScan = BulkScanWorkerManager.handleStatic( - scanJobDescription, parallelConnectionThreads, parallelScanThreads); + scanJobDescription, + parallelConnectionThreads, + parallelScanThreads, + persistenceProvider); + workerExecutor.submit( () -> { ScanResult scanResult = null; boolean persist = true; try { - scanResult = waitForScanResult(resultFuture, scanJobDescription); + scanResult = waitForScanResult(scheduledScan, scanJobDescription); } catch (InterruptedException e) { LOGGER.error("Worker was interrupted - not persisting anything", e); scanJobDescription.setStatus(JobStatus.INTERNAL_ERROR); @@ -118,7 +123,7 @@ private void handleScanJob(ScanJobDescription scanJobDescription) { "Scan of '{}' did not finish in time and did not cancel gracefully", scanJobDescription.getScanTarget()); scanJobDescription.setStatus(JobStatus.CANCELLED); - resultFuture.cancel(true); + scheduledScan.getFinalResult().cancel(true); scanResult = ScanResult.fromException(scanJobDescription, e); } catch (Exception e) { LOGGER.error( diff --git a/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java b/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java index 30d2ffb..aec842e 100644 --- a/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java +++ b/src/main/java/de/rub/nds/crawler/persistence/IPersistenceProvider.java @@ -73,4 +73,13 @@ public interface IPersistenceProvider { */ ScanResult getScanResultByScanJobDescriptionId( String dbName, String collectionName, String scanJobDescriptionId); + + /** + * Upsert a partial scan result into the database. Uses the job ID as the document ID, so + * subsequent calls with the same job will overwrite the previous partial result. + * + * @param job The scan job description (provides ID, database name, collection name). + * @param partialResult The partial result document to upsert. + */ + void upsertPartialResult(ScanJobDescription job, org.bson.Document partialResult); } diff --git a/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java b/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java index a1278c1..6eab7b6 100644 --- a/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java +++ b/src/main/java/de/rub/nds/crawler/persistence/MongoPersistenceProvider.java @@ -25,6 +25,7 @@ import com.mongodb.client.MongoClients; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Indexes; +import com.mongodb.client.model.ReplaceOptions; import com.mongodb.lang.NonNull; import de.rub.nds.crawler.config.delegate.MongoDbDelegate; import de.rub.nds.crawler.constant.JobStatus; @@ -392,4 +393,32 @@ public ScanResult getScanResultByScanJobDescriptionId( e); } } + + @Override + public void upsertPartialResult(ScanJobDescription job, org.bson.Document partialResult) { + String dbName = job.getDbName(); + String collectionName = job.getCollectionName(); + String jobId = job.getId().toString(); + + LOGGER.debug( + "Upserting partial result for job {} into collection: {}.{}", + jobId, + dbName, + collectionName); + + try { + // Get raw MongoDB collection (not JacksonMongoCollection) for Document operations + var collection = databaseCache.getUnchecked(dbName).getCollection(collectionName); + + // Upsert: replace if exists, insert if not + collection.replaceOne( + new org.bson.Document("_id", jobId), + partialResult, + new ReplaceOptions().upsert(true)); + + LOGGER.debug("Upserted partial result for job {}", jobId); + } catch (Exception e) { + LOGGER.warn("Failed to upsert partial result for job {}: {}", jobId, e.getMessage()); + } + } } diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index 52a1a13..009ce36 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -20,7 +20,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Future; import org.bson.Document; import org.junit.jupiter.api.Test; @@ -50,16 +49,16 @@ static class TestBulkScanWorker extends BulkScanWorker { } @Override - public Document scan(ScanTarget scanTarget) { + public Document scan(ScanJobDescription jobDescription, ScheduledScan scheduledScan) { // Capture the job description during scan - ScanJobDescription localJobDescription = getCurrentJobDescription(); - capturedJobDescription = localJobDescription; + capturedJobDescription = jobDescription; + ScanTarget scanTarget = jobDescription.getScanTarget(); Document result = new Document(); result.put("target", scanTarget.getIp()); - result.put("hasJobDescription", localJobDescription != null); - if (localJobDescription != null) { - result.put("jobId", localJobDescription.getId().toString()); + result.put("hasJobDescription", jobDescription != null); + if (jobDescription != null) { + result.put("jobId", jobDescription.getId().toString()); } return result; } @@ -122,8 +121,8 @@ void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Excepti new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); // Execute the scan - Future future = worker.handle(jobDescription); - Document result = future.get(); + ScheduledScan scheduledScan = worker.handle(jobDescription); + Document result = scheduledScan.getFinalResult().get(); // Verify the job description was available during scan assertTrue( @@ -203,11 +202,10 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); // Execute the scan - Future future = worker.handle(jobDescription); - future.get(); // Wait for completion + ScheduledScan scheduledScan = worker.handle(jobDescription); + scheduledScan.getFinalResult().get(); // Wait for completion - // After scan completes, the ThreadLocal should be cleaned up - // We can verify this by running another scan and checking it gets the new job description + // After scan completes, verify we can run another scan ScanTarget newTarget = new ScanTarget(); newTarget.setIp("192.0.2.2"); // TEST-NET-1 (RFC 5737) newTarget.setPort(443); @@ -215,8 +213,8 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { ScanJobDescription newJobDescription = new ScanJobDescription(newTarget, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future2 = worker.handle(newJobDescription); - Document result2 = future2.get(); + ScheduledScan scheduledScan2 = worker.handle(newJobDescription); + Document result2 = scheduledScan2.getFinalResult().get(); // The second scan should have the second job description, not the first assertEquals(newJobDescription.getId().toString(), result2.getString("jobId")); @@ -240,7 +238,7 @@ void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { // Create multiple job descriptions List jobDescriptions = new ArrayList<>(); - List> futures = new ArrayList<>(); + List scheduledScans = new ArrayList<>(); for (int i = 0; i < 5; i++) { ScanTarget target = new ScanTarget(); @@ -251,12 +249,12 @@ void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); jobDescriptions.add(jobDescription); - futures.add(worker.handle(jobDescription)); + scheduledScans.add(worker.handle(jobDescription)); } // Wait for all scans to complete and verify each got the correct job description for (int i = 0; i < 5; i++) { - Document result = futures.get(i).get(); + Document result = scheduledScans.get(i).getFinalResult().get(); assertTrue(result.getBoolean("hasJobDescription")); assertEquals( jobDescriptions.get(i).getId().toString(), @@ -289,8 +287,8 @@ void testInitializationIsCalledOnFirstHandle() throws Exception { ScanJobDescription jobDescription = new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future = worker.handle(jobDescription); - future.get(); + ScheduledScan scheduledScan = worker.handle(jobDescription); + scheduledScan.getFinalResult().get(); assertTrue(worker.isInitCalled(), "Init should be called on first handle"); } @@ -317,8 +315,8 @@ void testCleanupIsCalledWhenAllJobsComplete() throws Exception { ScanJobDescription jobDescription = new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future = worker.handle(jobDescription); - future.get(); + ScheduledScan scheduledScan = worker.handle(jobDescription); + scheduledScan.getFinalResult().get(); // Give cleanup a moment to execute (it runs after job completion) Thread.sleep(100); @@ -352,8 +350,8 @@ void testManualInitPreventsSelfCleanup() throws Exception { ScanJobDescription jobDescription = new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - Future future = worker.handle(jobDescription); - future.get(); + ScheduledScan scheduledScan = worker.handle(jobDescription); + scheduledScan.getFinalResult().get(); // Give cleanup a moment (if it were to execute) Thread.sleep(100); diff --git a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java index 501b3d4..1bc3e9a 100644 --- a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java +++ b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProvider.java @@ -15,10 +15,12 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import org.bson.Document; public class DummyPersistenceProvider implements IPersistenceProvider { public final List results = new ArrayList<>(); public final List bulkScans = new ArrayList<>(); + public final List partialResults = new ArrayList<>(); @Override public void insertScanResult(ScanResult scanResult, ScanJobDescription job) { @@ -55,4 +57,9 @@ public ScanResult getScanResultByScanJobDescriptionId( .max((r1, r2) -> r1.getTimestamp().compareTo(r2.getTimestamp())) .orElse(null); } + + @Override + public void upsertPartialResult(ScanJobDescription job, Document partialResult) { + partialResults.add(partialResult); + } }