Skip to content

Commit b6800e0

Browse files
author
Sean Sullivan
committed
code cleanup and lint
1 parent 29ce910 commit b6800e0

File tree

8 files changed

+14
-165
lines changed

8 files changed

+14
-165
lines changed

elastic_datashader/cache.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ def age_off_cache(cache_path: Path, idx_name: str, max_age: timedelta) -> None:
168168
remove_empty_dirs(cache_path/idx_name)
169169

170170
def remove_empty_dirs(path: Path):
171-
for root,dirs,_ in os.walk(path, topdown=False):
171+
for root, dirs, _ in os.walk(path, topdown=False):
172172
for d in dirs:
173173
with suppress(OSError):
174-
os.rmdir(Path(root,d))
174+
os.rmdir(Path(root, d))
175175

176176
def get_idx_names(cache_path: Path) -> Iterable[str]:
177177
for path in cache_path.glob("*"):

elastic_datashader/config.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ class Config:
1818
cache_cleanup_interval: timedelta
1919
cache_path: Path
2020
cache_timeout: timedelta
21-
csrf_secret_key: str
2221
datashader_headers: Dict[Any, Any]
2322
elastic_hosts: str
2423
ellipse_render_mode: str
@@ -29,8 +28,6 @@ class Config:
2928
max_ellipses_per_tile: int
3029
max_legend_items_per_tile: int
3130
num_ellipse_points: int
32-
proxy_host: Optional[str]
33-
proxy_prefix: str
3431
query_timeout_seconds: int
3532
render_timeout: timedelta
3633
tms_key: Optional[str]
@@ -93,7 +90,6 @@ def config_from_env(env) -> Config:
9390
cache_cleanup_interval=timedelta(seconds=int(env.get("DATASHADER_CACHE_CLEANUP_INTERVAL", 5*60))),
9491
cache_path=Path(env.get("DATASHADER_CACHE_DIRECTORY", "tms-cache")),
9592
cache_timeout=timedelta(seconds=int(env.get("DATASHADER_CACHE_TIMEOUT", 60*60))),
96-
csrf_secret_key=env.get("DATASHADER_CSRF_SECRET_KEY", "CSRFProtectionKey"),
9793
datashader_headers=load_datashader_headers(env.get("DATASHADER_HEADER_FILE", "headers.yaml")),
9894
elastic_hosts=env.get("DATASHADER_ELASTIC", "http://localhost:9200"),
9995
ellipse_render_mode=env.get("DATASHADER_ELLIPSE_RENDER_MODE", "matrix"),
@@ -104,8 +100,6 @@ def config_from_env(env) -> Config:
104100
max_ellipses_per_tile=int(env.get("DATASHADER_MAX_ELLIPSES_PER_TILE", 100_000)),
105101
max_legend_items_per_tile=int(env.get("MAX_LEGEND_ITEMS_PER_TILE", 20)),
106102
num_ellipse_points=int(env.get("DATASHADER_NUM_ELLIPSE_POINTS", 100)),
107-
proxy_host=env.get("DATASHADER_PROXY_HOST", None),
108-
proxy_prefix=env.get("DATASHADER_PROXY_PREFIX", ""),
109103
query_timeout_seconds=int(env.get("DATASHADER_QUERY_TIMEOUT", 0)),
110104
render_timeout=timedelta(seconds=int(env.get("DATASHADER_RENDER_TIMEOUT", 30))),
111105
tms_key=env.get("DATASHADER_TMS_KEY", None),

elastic_datashader/elastic.py

Lines changed: 7 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -158,26 +158,6 @@ def convert_nm_to_ellipse_units(distance: float, units: str) -> float:
158158
# NB. assume "majmin_m" if any others
159159
return distance * 1852
160160

161-
def get_field_type(elastic_hosts: str, headers: Optional[str], params: Dict[str, Any], field: str, idx: str) -> str:
162-
user = params.get("user")
163-
x_opaque_id = params.get("x-opaque-id")
164-
es = Elasticsearch(
165-
elastic_hosts.split(","),
166-
verify_certs=False,
167-
timeout=900,
168-
headers=get_es_headers(headers, user, x_opaque_id),
169-
)
170-
if idx.find("*:") != -1:
171-
idx = idx[idx.find("*:")+2:] # when you query for mappings if it is cross cluster you don't get a mapping
172-
mappings = es.indices.get_field_mapping(fields=field, index=idx)
173-
# {'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}}
174-
index = list(mappings.keys())[0] # if index is my_index* it comes back as my_index
175-
field_parts = field.split(".")
176-
try:
177-
return mappings[index]['mappings'][field]['mapping'][field_parts[-1]]['type'] # handles 'geo_center' or a nested object {signal:{geo:{location:{}}}}
178-
except AttributeError:
179-
return mappings[index]['mappings'][field]['mapping'][field]['type'] # handles literal string with periods 'signal.geo.location'
180-
181161
def get_search_base(
182162
elastic_hosts: str,
183163
headers: Optional[str],
@@ -271,21 +251,6 @@ def get_search_base(
271251

272252
return base_s
273253

274-
def handle_range_or_exists_filters(filter_input: Dict[Any, Any]) -> Dict[str, Any]:
275-
"""
276-
`range` and `exists` filters can appear either directly under
277-
`filter[]` or under `filter[].query` depending on the version
278-
of Kibana, the former being the old way, so they need special
279-
handling for backward compatibility.
280-
"""
281-
filter_type = filter_input.get("meta").get("type") # "range" or "exists"
282-
283-
# Handle old query structure for backward compatibility
284-
if filter_input.get(filter_type) is not None:
285-
return {filter_type: filter_input.get(filter_type)}
286-
287-
return filter_input.get("query")
288-
289254
def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]:
290255
"""
291256
@@ -309,7 +274,7 @@ def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]:
309274
f.get("geo_shape") or
310275
f.get("geo_distance")
311276
)
312-
if f.get("query",None):
277+
if f.get("query", None):
313278
if f.get("meta").get("negate"):
314279
filter_dict["must_not"].append(f.get("query"))
315280
else:
@@ -318,16 +283,16 @@ def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]:
318283
if not is_spatial_filter:
319284
filt_type = f.get("meta").get("type")
320285
if f.get("meta").get("negate"):
321-
filter_dict["must_not"].append({filt_type:f.get(filt_type)})
286+
filter_dict["must_not"].append({filt_type: f.get(filt_type)})
322287
else:
323-
filter_dict["filter"].append({filt_type:f.get(filt_type)})
288+
filter_dict["filter"].append({filt_type: f.get(filt_type)})
324289
else:
325-
for geo_type in ["geo_polygon","geo_bounding_box","geo_shape","geo_distance"]:
326-
if f.get(geo_type,None):
290+
for geo_type in ["geo_polygon", "geo_bounding_box", "geo_shape", "geo_distance"]:
291+
if f.get(geo_type, None):
327292
if f.get("meta").get("negate"):
328-
filter_dict["must_not"].append({geo_type:f.get(geo_type)})
293+
filter_dict["must_not"].append({geo_type: f.get(geo_type)})
329294
else:
330-
filter_dict["filter"].append({geo_type:f.get(geo_type)})
295+
filter_dict["filter"].append({geo_type: f.get(geo_type)})
331296
logger.info("Filter output %s", filter_dict)
332297
return filter_dict
333298

@@ -396,32 +361,6 @@ def parse_duration_interval(interval):
396361
kwargs[key] = int(interval[0:len(interval)-1])
397362
return relativedelta(**kwargs)
398363

399-
def convert(response, category_formatter=str):
400-
"""
401-
402-
:param response:
403-
:return:
404-
"""
405-
if hasattr(response.aggregations, "categories"):
406-
for category in response.aggregations.categories:
407-
for bucket in category.grids:
408-
x, y = lnglat_to_meters(
409-
bucket.centroid.location.lon, bucket.centroid.location.lat
410-
)
411-
yield {
412-
"lon": bucket.centroid.location.lon,
413-
"lat": bucket.centroid.location.lat,
414-
"x": x,
415-
"y": y,
416-
"c": bucket.centroid.count,
417-
"t": category_formatter(category.key),
418-
}
419-
else:
420-
for bucket in response.aggregations.grids:
421-
lon = bucket.centroid.location.lon
422-
lat = bucket.centroid.location.lat
423-
x, y = lnglat_to_meters(lon, lat)
424-
yield {"lon": lon, "lat": lat, "x": x, "y": y, "c": bucket.centroid.count}
425364

426365
def convert_composite(response, categorical, filter_buckets, histogram_interval, category_type, category_format):
427366
if categorical and filter_buckets is False:
@@ -533,20 +472,6 @@ def get_nested_field_from_hit(hit, field_parts: List[str], default=None):
533472

534473
raise ValueError("field must be provided")
535474

536-
def chunk_iter(iterable, chunk_size):
537-
chunks = [None] * chunk_size
538-
i = -1
539-
for i, v in enumerate(iterable):
540-
idx = i % chunk_size
541-
if idx == 0 and i > 0:
542-
i = -1
543-
yield (True, chunks)
544-
chunks[idx] = v
545-
546-
if i >= 0:
547-
last_written_idx = i % chunk_size
548-
yield (False, chunks[0:last_written_idx+1])
549-
550475
def bucket_noop(bucket, search):
551476
# pylint: disable=unused-argument
552477
return bucket

elastic_datashader/parameters.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from datetime import datetime, timedelta, timezone
22
from hashlib import sha256
33
from json import loads
4-
from socket import gethostname
54
from time import sleep
65
from typing import Any, Dict, Optional, Tuple
76
from urllib.parse import unquote
@@ -351,7 +350,6 @@ def generate_global_params(headers, params, idx):
351350
if category_type == "number":
352351
bounds_s.aggs.metric("field_stats", "stats", field=category_field)
353352

354-
# field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx)
355353
field_type = params["geofield_type"] # CCS you cannot get mappings so we needed to push the field type from the client side
356354
# Execute and process search
357355
if len(list(bounds_s.aggs)) > 0 and field_type != "geo_shape":
@@ -470,7 +468,7 @@ def generate_global_params(headers, params, idx):
470468

471469

472470
def merge_generated_parameters(headers, params, idx, param_hash):
473-
layer_id = f"{param_hash}_{gethostname()}"
471+
layer_id = f"{param_hash}_{config.hostname}"
474472
es = Elasticsearch(
475473
config.elastic_hosts.split(","),
476474
verify_certs=False,
@@ -488,7 +486,7 @@ def merge_generated_parameters(headers, params, idx, param_hash):
488486
try:
489487
doc = Document(
490488
_id=layer_id,
491-
creating_host=gethostname(),
489+
creating_host=config.hostname,
492490
creating_pid=os.getpid(),
493491
creating_timestamp=datetime.now(timezone.utc),
494492
generated_params=None,
@@ -533,7 +531,7 @@ def merge_generated_parameters(headers, params, idx, param_hash):
533531
generated_params = {
534532
"complete": False,
535533
"generation_start_time": datetime.now(timezone.utc),
536-
"generating_host": gethostname(),
534+
"generating_host": config.hostname,
537535
"generating_pid": os.getpid(),
538536
}
539537

elastic_datashader/routers/tms.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
)
2626
from ..config import config
2727
from ..drawing import generate_x_tile
28-
from ..elastic import get_es_headers,get_search_base
28+
from ..elastic import get_es_headers, get_search_base
2929
from ..logger import logger
3030
from ..parameters import extract_parameters, merge_generated_parameters
3131
from ..tilegen import (
@@ -297,7 +297,7 @@ async def fetch_or_render_tile(already_waited: int, idx: str, x: int, y: int, z:
297297
# try to build the dsl object bad filters cause exceptions that are then retried.
298298
# underlying elasticsearch_dsl doesn't support the elasticsearch 8 api yet so this causes requests to thrash
299299
# If the filters are bad or elasticsearch_dsl cannot build the request will never be completed so serve X tile
300-
get_search_base(config.elastic_hosts,request.headers,params,idx)
300+
get_search_base(config.elastic_hosts, request.headers, params, idx)
301301
except Exception as ex: # pylint: disable=W0703
302302
logger.exception("Error while extracting parameters")
303303
params = {"user": request.headers.get("es-security-runas-user", None)}

elastic_datashader/tilegen.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1099,7 +1099,6 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
10991099

11001100
# the composite needs one bin for 'after_key'
11011101
composite_agg_size = int(max_bins / inner_agg_size) - 1
1102-
# field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx)
11031102
field_type = params["geofield_type"] # CCS you cannot get mappings so we needed to push the field type from the client side
11041103
partial_data = False # TODO can we get partial data?
11051104
span = None

tests/test_config.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ def test_config_defaults():
1515
assert cfg.cache_path == Path("tms-cache")
1616
assert cfg.cache_timeout == timedelta(seconds=3600)
1717
assert cfg.elastic_hosts == "http://localhost:9200"
18-
assert cfg.proxy_host is None
19-
assert cfg.proxy_prefix == ""
2018
assert cfg.tms_key is None
2119
assert cfg.max_bins == 10000
2220
assert cfg.max_batch == 10000
@@ -56,8 +54,6 @@ def test_config_env():
5654
assert cfg.cache_path == Path("tms-cache-foo")
5755
assert cfg.cache_timeout == timedelta(seconds=60)
5856
assert cfg.elastic_hosts == "http://localhost:9201"
59-
assert cfg.proxy_host == "http://localhost:1337"
60-
assert cfg.proxy_prefix == "foo"
6157
assert cfg.tms_key == "bar"
6258
assert cfg.max_bins == 10
6359
assert cfg.max_batch == 1000

tests/test_elastic.py

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -152,66 +152,3 @@ def test_split_fieldname_to_list(field, expected):
152152
def test_get_nested_field_from_hit():
153153
pass
154154

155-
@pytest.mark.parametrize(
156-
"filter_input,filter_type,new_way,expected",
157-
(
158-
({"meta": {"type": "exists"}, "query": {"exists": {"field": "foo"}}}, "exists", True, {"exists": {"field": "foo"}}),
159-
({"meta": {"type": "range"}, "range": {"from": "foo", "to": "bar"}}, "range", False, {"range": {"from": "foo", "to": "bar"}}),
160-
)
161-
)
162-
def test_handle_range_or_exists_filters(filter_input, filter_type, new_way, expected):
163-
filter_output = elastic.handle_range_or_exists_filters(filter_input)
164-
assert len(filter_output) == 1
165-
assert filter_type in filter_output
166-
assert type(filter_output[filter_type]) is dict
167-
168-
if new_way:
169-
expected_output = filter_input["query"]
170-
else:
171-
expected_output = {filter_type: filter_input[filter_type]}
172-
173-
for key in expected_output:
174-
assert key in filter_output
175-
176-
for subkey in expected_output[key]:
177-
assert subkey in filter_output[key]
178-
assert expected_output[key][subkey] == filter_output[key][subkey]
179-
180-
def test_chunk_iter():
181-
for has_more, chunk in elastic.chunk_iter([], 1000):
182-
assert True == False
183-
184-
for has_more, chunk in elastic.chunk_iter(range(10), 1000):
185-
assert has_more == False
186-
assert len(chunk) == 10
187-
188-
for has_more, chunk in elastic.chunk_iter(range(1000), 1000):
189-
assert has_more == False
190-
assert len(chunk) == 1000
191-
192-
for ii, (has_more, chunk) in enumerate(elastic.chunk_iter(range(1001), 1000)):
193-
if ii == 0:
194-
assert has_more == True
195-
assert len(chunk) == 1000
196-
elif ii == 1:
197-
assert has_more == False
198-
assert len(chunk) == 1
199-
200-
for ii, (has_more, chunk) in enumerate(elastic.chunk_iter(range(2000), 1000)):
201-
if ii == 0:
202-
assert has_more == True
203-
assert len(chunk) == 1000
204-
elif ii == 1:
205-
assert has_more == False
206-
assert len(chunk) == 1000
207-
208-
for ii, (has_more, chunk) in enumerate(elastic.chunk_iter(range(2010), 1000)):
209-
if ii == 0:
210-
assert has_more == True
211-
assert len(chunk) == 1000
212-
elif ii == 1:
213-
assert has_more == True
214-
assert len(chunk) == 1000
215-
elif ii == 2:
216-
assert has_more == False
217-
assert len(chunk) == 10

0 commit comments

Comments
 (0)