Skip to content

Commit 20a9dd5

Browse files
Merge pull request #37 from spectriclabs/flake8_whitespace
Added flake8 whitespace checks and fixed whitespace issues.
2 parents 5b7ecfe + ecb61b9 commit 20a9dd5

File tree

7 files changed

+157
-153
lines changed

7 files changed

+157
-153
lines changed

elastic_datashader/elastic.py

Lines changed: 44 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,18 @@ def convert_nm_to_ellipse_units(distance: float, units: str) -> float:
158158
# NB. assume "majmin_m" if any others
159159
return distance * 1852
160160

161-
def get_field_type(elastic_hosts: str,headers: Optional[str],params: Dict[str, Any],field:str,idx: str) -> str:
161+
def get_field_type(elastic_hosts: str, headers: Optional[str], params: Dict[str, Any], field: str, idx: str) -> str:
162162
user = params.get("user")
163163
x_opaque_id = params.get("x-opaque-id")
164164
es = Elasticsearch(
165165
elastic_hosts.split(","),
166166
verify_certs=False,
167167
timeout=900,
168-
headers=get_es_headers(headers, user,x_opaque_id),
168+
headers=get_es_headers(headers, user, x_opaque_id),
169169
)
170-
mappings = es.indices.get_field_mapping(fields=field,index=idx)
171-
#{'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}}
172-
index = list(mappings.keys())[0] #if index is my_index* it comes back as my_index
170+
mappings = es.indices.get_field_mapping(fields=field, index=idx)
171+
# {'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}}
172+
index = list(mappings.keys())[0] # if index is my_index* it comes back as my_index
173173
return mappings[index]['mappings'][field]['mapping'][field]['type']
174174

175175
def get_search_base(
@@ -199,7 +199,7 @@ def get_search_base(
199199
elastic_hosts.split(","),
200200
verify_certs=False,
201201
timeout=900,
202-
headers=get_es_headers(headers, user,x_opaque_id),
202+
headers=get_es_headers(headers, user, x_opaque_id),
203203
)
204204

205205
# Create base search
@@ -219,13 +219,13 @@ def get_search_base(
219219
if time_range and time_range[timestamp_field]:
220220
base_s = base_s.filter("range", **time_range)
221221

222-
#filter the ellipse search range in the data base query so the legen matches the tiles
223-
if params.get('render_mode',"") =="ellipses":
224-
units = convert_nm_to_ellipse_units(params['search_nautical_miles'],params['ellipse_units'])
225-
search_range = {params["ellipse_major"]:{"lte":units}}
226-
base_s = base_s.filter("range",**search_range)
227-
search_range = {params["ellipse_minor"]:{"lte":units}}
228-
base_s = base_s.filter("range",**search_range)
222+
# filter the ellipse search range in the data base query so the legen matches the tiles
223+
if params.get('render_mode', "") =="ellipses":
224+
units = convert_nm_to_ellipse_units(params['search_nautical_miles'], params['ellipse_units'])
225+
search_range = {params["ellipse_major"]: {"lte": units}}
226+
base_s = base_s.filter("range", **search_range)
227+
search_range = {params["ellipse_minor"]: {"lte": units}}
228+
base_s = base_s.filter("range", **search_range)
229229

230230
# Add lucene query
231231
if lucene_query:
@@ -355,14 +355,14 @@ def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]:
355355
filter_key = f.get("meta", {}).get("key")
356356
if f.get("meta", {}).get("negate"):
357357
if filter_key == "query":
358-
filter_dict["must_not"].append( { "bool": f.get(filter_key).get("bool") } )
358+
filter_dict["must_not"].append({"bool": f.get(filter_key).get("bool")})
359359
else:
360-
filter_dict["must_not"].append( { filter_key: f.get(filter_key) } )
360+
filter_dict["must_not"].append({filter_key: f.get(filter_key)})
361361
else:
362362
if filter_key == "query":
363-
filter_dict["filter"].append( { "bool": f.get(filter_key).get("bool") } )
363+
filter_dict["filter"].append({"bool": f.get(filter_key).get("bool")})
364364
else:
365-
filter_dict["filter"].append( { filter_key: f.get(filter_key) } )
365+
filter_dict["filter"].append({filter_key: f.get(filter_key)})
366366

367367
else:
368368
raise ValueError("unsupported filter type {}".format(f.get("meta").get("type"))) # pylint: disable=C0209
@@ -389,7 +389,7 @@ def load_datashader_headers(header_file_path_str: Optional[str]) -> Dict[Any, An
389389

390390
return loaded_yaml
391391

392-
def get_es_headers(request_headers=None, user=None,x_opaque_id=None):
392+
def get_es_headers(request_headers=None, user=None, x_opaque_id=None):
393393
"""
394394
395395
:param request_headers:
@@ -420,15 +420,17 @@ def get_es_headers(request_headers=None, user=None,x_opaque_id=None):
420420
return result
421421

422422
def parse_duration_interval(interval):
423-
durations = {"days":"d",
424-
"minutes":"m",
425-
"hours":"h",
426-
"weeks":"w",
427-
"months":"M",
428-
#"quarter":"q", dateutil.relativedelta doesn't handle quarters
429-
"years":"y"}
423+
durations = {
424+
"days": "d",
425+
"minutes": "m",
426+
"hours": "h",
427+
"weeks": "w",
428+
"months": "M",
429+
# "quarter": "q", dateutil.relativedelta doesn't handle quarters
430+
"years": "y",
431+
}
430432
kwargs = {}
431-
for key,value in durations.items():
433+
for key, value in durations.items():
432434
if interval[len(interval)-1] == value:
433435
kwargs[key] = int(interval[0:len(interval)-1])
434436
return relativedelta(**kwargs)
@@ -524,11 +526,11 @@ def geotile_bucket_to_lonlat(bucket):
524526
if hasattr(bucket, "centroid"):
525527
lon = bucket.centroid.location.lon
526528
lat = bucket.centroid.location.lat
527-
elif hasattr(bucket.key,'grids'):
528-
z, x, y = [ int(x) for x in bucket.key.grids.split("/") ]
529+
elif hasattr(bucket.key, 'grids'):
530+
z, x, y = [int(x) for x in bucket.key.grids.split("/")]
529531
lon, lat = mu.center(x, y, z)
530532
else:
531-
z, x, y = [ int(x) for x in bucket.key.split("/") ]
533+
z, x, y = [int(x) for x in bucket.key.split("/")]
532534
lon, lat = mu.center(x, y, z)
533535
return lon, lat
534536

@@ -571,7 +573,7 @@ def get_nested_field_from_hit(hit, field_parts: List[str], default=None):
571573
raise ValueError("field must be provided")
572574

573575
def chunk_iter(iterable, chunk_size):
574-
chunks = [ None ] * chunk_size
576+
chunks = [None] * chunk_size
575577
i = -1
576578
for i, v in enumerate(iterable):
577579
idx = (i % chunk_size)
@@ -581,14 +583,14 @@ def chunk_iter(iterable, chunk_size):
581583
chunks[idx] = v
582584

583585
if i >= 0:
584-
last_written_idx =( i % chunk_size)
586+
last_written_idx = (i % chunk_size)
585587
yield (False, chunks[0:last_written_idx+1])
586588

587-
def bucket_noop(bucket,search):
589+
def bucket_noop(bucket, search):
588590
# pylint: disable=unused-argument
589591
return bucket
590592
class Scan:
591-
def __init__(self, searches, inner_aggs=None,field=None,precision=None, size=10, timeout=None,bucket_callback=bucket_noop):
593+
def __init__(self, searches, inner_aggs=None, field=None, precision=None, size=10, timeout=None, bucket_callback=bucket_noop):
592594
self.field = field
593595
self.precision = precision
594596
self.searches = searches
@@ -616,29 +618,29 @@ def execute(self):
616618
self.total_took = 0
617619
self.aborted = False
618620

619-
def run_search(s,**kwargs):
621+
def run_search(s, **kwargs):
620622
_timeout_at = kwargs.pop("timeout_at", None)
621623
if _timeout_at:
622624
_time_remaining = _timeout_at - int(time.time())
623625
s = s.params(timeout=f"{_time_remaining}s")
624626
if self.field and self.precision:
625-
s.aggs.bucket("comp", "geotile_grid", field=self.field,precision=self.precision,size=self.size)
626-
#logger.info(json.dumps(s.to_dict(),indent=2,default=str))
627+
s.aggs.bucket("comp", "geotile_grid", field=self.field, precision=self.precision, size=self.size)
628+
# logger.info(json.dumps(s.to_dict(), indent=2, default=str))
627629
return s.execute()
628630

629631
timeout_at = None
630632
if self.timeout:
631633
timeout_at = int(time.time()) + self.timeout
632634
for search in self.searches:
633-
response = run_search(search,timeout_at=timeout_at)
635+
response = run_search(search, timeout_at=timeout_at)
634636
self.num_searches += 1
635637
self.total_took += response.took
636638
self.total_shards += response._shards.total # pylint: disable=W0212
637639
self.total_skipped += response._shards.skipped # pylint: disable=W0212
638640
self.total_successful += response._shards.successful # pylint: disable=W0212
639641
self.total_failed += response._shards.failed # pylint: disable=W0212
640642
for b in response.aggregations.comp.buckets:
641-
b = self.bucket_callback(b,self)
643+
b = self.bucket_callback(b, self)
642644
yield b
643645

644646

@@ -735,19 +737,19 @@ def get_tile_categories(base_s, x, y, z, geopoint_field, category_field, size):
735737
cat_s = cat_s.params(size=0)
736738
cat_s = cat_s.filter("geo_bounding_box", **{geopoint_field: bb_dict})
737739
cat_s.aggs.bucket("categories", "terms", field=category_field, size=size)
738-
cat_s.aggs.bucket("missing", "filter", bool={ "must_not" : { "exists": { "field": category_field } } })
740+
cat_s.aggs.bucket("missing", "filter", bool={"must_not" : {"exists": {"field": category_field}}})
739741
response = cat_s.execute()
740742
if hasattr(response.aggregations, "categories"):
741743
for category in response.aggregations.categories:
742744
# this if prevents bools from using 0/1 instead of true/false
743745
if hasattr(category, "key_as_string"):
744-
category_filters[str(category.key)] = { "term": {category_field: category.key_as_string} }
746+
category_filters[str(category.key)] = {"term": {category_field: category.key_as_string}}
745747
else:
746-
category_filters[str(category.key)] = { "term": {category_field: category.key} }
748+
category_filters[str(category.key)] = {"term": {category_field: category.key}}
747749
category_legend[str(category.key)] = category.doc_count
748750
category_legend["Other"] = response.aggregations.categories.sum_other_doc_count
749751
if hasattr(response.aggregations, "missing") and response.aggregations.missing.doc_count > 0:
750-
category_filters["N/A"] = { "bool": { "must_not" : { "exists": { "field": category_field } } } }
752+
category_filters["N/A"] = {"bool": {"must_not" : {"exists": {"field": category_field}}}}
751753
category_legend["N/A"] = response.aggregations.missing.doc_count
752754

753755
return category_filters, category_legend

elastic_datashader/parameters.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ def create_default_params() -> Dict[str, Any]:
5050
"track_connection": None,
5151
"use_centroid": False,
5252
"user": None,
53-
"bucket_min":0,
54-
"bucket_max":1,
55-
"timeOverlap":False,
56-
"timeOverlapSize":"auto"
53+
"bucket_min": 0,
54+
"bucket_max": 1,
55+
"timeOverlap": False,
56+
"timeOverlapSize": "auto"
5757
}
5858

5959

@@ -293,10 +293,10 @@ def extract_parameters(headers: Dict[Any, Any], query_params: Dict[Any, Any]) ->
293293
params["geopoint_field"] = query_params.get("geopoint_field", params["geopoint_field"])
294294
params["timestamp_field"] = query_params.get("timestamp_field", params["timestamp_field"])
295295
params.update(get_time_bounds(now, from_time, to_time))
296-
params["bucket_min"] = float(query_params.get("bucket_min",0))
297-
params["bucket_max"] = float(query_params.get("bucket_max",1))
298-
params["timeOverlap"] = query_params.get("timeOverlap","false") == "true"
299-
params["timeOverlapSize"] = query_params.get("timeOverlapSize","auto")
296+
params["bucket_min"] = float(query_params.get("bucket_min", 0))
297+
params["bucket_max"] = float(query_params.get("bucket_max", 1))
298+
params["timeOverlap"] = query_params.get("timeOverlap", "false") == "true"
299+
params["timeOverlapSize"] = query_params.get("timeOverlapSize", "auto")
300300
params["debug"] = (query_params.get("debug", False) == 'true')
301301

302302
if params["geopoint_field"] is None:
@@ -346,7 +346,7 @@ def generate_global_params(headers, params, idx):
346346
if category_type == "number":
347347
bounds_s.aggs.metric("field_stats", "stats", field=category_field)
348348

349-
field_type = get_field_type(config.elastic_hosts, headers, params,geopoint_field, idx)
349+
field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx)
350350
# Execute and process search
351351
if len(list(bounds_s.aggs)) > 0 and field_type != "geo_shape":
352352
logger.info(bounds_s.to_dict())
@@ -427,14 +427,14 @@ def generate_global_params(headers, params, idx):
427427

428428
if category_field:
429429
max_value_s = copy.copy(base_s)
430-
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
431-
bucket.metric("sum","sum",field=category_field,missing=0)
430+
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1)
431+
bucket.metric("sum", "sum", field=category_field, missing=0)
432432
resp = max_value_s.execute()
433433
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
434434
histogram_range = estimated_points_per_tile
435435
else:
436436
max_value_s = copy.copy(base_s)
437-
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
437+
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1)
438438
resp = max_value_s.execute()
439439
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
440440
histogram_range = estimated_points_per_tile
@@ -471,14 +471,14 @@ def merge_generated_parameters(headers, params, idx, param_hash):
471471
timeout=120
472472
)
473473

474-
#See if the hash exists
474+
# See if the hash exists
475475
try:
476476
doc = Document.get(id=layer_id, using=es, index=".datashader_layers")
477477
except NotFoundError:
478478
doc = None
479479

480480
if not doc:
481-
#if not, create the hash in the db but only if it does not already exist
481+
# if not, create the hash in the db but only if it does not already exist
482482
try:
483483
doc = Document(
484484
_id=layer_id,
@@ -493,13 +493,13 @@ def merge_generated_parameters(headers, params, idx, param_hash):
493493
except ConflictError:
494494
logger.debug("Hash document now exists, continuing")
495495

496-
#re-fetch to get sequence number correct
496+
# re-fetch to get sequence number correct
497497
doc = Document.get(id=layer_id, using=es, index=".datashader_layers")
498498

499-
#Check for generator timeouts:
499+
# Check for generator timeouts:
500500
if doc.to_dict().get("generated_params", {}).get("generation_start_time") and \
501-
datetime.now(timezone.utc) > datetime.strptime(doc.to_dict().get("generated_params", {}).get("generation_start_time"),"%Y-%m-%dT%H:%M:%S.%f%z")+timedelta(seconds=5*60):
502-
#Something caused the worker generating the params to time out so clear that entry
501+
datetime.now(timezone.utc) > datetime.strptime(doc.to_dict().get("generated_params", {}).get("generation_start_time"), "%Y-%m-%dT%H:%M:%S.%f%z")+timedelta(seconds=5*60):
502+
# Something caused the worker generating the params to time out so clear that entry
503503
try:
504504
doc.update(
505505
using=es,
@@ -511,18 +511,18 @@ def merge_generated_parameters(headers, params, idx, param_hash):
511511
except ConflictError:
512512
logger.debug("Abandoned resetting parameters due to conflict, other process has completed.")
513513

514-
#Loop-check if the generated params are in missing/in-process/complete
514+
# Loop-check if the generated params are in missing/in-process/complete
515515
timeout_at = datetime.now(timezone.utc)+timedelta(seconds=45)
516516

517517
while doc.to_dict().get("generated_params", {}).get("complete", False) is False:
518518
if datetime.now(timezone.utc) > timeout_at:
519519
logger.info("Hit timeout waiting for generated parameters to be placed into database")
520520
break
521521

522-
#If missing, mark them as in generation
522+
# If missing, mark them as in generation
523523
if not doc.to_dict().get("generated_params", None):
524-
#Mark them as being generated but do so with concurrenty control
525-
#https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html
524+
# Mark them as being generated but do so with concurrenty control
525+
# https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html
526526
logger.info("Discovering generated parameters")
527527
generated_params = {
528528
"complete": False,
@@ -543,12 +543,12 @@ def merge_generated_parameters(headers, params, idx, param_hash):
543543
logger.debug("Abandoned generating parameters due to conflict, will wait for other process to complete.")
544544
break
545545

546-
#Generate and save off parameters
546+
# Generate and save off parameters
547547
logger.warning("Discovering generated params")
548548
generated_params.update(generate_global_params(headers, params, idx))
549549
generated_params["generation_complete_time"] = datetime.now(timezone.utc)
550550
generated_params["complete"] = True
551-
#Store off generated params
551+
# Store off generated params
552552
doc.update(
553553
using=es,
554554
index=".datashader_layers",
@@ -561,6 +561,6 @@ def merge_generated_parameters(headers, params, idx, param_hash):
561561
sleep(1)
562562
doc = Document.get(id=layer_id, using=es, index=".datashader_layers")
563563

564-
#We now have params so use them
564+
# We now have params so use them
565565
params["generated_params"] = doc.to_dict().get("generated_params")
566566
return params

elastic_datashader/routers/data.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async def get_data(idx: str, lat: float, lon: float, radius: float, request: Req
3232
lat = float(lat)
3333
lon = float(lon)
3434
radius = float(radius)
35-
#Check for paging args
35+
# Check for paging args
3636
from_arg = int(request.args.get("from", 0))
3737
size_arg = int(request.args.get("size", 100))
3838

@@ -70,20 +70,20 @@ async def get_data(idx: str, lat: float, lon: float, radius: float, request: Req
7070

7171
# Build and execute search
7272
base_s = get_search_base(config.elastic_hosts, request.headers, params, idx)
73-
distance_filter_dict = {"distance": f"{radius}m", geopoint_field: {"lat":lat, "lon":lon}}
73+
distance_filter_dict = {"distance": f"{radius}m", geopoint_field: {"lat": lat, "lon": lon}}
7474
base_s = base_s.filter("geo_distance", **distance_filter_dict)
75-
distance_sort_dict = {geopoint_field:{"lat":lat, "lon":lon}, "order":"asc", "ignore_unmapped":True}
75+
distance_sort_dict = {geopoint_field: {"lat": lat, "lon": lon}, "order": "asc", "ignore_unmapped": True}
7676
base_s = base_s.sort({"_geo_distance": distance_sort_dict})
7777
# Paginate
78-
base_s = base_s[from_arg:from_arg+size_arg]
78+
base_s = base_s[from_arg: from_arg+size_arg]
7979

8080
search_resp = base_s.execute()
8181
hits = []
8282
hit_count = 0
8383

8484
for hit in search_resp:
8585
if includes_list:
86-
#Only include named fields
86+
# Only include named fields
8787
named = {}
8888

8989
for f in includes_list:

0 commit comments

Comments
 (0)