Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/de/rub/nds/crawler/constant/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
public enum JobStatus {
/** Job is waiting to be executed. */
TO_BE_EXECUTED(false),
/** Job is currently being executed. Partial results may be available in DB. */
RUNNING(false),
/** The domain was not resolvable. An empty result was written to DB. */
UNRESOLVABLE(true),
/** An uncaught exception occurred while resolving the host. */
Expand Down
70 changes: 59 additions & 11 deletions src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
package de.rub.nds.crawler.core;

import de.rub.nds.crawler.data.ScanConfig;
import de.rub.nds.crawler.data.ScanJobDescription;
import de.rub.nds.crawler.data.ScanTarget;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import de.rub.nds.crawler.util.CanceallableThreadPoolExecutor;
import de.rub.nds.scanner.core.execution.NamedThreadFactory;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -41,6 +42,9 @@ public abstract class BulkScanWorker<T extends ScanConfig> {
/** The scan configuration for this worker */
protected final T scanConfig;

/** The persistence provider for writing partial results */
private IPersistenceProvider persistenceProvider;

/**
* Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into
* a future object such that we can handle timeouts properly.
Expand Down Expand Up @@ -74,31 +78,53 @@ protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThread
* Handles a scan target by submitting it to the executor. If init was not called, it will
* initialize itself. In this case it will also clean up itself if all jobs are done.
*
* @param scanTarget The target to scan.
* @return A future that resolves to the scan result once the scan is done.
* <p>Returns a {@link ScheduledScan} that represents the entire scan lifecycle, allowing
* callers to:
*
* <ul>
* <li>Get partial results as the scan progresses
* <li>Register listeners for progress updates
* <li>Wait for the final result
* </ul>
*
* @param jobDescription The job description for this scan.
* @return A ScheduledScan representing the scan lifecycle
*/
public Future<Document> handle(ScanTarget scanTarget) {
public ScheduledScan handle(ScanJobDescription jobDescription) {
// if we initialized ourself, we also clean up ourself
shouldCleanupSelf.weakCompareAndSetAcquire(false, init());
activeJobs.incrementAndGet();
return timeoutExecutor.submit(

ScheduledScan scheduledScan = new ScheduledScan();

timeoutExecutor.submit(
() -> {
Document result = scan(scanTarget);
if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) {
cleanup();
try {
Document result = scan(jobDescription, scheduledScan);
scheduledScan.complete(result);
if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) {
cleanup();
}
} catch (Exception e) {
scheduledScan.completeExceptionally(e);
activeJobs.decrementAndGet();
throw e;
}
return result;
});

return scheduledScan;
}

/**
* Scans a target and returns the result as a Document. This is the core scanning functionality
* that must be implemented by subclasses.
*
* @param scanTarget The target to scan
* @param jobDescription The job description containing target and metadata
* @param scheduledScan The scheduled scan for reporting progress via {@link
* ScheduledScan#updateResult}
* @return The scan result as a Document
*/
public abstract Document scan(ScanTarget scanTarget);
public abstract Document scan(ScanJobDescription jobDescription, ScheduledScan scheduledScan);
Comment on lines -101 to +127
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike having an "out parameter". The only alternative I see would be a functional interface to feed the partial results into. But I guess that isn't that much of a difference.


/**
* Initializes this worker if it hasn't been initialized yet. This method is thread-safe and
Expand Down Expand Up @@ -161,4 +187,26 @@ public final boolean cleanup() {
* specific resources.
*/
protected abstract void cleanupInternal();

/**
* Sets the persistence provider for writing partial results.
*
* @param persistenceProvider The persistence provider to use
*/
public void setPersistenceProvider(IPersistenceProvider persistenceProvider) {
this.persistenceProvider = persistenceProvider;
}
Comment on lines +196 to +198
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead pass this in the constructor? Is there a use-case to having this as a variable instead of a constant?


/**
* Persists a partial scan result. This method can be called by subclasses during scanning to
* save intermediate results.
*
* @param jobDescription The job description for the scan
* @param partialResult The partial result document to persist
*/
protected void persistPartialResult(ScanJobDescription jobDescription, Document partialResult) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not somehow linked to ScheduledScan.updateResult?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two approaches:
Either this function updates the results in the ScheduledScan object
Or this function is registered as an observer for updates on the ScheduledScan object; i.e. if updateResult is called, it fires an even that causes persistPratialResult to be called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or are there cases where one function is called and not the other?

if (persistenceProvider != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check?

persistenceProvider.upsertPartialResult(jobDescription, partialResult);
}
}
}
30 changes: 19 additions & 11 deletions src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
import de.rub.nds.crawler.data.BulkScanInfo;
import de.rub.nds.crawler.data.ScanConfig;
import de.rub.nds.crawler.data.ScanJobDescription;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.UncheckedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.Document;

/**
* Each ScanJob has its own BulkScanWorker. This class manages the BulkScanWorkers and ensures that
Expand Down Expand Up @@ -58,21 +57,27 @@ public static BulkScanWorkerManager getInstance() {

/**
* Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription,
* int, int)}.
* int, int, IPersistenceProvider)}.
*
* @param scanJobDescription The scan job to handle
* @param parallelConnectionThreads The number of parallel connection threads to use (used to
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @return A future that returns the scan result when the target is scanned is done
* @param persistenceProvider The persistence provider for writing partial results
* @return A ScheduledScan representing the scan lifecycle
*/
public static Future<Document> handleStatic(
public static ScheduledScan handleStatic(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads) {
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
BulkScanWorkerManager manager = getInstance();
return manager.handle(scanJobDescription, parallelConnectionThreads, parallelScanThreads);
return manager.handle(
scanJobDescription,
parallelConnectionThreads,
parallelScanThreads,
persistenceProvider);
}

private final Cache<String, BulkScanWorker<?>> bulkScanWorkers;
Expand Down Expand Up @@ -135,19 +140,22 @@ public BulkScanWorker<?> getBulkScanWorker(
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @return A future that returns the scan result when the target is scanned is done
* @param persistenceProvider The persistence provider for writing partial results
* @return A ScheduledScan representing the scan lifecycle
*/
public Future<Document> handle(
public ScheduledScan handle(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads) {
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
BulkScanInfo bulkScanInfo = scanJobDescription.getBulkScanInfo();
BulkScanWorker<?> worker =
getBulkScanWorker(
bulkScanInfo.getBulkScanId(),
bulkScanInfo.getScanConfig(),
parallelConnectionThreads,
parallelScanThreads);
return worker.handle(scanJobDescription.getScanTarget());
worker.setPersistenceProvider(persistenceProvider);
return worker.handle(scanJobDescription);
}
}
88 changes: 88 additions & 0 deletions src/main/java/de/rub/nds/crawler/core/ScheduledScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner
*
* Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH
*
* Licensed under Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0.txt
*/
package de.rub.nds.crawler.core;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.bson.Document;

/**
* Represents a scheduled scan that tracks progress and provides both partial and final results.
*
* <p>This class provides a clean abstraction for the scan lifecycle:
*
* <ul>
* <li>Check if the scan is complete via {@link #isComplete()}
* <li>Get the current result (partial or final) via {@link #getCurrentResult()}
* <li>Wait for the final result via {@link #getFinalResult()}
* </ul>
*/
public class ScheduledScan {

private volatile Document currentResult;
private final CompletableFuture<Document> finalResult = new CompletableFuture<>();
Comment on lines +26 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning for wrapping the future instead of extending it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also find the name to not capture the functionality.
If this would extend the future, I'd call it something like ProgressableFuture<Document>.


/**
* Check if the scan has completed.
*
* @return true if the scan is complete, false if still in progress
*/
public boolean isComplete() {
return finalResult.isDone();
}

/**
* Get the current result document. If the scan is still in progress, this returns the latest
* partial result. If the scan is complete, this returns the final result.
*
* @return The current result document, or null if no result is available yet
*/
public Document getCurrentResult() {
return currentResult;
}

/**
* Get a Future that will resolve to the final result when the scan completes.
*
* @return A Future containing the final scan result
*/
public Future<Document> getFinalResult() {
return finalResult;
}

/**
* Update the current result. This is called by the scan worker when new partial results are
* available.
*
* @param partialResult The updated partial result document
*/
public void updateResult(Document partialResult) {
this.currentResult = partialResult;
}

/**
* Mark the scan as complete with the final result. This will complete the Future and notify any
* waiting consumers.
*
* @param result The final scan result
*/
void complete(Document result) {
this.currentResult = result;
this.finalResult.complete(result);
}

/**
* Mark the scan as failed with an exception.
*
* @param exception The exception that caused the failure
*/
void completeExceptionally(Throwable exception) {
this.finalResult.completeExceptionally(exception);
}
}
15 changes: 10 additions & 5 deletions src/main/java/de/rub/nds/crawler/core/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ public void start() {
}

private ScanResult waitForScanResult(
Future<Document> resultFuture, ScanJobDescription scanJobDescription)
ScheduledScan scheduledScan, ScanJobDescription scanJobDescription)
throws ExecutionException, InterruptedException, TimeoutException {
Document resultDocument;
JobStatus jobStatus;
Future<Document> resultFuture = scheduledScan.getFinalResult();
try {
resultDocument = resultFuture.get(scanTimeout, TimeUnit.MILLISECONDS);
jobStatus = resultDocument != null ? JobStatus.SUCCESS : JobStatus.EMPTY;
Expand All @@ -92,15 +93,19 @@ private ScanResult waitForScanResult(

private void handleScanJob(ScanJobDescription scanJobDescription) {
LOGGER.info("Received scan job for {}", scanJobDescription.getScanTarget());
Future<Document> resultFuture =
ScheduledScan scheduledScan =
BulkScanWorkerManager.handleStatic(
scanJobDescription, parallelConnectionThreads, parallelScanThreads);
scanJobDescription,
parallelConnectionThreads,
parallelScanThreads,
persistenceProvider);

workerExecutor.submit(
() -> {
ScanResult scanResult = null;
boolean persist = true;
try {
scanResult = waitForScanResult(resultFuture, scanJobDescription);
scanResult = waitForScanResult(scheduledScan, scanJobDescription);
} catch (InterruptedException e) {
LOGGER.error("Worker was interrupted - not persisting anything", e);
scanJobDescription.setStatus(JobStatus.INTERNAL_ERROR);
Expand All @@ -118,7 +123,7 @@ private void handleScanJob(ScanJobDescription scanJobDescription) {
"Scan of '{}' did not finish in time and did not cancel gracefully",
scanJobDescription.getScanTarget());
scanJobDescription.setStatus(JobStatus.CANCELLED);
resultFuture.cancel(true);
scheduledScan.getFinalResult().cancel(true);
scanResult = ScanResult.fromException(scanJobDescription, e);
} catch (Exception e) {
LOGGER.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,13 @@ public interface IPersistenceProvider {
*/
ScanResult getScanResultByScanJobDescriptionId(
String dbName, String collectionName, String scanJobDescriptionId);

/**
* Upsert a partial scan result into the database. Uses the job ID as the document ID, so
* subsequent calls with the same job will overwrite the previous partial result.
*
* @param job The scan job description (provides ID, database name, collection name).
* @param partialResult The partial result document to upsert.
*/
void upsertPartialResult(ScanJobDescription job, org.bson.Document partialResult);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.lang.NonNull;
import de.rub.nds.crawler.config.delegate.MongoDbDelegate;
import de.rub.nds.crawler.constant.JobStatus;
Expand Down Expand Up @@ -392,4 +393,32 @@ public ScanResult getScanResultByScanJobDescriptionId(
e);
}
}

@Override
public void upsertPartialResult(ScanJobDescription job, org.bson.Document partialResult) {
String dbName = job.getDbName();
String collectionName = job.getCollectionName();
String jobId = job.getId().toString();

LOGGER.debug(
"Upserting partial result for job {} into collection: {}.{}",
jobId,
dbName,
collectionName);

try {
// Get raw MongoDB collection (not JacksonMongoCollection) for Document operations
var collection = databaseCache.getUnchecked(dbName).getCollection(collectionName);

// Upsert: replace if exists, insert if not
collection.replaceOne(
new org.bson.Document("_id", jobId),
partialResult,
new ReplaceOptions().upsert(true));

LOGGER.debug("Upserted partial result for job {}", jobId);
} catch (Exception e) {
LOGGER.warn("Failed to upsert partial result for job {}: {}", jobId, e.getMessage());
}
}
}
Loading