Skip to content

Commit 5ff146d

Browse files
author
Sean Sullivan
committed
fix category spread, timeout, and epoch_millis time index
1 parent c4f334f commit 5ff146d

File tree

2 files changed

+24
-37
lines changed

2 files changed

+24
-37
lines changed

elastic_datashader/elastic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ def get_search_base(
216216
time_range = None
217217

218218
if timestamp_field:
219-
time_range = {timestamp_field: {}}
219+
time_range = {timestamp_field: {"format": "strict_date_optional_time_nanos"}}
220220
if start_time is not None:
221221
time_range[timestamp_field]["gte"] = start_time
222222
if stop_time is not None:
@@ -680,7 +680,7 @@ def run_search(**kwargs):
680680
s = self.search[:0]
681681

682682
if _timeout_at:
683-
_time_remaining = _timeout_at - time.time()
683+
_time_remaining = int(_timeout_at - time.time())
684684
s = s.params(timeout=f"{_time_remaining}s")
685685

686686
s.aggs.bucket("comp", "composite", sources=self.source_aggs, size=self.size, **kwargs)

elastic_datashader/tilegen.py

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
get_nested_field_from_hit,
4040
to_32bit_float,
4141
Scan,
42+
ScanAggs,
4243
get_tile_categories,
4344
scan
4445
)
@@ -852,23 +853,14 @@ def generate_nonaggregated_tile(
852853
raise
853854

854855
@lru_cache
855-
def calculate_pixel_spread(geotile_precision: int) -> int:
856+
def calculate_pixel_spread(max_zoom: int,agg_zooms:int) -> int:
856857
'''
857858
Pixel spread is the number of pixels to put around each
858859
data point.
859860
'''
860-
logger.debug('calculate_pixel_spread(%d)', geotile_precision)
861+
logger.debug('calculate_pixel_spread(%d,%d)', max_zoom,agg_zooms)
862+
return max(int(abs(max_zoom-agg_zooms)),0)
861863

862-
if geotile_precision >= 20:
863-
return geotile_precision // 4
864-
865-
if geotile_precision >= 15:
866-
return 2
867-
868-
if geotile_precision >= 12:
869-
return 1
870-
871-
return 0
872864

873865
def apply_spread(img, spread):
874866
'''
@@ -987,7 +979,6 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
987979
# Now find out how many documents
988980
count_s = copy.copy(base_s)[0:0] # slice of array sets from/size since we are aggregating the data we don't need the hits
989981
count_s = count_s.filter("geo_bounding_box", **{geopoint_field: bb_dict})
990-
991982
doc_cnt = count_s.count()
992983
logger.info("Document Count: %s", doc_cnt)
993984
metrics['doc_cnt'] = doc_cnt
@@ -1118,38 +1109,33 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11181109
span = None
11191110
if field_type == "geo_point":
11201111
geo_tile_grid = A("geotile_grid", field=geopoint_field, precision=geotile_precision, size=max_bins)
1121-
estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt)
1112+
11221113
if params['bucket_min']>0 or params['bucket_max']<1:
1123-
if estimated_points_per_tile is None:
1114+
if global_doc_cnt is None:
11241115
# this isn't good we need a real number so lets query the max aggregation ammount
11251116
max_value_s = copy.copy(base_s)
11261117
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field, precision=geotile_precision, size=1)
11271118
if category_field:
11281119
bucket.metric("sum", "sum", field=category_field, missing=0)
11291120
resp = max_value_s.execute()
11301121
if category_field:
1131-
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
1122+
global_doc_cnt = resp.aggregations.comp.buckets[0].sum['value']
11321123
else:
1133-
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
1124+
global_doc_cnt = resp.aggregations.comp.buckets[0].doc_count
11341125

1135-
min_bucket = math.floor(math.exp(math.log(estimated_points_per_tile)*params['bucket_min']))
1136-
max_bucket = math.ceil(math.exp(math.log(estimated_points_per_tile)*params['bucket_max']))
1126+
min_bucket = math.floor(math.exp(math.log(global_doc_cnt)*params['bucket_min']))
1127+
max_bucket = math.ceil(math.exp(math.log(global_doc_cnt)*params['bucket_max']))
11371128
geo_tile_grid.pipeline("selector", "bucket_selector", buckets_path={"doc_count": "_count"}, script=f"params.doc_count >= {min_bucket} && params.doc_count <= {max_bucket}")
1138-
1139-
if inner_aggs is not None:
1140-
for agg_name, agg in inner_aggs.items():
1141-
geo_tile_grid.aggs[agg_name] = agg
1142-
tile_s.aggs["comp"] = geo_tile_grid
1143-
resp = Scan([tile_s], timeout=config.query_timeout_seconds)
1144-
# resp = ScanAggs(
1145-
# tile_s,
1146-
# {"grids": geo_tile_grid},
1147-
# inner_aggs,
1148-
# size=composite_agg_size,
1149-
# timeout=config.query_timeout_seconds
1150-
# ) # Dont use composite aggregator because you cannot use a bucket selector
1151-
1152-
1129+
if category_field:
1130+
geo_tile_grid = A("geotile_grid", field=geopoint_field, precision=geotile_precision)
1131+
resp = ScanAggs(tile_s,{"grids": geo_tile_grid},inner_aggs,size=composite_agg_size,timeout=config.query_timeout_seconds)
1132+
else:
1133+
if inner_aggs is not None:
1134+
for agg_name, agg in inner_aggs.items():
1135+
geo_tile_grid.aggs[agg_name] = agg
1136+
tile_s.aggs["comp"] = geo_tile_grid
1137+
resp = Scan([tile_s], timeout=config.query_timeout_seconds)
1138+
estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt)
11531139
df = pd.DataFrame(
11541140
convert_composite(
11551141
resp.execute(),
@@ -1359,7 +1345,8 @@ def remap_bucket(bucket, search):
13591345

13601346
###############################################################
13611347
# Common
1362-
img = apply_spread(img, spread or calculate_pixel_spread(geotile_precision))
1348+
spread = spread or calculate_pixel_spread(max_agg_zooms,agg_zooms)
1349+
img = apply_spread(img, spread)
13631350
img = img.to_bytesio().read()
13641351

13651352
if partial_data:

0 commit comments

Comments
 (0)