Skip to content

Commit 75f080d

Browse files
author
Sean Sullivan
committed
Add ability to filter buckets using pipeline bucket_selector
1 parent 8379456 commit 75f080d

File tree

2 files changed

+83
-14
lines changed

2 files changed

+83
-14
lines changed

elastic_datashader/parameters.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from elasticsearch_dsl import Document
1616

1717
from .config import config
18-
from .elastic import get_search_base, build_dsl_filter
18+
from .elastic import get_field_type, get_search_base, build_dsl_filter
1919
from .logger import logger
2020
from .timeutil import quantize_time_range, convert_kibana_time
2121

@@ -50,6 +50,8 @@ def create_default_params() -> Dict[str, Any]:
5050
"track_connection": None,
5151
"use_centroid": False,
5252
"user": None,
53+
"bucket_min":0,
54+
"bucket_max":1
5355
}
5456

5557

@@ -289,6 +291,8 @@ def extract_parameters(headers: Dict[Any, Any], query_params: Dict[Any, Any]) ->
289291
params["geopoint_field"] = query_params.get("geopoint_field", params["geopoint_field"])
290292
params["timestamp_field"] = query_params.get("timestamp_field", params["timestamp_field"])
291293
params.update(get_time_bounds(now, from_time, to_time))
294+
params["bucket_min"] = float(query_params.get("bucket_min",0))
295+
params["bucket_max"] = float(query_params.get("bucket_max",1))
292296
params["debug"] = (query_params.get("debug", False) == 'true')
293297

294298
if params["geopoint_field"] is None:
@@ -305,8 +309,9 @@ def generate_global_params(headers, params, idx):
305309
category_field = params["category_field"]
306310
category_type = params["category_type"]
307311
category_histogram = params["category_histogram"]
312+
current_zoom = params["mapZoom"]
308313
span_range = params["span_range"]
309-
314+
resolution = params["resolution"]
310315
histogram_range = 0
311316
histogram_interval = None
312317
histogram_cnt = None
@@ -316,7 +321,7 @@ def generate_global_params(headers, params, idx):
316321

317322
# Create base search
318323
base_s = get_search_base(config.elastic_hosts, headers, params, idx)
319-
324+
base_s = base_s[0:0]
320325
# west, south, east, north
321326
global_bounds = [-180, -90, 180, 90]
322327
global_doc_cnt = 0
@@ -337,8 +342,10 @@ def generate_global_params(headers, params, idx):
337342
if category_type == "number":
338343
bounds_s.aggs.metric("field_stats", "stats", field=category_field)
339344

345+
field_type = get_field_type(config.elastic_hosts, headers, params,geopoint_field, idx)
340346
# Execute and process search
341-
if len(list(bounds_s.aggs)) > 0:
347+
if len(list(bounds_s.aggs)) > 0 and field_type != "geo_shape":
348+
logger.info(bounds_s.to_dict())
342349
bounds_resp = bounds_s.execute()
343350
assert len(bounds_resp.hits) == 0
344351

@@ -403,6 +410,45 @@ def generate_global_params(headers, params, idx):
403410
)
404411
else:
405412
histogram_range = 0
413+
elif field_type == "geo_shape":
414+
zoom = 0
415+
if resolution == "coarse":
416+
zoom = 5
417+
elif resolution == "fine":
418+
zoom = 6
419+
elif resolution == "finest":
420+
zoom = 7
421+
geotile_precision = current_zoom+zoom
422+
max_value_s = copy.copy(base_s)
423+
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=0,size=1)
424+
resp = max_value_s.execute()
425+
global_doc_cnt = resp.aggregations.comp.buckets[0].doc_count
426+
if global_doc_cnt > 100000:
427+
histogram_cnt = 200
428+
else:
429+
histogram_cnt = 500
430+
431+
if category_field:
432+
max_value_s = copy.copy(base_s)
433+
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
434+
bucket.metric("sum","sum",field=category_field,missing=0)
435+
resp = max_value_s.execute()
436+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
437+
histogram_range = estimated_points_per_tile
438+
else:
439+
max_value_s = copy.copy(base_s)
440+
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
441+
resp = max_value_s.execute()
442+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
443+
histogram_range = estimated_points_per_tile
444+
if histogram_range > 0:
445+
# round to the nearest larger power of 10
446+
histogram_range = math.pow(
447+
10, math.ceil(math.log10(histogram_range))
448+
)
449+
histogram_interval = histogram_range / histogram_cnt
450+
field_min = 0
451+
field_max = estimated_points_per_tile
406452
else:
407453
logger.debug("Skipping global query")
408454

elastic_datashader/tilegen.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
get_nested_field_from_hit,
4040
to_32bit_float,
4141
Scan,
42-
ScanAggs,
4342
get_tile_categories,
4443
scan
4544
)
@@ -984,7 +983,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
984983

985984
# Create base search
986985
base_s = get_search_base(config.elastic_hosts, headers, params, idx)
987-
986+
base_s = base_s[0:0]
988987
# Now find out how many documents
989988
count_s = copy.copy(base_s)[0:0] #slice of array sets from/size since we are aggregating the data we don't need the hits
990989
count_s = count_s.filter("geo_bounding_box", **{geopoint_field: bb_dict})
@@ -1118,13 +1117,37 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11181117
partial_data = False # TODO can we get partial data?
11191118
span = None
11201119
if field_type == "geo_point":
1121-
resp = ScanAggs(
1122-
tile_s,
1123-
{"grids": A("geotile_grid", field=geopoint_field, precision=geotile_precision)},
1124-
inner_aggs,
1125-
size=composite_agg_size,
1126-
timeout=config.query_timeout_seconds
1127-
)
1120+
geo_tile_grid = A("geotile_grid", field=geopoint_field, precision=geotile_precision)
1121+
logger.info(params)
1122+
estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt)
1123+
if params['bucket_min']>0 or params['bucket_max']<1:
1124+
if estimated_points_per_tile is None:
1125+
#this isn't good we need a real number so lets query the max aggregation ammount
1126+
max_value_s = copy.copy(base_s)
1127+
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
1128+
if category_field:
1129+
bucket.metric("sum","sum",field=category_field,missing=0)
1130+
resp = max_value_s.execute()
1131+
if category_field:
1132+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
1133+
else:
1134+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
1135+
min_bucket = estimated_points_per_tile*params['bucket_min']
1136+
max_bucket = estimated_points_per_tile*params['bucket_max']
1137+
geo_tile_grid.pipeline("selector","bucket_selector",buckets_path={"doc_count":"_count"},script=f"params.doc_count > {min_bucket} && params.doc_count < {max_bucket}")
1138+
logger.info(geo_tile_grid.to_dict())
1139+
if inner_aggs is not None:
1140+
for agg_name, agg in inner_aggs.items():
1141+
geo_tile_grid.aggs[agg_name] = agg
1142+
tile_s.aggs["comp"] = geo_tile_grid
1143+
resp = Scan([tile_s],timeout=config.query_timeout_seconds)
1144+
# resp = ScanAggs(
1145+
# tile_s,
1146+
# {"grids": geo_tile_grid},
1147+
# inner_aggs,
1148+
# size=composite_agg_size,
1149+
# timeout=config.query_timeout_seconds
1150+
# ) #Dont use composite aggregator because you cannot use a bucket selector
11281151

11291152

11301153
df = pd.DataFrame(
@@ -1137,7 +1160,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11371160
category_format
11381161
)
11391162
)
1140-
estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt)
1163+
11411164
elif field_type == "geo_shape":
11421165
zoom = 0
11431166
if resolution == "coarse":

0 commit comments

Comments
 (0)