Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ baseType,category,subCategory,relatedUniques
"Cobalt Jewel","any jewel","base jewel","Grand Spectrum|Forbidden Flesh|The Balance of Terror"
"Viridian Jewel","any jewel","base jewel","Impossible Escape|Grand Spectrum"
"Prismatic Jewel","any jewel","base jewel","Watcher's Eye|Sublime Vision|The Light of Meaning|Bound By Destiny"
"Timeless Jewel","any jewel","base jewel","Glorious Vanity|Lethal Pride|Brutal Restraint|Militant Faith|Elegant Hubris"
"Timeless Jewel","any jewel","base jewel","Glorious Vanity|Lethal Pride|Brutal Restraint|Militant Faith|Elegant Hubris|Heroic Tragedy"
"Large Cluster Jewel","any jewel","cluster jewel","Voices"
"Medium Cluster Jewel","any jewel","cluster jewel",
"Small Cluster Jewel","any jewel","cluster jewel",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,118 @@
from pandas import DataFrame
from io import StringIO

import pandas as pd
import requests

from data_retrieval_app.data_deposit.data_depositor_base import DataDepositorBase
from data_retrieval_app.logs.logger import data_deposit_logger as logger


class ItemBaseTypeDataDepositor(DataDepositorBase):
def __init__(self) -> None:
super().__init__(data_type="item_base_type")

self.update_url = str(self.data_url)
self.data_url += "?on_duplicate_pkey_do_nothing=true"
self.current_basetypes_df = self._get_current_base_types()

def _get_current_base_types(self) -> pd.DataFrame:
logger.info("Retrieving previously deposited data.")

response = requests.get(self.data_url, headers=self.pom_auth_headers)

df = pd.DataFrame()
# Check if the request was successful
if response.status_code == 200:
# Load the JSON data into a pandas DataFrame
json_io = StringIO(response.content.decode("utf-8"))
df = pd.read_json(json_io, dtype=str)

if df.empty:
logger.info("Found no previously deposited data.")
return pd.DataFrame(
columns=[
"itemBaseTypeId",
"baseType",
"category",
"subCategory",
"relatedUniques",
]
)
else:
logger.info("Successfully retrieved previously deposited data.")
return df

def _update_duplicates(self, duplicate_df: pd.DataFrame):
"""
Note that this method does not remove uniques which are not present in the file.
"""
changed_rows = duplicate_df[
(duplicate_df["relatedUniques"] != duplicate_df["relatedUniques_y"])
& (~duplicate_df["relatedUniques"].isna())
]

if not changed_rows.empty:
logger.info(
"Found changes in related uniques for some item base types. Updating these changes."
)
changed_rows["new_related_uniques"] = changed_rows.apply(
lambda row: (
"|".join(
set(
row["relatedUniques"].split("|")
+ row["relatedUniques_y"].split("|")
)
)
if row["relatedUniques_y"]
else row["relatedUniques"]
),
axis=1,
)
headers = {
"accept": "application/json",
"Content-Type": "application/json",
}
headers.update(self.pom_auth_headers)

for _, row in changed_rows.iterrows():
item_base_type_id = row["itemBaseTypeId"]
data = {
"baseType": row["baseType"],
"category": row["category"],
"subCategory": row["subCategory"],
"relatedUniques": row["new_related_uniques"],
}

try:
response = requests.put(
self.update_url + str(item_base_type_id),
json=data,
headers=headers,
# add HTTP Basic Auth
)
response.raise_for_status()
except Exception as e:
logger.error(
f"The following error occurred while making request during _update_duplicates modifiers: {e}"
)
raise e

def _process_data(self, df: pd.DataFrame) -> pd.DataFrame:
merged_df = pd.merge(
df,
self.current_basetypes_df,
how="left",
on="baseType",
suffixes=("", "_y"),
)
non_duplicate_mask = merged_df["itemBaseTypeId"].isna()
non_duplicate_df = merged_df[non_duplicate_mask]
non_duplicate_df = non_duplicate_df.drop(
columns=[
column for column in non_duplicate_df.columns if column.endswith("_y")
]
)

self._update_duplicates(merged_df[~non_duplicate_mask])

def _process_data(self, df: DataFrame) -> DataFrame:
return df
return non_duplicate_df
Loading