Skip to content

Commit fadfeec

Browse files
author
Sean Sullivan
committed
Implement time/space overlaps for geo_shapes
1 parent dd370af commit fadfeec

File tree

3 files changed

+81
-4
lines changed

3 files changed

+81
-4
lines changed

elastic_datashader/elastic.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from pathlib import Path
22
from typing import Any, Dict, List, Optional
3-
43
import copy
54
import struct
65
import time
6+
from dateutil.relativedelta import relativedelta
77

88
from datashader.utils import lnglat_to_meters
99
from elasticsearch import Elasticsearch
@@ -419,6 +419,19 @@ def get_es_headers(request_headers=None, user=None,x_opaque_id=None):
419419

420420
return result
421421

422+
def parse_duration_interval(interval):
423+
durations = {"days":"d",
424+
"minutes":"m",
425+
"hours":"h",
426+
"weeks":"w",
427+
"months":"M",
428+
#"quarter":"q", dateutil.relativedelta doesn't handle quarters
429+
"years":"y"}
430+
kwargs = {}
431+
for key,value in durations.items():
432+
if interval[1] == value:
433+
kwargs[key] = int(interval[0])
434+
return relativedelta(**kwargs)
422435

423436
def convert(response, category_formatter=str):
424437
"""

elastic_datashader/parameters.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ def create_default_params() -> Dict[str, Any]:
5151
"use_centroid": False,
5252
"user": None,
5353
"bucket_min":0,
54-
"bucket_max":1
54+
"bucket_max":1,
55+
"timeOverlap":False,
56+
"timeOverlapSize":"auto"
5557
}
5658

5759

@@ -293,6 +295,8 @@ def extract_parameters(headers: Dict[Any, Any], query_params: Dict[Any, Any]) ->
293295
params.update(get_time_bounds(now, from_time, to_time))
294296
params["bucket_min"] = float(query_params.get("bucket_min",0))
295297
params["bucket_max"] = float(query_params.get("bucket_max",1))
298+
params["timeOverlap"] = query_params.get("timeOverlap","false") == "true"
299+
params["timeOverlapSize"] = query_params.get("timeOverlapSize","auto")
296300
params["debug"] = (query_params.get("debug", False) == 'true')
297301

298302
if params["geopoint_field"] is None:

elastic_datashader/tilegen.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from dataclasses import asdict, dataclass
22
from functools import lru_cache
33
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
4-
4+
from datetime import datetime, timezone
55
import copy
66
import math
77
import time
@@ -32,6 +32,7 @@
3232
gen_overlay,
3333
)
3434
from .elastic import (
35+
parse_duration_interval,
3536
get_field_type,
3637
get_search_base,
3738
convert_composite,
@@ -1187,7 +1188,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11871188
resp = max_value_s.execute()
11881189
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
11891190
span = [0,estimated_points_per_tile]
1190-
logger.info("EST Points: %s",estimated_points_per_tile)
1191+
logger.info("EST Points: %s %s",estimated_points_per_tile,category_field)
11911192

11921193
searches = []
11931194
composite_agg_size = 65536#max agg bucket size
@@ -1230,6 +1231,13 @@ def remap_bucket(bucket,search):
12301231
if category_field:
12311232
#bucket_callback = calc_aggregation #don't run a sub query. sub aggregation worked But we might want to leave this in for cross index searches
12321233
bucket_callback = remap_bucket
1234+
1235+
if params['timeOverlap']:#run scan using date intervals to check overlaps during the same time
1236+
subtile_bb_dict = create_bounding_box_for_tile(x, y, z)
1237+
interval = params['timeOverlapSize']
1238+
logger.info("CREATING TIMEBUCKETS %s",interval)
1239+
searches = create_time_interval_searches(base_s,subtile_bb_dict,start_time,stop_time,timestamp_field,geopoint_field,geotile_precision,composite_agg_size,category_field,interval)
1240+
12331241
resp = Scan(searches,timeout=config.query_timeout_seconds,bucket_callback=bucket_callback)
12341242
df = pd.DataFrame(
12351243
convert_composite(
@@ -1370,3 +1378,55 @@ def remap_bucket(bucket,search):
13701378
"An exception occured while attempting to generate a tile:"
13711379
)
13721380
raise
1381+
1382+
1383+
def create_time_interval_searches(base_s,subtile_bb_dict,start_time,stop_time,timestamp_field,geopoint_field,geotile_precision,composite_agg_size,category_field,interval="auto"):
1384+
stime = start_time
1385+
searches = []
1386+
if interval == "auto":
1387+
subtile_s = copy.copy(base_s)
1388+
subtile_s = subtile_s[0:0]
1389+
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict})
1390+
subtile_s.aggs.bucket("by_time", "auto_date_histogram", field="lastupdated",buckets=546)
1391+
resp = subtile_s.execute()
1392+
interval = resp.aggregations.by_time.interval
1393+
#create a search for each bucket using the bucket time plus the interval
1394+
logger.info("Doing multiple queries based on interval %s",interval)
1395+
1396+
for bucket in resp.aggregations.by_time:
1397+
subtile_s = copy.copy(base_s)
1398+
bucket_start_time = datetime.strptime(bucket.key_as_string,"%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
1399+
bucket_stop_time = bucket_start_time+ parse_duration_interval(interval)
1400+
1401+
if timestamp_field:
1402+
time_range = {timestamp_field: {}}
1403+
if bucket_start_time is not None:
1404+
time_range[timestamp_field]["gte"] = bucket_start_time
1405+
if stop_time is not None:
1406+
time_range[timestamp_field]["lte"] = bucket_stop_time
1407+
1408+
if time_range and time_range[timestamp_field]:
1409+
subtile_s = subtile_s.filter("range", **time_range)
1410+
bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict)
1411+
if category_field:
1412+
bucket.metric("sum","sum",field=category_field,missing=0)
1413+
searches.append(subtile_s)
1414+
return searches
1415+
1416+
while stime < stop_time:
1417+
subtile_s = copy.copy(base_s)
1418+
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict})
1419+
subtile_s = subtile_s[0:0]
1420+
bucket_start_time = stime
1421+
bucket_stop_time = bucket_start_time+ parse_duration_interval(interval)
1422+
time_range = {timestamp_field: {}}
1423+
time_range[timestamp_field]["gte"] = bucket_start_time
1424+
time_range[timestamp_field]["lte"] = bucket_stop_time
1425+
stime = bucket_stop_time
1426+
subtile_s = subtile_s.filter("range", **time_range)
1427+
bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict)
1428+
bucket.pipeline("selector","bucket_selector",buckets_path={"doc_count":"_count"},script="params.doc_count >= 2")
1429+
if category_field:
1430+
bucket.metric("sum","sum",field=category_field,missing=0)
1431+
searches.append(subtile_s)
1432+
return searches

0 commit comments

Comments
 (0)