Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cloudinary_cli/core/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from cloudinary_cli.utils.json_utils import write_json_to_file, print_json
from cloudinary_cli.utils.utils import write_json_list_to_csv, confirm_action, whitelist_keys, \
normalize_list_params
from cloudinary_cli.utils.search_utils import parse_aggregate

DEFAULT_MAX_RESULTS = 500

Expand Down Expand Up @@ -100,7 +101,7 @@ def _perform_search(query, with_field, fields, sort_by, aggregate, max_results,
if sort_by:
search.sort_by(*sort_by)
if aggregate:
search.aggregate(aggregate)
search.aggregate(parse_aggregate(aggregate))
if next_cursor:
search.next_cursor(next_cursor)
if ttl:
Expand Down
163 changes: 163 additions & 0 deletions cloudinary_cli/utils/search_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import json
from cloudinary.utils import build_array


def parse_aggregate(agg_input):
"""
Parses an aggregator definition or list of definitions into structured aggregator objects.

Accepts:
- Full JSON (if a string starts with '{')
- Transformation-style string (if a string contains ':')
- Simple aggregate string
- A list (or tuple) of any of the above

:param agg_input: Aggregator definition(s) as a string or list of strings.
:type agg_input: str or list or dict
:return: List of parsed aggregator objects.
:rtype: list
"""
agg_list = build_array(agg_input)
parsed_aggregators = []

for agg in agg_list:
if isinstance(agg, str):
s = agg.strip()

if s.startswith("{"):
parsed = parse_json_aggregate(s)
else:
parsed = parse_aggregate_string(s)

parsed_aggregators.append(parsed)
else:
parsed_aggregators.append(agg)

return parsed_aggregators


def parse_json_aggregate(s):
"""
Parses a JSON aggregator string.

:param s: JSON aggregator string.
:type s: str
:return: Parsed aggregator object.
:rtype: dict
:raises: ValueError if JSON is invalid or missing the required 'type' key.
"""
try:
agg_obj = json.loads(s)
except json.JSONDecodeError as e:
raise ValueError("Invalid JSON provided for aggregate: " + str(e))

if not (isinstance(agg_obj, dict) and "type" in agg_obj):
raise ValueError("Full JSON aggregate must be an object with a 'type' key.")

return agg_obj


def parse_aggregate_string(s):
"""
Parses a transformation-style aggregator string into a structured aggregator.

Expected format:
"agg_type:range1,range2,..."
where each range is in the format "<key>_<from>-<to>".

If the string does not contain a colon, it is returned as-is.

:param s: Aggregator string.
:type s: str
:return: Aggregator object (dict) if colon is present, else the original string.
"""
if ":" not in s:
return s

try:
agg_type, range_str = s.split(":", 1)
except ValueError:
raise ValueError("Aggregator string must contain a colon separating type and ranges.")

agg_type = agg_type.strip()
ranges = []

for part in range_str.split(","):
part = part.strip()
if not part:
continue

range_dict = parse_range_definition(part)
ranges.append(range_dict)

result = {"type": agg_type, "ranges": ranges}
return result


def parse_range_definition(part):
"""
Parses a single range definition in the format "<key>_<range_value>".

:param part: Range definition string.
:type part: str
:return: Dict with 'key' and parsed 'from' and/or 'to' values.
"""
if "_" not in part:
raise ValueError("Range definition '{}' must contain an underscore separating key and value.".format(part))

key, value = part.split("_", 1)
key = key.strip()
value = value.strip()

if "-" not in value:
raise ValueError("Range value in '{}' must contain a dash (-) separating from and to values.".format(part))

from_val, to_val = parse_range_bounds(value, part)
range_dict = {"key": key}

if from_val is not None:
range_dict["from"] = from_val

if to_val is not None:
range_dict["to"] = to_val

return range_dict


def parse_range_bounds(value, part):
"""
Parses a range value in the format "from-to", where either may be omitted.
Returns numeric values (int if whole number, else float) or None.

:param value: Range value string.
:type value: str
:param part: Original range definition string.
:type part: str
:return: Tuple (from_val, to_val) as numbers or None.
"""
parts = value.split("-", 1)
from_val = parse_numeric_value(parts[0], "from", part)
to_val = parse_numeric_value(parts[1], "to", part)

return from_val, to_val

def parse_numeric_value(value, label, part):
"""
Parses a numeric value (int or float) or returns None if the value is empty.

:param value: The string to parse.
:type value: str
:param label: The label ('from' or 'to') for error messages.
:type label: str
:param part: The original range definition string for error context.
:type part: str
:return: Parsed numeric value (int or float) or None.
:rtype: int, float, or None
:raises ValueError: If the value is not a valid number.
"""
value = value.strip() if value else value
try:
num = float(value) if value else None
return int(num) if num is not None and num.is_integer() else num
except ValueError:
raise ValueError(f"Invalid numeric value for '{label}' in range '{part}'.")
170 changes: 170 additions & 0 deletions test/test_search_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import unittest
from cloudinary_cli.utils.search_utils import (
parse_aggregate,
parse_json_aggregate,
parse_aggregate_string,
parse_range_definition,
parse_range_bounds
)


class TestAggregateParsing(unittest.TestCase):

# --- Tests for parse_json_aggregate ---

def test_parse_json_aggregate_valid(self):
s = '{"type": "bytes", "ranges": [{"key": "tiny", "to": 500}]}'
result = parse_json_aggregate(s)
expected = {"type": "bytes", "ranges": [{"key": "tiny", "to": 500}]}
self.assertEqual(expected, result)

def test_parse_json_aggregate_invalid_json(self):
s = '{"type": "bytes", "ranges": [{"key": "tiny", "to": 500}' # missing closing ]
with self.assertRaises(ValueError):
parse_json_aggregate(s)

def test_parse_json_aggregate_missing_type(self):
s = '{"ranges": [{"key": "tiny", "to": 500}]}'
with self.assertRaises(ValueError):
parse_json_aggregate(s)

# --- Tests for parse_aggregate_string ---

def test_parse_aggregate_string_valid(self):
s = "bytes:tiny_-500,medium_501-1999,big_2000-"
result = parse_aggregate_string(s)
expected = {
"type": "bytes",
"ranges": [
{"key": "tiny", "to": 500},
{"key": "medium", "from": 501, "to": 1999},
{"key": "big", "from": 2000}
]
}
self.assertEqual(expected, result)

def test_parse_aggregate_string_no_colon(self):
s = "format"
result = parse_aggregate_string(s)
self.assertEqual(s, result)

# --- Tests for parse_aggregate (supports list and non-string inputs) ---

def test_parse_aggregate_simple_string(self):
s = "format"
result = parse_aggregate(s)
self.assertEqual([s], result)

def test_parse_aggregate_json(self):
s = '{"type": "bytes", "ranges": [{"key": "tiny", "to": 500}]}'
result = parse_aggregate(s)
expected = [{"type": "bytes", "ranges": [{"key": "tiny", "to": 500}]}]
self.assertEqual(expected, result)

def test_parse_aggregate_transformation_string(self):
s = "bytes:tiny_-500,medium_501-1999,big_2000-"
result = parse_aggregate(s)
expected = [{
"type": "bytes",
"ranges": [
{"key": "tiny", "to": 500},
{"key": "medium", "from": 501, "to": 1999},
{"key": "big", "from": 2000}
]
}]
self.assertEqual(expected, result)

def test_parse_aggregate_list_input(self):
input_list = [
"format",
"bytes:tiny_-500,medium_501-1999,big_2000-"
]
result = parse_aggregate(input_list)
expected = [
"format",
{
"type": "bytes",
"ranges": [
{"key": "tiny", "to": 500},
{"key": "medium", "from": 501, "to": 1999},
{"key": "big", "from": 2000}
]
}
]
self.assertEqual(expected, result)

def test_parse_aggregate_non_string(self):
# If a non-string (e.g. dict) is passed, build_array wraps it, and it is returned as is.
d = {"type": "custom", "value": 123}
result = parse_aggregate(d)
self.assertEqual([d], result)

# --- Tests for parse_range_definition ---

def test_parse_range_definition_valid_tiny(self):
part = "tiny_-500"
result = parse_range_definition(part)
expected = {"key": "tiny", "to": 500}
self.assertEqual(expected, result)

def test_parse_range_definition_valid_medium(self):
part = "medium_501-1999"
result = parse_range_definition(part)
expected = {"key": "medium", "from": 501, "to": 1999}
self.assertEqual(expected, result)

def test_parse_range_definition_valid_big(self):
part = "big_2000-"
result = parse_range_definition(part)
expected = {"key": "big", "from": 2000}
self.assertEqual(expected, result)

def test_parse_range_definition_missing_underscore(self):
part = "big2000-"
with self.assertRaises(ValueError):
parse_range_definition(part)

def test_parse_range_definition_missing_dash(self):
part = "big_2000"
with self.assertRaises(ValueError):
parse_range_definition(part)

# --- Tests for parse_range_bounds ---

def test_parse_range_bounds_whole_numbers(self):
value = "501-1999"
result = parse_range_bounds(value, "test")
expected = (501, 1999)
self.assertEqual(expected, result)

def test_parse_range_bounds_floats(self):
value = "24.5-29.97"
result = parse_range_bounds(value, "test")
expected = (24.5, 29.97)
self.assertEqual(expected, result)

def test_parse_range_bounds_empty_from(self):
value = "-500"
result = parse_range_bounds(value, "test")
expected = (None, 500)
self.assertEqual(expected, result)

def test_parse_range_bounds_empty_to(self):
value = "2000-"
result = parse_range_bounds(value, "test")
expected = (2000, None)
self.assertEqual(expected, result)

def test_parse_range_bounds_invalid_from(self):
value = "abc-100"
with self.assertRaises(ValueError):
parse_range_bounds(value, "test")

def test_parse_range_bounds_invalid_to(self):
value = "100-abc"
with self.assertRaises(ValueError):
parse_range_bounds(value, "test")


if __name__ == '__main__':
unittest.main()