From 14c7c66885afbb271cf38905e5e94dbdfc0d1ac6 Mon Sep 17 00:00:00 2001 From: "E. Lynette Rayle" Date: Fri, 28 Jun 2024 11:50:02 -0400 Subject: [PATCH 1/7] minor corrections in README --- tools/blobstorage-backupdata/README.MD | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/blobstorage-backupdata/README.MD b/tools/blobstorage-backupdata/README.MD index 54994f1..34b06e3 100644 --- a/tools/blobstorage-backupdata/README.MD +++ b/tools/blobstorage-backupdata/README.MD @@ -15,7 +15,7 @@ parties to maintain the complete mirror of ClearlyDefined definitions, and doing ## Published data structure The changes published into a storage account named `clearlydefinedprod`, in a container named -`production-snapshots`. This container doesn't allow for listing of objects, because of costs +`changes-notifications`. This container doesn't allow for listing of objects, because of costs considerations. Instead, the following mechanism is used to notify users of new changes. The files contained in the storage container can be split into two categories: @@ -25,7 +25,7 @@ The files contained in the storage container can be split into two categories: ### Definition files Definition files are named after ClearlyDefined coordinates with a `.json` extension, e.g. `pypi/pypi/-/opentelemetry-api/1.21.0.json`. They are contained in the root directory of the -`production-snapshots` storage container. They can be read individually, but not listed. +`changes-notifications` storage container. They can be read individually, but not listed. Each definition file contain a JSON representation of a ClearlyDefined package definition, excluding the `files` part. @@ -110,7 +110,7 @@ excluding the `files` part. ### Indexing files -Indexing files are contained in the `changes` sub-directory of the `production-snapshots` container. +Indexing files are contained in the `changes` sub-directory of the `changes-notifications` container. There are further two categories of these files: 1. Changeset files, 2. `index` file. @@ -222,7 +222,7 @@ interval from the main database, and publishes the resulting definitions and cha updates the `changes\index` file by adding the published changeset entries. ### Operations -`blobstorage-backupdata` is built on every merge to main branch. It done using GitHub action defined in +`blobstorage-backupdata` is built on every merge to main branch. It is done using a GitHub action defined in `.github/workflows/backup-data-docker-image.yml` file. The produced Docker image is published to `ghcr.io``, and is public. The image is tagged with SHA of the commit. From 35568859ee182ca5dbb7a75d041893eb1efec934 Mon Sep 17 00:00:00 2001 From: "E. Lynette Rayle" Date: Mon, 1 Jul 2024 14:22:54 -0400 Subject: [PATCH 2/7] ignore .env, python virtual .venv, and tool output files --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.gitignore b/.gitignore index 5615687..94ee5a1 100644 --- a/.gitignore +++ b/.gitignore @@ -309,6 +309,11 @@ paket-files/ # Python Tools for Visual Studio (PTVS) __pycache__/ *.pyc +**/.env +**/.venv + +# tools/analyze_data_synchronization +tools/analyze_data_synchronization/*.json # Cake - Uncomment if you are using it # tools/** From 339d18fd8bdb6db07283fcc0ce5527898db1234f Mon Sep 17 00:00:00 2001 From: "E. Lynette Rayle" Date: Mon, 1 Jul 2024 14:27:44 -0400 Subject: [PATCH 3/7] add script to check if DB and production-definitions have the same data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This performs a check one month at a time hardcoded for all months in 2024. Output file is hardcoded to "2024-invalid_data.json”. --- tools/analyze_data_synchronization/analyze.py | 164 ++++++++++++++++++ .../requirements.txt | 12 ++ 2 files changed, 176 insertions(+) create mode 100644 tools/analyze_data_synchronization/analyze.py create mode 100644 tools/analyze_data_synchronization/requirements.txt diff --git a/tools/analyze_data_synchronization/analyze.py b/tools/analyze_data_synchronization/analyze.py new file mode 100644 index 0000000..28335c7 --- /dev/null +++ b/tools/analyze_data_synchronization/analyze.py @@ -0,0 +1,164 @@ +import json +import os +import urllib.parse + +import pymongo +import requests +from azure.cosmos import cosmos_client +from dotenv import load_dotenv + +load_dotenv() + +MONGO_CONNECTION_STRING = str(os.environ.get("MONGO_CONNECTION_STRING")) +DB_NAME = "clearlydefined" +COLLECTION_NAME = "definitions-trimmed" +BASE_AZURE_BLOB_URL = str(os.environ.get("BASE_AZURE_BLOB_URL")) + + +# Example coordinates: composer/packagist/00f100/fcphp-cache/revision/0.1.0.json + +# Mongo document with unused fields removed +# { +# "_id": "composer/packagist/00f100/fcphp-cache/0.1.0", +# "_meta": { +# "schemaVersion": "1.6.1", +# "updated": "2019-08-29T02:06:54.498Z" +# }, +# "coordinates": {# "type": "composer", +# "provider": "packagist", +# "namespace": "00f100", +# "name": "fcphp-cache", +# "revision": "0.1.0" +# }, +# "licensed": { +# "declared": "MIT",# "toolScore": { +# "total": 17, +# "declared": 0, +# "discovered": 2, +# "consistency": 0, +# "spdx": 0, +# "texts": 15 +# }, +# "score": { +# "total": 17, +# "declared": 0, +# "discovered": 2, +# "consistency": 0, +# "spdx": 0, +# "texts": 15 +# } +# } +# } + + +def fetch_blob(base_url, type, provider, namespace, name, revision): + """Fetch the blob from the azure blob storage""" + # need to encode the url for the %2f characters + url = urllib.parse.quote( + f"{type}/{provider}/{namespace}/{name}/revision/{revision}.json".lower() + ) + url = f"{base_url}/{url}" + # Fetch the data from the blob storage + res = requests.get(url) + if res.status_code != 200: + return {} + return res.json() + + +def dump_data(data, filename): + with open(filename, "w") as f: + json.dump(data, f) + + +client = pymongo.MongoClient(MONGO_CONNECTION_STRING) + +db = client[DB_NAME] +if DB_NAME not in client.list_database_names(): + print(f"Database '{DB_NAME}' not found.") +else: + print(f"Using database: '{DB_NAME}'.") + +collection = db[COLLECTION_NAME] +if COLLECTION_NAME not in db.list_collection_names(): + print(f"Collection '{COLLECTION_NAME}' not found.") +else: + print(f"Using collection: '{COLLECTION_NAME}'.") + + +months = ["2024-01", "2024-02", "2024-03", "2024-04", "2024-05", "2024-06"] + +invalid_data = {} + +for month in months: + docs = collection.find( + { + "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, + "licensed.declared": {"$exists": False}, + }, + max_time_ms=10000000, + ).limit(5000) + + doc_count = collection.count_documents( + { + "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, + "licensed.declared": {"$exists": False}, + }, + max_time_ms=10000000, + ) + + invalid_data[month] = { + "stats": { + "sample_total": 0, + "sample_invalid": 0, + } + } + count = 0 + + for doc in docs: + count += 1 + blob = fetch_blob( + BASE_AZURE_BLOB_URL, + doc["coordinates"]["type"], + doc["coordinates"]["provider"], + doc["coordinates"].get("namespace", "-"), + doc["coordinates"]["name"], + doc["coordinates"]["revision"], + ) + db_licensed = doc.get("licensed", {}) + blob_licensed = blob.get("licensed", {}) + + if db_licensed.get("declared") != blob_licensed.get("declared"): + # only adding the licensed and meta fields to the invalid data + invalid_data[month][doc["_id"]] = { + "db": { + "licensed": (db_licensed.get("declared")), + "_meta": doc.get("_meta", {}), + }, + "blob": { + "licensed": (blob_licensed.get("declared")), + "_meta": blob.get("_meta", {}), + }, + } + + # Checkpoint in case mongo dies + if count % 100 == 0: + print( + f"Checkpoint: total number of invalid data: {len(invalid_data[month])}, total items {count} ({len(invalid_data[month])/count * 100}%)" + ) + invalid_data[month]["stats"]["sample_total"] = count + invalid_data[month]["stats"]["sample_invalid"] = len(invalid_data[month]) + dump_data(invalid_data, f"2024-invalid_data.json") + + invalid_data[month]["stats"]["total_documents"] = doc_count + invalid_data[month]["stats"]["total_estimated_invalid"] = doc_count * ( + len(invalid_data[month]) / count + ) + invalid_data[month]["stats"]["sample_percent_of_total"] = doc_count * ( + count / doc_count + ) + dump_data(invalid_data, f"2024-invalid_data.json") + print("Done") + print( + f"Total number of invalid data: {len(invalid_data[month])}, total items {count} ({len(invalid_data[month])/count * 100}%)" + ) + dump_data(invalid_data, f"2024-invalid_data.json") \ No newline at end of file diff --git a/tools/analyze_data_synchronization/requirements.txt b/tools/analyze_data_synchronization/requirements.txt new file mode 100644 index 0000000..3d63693 --- /dev/null +++ b/tools/analyze_data_synchronization/requirements.txt @@ -0,0 +1,12 @@ +azure-core==1.30.2 +azure-cosmos==4.7.0 +certifi==2024.6.2 +charset-normalizer==3.3.2 +dnspython==2.6.1 +idna==3.7 +pymongo==4.7.3 +python-dotenv==1.0.1 +requests==2.32.3 +six==1.16.0 +typing_extensions==4.12.2 +urllib3==2.2.1 \ No newline at end of file From cba85deb978a8f793c5d685436329d55f507deba Mon Sep 17 00:00:00 2001 From: "E. Lynette Rayle" Date: Mon, 1 Jul 2024 18:20:42 -0400 Subject: [PATCH 4/7] add ability to have start and stop dates * allows for a check of a single week * continues to support processing a month at a time * expands support for controlling function through .env file * provides example .env file --- .../analyze_data_synchronization/.env_example | 8 + tools/analyze_data_synchronization/README.md | 124 ++++++++++ tools/analyze_data_synchronization/analyze.py | 223 +++++++++++++----- 3 files changed, 294 insertions(+), 61 deletions(-) create mode 100644 tools/analyze_data_synchronization/.env_example create mode 100644 tools/analyze_data_synchronization/README.md diff --git a/tools/analyze_data_synchronization/.env_example b/tools/analyze_data_synchronization/.env_example new file mode 100644 index 0000000..d9399fc --- /dev/null +++ b/tools/analyze_data_synchronization/.env_example @@ -0,0 +1,8 @@ +MONGO_CONNECTION_STRING="mongodb://localhost:27017/" +BASE_AZURE_BLOB_URL = "https://storageaccount.blob.core.windows.net/container_name" +OUTPUT_FILE = "invalid-data.json" +# START_DATE = "2024-06-21" +# END_DATE = "2024-06-28" +START_MONTH = str(os.environ.get("START_MONTH", "2024-06")) +END_MONTH = str(os.environ.get("END_MONTH", "2024-06")) +MAX_DOCS = 500 \ No newline at end of file diff --git a/tools/analyze_data_synchronization/README.md b/tools/analyze_data_synchronization/README.md new file mode 100644 index 0000000..70ff1f5 --- /dev/null +++ b/tools/analyze_data_synchronization/README.md @@ -0,0 +1,124 @@ +# analyze_data_synchronization tool + +This script is used to quantify the level of out-of-sync data between the Cosmos DB and the production-definitions data (source of truth). +It is a diagnostic tool intended to be run on localhost if a problem is suspected. It is not run on a regular basis, at least at this +time. + +## Usage + +### Prerequisites + +Set up environment variables that drive how the tool runs. This can be set as system env vars. They can also be set in a `.env` You can +rename `.env-example` to `.env` and modify as desired. + +- MONGO_CONNECTION_STRING (required) - the connection string to the MongoDB database +- BASE_AZURE_BLOB_URL (required) - the base path including the container +- START_DATE (optional) - the first date to include in the query (default: `""`) +- END_DATE (optional) - the last date to include in the query (default: `""`) +- START_MONTH (optional) - the first month to include in the query (default: `"2024-01"`) +- END_MONTH (optional) - the last month to include in the query (default: `"2024-06"`) +- MAX_DOCS (optional) - the max number of documents that will be processed for each month or during the custom date range (default: 5000) +- OUTPUT_FILE (optional) - the file to write the output to (default: `"invalid_data.json"`) + +_NOTE: Limiting MAX_DOCS to no more than 5000 allows the script to complete in a reasonable length of time and is a +sample of sufficient size to provide an understanding of the scope of the problem._ + +### Set up virtual environment + +This is best run in a Python virtual environment. Set up the .venv and install the required dependencies. + +```bash +python3 -m venv .venv +source .venv/bin/activate +python3 -m pip install -r requirements.txt +``` + +### Run the script + +```bash +python3 analyze.py +``` + +## Example + +### Example coordinates + +```text +composer/packagist/00f100/fcphp-cache/revision/0.1.0.json +``` + +### Example Mongo document with unused fields removed + +```json +{ + "_id": "composer/packagist/00f100/fcphp-cache/0.1.0", + "_meta": { + "schemaVersion": "1.6.1", + "updated": "2019-08-29T02:06:54.498Z" + }, + "coordinates": { + "type": "composer", + "provider": "packagist", + "namespace": "00f100", + "name": "fcphp-cache", + "revision": "0.1.0" + }, + "licensed": { + "declared": "MIT", + "toolScore": { + "total": 17, + "declared": 0, + "discovered": 2, + "consistency": 0, + "spdx": 0, + "texts": 15 + }, + "score": { + "total": 17, + "declared": 0, + "discovered": 2, + "consistency": 0, + "spdx": 0, + "texts": 15 + } + } +} +``` + +### Example Output + +The following shows the summary stats and an example of one of the invalid samples. The actual results will contain +all the invalid samples. + +```json +{ + "2024-06": { + "stats": { + "sample_total": 500, + "sample_invalid": 6, + "percent_invalid": "1.2%", + "total_documents": 86576, + "total_estimated_invalid": 1039, + "sample_percent_of_total": "0.58%" + }, + "sourcearchive/mavencentral/org.apache.kerby/kerby-util/1.0.1": { + "db": { + "licensed": null, + "_meta": { + "schemaVersion": "1.6.1", + "updated": "2024-06-13T12:59:21.981Z" + } + }, + "blob": { + "licensed": "Apache-2.0", + "_meta": { + "schemaVersion": "1.6.1", + "updated": "2024-06-13T12:59:31.368Z" + } + } + }, + ... + } + ... +} +``` diff --git a/tools/analyze_data_synchronization/analyze.py b/tools/analyze_data_synchronization/analyze.py index 28335c7..708ace6 100644 --- a/tools/analyze_data_synchronization/analyze.py +++ b/tools/analyze_data_synchronization/analyze.py @@ -1,5 +1,6 @@ import json import os +import sys import urllib.parse import pymongo @@ -7,6 +8,20 @@ from azure.cosmos import cosmos_client from dotenv import load_dotenv +# To run this script, you need to have the following environment variables set: +# +# * MONGO_CONNECTION_STRING: The connection string to the MongoDB database +# * START_MONTH: The first month to include in the query +# * END_MONTH: The last month to include in the query +# * OUTPUT_FILE: The file to write the output to +# +# Command to run this script on localhost from this directory after setting the environment variables: +# +# python3 -m venv .venv +# source .venv/bin/activate +# python3 -m pip install -r requirements.txt +# python3 analyze.py + load_dotenv() MONGO_CONNECTION_STRING = str(os.environ.get("MONGO_CONNECTION_STRING")) @@ -14,24 +29,34 @@ COLLECTION_NAME = "definitions-trimmed" BASE_AZURE_BLOB_URL = str(os.environ.get("BASE_AZURE_BLOB_URL")) +START_MONTH = str(os.environ.get("START_MONTH", "2024-01")) +END_MONTH = str(os.environ.get("END_MONTH", "2024-06")) + +START_DATE = str(os.environ.get("START_DATE", "")) +END_DATE = str(os.environ.get("END_DATE", "")) + +MAX_DOCS = int(os.environ.get("MAX_DOCS", 5000)) +OUTPUT_FILE = str(os.environ.get("OUTPUT_FILE", "invalid_data.json")) # Example coordinates: composer/packagist/00f100/fcphp-cache/revision/0.1.0.json -# Mongo document with unused fields removed +# Example Mongo document with unused fields removed # { # "_id": "composer/packagist/00f100/fcphp-cache/0.1.0", # "_meta": { # "schemaVersion": "1.6.1", # "updated": "2019-08-29T02:06:54.498Z" # }, -# "coordinates": {# "type": "composer", +# "coordinates": { +# "type": "composer", # "provider": "packagist", # "namespace": "00f100", # "name": "fcphp-cache", # "revision": "0.1.0" # }, # "licensed": { -# "declared": "MIT",# "toolScore": { +# "declared": "MIT", +# "toolScore": { # "total": 17, # "declared": 0, # "discovered": 2, @@ -50,6 +75,21 @@ # } # } +# Example Output +# [ +# "2024-01": { +# "stats": { +# "sample_total": 5000, +# "sample_invalid": 657, +# "percent_invalid": 13.10% +# "total_documents": 84167, +# "total_estimated_invalid": 11059.5438, +# "sample_percent_of_total": 5.94% +# } +# }, +# ... +# ] + def fetch_blob(base_url, type, provider, namespace, name, revision): """Fetch the blob from the azure blob storage""" @@ -69,51 +109,56 @@ def dump_data(data, filename): with open(filename, "w") as f: json.dump(data, f) +def initialize_stats(range_label, total_docs_count): + invalid_data[range_label] = { + "stats": { + "sample_total": 0, + "sample_invalid": 0, + "percent_invalid": "0%", + "total_documents": total_docs_count, + "total_estimated_invalid": 0, + "sample_percent_of_total": "0%", + } + } + return invalid_data -client = pymongo.MongoClient(MONGO_CONNECTION_STRING) - -db = client[DB_NAME] -if DB_NAME not in client.list_database_names(): - print(f"Database '{DB_NAME}' not found.") -else: - print(f"Using database: '{DB_NAME}'.") - -collection = db[COLLECTION_NAME] -if COLLECTION_NAME not in db.list_collection_names(): - print(f"Collection '{COLLECTION_NAME}' not found.") -else: - print(f"Using collection: '{COLLECTION_NAME}'.") - +def update_stats(invalid_data, range_label, sample_count, checkpoint=False): + invalid_count = len(invalid_data[range_label]) - 1 + invalid_data[range_label]["stats"]["sample_total"] = sample_count + invalid_data[range_label]["stats"]["sample_invalid"] = invalid_count -months = ["2024-01", "2024-02", "2024-03", "2024-04", "2024-05", "2024-06"] + percent_invalid = invalid_count / sample_count * 100 + invalid_data[range_label]["stats"]["percent_invalid"] = str(round(percent_invalid, 2)) + "%" -invalid_data = {} + total_count = invalid_data[range_label]["stats"]["total_documents"] + invalid_data[range_label]["stats"]["total_estimated_invalid"] = round((total_count * percent_invalid) / 100) + invalid_data[range_label]["stats"]["sample_percent_of_total"] = str(round((sample_count / total_count * 100), 2)) + "%" -for month in months: - docs = collection.find( - { - "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, - "licensed.declared": {"$exists": False}, - }, - max_time_ms=10000000, - ).limit(5000) + if checkpoint: + print( + f"Checkpoint: total number of invalid data: {invalid_count}, total items {sample_count} ({percent_invalid}%)" + ) + else: + print( + f"Total number of invalid data: {invalid_count}, total items {sample_count} ({percent_invalid}%)" + ) + dump_data(invalid_data, OUTPUT_FILE) - doc_count = collection.count_documents( - { - "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, - "licensed.declared": {"$exists": False}, - }, - max_time_ms=10000000, - ) +def create_months(start_month, end_month): + start_year, start_month = start_month.split("-") + end_year, end_month = end_month.split("-") + months = [] + for year in range(int(start_year), int(end_year) + 1): + for month in range(1, 13): + if year == int(start_year) and month < int(start_month): + continue + if year == int(end_year) and month > int(end_month): + continue + months.append(f"{year}-{str(month).zfill(2)}") + return months - invalid_data[month] = { - "stats": { - "sample_total": 0, - "sample_invalid": 0, - } - } +def analyze_docs(docs, range_label, invalid_data): count = 0 - for doc in docs: count += 1 blob = fetch_blob( @@ -129,7 +174,7 @@ def dump_data(data, filename): if db_licensed.get("declared") != blob_licensed.get("declared"): # only adding the licensed and meta fields to the invalid data - invalid_data[month][doc["_id"]] = { + invalid_data[range_label][doc["_id"]] = { "db": { "licensed": (db_licensed.get("declared")), "_meta": doc.get("_meta", {}), @@ -142,23 +187,79 @@ def dump_data(data, filename): # Checkpoint in case mongo dies if count % 100 == 0: - print( - f"Checkpoint: total number of invalid data: {len(invalid_data[month])}, total items {count} ({len(invalid_data[month])/count * 100}%)" - ) - invalid_data[month]["stats"]["sample_total"] = count - invalid_data[month]["stats"]["sample_invalid"] = len(invalid_data[month]) - dump_data(invalid_data, f"2024-invalid_data.json") - - invalid_data[month]["stats"]["total_documents"] = doc_count - invalid_data[month]["stats"]["total_estimated_invalid"] = doc_count * ( - len(invalid_data[month]) / count - ) - invalid_data[month]["stats"]["sample_percent_of_total"] = doc_count * ( - count / doc_count - ) - dump_data(invalid_data, f"2024-invalid_data.json") - print("Done") - print( - f"Total number of invalid data: {len(invalid_data[month])}, total items {count} ({len(invalid_data[month])/count * 100}%)" + update_stats(invalid_data, range_label, count, True) + + update_stats(invalid_data, range_label, count) + return count + + +client = pymongo.MongoClient(MONGO_CONNECTION_STRING) + +db = client[DB_NAME] +if DB_NAME not in client.list_database_names(): + print(f"Database '{DB_NAME}' not found.") +else: + print(f"Using database: '{DB_NAME}'.") + +collection = db[COLLECTION_NAME] +if COLLECTION_NAME not in db.list_collection_names(): + print(f"Collection '{COLLECTION_NAME}' not found.") +else: + print(f"Using collection: '{COLLECTION_NAME}'.") + +print(f"OUTPUT_FILE: '{OUTPUT_FILE}'") +print(f"MAX_DOCS: {MAX_DOCS}") + +invalid_data = {} + +if START_DATE and END_DATE: + print("Processing custom date range") + print(f" START_DATE: {START_DATE}") + print(f" END_DATE: {END_DATE}") + docs = collection.find( + { + "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, + "licensed.declared": {"$exists": False}, + }, + max_time_ms=10000000, + ).limit(MAX_DOCS) + + all_docs_count = collection.count_documents( + { + "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, + "licensed.declared": {"$exists": False}, + }, + max_time_ms=10000000, ) - dump_data(invalid_data, f"2024-invalid_data.json") \ No newline at end of file + + invalid_data = initialize_stats("custom_range", all_docs_count) + analyze_docs(docs, "custom_range", invalid_data) + +else: + print("Processing by months") + print(f" START_MONTH: {START_MONTH}") + print(f" END_MONTH: {END_MONTH}") + months = create_months(START_MONTH, END_MONTH) + print(f" {months}") + + for month in months: + docs = collection.find( + { + "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, + "licensed.declared": {"$exists": False}, + }, + max_time_ms=10000000, + ).limit(MAX_DOCS) + + all_docs_count = collection.count_documents( + { + "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, + "licensed.declared": {"$exists": False}, + }, + max_time_ms=10000000, + ) + + invalid_data = initialize_stats(month, all_docs_count) + analyze_docs(docs, month, invalid_data) + +dump_data(invalid_data, OUTPUT_FILE) From 615c940ca57253a572cbaf31bbcb0acee5c19159 Mon Sep 17 00:00:00 2001 From: "E. Lynette Rayle" Date: Tue, 23 Jul 2024 08:40:11 -0400 Subject: [PATCH 5/7] process using pagination and optionally repair out-of-sync data --- .../analyze_data_synchronization/.env_example | 8 +- tools/analyze_data_synchronization/analyze.py | 227 ++++++++++++------ 2 files changed, 163 insertions(+), 72 deletions(-) diff --git a/tools/analyze_data_synchronization/.env_example b/tools/analyze_data_synchronization/.env_example index d9399fc..c084ec5 100644 --- a/tools/analyze_data_synchronization/.env_example +++ b/tools/analyze_data_synchronization/.env_example @@ -1,8 +1,12 @@ MONGO_CONNECTION_STRING="mongodb://localhost:27017/" -BASE_AZURE_BLOB_URL = "https://storageaccount.blob.core.windows.net/container_name" +BASE_AZURE_BLOB_URL = "https://clearlydefineddev.blob.core.windows.net" +AZURE_CONTAINER_NAME = "develop-definition" +SERVICE_API_URL = "http://dev-api.clearlydefined.io/" OUTPUT_FILE = "invalid-data.json" # START_DATE = "2024-06-21" # END_DATE = "2024-06-28" START_MONTH = str(os.environ.get("START_MONTH", "2024-06")) END_MONTH = str(os.environ.get("END_MONTH", "2024-06")) -MAX_DOCS = 500 \ No newline at end of file +INITIAL_SKIP = 0 +PAGE_SIZE = 1000 +REPAIR = false diff --git a/tools/analyze_data_synchronization/analyze.py b/tools/analyze_data_synchronization/analyze.py index 708ace6..c6ad021 100644 --- a/tools/analyze_data_synchronization/analyze.py +++ b/tools/analyze_data_synchronization/analyze.py @@ -8,26 +8,43 @@ from azure.cosmos import cosmos_client from dotenv import load_dotenv -# To run this script, you need to have the following environment variables set: +# All environment variables can be defined in tools/analyze_data_synchronization/.env # +# Required environment variables: # * MONGO_CONNECTION_STRING: The connection string to the MongoDB database -# * START_MONTH: The first month to include in the query -# * END_MONTH: The last month to include in the query -# * OUTPUT_FILE: The file to write the output to +# * BASE_AZURE_BLOB_URL The base URL for the Azure Blob Storage +# * SERVICE_API_URL: The URL for the service API +# +# Optional environment variables: +# * AZURE_CONTAINER_NAME: The name of the Azure Blob Storage container (default: "develop-definition") +# * OUTPUT_FILE: The file to write the output to (default: "invalid_data.json") +# * REPAIR: If set to "true", the script will attempt to repair the data (default: false) +# * PAGE_SIZE: The number of documents to process at a time (default: 1000) +# * INITIAL_SKIP: The number of documents to skip before starting the analysis (default: 0) - used with START_DATE and END_DATE +# * START_MONTH: The first month to include in the query (default: 2024-01) +# * END_MONTH: The last month to include in the query (default: 2024-06) +# * START_DATE: The first date to include in the query (default: "") +# * END_DATE: The last date to include in the query (default: "") +# * VERBOSE: If set to "true", the script will output more information (default: false) + +# Commands to run this script on localhost from this directory after setting the environment variables: # -# Command to run this script on localhost from this directory after setting the environment variables: -# -# python3 -m venv .venv -# source .venv/bin/activate -# python3 -m pip install -r requirements.txt -# python3 analyze.py +# cd tools/analyze_data_synchronization +# python3 -m venv .venv +# source .venv/bin/activate +# python3 -m pip install -r requirements.txt +# python3 analyze.py load_dotenv() MONGO_CONNECTION_STRING = str(os.environ.get("MONGO_CONNECTION_STRING")) DB_NAME = "clearlydefined" COLLECTION_NAME = "definitions-trimmed" + BASE_AZURE_BLOB_URL = str(os.environ.get("BASE_AZURE_BLOB_URL")) +AZURE_CONTAINER_NAME = str(os.environ.get("AZURE_CONTAINER_NAME", "develop-definition")) + +SERVICE_API_URL = str(os.environ.get("SERVICE_API_URL")) START_MONTH = str(os.environ.get("START_MONTH", "2024-01")) END_MONTH = str(os.environ.get("END_MONTH", "2024-06")) @@ -35,8 +52,11 @@ START_DATE = str(os.environ.get("START_DATE", "")) END_DATE = str(os.environ.get("END_DATE", "")) -MAX_DOCS = int(os.environ.get("MAX_DOCS", 5000)) +REPAIR = str(os.environ.get("REPAIR", "false")).lower() == "true" +PAGE_SIZE = int(os.environ.get("PAGE_SIZE", 1000)) +INITIAL_SKIP = int(os.environ.get("INITIAL_SKIP", 0)) OUTPUT_FILE = str(os.environ.get("OUTPUT_FILE", "invalid_data.json")) +VERBOSE = str(os.environ.get("VERBOSE", "false")).lower() == "true" # Example coordinates: composer/packagist/00f100/fcphp-cache/revision/0.1.0.json @@ -91,25 +111,35 @@ # ] -def fetch_blob(base_url, type, provider, namespace, name, revision): +def fetch_blob(base_url, container_name, type, provider, namespace, name, revision): """Fetch the blob from the azure blob storage""" # need to encode the url for the %2f characters url = urllib.parse.quote( f"{type}/{provider}/{namespace}/{name}/revision/{revision}.json".lower() ) - url = f"{base_url}/{url}" + url = f"{base_url}/{container_name}/{url}" # Fetch the data from the blob storage res = requests.get(url) if res.status_code != 200: return {} return res.json() +def repair_data(service_url, type, provider, namespace, name, revision): + """Repair the data by requesting the definition from the service with the force parameter""" + if VERBOSE: + print(f" Repairing data for {type}/{provider}/{namespace}/{name}/{revision}") + url = f"{service_url}/definitions/{type}/{provider}/{namespace}/{name}/{revision}?force=true" + res = requests.get(url) + if res.status_code != 200: + return {} + return res.json() def dump_data(data, filename): with open(filename, "w") as f: json.dump(data, f) -def initialize_stats(range_label, total_docs_count): +def initialize_stats(range_label, total_docs_count, invalid_data): + # TODO: Add container name to summary stats. invalid_data[range_label] = { "stats": { "sample_total": 0, @@ -120,10 +150,8 @@ def initialize_stats(range_label, total_docs_count): "sample_percent_of_total": "0%", } } - return invalid_data -def update_stats(invalid_data, range_label, sample_count, checkpoint=False): - invalid_count = len(invalid_data[range_label]) - 1 +def update_stats(invalid_data, invalid_count, range_label, sample_count, checkpoint=False): invalid_data[range_label]["stats"]["sample_total"] = sample_count invalid_data[range_label]["stats"]["sample_invalid"] = invalid_count @@ -134,13 +162,16 @@ def update_stats(invalid_data, range_label, sample_count, checkpoint=False): invalid_data[range_label]["stats"]["total_estimated_invalid"] = round((total_count * percent_invalid) / 100) invalid_data[range_label]["stats"]["sample_percent_of_total"] = str(round((sample_count / total_count * 100), 2)) + "%" + repaired = "" + if REPAIR: + repaired = " (repaired)" if checkpoint: print( - f"Checkpoint: total number of invalid data: {invalid_count}, total items {sample_count} ({percent_invalid}%)" + f"Checkpoint: total invalid data: {invalid_count}{repaired}, total items {sample_count} ({percent_invalid}%)" ) else: print( - f"Total number of invalid data: {invalid_count}, total items {sample_count} ({percent_invalid}%)" + f"Total invalid data: {invalid_count}{repaired}, total items {sample_count} ({percent_invalid}%)" ) dump_data(invalid_data, OUTPUT_FILE) @@ -157,12 +188,55 @@ def create_months(start_month, end_month): months.append(f"{year}-{str(month).zfill(2)}") return months -def analyze_docs(docs, range_label, invalid_data): - count = 0 +def page_count(collection, query, range_label, invalid_data): + all_docs_count = collection.count_documents(query, + max_time_ms=10000000) + + initialize_stats(range_label, all_docs_count, invalid_data) + if all_docs_count == 0: + print("No documents found missing licenses in {range_label}.") + return 0 + + if INITIAL_SKIP > 0: + print(f"Skipping {INITIAL_SKIP} documents") + all_docs_count -= INITIAL_SKIP + + pages = all_docs_count // PAGE_SIZE + if all_docs_count % PAGE_SIZE: + pages += 1 + return pages + + +def analyze_docs(collection, query, range_label, invalid_data): + pages = page_count(collection, query, range_label, invalid_data) + if pages == 0: + return + + running_count_docs = 0 + running_count_invalid = 0 + page = 0 + skip = 0 + if INITIAL_SKIP > 0: + skip = INITIAL_SKIP + while True: + print(f"Processing page {page+1} of {pages} in {range_label}") + docs = collection.find(query).skip(skip).limit(PAGE_SIZE).max_time_ms(10000000) + new_docs_count, new_invalid_count = analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_label, invalid_data) + running_count_invalid += new_invalid_count + running_count_docs += new_docs_count + if new_docs_count == 0: + break + page += 1 + skip = page * PAGE_SIZE + INITIAL_SKIP + +def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_label, invalid_data): + count_docs = 0 + count_invalid = 0 for doc in docs: - count += 1 + count_docs += 1 blob = fetch_blob( BASE_AZURE_BLOB_URL, + AZURE_CONTAINER_NAME, doc["coordinates"]["type"], doc["coordinates"]["provider"], doc["coordinates"].get("namespace", "-"), @@ -173,42 +247,69 @@ def analyze_docs(docs, range_label, invalid_data): blob_licensed = blob.get("licensed", {}) if db_licensed.get("declared") != blob_licensed.get("declared"): - # only adding the licensed and meta fields to the invalid data - invalid_data[range_label][doc["_id"]] = { - "db": { - "licensed": (db_licensed.get("declared")), - "_meta": doc.get("_meta", {}), - }, - "blob": { - "licensed": (blob_licensed.get("declared")), - "_meta": blob.get("_meta", {}), - }, - } + count_invalid += 1 + if VERBOSE: + # only adding the licensed and meta fields to the invalid data + invalid_data[range_label][doc["_id"]] = { + "db": { + "licensed": (db_licensed.get("declared")), + "_meta": doc.get("_meta", {}), + }, + "blob": { + "licensed": (blob_licensed.get("declared")), + "_meta": blob.get("_meta", {}), + }, + } + if REPAIR: + # request definition with the force parameter to update the licensed.declared field in the database + collection.update_one( + {"_id": doc["_id"]}, + {"$set": {"licensed.declared": blob_licensed.get("declared")}}, + ) + blob = repair_data( + SERVICE_API_URL, + doc["coordinates"]["type"], + doc["coordinates"]["provider"], + doc["coordinates"].get("namespace", "-"), + doc["coordinates"]["name"], + doc["coordinates"]["revision"], + ) + # Checkpoint in case mongo dies - if count % 100 == 0: - update_stats(invalid_data, range_label, count, True) + if count_docs % 100 == 0: + total_invalid = running_count_invalid + count_invalid + total_docs = running_count_docs + count_docs + update_stats(invalid_data, total_invalid, range_label, total_docs, True) - update_stats(invalid_data, range_label, count) - return count + total_invalid = running_count_invalid + count_invalid + total_docs = running_count_docs + count_docs + update_stats(invalid_data, total_invalid, range_label, total_docs) + return count_docs, count_invalid +### Main ### +print("Starting data synchronization analysis") client = pymongo.MongoClient(MONGO_CONNECTION_STRING) db = client[DB_NAME] if DB_NAME not in client.list_database_names(): - print(f"Database '{DB_NAME}' not found.") + print(f" Database '{DB_NAME}' not found.") else: - print(f"Using database: '{DB_NAME}'.") + print(f" Using database: '{DB_NAME}'.") collection = db[COLLECTION_NAME] if COLLECTION_NAME not in db.list_collection_names(): - print(f"Collection '{COLLECTION_NAME}' not found.") + print(f" Collection '{COLLECTION_NAME}' not found.") else: - print(f"Using collection: '{COLLECTION_NAME}'.") + print(f" Using collection: '{COLLECTION_NAME}'.") + +print(f" Using blob container: '{AZURE_CONTAINER_NAME}'") +print(f"PAGE_SIZE: {PAGE_SIZE}") +print(f"INITIAL_SKIP: {INITIAL_SKIP}") +print(f"REPAIR: {REPAIR}") print(f"OUTPUT_FILE: '{OUTPUT_FILE}'") -print(f"MAX_DOCS: {MAX_DOCS}") invalid_data = {} @@ -216,26 +317,19 @@ def analyze_docs(docs, range_label, invalid_data): print("Processing custom date range") print(f" START_DATE: {START_DATE}") print(f" END_DATE: {END_DATE}") - docs = collection.find( + + analyze_docs( + collection, { "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, "licensed.declared": {"$exists": False}, }, - max_time_ms=10000000, - ).limit(MAX_DOCS) - - all_docs_count = collection.count_documents( - { - "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, - "licensed.declared": {"$exists": False}, - }, - max_time_ms=10000000, - ) - - invalid_data = initialize_stats("custom_range", all_docs_count) - analyze_docs(docs, "custom_range", invalid_data) + "custom_range", + invalid_data, + ) else: + INITIAL_SKIP = 0 print("Processing by months") print(f" START_MONTH: {START_MONTH}") print(f" END_MONTH: {END_MONTH}") @@ -243,23 +337,16 @@ def analyze_docs(docs, range_label, invalid_data): print(f" {months}") for month in months: - docs = collection.find( - { - "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, - "licensed.declared": {"$exists": False}, - }, - max_time_ms=10000000, - ).limit(MAX_DOCS) + print(f"Processing {month}") - all_docs_count = collection.count_documents( + analyze_docs( + collection, { - "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, + "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, "licensed.declared": {"$exists": False}, }, - max_time_ms=10000000, - ) - - invalid_data = initialize_stats(month, all_docs_count) - analyze_docs(docs, month, invalid_data) + month, + invalid_data, + ) dump_data(invalid_data, OUTPUT_FILE) From 291234c549565967b87555d9c2bde955cf365f49 Mon Sep 17 00:00:00 2001 From: "E. Lynette Rayle" Date: Tue, 23 Jul 2024 14:00:04 -0400 Subject: [PATCH 6/7] process updates in batches of 500 Batch processing: * updates just the declared license in the DB documents using `collection.bulk_write()` * updates denitions using service API `POST /definitions?force=true` _NOTE: Updating the DB makes the fix of the declared license immediately available. When the `POST /definitions` request completes, the full DB document will be updated to be in sync with the blob definition._ Additional changes: * moves global variable definitions based on .env to the initialize() function * adds DRYRUN flag to check what would run and how many records would be evaluated * add estimated time to complete * adds script and function level documentation * includes timestamps to make it easier to estimate how long it will take to complete a run * generate filename based on date range and offset to avoid overwriting output files _NOTE: Azure only supports fetching one blob at a time. Not able to optimize that part of the code. _ _NOTE: Batch size of 500 was selected because that is the max number of coordinates supported in calls to service API `POST /definitions`._ --- .../analyze_data_synchronization/.env_example | 3 +- tools/analyze_data_synchronization/analyze.py | 250 ++++++++++++------ 2 files changed, 169 insertions(+), 84 deletions(-) diff --git a/tools/analyze_data_synchronization/.env_example b/tools/analyze_data_synchronization/.env_example index c084ec5..6094a48 100644 --- a/tools/analyze_data_synchronization/.env_example +++ b/tools/analyze_data_synchronization/.env_example @@ -2,7 +2,7 @@ MONGO_CONNECTION_STRING="mongodb://localhost:27017/" BASE_AZURE_BLOB_URL = "https://clearlydefineddev.blob.core.windows.net" AZURE_CONTAINER_NAME = "develop-definition" SERVICE_API_URL = "http://dev-api.clearlydefined.io/" -OUTPUT_FILE = "invalid-data.json" +BASE_OUTPUT_FILENAME = "invalid-data" # START_DATE = "2024-06-21" # END_DATE = "2024-06-28" START_MONTH = str(os.environ.get("START_MONTH", "2024-06")) @@ -10,3 +10,4 @@ END_MONTH = str(os.environ.get("END_MONTH", "2024-06")) INITIAL_SKIP = 0 PAGE_SIZE = 1000 REPAIR = false +DRYRUN = false \ No newline at end of file diff --git a/tools/analyze_data_synchronization/analyze.py b/tools/analyze_data_synchronization/analyze.py index c6ad021..d75c82c 100644 --- a/tools/analyze_data_synchronization/analyze.py +++ b/tools/analyze_data_synchronization/analyze.py @@ -7,27 +7,9 @@ import requests from azure.cosmos import cosmos_client from dotenv import load_dotenv +from datetime import datetime, timedelta -# All environment variables can be defined in tools/analyze_data_synchronization/.env -# -# Required environment variables: -# * MONGO_CONNECTION_STRING: The connection string to the MongoDB database -# * BASE_AZURE_BLOB_URL The base URL for the Azure Blob Storage -# * SERVICE_API_URL: The URL for the service API -# -# Optional environment variables: -# * AZURE_CONTAINER_NAME: The name of the Azure Blob Storage container (default: "develop-definition") -# * OUTPUT_FILE: The file to write the output to (default: "invalid_data.json") -# * REPAIR: If set to "true", the script will attempt to repair the data (default: false) -# * PAGE_SIZE: The number of documents to process at a time (default: 1000) -# * INITIAL_SKIP: The number of documents to skip before starting the analysis (default: 0) - used with START_DATE and END_DATE -# * START_MONTH: The first month to include in the query (default: 2024-01) -# * END_MONTH: The last month to include in the query (default: 2024-06) -# * START_DATE: The first date to include in the query (default: "") -# * END_DATE: The last date to include in the query (default: "") -# * VERBOSE: If set to "true", the script will output more information (default: false) - -# Commands to run this script on localhost from this directory after setting the environment variables: +# Commands to run this script on localhost from this directory after setting the environment variables (see initialize() below): # # cd tools/analyze_data_synchronization # python3 -m venv .venv @@ -35,28 +17,20 @@ # python3 -m pip install -r requirements.txt # python3 analyze.py -load_dotenv() - -MONGO_CONNECTION_STRING = str(os.environ.get("MONGO_CONNECTION_STRING")) -DB_NAME = "clearlydefined" -COLLECTION_NAME = "definitions-trimmed" - -BASE_AZURE_BLOB_URL = str(os.environ.get("BASE_AZURE_BLOB_URL")) -AZURE_CONTAINER_NAME = str(os.environ.get("AZURE_CONTAINER_NAME", "develop-definition")) - -SERVICE_API_URL = str(os.environ.get("SERVICE_API_URL")) - -START_MONTH = str(os.environ.get("START_MONTH", "2024-01")) -END_MONTH = str(os.environ.get("END_MONTH", "2024-06")) - -START_DATE = str(os.environ.get("START_DATE", "")) -END_DATE = str(os.environ.get("END_DATE", "")) - -REPAIR = str(os.environ.get("REPAIR", "false")).lower() == "true" -PAGE_SIZE = int(os.environ.get("PAGE_SIZE", 1000)) -INITIAL_SKIP = int(os.environ.get("INITIAL_SKIP", 0)) -OUTPUT_FILE = str(os.environ.get("OUTPUT_FILE", "invalid_data.json")) -VERBOSE = str(os.environ.get("VERBOSE", "false")).lower() == "true" +# This script analyzes the data synchronization between the CosmosDB and the Azure Blob Storage. +# It compares the licensed.declared field in the CosmosDB with the licensed.declared field in the +# Azure Blob Storage. If the fields do not match, the document is considered invalid. The script +# outputs a JSON file with summary statistics. +# +# The script can also repair the data by updating the licensed.declared field in the CosmosDB with +# the value from the Azure Blob Storage and makes a request to the service API to force the +# definitions to be re-processed. This insures that any other data in the DB document is also +# in sync with the source of truth in the blob store. +# +# The script can be run for a date range or for a range of months. The repair option is only +# supported for the date range option. The range of months is used to estimate the total number +# of invalid documents in the database by evaluating a sample of the data for each month in the +# range. # Example coordinates: composer/packagist/00f100/fcphp-cache/revision/0.1.0.json @@ -111,6 +85,83 @@ # ] +# All environment variables can be defined in `tools/analyze_data_synchronization/.env` +# See `tools/analyze_data_synchronization/.env.example` for an example. +# +# Required environment variables: +# * MONGO_CONNECTION_STRING: The connection string to the CosmosDB database +# * BASE_AZURE_BLOB_URL The base URL for the Azure Blob Storage +# * SERVICE_API_URL: The URL for the service API +# +# Optional environment variables: +# * AZURE_CONTAINER_NAME: The name of the Azure Blob Storage container (default: "develop-definition") +# * OUTPUT_FILE: The file to write the output to (default: "invalid_data.json") +# * REPAIR: If set to "true", the script will attempt to repair the data (default: false) +# * PAGE_SIZE: The number of documents to process at a time (default: 1000) +# * INITIAL_SKIP: The number of documents to skip before starting the analysis (default: 0) - used with START_DATE and END_DATE +# * START_DATE: The first date to include in the query (default: "") +# * END_DATE: The last date to include in the query (default: "") +# * START_MONTH: The first month to include in the query (default: 2024-01) - ignored if START_DATE is set +# * END_MONTH: The last month to include in the query (default: 2024-06) - ignored if START_DATE is set +# * VERBOSE: If set to "true", the script will output more information (default: false) +def initialize(): + """Set up global variables based on the environment variables""" + global MONGO_CONNECTION_STRING, DB_NAME, COLLECTION_NAME, BASE_AZURE_BLOB_URL + global AZURE_CONTAINER_NAME, SERVICE_API_URL, START_DATE, END_DATE, START_MONTH, END_MONTH + global INITIAL_SKIP, PAGE_SIZE, OUTPUT_FILE, REPAIR, VERBOSE, DRYRUN + + load_dotenv() + + MONGO_CONNECTION_STRING = str(os.environ.get("MONGO_CONNECTION_STRING")) + DB_NAME = "clearlydefined" + COLLECTION_NAME = "definitions-trimmed" + + BASE_AZURE_BLOB_URL = str(os.environ.get("BASE_AZURE_BLOB_URL")) + AZURE_CONTAINER_NAME = str(os.environ.get("AZURE_CONTAINER_NAME", "develop-definition")) + + SERVICE_API_URL = str(os.environ.get("SERVICE_API_URL")) + + START_DATE = str(os.environ.get("START_DATE", "")) # used to repair the data + END_DATE = str(os.environ.get("END_DATE", "")) + END_DATE = START_DATE if END_DATE < START_DATE else END_DATE + + START_MONTH = str(os.environ.get("START_MONTH", "2024-01")) # used to spot check the data for out-of-sync licenses + END_MONTH = str(os.environ.get("END_MONTH", "2024-06")) + if START_DATE and END_DATE: # if the date range is set, ignore the month range + START_MONTH = "" + END_MONTH = "" + + MAX_PAGE_SIZE = 500 # The service API can only process 500 definitions at a time, so it is limiting the max page size + PAGE_SIZE = int(os.environ.get("PAGE_SIZE", MAX_PAGE_SIZE)) + PAGE_SIZE = PAGE_SIZE if 0 < PAGE_SIZE < MAX_PAGE_SIZE else MAX_PAGE_SIZE + + INITIAL_SKIP = int(os.environ.get("INITIAL_SKIP", 0)) # used if processing fails part way through + INITIAL_SKIP = 0 if INITIAL_SKIP < 0 or START_MONTH else INITIAL_SKIP + + base_filename = str(os.environ.get("BASE_OUTPUT_FILENAME", "invalid-data")) + OUTPUT_FILE = filename(base_filename, START_DATE, END_DATE, START_MONTH, END_MONTH, INITIAL_SKIP) + + REPAIR = str(os.environ.get("REPAIR", "false")).lower() == "true" + REPAIR = False if START_MONTH else REPAIR # repair is not supported for spot checking multiple months + + VERBOSE = str(os.environ.get("VERBOSE", "false")).lower() == "true" + DRYRUN = str(os.environ.get("DRYRUN", "false")).lower() == "true" + +def filename(base_filename, start_date, end_date, start_month, end_month, initial_skip): + """Generate output filename based on the parameters passed in (e.g. 2024-01-01_thru_2024-01-31_invalid-data_offset-1200.json)""" + start_dt = start_date if start_date else start_month + end_dt = end_date if end_date else end_month + offset = '' + if initial_skip > 0: + offset = f"_offset-{initial_skip}" + + return start_dt + '_thru_' + end_dt + '_' + base_filename + offset + ".json" + +def custom_range_label(): + """Generate a range label for the custom date range""" + return f"{START_DATE}_thru_{END_DATE}_offset-{INITIAL_SKIP}" + + def fetch_blob(base_url, container_name, type, provider, namespace, name, revision): """Fetch the blob from the azure blob storage""" # need to encode the url for the %2f characters @@ -124,22 +175,23 @@ def fetch_blob(base_url, container_name, type, provider, namespace, name, revisi return {} return res.json() -def repair_data(service_url, type, provider, namespace, name, revision): - """Repair the data by requesting the definition from the service with the force parameter""" +def repair_all(service_url, coordinates): + """Repair the data by requesting the definitions from the service with the force parameter""" if VERBOSE: - print(f" Repairing data for {type}/{provider}/{namespace}/{name}/{revision}") - url = f"{service_url}/definitions/{type}/{provider}/{namespace}/{name}/{revision}?force=true" - res = requests.get(url) + print(f" Repairing data for {len(coordinates)} coordinates") + url = f"{service_url}/definitions?force=true" + res = requests.post(url, '\n'.join(coordinates)) if res.status_code != 200: return {} return res.json() def dump_data(data, filename): + """Write the data to a file""" with open(filename, "w") as f: json.dump(data, f) def initialize_stats(range_label, total_docs_count, invalid_data): - # TODO: Add container name to summary stats. + """Initialize stats for the range""" invalid_data[range_label] = { "stats": { "sample_total": 0, @@ -152,30 +204,32 @@ def initialize_stats(range_label, total_docs_count, invalid_data): } def update_stats(invalid_data, invalid_count, range_label, sample_count, checkpoint=False): + """Update the stats for the range at each checkpoint or at the end of a page of data""" invalid_data[range_label]["stats"]["sample_total"] = sample_count invalid_data[range_label]["stats"]["sample_invalid"] = invalid_count - percent_invalid = invalid_count / sample_count * 100 - invalid_data[range_label]["stats"]["percent_invalid"] = str(round(percent_invalid, 2)) + "%" + percent_invalid = round(invalid_count / sample_count * 100, 2) + invalid_data[range_label]["stats"]["percent_invalid"] = str(percent_invalid) + "%" total_count = invalid_data[range_label]["stats"]["total_documents"] invalid_data[range_label]["stats"]["total_estimated_invalid"] = round((total_count * percent_invalid) / 100) invalid_data[range_label]["stats"]["sample_percent_of_total"] = str(round((sample_count / total_count * 100), 2)) + "%" repaired = "" - if REPAIR: - repaired = " (repaired)" - if checkpoint: + if checkpoint and VERBOSE: print( - f"Checkpoint: total invalid data: {invalid_count}{repaired}, total items {sample_count} ({percent_invalid}%)" + f" Checkpoint: total invalid data: {invalid_count}, total items {sample_count} ({percent_invalid}%) - {datetime.now()}" ) else: + if REPAIR: + repaired = " (repaired)" print( - f"Total invalid data: {invalid_count}{repaired}, total items {sample_count} ({percent_invalid}%)" + f" Total invalid data: {invalid_count}{repaired}, total items {sample_count} ({percent_invalid}%) - {datetime.now()}" ) dump_data(invalid_data, OUTPUT_FILE) def create_months(start_month, end_month): + """Create a list of months between the start and end months""" start_year, start_month = start_month.split("-") end_year, end_month = end_month.split("-") months = [] @@ -188,28 +242,33 @@ def create_months(start_month, end_month): months.append(f"{year}-{str(month).zfill(2)}") return months -def page_count(collection, query, range_label, invalid_data): +def page_count_and_setup(collection, query, range_label, invalid_data): + """Get the count of pages and set up the stats for the range""" all_docs_count = collection.count_documents(query, max_time_ms=10000000) - initialize_stats(range_label, all_docs_count, invalid_data) if all_docs_count == 0: - print("No documents found missing licenses in {range_label}.") + print(f"No documents found with missing licenses in {range_label}.") return 0 if INITIAL_SKIP > 0: print(f"Skipping {INITIAL_SKIP} documents") all_docs_count -= INITIAL_SKIP - pages = all_docs_count // PAGE_SIZE + page_count = all_docs_count // PAGE_SIZE if all_docs_count % PAGE_SIZE: - pages += 1 - return pages + page_count += 1 + + est_hours_to_complete = round(page_count * 2.5 / 60) + est_completion_time = datetime.now() + timedelta(hours=est_hours_to_complete) + print(f"Found {all_docs_count} documents missing licenses in {range_label}. Estimated time to complete is {est_hours_to_complete} hours ending at {est_completion_time}.") + return page_count -def analyze_docs(collection, query, range_label, invalid_data): - pages = page_count(collection, query, range_label, invalid_data) - if pages == 0: +def analyze_docs(collection, query, range_label, invalid_data, one_pass=False): + """Analyze the documents in the collection for the given query""" + page_count = page_count_and_setup(collection, query, range_label, invalid_data) + if page_count == 0 or DRYRUN: return running_count_docs = 0 @@ -219,21 +278,27 @@ def analyze_docs(collection, query, range_label, invalid_data): if INITIAL_SKIP > 0: skip = INITIAL_SKIP while True: - print(f"Processing page {page+1} of {pages} in {range_label}") + print(f"Processing page {page+1} of {page_count} in {range_label} starting at offset {skip} - {datetime.now()}") docs = collection.find(query).skip(skip).limit(PAGE_SIZE).max_time_ms(10000000) new_docs_count, new_invalid_count = analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_label, invalid_data) running_count_invalid += new_invalid_count running_count_docs += new_docs_count - if new_docs_count == 0: + if new_docs_count == 0 or one_pass: break page += 1 skip = page * PAGE_SIZE + INITIAL_SKIP def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_label, invalid_data): + """Analyze a page of documents""" count_docs = 0 count_invalid = 0 + + repair_list = [] + bulk_operations = [] + for doc in docs: count_docs += 1 + blob = fetch_blob( BASE_AZURE_BLOB_URL, AZURE_CONTAINER_NAME, @@ -261,27 +326,38 @@ def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_ }, } if REPAIR: - # request definition with the force parameter to update the licensed.declared field in the database - collection.update_one( - {"_id": doc["_id"]}, - {"$set": {"licensed.declared": blob_licensed.get("declared")}}, - ) - blob = repair_data( - SERVICE_API_URL, - doc["coordinates"]["type"], - doc["coordinates"]["provider"], - doc["coordinates"].get("namespace", "-"), - doc["coordinates"]["name"], - doc["coordinates"]["revision"], + # add the coordinates to the repair list + coordinates = urllib.parse.quote( + doc["coordinates"]["type"] + \ + doc["coordinates"]["provider"] + \ + doc["coordinates"].get("namespace", "-") + \ + doc["coordinates"]["name"] + \ + doc["coordinates"]["revision"] ) + repair_list.append(coordinates) - + bulk_operations.append( + pymongo.UpdateOne( + {"_id": doc["_id"]}, + {"$set": {"licensed.declared": blob_licensed.get("declared")}}, + ) + ) + # Checkpoint in case mongo dies if count_docs % 100 == 0: total_invalid = running_count_invalid + count_invalid total_docs = running_count_docs + count_docs update_stats(invalid_data, total_invalid, range_label, total_docs, True) + if REPAIR and len(repair_list) > 0: + collection.bulk_write(bulk_operations) + blob = repair_all( + SERVICE_API_URL, + repair_list + ) + if VERBOSE: + print(f"Repaired {len(repair_list)} items") + total_invalid = running_count_invalid + count_invalid total_docs = running_count_docs + count_docs update_stats(invalid_data, total_invalid, range_label, total_docs) @@ -289,7 +365,9 @@ def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_ ### Main ### -print("Starting data synchronization analysis") +initialize() + +print(f"Starting data synchronization analysis at {datetime.now()}") client = pymongo.MongoClient(MONGO_CONNECTION_STRING) db = client[DB_NAME] @@ -318,13 +396,14 @@ def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_ print(f" START_DATE: {START_DATE}") print(f" END_DATE: {END_DATE}") + label = custom_range_label() if not DRYRUN else f"{custom_range_label()}_dryrun" analyze_docs( collection, { "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, "licensed.declared": {"$exists": False}, }, - "custom_range", + label, invalid_data, ) @@ -339,14 +418,19 @@ def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_ for month in months: print(f"Processing {month}") + label = month if not DRYRUN else f"{month}_dryrun" analyze_docs( collection, { - "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, + "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, "licensed.declared": {"$exists": False}, }, - month, + label, invalid_data, + one_pass=True, ) dump_data(invalid_data, OUTPUT_FILE) + +if DRYRUN: + print("Dry run completed. No data was modified.") From d5cd05e141e61927c0cf73600b2da8dc7c90fae2 Mon Sep 17 00:00:00 2001 From: "E. Lynette Rayle" Date: Wed, 24 Jul 2024 09:40:44 -0400 Subject: [PATCH 7/7] print as CSV when DRYRUN; add total count during range --- tools/analyze_data_synchronization/analyze.py | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/tools/analyze_data_synchronization/analyze.py b/tools/analyze_data_synchronization/analyze.py index d75c82c..ca392a6 100644 --- a/tools/analyze_data_synchronization/analyze.py +++ b/tools/analyze_data_synchronization/analyze.py @@ -242,32 +242,47 @@ def create_months(start_month, end_month): months.append(f"{year}-{str(month).zfill(2)}") return months -def page_count_and_setup(collection, query, range_label, invalid_data): +def page_count_and_setup(collection, all_query, missing_query, range_label, invalid_data): """Get the count of pages and set up the stats for the range""" - all_docs_count = collection.count_documents(query, - max_time_ms=10000000) - initialize_stats(range_label, all_docs_count, invalid_data) - if all_docs_count == 0: - print(f"No documents found with missing licenses in {range_label}.") + all_docs_count = collection.count_documents( + all_query, + max_time_ms=10000000 + ) + + docs_with_missing_count = collection.count_documents( + missing_query, + max_time_ms=10000000 + ) + initialize_stats(range_label, docs_with_missing_count, invalid_data) + if docs_with_missing_count == 0: + if DRYRUN: + print(f"{range_label}, {all_docs_count}, 0%, 0, 0, 0") + else: + print(f"No documents found with missing licenses out of {all_docs_count} total in {range_label}.") return 0 if INITIAL_SKIP > 0: print(f"Skipping {INITIAL_SKIP} documents") - all_docs_count -= INITIAL_SKIP + docs_with_missing_count -= INITIAL_SKIP - page_count = all_docs_count // PAGE_SIZE - if all_docs_count % PAGE_SIZE: + page_count = docs_with_missing_count // PAGE_SIZE + if docs_with_missing_count % PAGE_SIZE: page_count += 1 est_hours_to_complete = round(page_count * 2.5 / 60) est_completion_time = datetime.now() + timedelta(hours=est_hours_to_complete) - print(f"Found {all_docs_count} documents missing licenses in {range_label}. Estimated time to complete is {est_hours_to_complete} hours ending at {est_completion_time}.") + + if DRYRUN: + print(f"{range_label}, {all_docs_count}, {docs_with_missing_count}, {round(docs_with_missing_count/all_docs_count, 4)*100}%, {est_hours_to_complete}, {round(est_hours_to_complete / 24, 2)}") + else: + print(f"Found {docs_with_missing_count} documents missing licenses out of {all_docs_count} total in {range_label}. Estimated time to complete is {est_hours_to_complete} hours ending at {est_completion_time}.") return page_count def analyze_docs(collection, query, range_label, invalid_data, one_pass=False): """Analyze the documents in the collection for the given query""" - page_count = page_count_and_setup(collection, query, range_label, invalid_data) + missing_query = {**query, "licensed.declared": {"$exists": False}} + page_count = page_count_and_setup(collection, query, missing_query, range_label, invalid_data) if page_count == 0 or DRYRUN: return @@ -279,7 +294,7 @@ def analyze_docs(collection, query, range_label, invalid_data, one_pass=False): skip = INITIAL_SKIP while True: print(f"Processing page {page+1} of {page_count} in {range_label} starting at offset {skip} - {datetime.now()}") - docs = collection.find(query).skip(skip).limit(PAGE_SIZE).max_time_ms(10000000) + docs = collection.find(missing_query).skip(skip).limit(PAGE_SIZE).max_time_ms(10000000) new_docs_count, new_invalid_count = analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_label, invalid_data) running_count_invalid += new_invalid_count running_count_docs += new_docs_count @@ -395,13 +410,15 @@ def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_ print("Processing custom date range") print(f" START_DATE: {START_DATE}") print(f" END_DATE: {END_DATE}") - + + if DRYRUN: + print("Range, # all docs, # missing, % missing, est hours to complete, est days to complete") + label = custom_range_label() if not DRYRUN else f"{custom_range_label()}_dryrun" analyze_docs( collection, { "_meta.updated": {"$gte": START_DATE, "$lte": END_DATE}, - "licensed.declared": {"$exists": False}, }, label, invalid_data, @@ -415,17 +432,19 @@ def analyze_page_of_docs(docs, running_count_docs, running_count_invalid, range_ months = create_months(START_MONTH, END_MONTH) print(f" {months}") + if DRYRUN: + print("Range, # all docs, # missing, % missing, est hours to complete, est days to complete") + for month in months: - print(f"Processing {month}") + if not DRYRUN: + print(f"Processing {month}") - label = month if not DRYRUN else f"{month}_dryrun" analyze_docs( collection, { "_meta.updated": {"$gte": f"{month}-01", "$lte": f"{month}-31"}, - "licensed.declared": {"$exists": False}, }, - label, + month, invalid_data, one_pass=True, )