Skip to content

Commit 3e2a4cf

Browse files
authored
Merge pull request #11 from ImMin5/master
Add task_type option for job tasks
2 parents 7419e61 + ea46baf commit 3e2a4cf

File tree

4 files changed

+307
-195
lines changed

4 files changed

+307
-195
lines changed

src/plugin/main.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
app = DataSourcePluginServer()
88

99

10-
@app.route('DataSource.init')
10+
@app.route("DataSource.init")
1111
def data_source_init(params: dict) -> dict:
12-
""" init plugin by options
12+
"""init plugin by options
1313
1414
Args:
1515
params (DataSourceInitRequest): {
@@ -22,15 +22,15 @@ def data_source_init(params: dict) -> dict:
2222
'metadata': 'dict'
2323
}
2424
"""
25-
options = params['options']
25+
options = params["options"]
2626

2727
data_source_mgr = DataSourceManager()
2828
return data_source_mgr.init_response(options)
2929

3030

31-
@app.route('DataSource.verify')
31+
@app.route("DataSource.verify")
3232
def data_source_verify(params: dict) -> None:
33-
""" Verifying data source plugin
33+
"""Verifying data source plugin
3434
3535
Args:
3636
params (CollectorVerifyRequest): {
@@ -44,18 +44,18 @@ def data_source_verify(params: dict) -> None:
4444
None
4545
"""
4646

47-
options = params['options']
48-
secret_data = params['secret_data']
49-
domain_id = params.get('domain_id')
50-
schema = params.get('schema')
47+
options = params["options"]
48+
secret_data = params["secret_data"]
49+
domain_id = params.get("domain_id")
50+
schema = params.get("schema")
5151

5252
data_source_mgr = DataSourceManager()
5353
data_source_mgr.verify_plugin(options, secret_data, domain_id, schema)
5454

5555

56-
@app.route('Job.get_tasks')
56+
@app.route("Job.get_tasks")
5757
def job_get_tasks(params: dict) -> dict:
58-
""" Get job tasks
58+
"""Get job tasks
5959
6060
Args:
6161
params (JobGetTaskRequest): {
@@ -75,20 +75,29 @@ def job_get_tasks(params: dict) -> dict:
7575
7676
"""
7777

78-
domain_id = params['domain_id']
79-
options = params['options']
80-
secret_data = params['secret_data']
81-
schema = params.get('schema')
82-
start = params.get('start')
83-
last_synchronized_at = params.get('last_synchronized_at')
78+
domain_id = params["domain_id"]
79+
options = params["options"]
80+
secret_data = params["secret_data"]
81+
schema = params.get("schema")
82+
start = params.get("start")
83+
last_synchronized_at = params.get("last_synchronized_at")
8484

8585
job_mgr = JobManager()
86-
return job_mgr.get_tasks(domain_id, options, secret_data, schema, start, last_synchronized_at)
86+
task_type = options.get("task_type", "identity")
8787

88+
if task_type == "identity":
89+
return job_mgr.get_tasks(
90+
domain_id, options, secret_data, schema, start, last_synchronized_at
91+
)
92+
else:
93+
return job_mgr.get_tasks_directory_type(
94+
domain_id, options, secret_data, schema, start, last_synchronized_at
95+
)
8896

89-
@app.route('Cost.get_data')
97+
98+
@app.route("Cost.get_data")
9099
def cost_get_data(params: dict) -> Generator[dict, None, None]:
91-
""" Get external cost data
100+
"""Get external cost data
92101
93102
Args:
94103
params (CostGetDataRequest): {
@@ -117,11 +126,11 @@ def cost_get_data(params: dict) -> Generator[dict, None, None]:
117126
}
118127
"""
119128

120-
options = params['options']
121-
secret_data = params['secret_data']
129+
options = params["options"]
130+
secret_data = params["secret_data"]
122131

123-
task_options = params.get('task_options', {})
124-
schema = params.get('schema')
132+
task_options = params.get("task_options", {})
133+
schema = params.get("schema")
125134

126135
cost_mgr = CostManager()
127136
return cost_mgr.get_data(options, secret_data, task_options, schema)

src/plugin/manager/cost_manager.py

Lines changed: 109 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -12,38 +12,38 @@
1212
_LOGGER = logging.getLogger(__name__)
1313

1414
_REGION_MAP = {
15-
'APE1': 'ap-east-1',
16-
'APN1': 'ap-northeast-1',
17-
'APN2': 'ap-northeast-2',
18-
'APN3': 'ap-northeast-3',
19-
'APS1': 'ap-southeast-1',
20-
'APS2': 'ap-southeast-2',
21-
'APS3': 'ap-south-1',
22-
'CAN1': 'ca-central-1',
23-
'CPT': 'af-south-1',
24-
'EUN1': 'eu-north-1',
25-
'EUC1': 'eu-central-1',
26-
'EU': 'eu-west-1',
27-
'EUW2': 'eu-west-2',
28-
'EUW3': 'eu-west-3',
29-
'MES1': 'me-south-1',
30-
'SAE1': 'sa-east-1',
31-
'UGW1': 'AWS GovCloud (US-West)',
32-
'UGE1': 'AWS GovCloud (US-East)',
33-
'USE1': 'us-east-1',
34-
'USE2': 'us-east-2',
35-
'USW1': 'us-west-1',
36-
'USW2': 'us-west-2',
37-
'AP': 'Asia Pacific',
38-
'AU': 'Australia',
39-
'CA': 'Canada',
15+
"APE1": "ap-east-1",
16+
"APN1": "ap-northeast-1",
17+
"APN2": "ap-northeast-2",
18+
"APN3": "ap-northeast-3",
19+
"APS1": "ap-southeast-1",
20+
"APS2": "ap-southeast-2",
21+
"APS3": "ap-south-1",
22+
"CAN1": "ca-central-1",
23+
"CPT": "af-south-1",
24+
"EUN1": "eu-north-1",
25+
"EUC1": "eu-central-1",
26+
"EU": "eu-west-1",
27+
"EUW2": "eu-west-2",
28+
"EUW3": "eu-west-3",
29+
"MES1": "me-south-1",
30+
"SAE1": "sa-east-1",
31+
"UGW1": "AWS GovCloud (US-West)",
32+
"UGE1": "AWS GovCloud (US-East)",
33+
"USE1": "us-east-1",
34+
"USE2": "us-east-2",
35+
"USW1": "us-west-1",
36+
"USW2": "us-west-2",
37+
"AP": "Asia Pacific",
38+
"AU": "Australia",
39+
"CA": "Canada",
4040
# 'EU': 'Europe and Israel',
41-
'IN': 'India',
42-
'JP': 'Japan',
43-
'ME': 'Middle East',
44-
'SA': 'South America',
45-
'US': 'United States',
46-
'ZA': 'South Africa',
41+
"IN": "India",
42+
"JP": "Japan",
43+
"ME": "Middle East",
44+
"SA": "South America",
45+
"US": "United States",
46+
"ZA": "South Africa",
4747
}
4848

4949

@@ -54,41 +54,47 @@ def __init__(self, *args, **kwargs):
5454
self.aws_s3_connector = AWSS3Connector()
5555
self.space_connector = SpaceONEConnector()
5656

57-
def get_data(self, options: dict, secret_data: dict, task_options: dict, schema: str = None) \
58-
-> Generator[dict, None, None]:
57+
def get_data(
58+
self, options: dict, secret_data: dict, task_options: dict, schema: str = None
59+
) -> Generator[dict, None, None]:
60+
task_type = task_options.get("task_type", "identity")
61+
5962
self.aws_s3_connector.create_session(options, secret_data, schema)
6063
self._check_task_options(task_options)
6164

62-
start = task_options['start']
63-
account_id = task_options['account_id']
64-
service_account_id = task_options['service_account_id']
65-
database = task_options['database']
66-
is_sync = task_options['is_sync']
65+
start = task_options["start"]
66+
account_id = task_options["account_id"]
67+
database = task_options["database"]
6768

68-
if is_sync == 'false':
69-
self._update_sync_state(options, secret_data, schema, service_account_id)
69+
if task_type == "identity":
70+
service_account_id = task_options["service_account_id"]
71+
is_sync = task_options["is_sync"]
72+
if is_sync == "false":
73+
self._update_sync_state(
74+
options, secret_data, schema, service_account_id
75+
)
7076

7177
date_ranges = self._get_date_range(start)
7278

7379
for date in date_ranges:
74-
year, month = date.split('-')
75-
path = f'SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}'
80+
year, month = date.split("-")
81+
path = f"SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}"
7682
response = self.aws_s3_connector.list_objects(path)
77-
contents = response.get('Contents', [])
83+
contents = response.get("Contents", [])
7884
for content in contents:
79-
response_stream = self.aws_s3_connector.get_cost_data(content['Key'])
85+
response_stream = self.aws_s3_connector.get_cost_data(content["Key"])
8086
for results in response_stream:
8187
yield self._make_cost_data(results, account_id)
8288

83-
yield {
84-
'results': []
85-
}
89+
yield {"results": []}
8690

8791
def _update_sync_state(self, options, secret_data, schema, service_account_id):
8892
self.space_connector.init_client(options, secret_data, schema)
89-
service_account_info = self.space_connector.get_service_account(service_account_id)
90-
tags = service_account_info.get('tags', {})
91-
tags['is_sync'] = 'true'
93+
service_account_info = self.space_connector.get_service_account(
94+
service_account_id
95+
)
96+
tags = service_account_info.get("tags", {})
97+
tags["is_sync"] = "true"
9298
self.space_connector.update_service_account(service_account_id, tags)
9399

94100
def _make_cost_data(self, results, account_id):
@@ -113,95 +119,97 @@ class CostSummaryItem(BaseModel):
113119

114120
for result in results:
115121
try:
116-
region = result['region'] or 'USE1'
117-
service_code = result['service_code']
118-
usage_type = result['usage_type']
122+
region = result["region"] or "USE1"
123+
service_code = result["service_code"]
124+
usage_type = result["usage_type"]
119125
data = {
120-
'cost': result.get('usage_cost', 0.0) or 0.0,
121-
'usage_quantity': result['usage_quantity'],
122-
'usage_unit': None,
123-
'provider': 'aws',
124-
'region_code': _REGION_MAP.get(region, region),
125-
'product': service_code,
126-
'usage_type': usage_type,
127-
'billed_date': result['usage_date'],
128-
'additional_info': {
129-
'Instance Type': result['instance_type'],
130-
'Account ID': account_id
126+
"cost": result.get("usage_cost", 0.0) or 0.0,
127+
"usage_quantity": result["usage_quantity"],
128+
"usage_unit": None,
129+
"provider": "aws",
130+
"region_code": _REGION_MAP.get(region, region),
131+
"product": service_code,
132+
"usage_type": usage_type,
133+
"billed_date": result["usage_date"],
134+
"additional_info": {
135+
"Instance Type": result["instance_type"],
136+
"Account ID": account_id,
131137
},
132-
'tags': self._get_tags_from_cost_data(result)
138+
"tags": self._get_tags_from_cost_data(result),
133139
}
134140

135-
if service_code == 'AWSDataTransfer':
136-
data['usage_unit'] = 'Bytes'
137-
if usage_type.find('-In-Bytes') > 0:
138-
data['additional_info']['Usage Type Details'] = 'Transfer In'
139-
elif usage_type.find('-Out-Bytes') > 0:
140-
data['additional_info']['Usage Type Details'] = 'Transfer Out'
141+
if service_code == "AWSDataTransfer":
142+
data["usage_unit"] = "Bytes"
143+
if usage_type.find("-In-Bytes") > 0:
144+
data["additional_info"]["Usage Type Details"] = "Transfer In"
145+
elif usage_type.find("-Out-Bytes") > 0:
146+
data["additional_info"]["Usage Type Details"] = "Transfer Out"
141147
else:
142-
data['additional_info']['Usage Type Details'] = 'Transfer Etc'
143-
elif service_code == 'AmazonCloudFront':
144-
if usage_type.find('-HTTPS') > 0:
145-
data['usage_unit'] = 'Count'
146-
data['additional_info']['Usage Type Details'] = 'HTTPS Requests'
147-
elif usage_type.find('-Out-Bytes') > 0:
148-
data['usage_unit'] = 'GB'
149-
data['additional_info']['Usage Type Details'] = 'Transfer Out'
148+
data["additional_info"]["Usage Type Details"] = "Transfer Etc"
149+
elif service_code == "AmazonCloudFront":
150+
if usage_type.find("-HTTPS") > 0:
151+
data["usage_unit"] = "Count"
152+
data["additional_info"]["Usage Type Details"] = "HTTPS Requests"
153+
elif usage_type.find("-Out-Bytes") > 0:
154+
data["usage_unit"] = "GB"
155+
data["additional_info"]["Usage Type Details"] = "Transfer Out"
150156
else:
151-
data['usage_unit'] = 'Count'
152-
data['additional_info']['Usage Type Details'] = 'HTTP Requests'
157+
data["usage_unit"] = "Count"
158+
data["additional_info"]["Usage Type Details"] = "HTTP Requests"
153159
else:
154-
data['additional_info']['Usage Type Details'] = None
160+
data["additional_info"]["Usage Type Details"] = None
155161

156162
except Exception as e:
157-
_LOGGER.error(f'[_make_cost_data] make data error: {e}', exc_info=True)
163+
_LOGGER.error(f"[_make_cost_data] make data error: {e}", exc_info=True)
158164
raise e
159165

160166
costs_data.append(data)
161167

162-
return {
163-
'results': costs_data
164-
}
168+
return {"results": costs_data}
165169

166170
@staticmethod
167171
def _get_tags_from_cost_data(cost_data: dict) -> dict:
168172
tags = {}
169173

170-
if tags_str := cost_data.get('tags'):
174+
if tags_str := cost_data.get("tags"):
171175
try:
172176
tags_dict: dict = utils.load_json(tags_str)
173177
for key, value in tags_dict.items():
174-
key = key.replace('user:', '')
178+
key = key.replace("user:", "")
175179
tags[key] = value
176180
except Exception as e:
177181
_LOGGER.debug(e)
178182

179183
return tags
180184

181185
@staticmethod
182-
def _check_task_options(task_options):
183-
if 'start' not in task_options:
184-
raise ERROR_REQUIRED_PARAMETER(key='task_options.start')
186+
def _check_task_options(task_options: dict):
187+
task_type = task_options.get("task_type", "identity")
188+
189+
if "start" not in task_options:
190+
raise ERROR_REQUIRED_PARAMETER(key="task_options.start")
191+
192+
if "database" not in task_options:
193+
raise ERROR_REQUIRED_PARAMETER(key="task_options.database")
185194

186-
if 'account_id' not in task_options:
187-
raise ERROR_REQUIRED_PARAMETER(key='task_options.account_id')
195+
if "is_sync" not in task_options:
196+
raise ERROR_REQUIRED_PARAMETER(key="task_options.is_sync")
188197

189-
if 'service_account_id' not in task_options:
190-
raise ERROR_REQUIRED_PARAMETER(key='task_options.service_account_id')
198+
if task_type == "identity":
191199

192-
if 'database' not in task_options:
193-
raise ERROR_REQUIRED_PARAMETER(key='task_options.database')
200+
if "account_id" not in task_options:
201+
raise ERROR_REQUIRED_PARAMETER(key="task_options.account_id")
194202

195-
if 'is_sync' not in task_options:
196-
raise ERROR_REQUIRED_PARAMETER(key='task_options.is_sync')
203+
if "service_account_id" not in task_options:
204+
raise ERROR_REQUIRED_PARAMETER(key="task_options.service_account_id")
197205

198206
@staticmethod
199207
def _get_date_range(start):
200208
date_ranges = []
201-
start_time = datetime.strptime(start, '%Y-%m')
209+
start_time = datetime.strptime(start, "%Y-%m")
202210
now = datetime.utcnow()
203211
for dt in rrule.rrule(rrule.MONTHLY, dtstart=start_time, until=now):
204-
billed_month = dt.strftime('%Y-%m')
212+
billed_month = dt.strftime("%Y-%m")
205213
date_ranges.append(billed_month)
206214

207215
return date_ranges

0 commit comments

Comments
 (0)