From 39a7861a70451c1106f8e5fe382079dc3faed8a9 Mon Sep 17 00:00:00 2001 From: jinyoungmoonDEV Date: Wed, 24 Sep 2025 14:33:49 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20get=5Ftasks=EC=97=90=EC=84=9C=20task=20?= =?UTF-8?q?=EC=83=9D=EC=84=B1=EC=8B=9C=20=EC=9D=BC=EB=B6=80=20=EC=96=B4?= =?UTF-8?q?=EC=B9=B4=EC=9A=B4=ED=8A=B8=20=EB=88=84=EB=9D=BD=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=20(paginator=20issue)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugin/connector/aws_s3_connector.py | 104 ++++++++++++++--------- src/plugin/manager/job_manager.py | 19 +++-- 2 files changed, 77 insertions(+), 46 deletions(-) diff --git a/src/plugin/connector/aws_s3_connector.py b/src/plugin/connector/aws_s3_connector.py index 25cca59..a834636 100644 --- a/src/plugin/connector/aws_s3_connector.py +++ b/src/plugin/connector/aws_s3_connector.py @@ -26,76 +26,102 @@ def __init__(self, *args, **kwargs): def create_session(self, options: dict, secret_data: dict, schema: str): self._check_secret_data(secret_data) - self.s3_bucket = secret_data['aws_s3_bucket'] - aws_access_key_id = secret_data['aws_access_key_id'] - aws_secret_access_key = secret_data['aws_secret_access_key'] - region_name = secret_data.get('region_name') - role_arn = secret_data.get('role_arn') - external_id = secret_data.get('external_id') + self.s3_bucket = secret_data["aws_s3_bucket"] + aws_access_key_id = secret_data["aws_access_key_id"] + aws_secret_access_key = secret_data["aws_secret_access_key"] + region_name = secret_data.get("region_name") + role_arn = secret_data.get("role_arn") + external_id = secret_data.get("external_id") if role_arn: - self._create_session_aws_assume_role(aws_access_key_id, aws_secret_access_key, region_name, - role_arn, external_id) + self._create_session_aws_assume_role( + aws_access_key_id, + aws_secret_access_key, + region_name, + role_arn, + external_id, + ) else: - self._create_session_aws_access_key(aws_access_key_id, aws_secret_access_key, region_name) + self._create_session_aws_access_key( + aws_access_key_id, aws_secret_access_key, region_name + ) - self.s3_client = self.session.client('s3') + self.s3_client = self.session.client("s3") - def list_objects(self, path): - return self.s3_client.list_objects(Bucket=self.s3_bucket, Prefix=path) + def list_objects(self, path, delimiter=None): + params = {"Bucket": self.s3_bucket, "Prefix": path} + if delimiter is not None: + params["Delimiter"] = delimiter + + return self.s3_client.list_objects(**params) def get_cost_data(self, key): obj = self.s3_client.get_object(Bucket=self.s3_bucket, Key=key) - df = pd.read_parquet(io.BytesIO(obj['Body'].read())) + df = pd.read_parquet(io.BytesIO(obj["Body"].read())) df = df.replace({np.nan: None}) - costs_data = df.to_dict('records') + costs_data = df.to_dict("records") - _LOGGER.debug(f'[get_cost_data] costs count({key}): {len(costs_data)}') + _LOGGER.debug(f"[get_cost_data] costs count({key}): {len(costs_data)}") # Paginate page_count = int(len(costs_data) / _PAGE_SIZE) + 1 for page_num in range(page_count): offset = _PAGE_SIZE * page_num - yield costs_data[offset:offset + _PAGE_SIZE] + yield costs_data[offset : offset + _PAGE_SIZE] @staticmethod def _check_secret_data(secret_data): - if 'aws_access_key_id' not in secret_data: - raise ERROR_REQUIRED_PARAMETER(key='secret_data.aws_access_key_id') + if "aws_access_key_id" not in secret_data: + raise ERROR_REQUIRED_PARAMETER(key="secret_data.aws_access_key_id") - if 'aws_secret_access_key' not in secret_data: - raise ERROR_REQUIRED_PARAMETER(key='secret_data.aws_secret_access_key') + if "aws_secret_access_key" not in secret_data: + raise ERROR_REQUIRED_PARAMETER(key="secret_data.aws_secret_access_key") - if 'aws_s3_bucket' not in secret_data: - raise ERROR_REQUIRED_PARAMETER(key='secret_data.aws_s3_bucket') + if "aws_s3_bucket" not in secret_data: + raise ERROR_REQUIRED_PARAMETER(key="secret_data.aws_s3_bucket") - def _create_session_aws_access_key(self, aws_access_key_id, aws_secret_access_key, region_name): - self.session = boto3.Session(aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - region_name=region_name) + def _create_session_aws_access_key( + self, aws_access_key_id, aws_secret_access_key, region_name + ): + self.session = boto3.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name, + ) - sts = self.session.client('sts') + sts = self.session.client("sts") sts.get_caller_identity() - def _create_session_aws_assume_role(self, aws_access_key_id, aws_secret_access_key, region_name, role_arn, external_id): - self._create_session_aws_access_key(aws_access_key_id, aws_secret_access_key, region_name) + def _create_session_aws_assume_role( + self, + aws_access_key_id, + aws_secret_access_key, + region_name, + role_arn, + external_id, + ): + self._create_session_aws_access_key( + aws_access_key_id, aws_secret_access_key, region_name + ) - sts = self.session.client('sts') + sts = self.session.client("sts") _assume_role_request = { - 'RoleArn': role_arn, - 'RoleSessionName': utils.generate_id('AssumeRoleSession'), + "RoleArn": role_arn, + "RoleSessionName": utils.generate_id("AssumeRoleSession"), } if external_id: - _assume_role_request.update({'ExternalId': external_id}) + _assume_role_request.update({"ExternalId": external_id}) assume_role_object = sts.assume_role(**_assume_role_request) - credentials = assume_role_object['Credentials'] - - self.session = boto3.Session(aws_access_key_id=credentials['AccessKeyId'], - aws_secret_access_key=credentials['SecretAccessKey'], - region_name=region_name, - aws_session_token=credentials['SessionToken']) + credentials = assume_role_object["Credentials"] + + self.session = boto3.Session( + aws_access_key_id=credentials["AccessKeyId"], + aws_secret_access_key=credentials["SecretAccessKey"], + region_name=region_name, + aws_session_token=credentials["SessionToken"], + ) diff --git a/src/plugin/manager/job_manager.py b/src/plugin/manager/job_manager.py index 26a2e7c..7b65e8c 100644 --- a/src/plugin/manager/job_manager.py +++ b/src/plugin/manager/job_manager.py @@ -119,15 +119,20 @@ def get_tasks_directory_type( path = f"SPACE_ONE/billing/database={database}/" if not accounts: - response = aws_s3_connector.list_objects(path) - for content in response.get("Contents", []): - key = content["Key"] - account_id = key.split("/")[3].split("=")[-1] + response = aws_s3_connector.list_objects(path, delimiter="/") + path_length = len(path) - if account_id and not "": - accounts.append(account_id) + for prefix in response.get("CommonPrefixes", []): + folder = prefix["Prefix"] + relative_path = folder[path_length:].lstrip("/") + + if relative_path: + first_folder = relative_path.split("/")[0] + if first_folder.startswith("account_id="): + account_id = first_folder.split("=", 1)[-1] + if account_id and account_id.strip(): + accounts.append(account_id) - accounts = list(set(accounts)) for account_id in accounts: task_options = { "account_id": account_id,