Skip to content

Commit 39a7861

Browse files
fix: get_tasks에서 task 생성시 일부 어카운트 누락 수정 (paginator issue)
1 parent 9bef250 commit 39a7861

File tree

2 files changed

+77
-46
lines changed

2 files changed

+77
-46
lines changed

src/plugin/connector/aws_s3_connector.py

Lines changed: 65 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,76 +26,102 @@ def __init__(self, *args, **kwargs):
2626
def create_session(self, options: dict, secret_data: dict, schema: str):
2727
self._check_secret_data(secret_data)
2828

29-
self.s3_bucket = secret_data['aws_s3_bucket']
30-
aws_access_key_id = secret_data['aws_access_key_id']
31-
aws_secret_access_key = secret_data['aws_secret_access_key']
32-
region_name = secret_data.get('region_name')
33-
role_arn = secret_data.get('role_arn')
34-
external_id = secret_data.get('external_id')
29+
self.s3_bucket = secret_data["aws_s3_bucket"]
30+
aws_access_key_id = secret_data["aws_access_key_id"]
31+
aws_secret_access_key = secret_data["aws_secret_access_key"]
32+
region_name = secret_data.get("region_name")
33+
role_arn = secret_data.get("role_arn")
34+
external_id = secret_data.get("external_id")
3535

3636
if role_arn:
37-
self._create_session_aws_assume_role(aws_access_key_id, aws_secret_access_key, region_name,
38-
role_arn, external_id)
37+
self._create_session_aws_assume_role(
38+
aws_access_key_id,
39+
aws_secret_access_key,
40+
region_name,
41+
role_arn,
42+
external_id,
43+
)
3944
else:
40-
self._create_session_aws_access_key(aws_access_key_id, aws_secret_access_key, region_name)
45+
self._create_session_aws_access_key(
46+
aws_access_key_id, aws_secret_access_key, region_name
47+
)
4148

42-
self.s3_client = self.session.client('s3')
49+
self.s3_client = self.session.client("s3")
4350

44-
def list_objects(self, path):
45-
return self.s3_client.list_objects(Bucket=self.s3_bucket, Prefix=path)
51+
def list_objects(self, path, delimiter=None):
52+
params = {"Bucket": self.s3_bucket, "Prefix": path}
53+
if delimiter is not None:
54+
params["Delimiter"] = delimiter
55+
56+
return self.s3_client.list_objects(**params)
4657

4758
def get_cost_data(self, key):
4859
obj = self.s3_client.get_object(Bucket=self.s3_bucket, Key=key)
49-
df = pd.read_parquet(io.BytesIO(obj['Body'].read()))
60+
df = pd.read_parquet(io.BytesIO(obj["Body"].read()))
5061
df = df.replace({np.nan: None})
5162

52-
costs_data = df.to_dict('records')
63+
costs_data = df.to_dict("records")
5364

54-
_LOGGER.debug(f'[get_cost_data] costs count({key}): {len(costs_data)}')
65+
_LOGGER.debug(f"[get_cost_data] costs count({key}): {len(costs_data)}")
5566

5667
# Paginate
5768
page_count = int(len(costs_data) / _PAGE_SIZE) + 1
5869

5970
for page_num in range(page_count):
6071
offset = _PAGE_SIZE * page_num
61-
yield costs_data[offset:offset + _PAGE_SIZE]
72+
yield costs_data[offset : offset + _PAGE_SIZE]
6273

6374
@staticmethod
6475
def _check_secret_data(secret_data):
65-
if 'aws_access_key_id' not in secret_data:
66-
raise ERROR_REQUIRED_PARAMETER(key='secret_data.aws_access_key_id')
76+
if "aws_access_key_id" not in secret_data:
77+
raise ERROR_REQUIRED_PARAMETER(key="secret_data.aws_access_key_id")
6778

68-
if 'aws_secret_access_key' not in secret_data:
69-
raise ERROR_REQUIRED_PARAMETER(key='secret_data.aws_secret_access_key')
79+
if "aws_secret_access_key" not in secret_data:
80+
raise ERROR_REQUIRED_PARAMETER(key="secret_data.aws_secret_access_key")
7081

71-
if 'aws_s3_bucket' not in secret_data:
72-
raise ERROR_REQUIRED_PARAMETER(key='secret_data.aws_s3_bucket')
82+
if "aws_s3_bucket" not in secret_data:
83+
raise ERROR_REQUIRED_PARAMETER(key="secret_data.aws_s3_bucket")
7384

74-
def _create_session_aws_access_key(self, aws_access_key_id, aws_secret_access_key, region_name):
75-
self.session = boto3.Session(aws_access_key_id=aws_access_key_id,
76-
aws_secret_access_key=aws_secret_access_key,
77-
region_name=region_name)
85+
def _create_session_aws_access_key(
86+
self, aws_access_key_id, aws_secret_access_key, region_name
87+
):
88+
self.session = boto3.Session(
89+
aws_access_key_id=aws_access_key_id,
90+
aws_secret_access_key=aws_secret_access_key,
91+
region_name=region_name,
92+
)
7893

79-
sts = self.session.client('sts')
94+
sts = self.session.client("sts")
8095
sts.get_caller_identity()
8196

82-
def _create_session_aws_assume_role(self, aws_access_key_id, aws_secret_access_key, region_name, role_arn, external_id):
83-
self._create_session_aws_access_key(aws_access_key_id, aws_secret_access_key, region_name)
97+
def _create_session_aws_assume_role(
98+
self,
99+
aws_access_key_id,
100+
aws_secret_access_key,
101+
region_name,
102+
role_arn,
103+
external_id,
104+
):
105+
self._create_session_aws_access_key(
106+
aws_access_key_id, aws_secret_access_key, region_name
107+
)
84108

85-
sts = self.session.client('sts')
109+
sts = self.session.client("sts")
86110

87111
_assume_role_request = {
88-
'RoleArn': role_arn,
89-
'RoleSessionName': utils.generate_id('AssumeRoleSession'),
112+
"RoleArn": role_arn,
113+
"RoleSessionName": utils.generate_id("AssumeRoleSession"),
90114
}
91115

92116
if external_id:
93-
_assume_role_request.update({'ExternalId': external_id})
117+
_assume_role_request.update({"ExternalId": external_id})
94118

95119
assume_role_object = sts.assume_role(**_assume_role_request)
96-
credentials = assume_role_object['Credentials']
97-
98-
self.session = boto3.Session(aws_access_key_id=credentials['AccessKeyId'],
99-
aws_secret_access_key=credentials['SecretAccessKey'],
100-
region_name=region_name,
101-
aws_session_token=credentials['SessionToken'])
120+
credentials = assume_role_object["Credentials"]
121+
122+
self.session = boto3.Session(
123+
aws_access_key_id=credentials["AccessKeyId"],
124+
aws_secret_access_key=credentials["SecretAccessKey"],
125+
region_name=region_name,
126+
aws_session_token=credentials["SessionToken"],
127+
)

src/plugin/manager/job_manager.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,20 @@ def get_tasks_directory_type(
119119
path = f"SPACE_ONE/billing/database={database}/"
120120

121121
if not accounts:
122-
response = aws_s3_connector.list_objects(path)
123-
for content in response.get("Contents", []):
124-
key = content["Key"]
125-
account_id = key.split("/")[3].split("=")[-1]
122+
response = aws_s3_connector.list_objects(path, delimiter="/")
123+
path_length = len(path)
126124

127-
if account_id and not "":
128-
accounts.append(account_id)
125+
for prefix in response.get("CommonPrefixes", []):
126+
folder = prefix["Prefix"]
127+
relative_path = folder[path_length:].lstrip("/")
128+
129+
if relative_path:
130+
first_folder = relative_path.split("/")[0]
131+
if first_folder.startswith("account_id="):
132+
account_id = first_folder.split("=", 1)[-1]
133+
if account_id and account_id.strip():
134+
accounts.append(account_id)
129135

130-
accounts = list(set(accounts))
131136
for account_id in accounts:
132137
task_options = {
133138
"account_id": account_id,

0 commit comments

Comments
 (0)