diff --git a/README.md b/README.md
index 4aa6eeb3a..604ecc80f 100644
--- a/README.md
+++ b/README.md
@@ -89,6 +89,7 @@ These directives are currently available:
| [Write JSON Object](wrangler-docs/directives/write-as-json-object.md) | Composes a JSON object based on the fields specified. |
| [Format as Currency](wrangler-docs/directives/format-as-currency.md) | Formats a number as currency as specified by locale. |
| **Transformations** | |
+| [Aggregate Stats](wrangler-docs/directives/aggregate-stats.md) | Analyzes byte size and time duration values, generating statistics |
| [Changing Case](wrangler-docs/directives/changing-case.md) | Changes the case of column values |
| [Cut Character](wrangler-docs/directives/cut-character.md) | Selects parts of a string value |
| [Set Column](wrangler-docs/directives/set-column.md) | Sets the column value to the result of an expression execution |
@@ -175,6 +176,28 @@ rates below are specified as *records/second*.
| High (167 Directives) | 426 | 127,946,398 | 82,677,845,324 | 106,367.27 |
| High (167 Directives) | 426 | 511,785,592 | 330,711,381,296 | 105,768.93 |
+## Byte Size and Time Duration Support
+
+The Wrangler library provides support for parsing and aggregating byte size and time duration values. This feature allows you to work with human-readable size and duration values directly in your recipes.
+
+### Byte Size Units
+
+The following byte size units are supported:
+
+- B: Bytes
+- KB: Kilobytes (1024 bytes)
+- MB: Megabytes (1024 \* 1024 bytes)
+- GB: Gigabytes (1024 _ 1024 _ 1024 bytes)
+- TB: Terabytes (1024 _ 1024 _ 1024 \* 1024 bytes)
+
+### Time Duration Units
+
+The following time duration units are supported:
+
+- ms: Milliseconds
+- s: Seconds
+- m: Minutes
+- h: Hours
## Contact
@@ -214,5 +237,6 @@ and limitations under the License.
Cask is a trademark of Cask Data, Inc. All rights reserved.
+
Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with
permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.
diff --git a/README.pdf b/README.pdf
new file mode 100644
index 000000000..5bfd086b6
Binary files /dev/null and b/README.pdf differ
diff --git a/pom.xml b/pom.xml
index a4ee67662..843dfad8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,6 +176,16 @@
${testSourceLocation}
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.22.2
+
+ 1
+ true
+ --add-opens java.base/java.lang=ALL-UNNAMED
+
+
org.apache.maven.plugins
maven-compiler-plugin
diff --git a/prompts.text b/prompts.text
new file mode 100644
index 000000000..f5a273d1b
--- /dev/null
+++ b/prompts.text
@@ -0,0 +1,125 @@
+Prompt 1
+I am modifying an ANTLR grammar for a Java project. Help me write lexer and parser rules in Directives.g4 for
+two new tokens:
+
+BYTE_SIZE → matches values like "10KB", "2.5MB", "1GB".
+
+TIME_DURATION → matches values like "5ms", "3.2s", "1min".
+
+Also, include helpful fragments like BYTE_UNIT, TIME_UNIT. Finally, show how to update the value parser rule
+(or create byteSizeArg, timeDurationArg if needed) so the new tokens are accepted as directive arguments.
+
+
+
+Prompt 2: Create ByteSize and TimeDuration Token Classes
+I am working on a Java project where tokens represent directive arguments. Help me create two new token classes:
+
+ByteSize.java and TimeDuration.java
+
+Each class should:
+
+Extend io.cdap.wrangler.api.parser.Token
+
+Parse strings like "10KB", "2.5MB", "1GB" (for ByteSize) and "500ms", "1.2s", "3min" (for TimeDuration)
+
+Internally store the value in canonical units (bytes for ByteSize, milliseconds or nanoseconds for TimeDuration)
+
+Provide getter methods like getBytes() and getMilliseconds()
+
+
+Prompt 3: Update Token Types and Directive Argument Support
+I am extending a token parsing framework in Java for a data transformation tool. Guide me to:
+
+Add two new token types: BYTE_SIZE and TIME_DURATION in the token registry or enum used (if any).
+
+Update the logic that defines valid argument types in directives,
+so that BYTE_SIZE and TIME_DURATION can be accepted where appropriate.
+
+Mention any necessary updates in registration/configuration files or classes if applicable.
+
+
+
+Prompt 4: Add Visitor Methods for New Parser Rules
+In my ANTLR-based Java parser for a directive language,
+I’ve added two new parser rules: byteSizeArg and timeDurationArg. Help me:
+
+Implement visitor methods visitByteSizeArg and visitTimeDurationArg in the appropriate visitor or parser class.
+
+These methods should return instances of ByteSize and TimeDuration tokens respectively using ctx.getText().
+
+Ensure these token instances are added to the TokenGroup for the directive being parsed.
+
+
+
+Prompt 5: Implement New AggregateStats Directive
+I’m creating a new directive class called AggregateStats in a Java-based data transformation engine. Guide me to:
+
+Implement the Directive interface
+
+Accept at least 4 arguments:
+
+Source column (byte sizes)
+
+Source column (time durations)
+
+Target column for total size
+
+Target column for total/average time
+
+Optionally accept:
+
+Aggregation type (total, avg)
+
+Output unit (MB, GB, seconds, minutes)
+
+In initialize, store the argument values
+
+In execute, use ExecutorContext.getStore() to:
+
+Accumulate byte size and time duration values (convert to canonical units)
+
+In finalize, return a single Row with converted results (e.g., MB, seconds)
+
+
+Prompt 6: Write Unit Tests for ByteSize and TimeDuration
+Help me write JUnit tests for one Java class: ByteSize and TimeDuration.
+ These class parse strings like "10KB" and "500ms" respectively.
+
+Test valid cases: "10KB", "1.5MB", "1GB" for ByteSize and "500ms", "2s", "1min" for TimeDuration.
+
+Verify that getBytes() or getMilliseconds() return the correct canonical values.
+
+Include a few invalid input tests and assert that they throw proper exceptions.
+
+
+
+
+Prompt 7: Write Parser Tests for New Grammar
+I’ve added BYTE_SIZE and TIME_DURATION tokens to an ANTLR grammar. Help me write parser tests in Java to:
+
+Validate that inputs like "10KB", "1.5MB", "5ms", "3min" are accepted in directive recipes.
+
+Use test classes like GrammarBasedParserTest.java or RecipeCompilerTest.java.
+
+Also test invalid values (e.g., "10KBB", "1..5MB", "ms5") and ensure they are rejected.
+
+
+
+
+Prompt 8: Write Integration Test for AggregateStats Directive
+I’ve created an AggregateStats directive that aggregates byte size and time duration columns. Help me write an integration test using TestingRig to:
+
+Create input data: List with columns like data_transfer_size and response_time using values like "1MB", "500KB", "2s", "500ms".
+
+Define recipe like:
+
+java
+
+String[] recipe = new String[] {
+ "aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec"
+};
+Execute with TestingRig.execute(recipe, rows)
+
+Assert that the resulting row contains correct aggregated values (in MB and seconds)
+
+Use a delta tolerance (e.g., 0.001) for comparing float values
\ No newline at end of file
diff --git a/wrangler-api/pom.xml b/wrangler-api/pom.xml
index e97464a64..b595000dd 100644
--- a/wrangler-api/pom.xml
+++ b/wrangler-api/pom.xml
@@ -39,6 +39,35 @@
${cdap.version}
provided
-
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+ rat-excludes.txt
+ 2
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.0.0
+
+ checkstyle.xml
+ suppressions.xml
+
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+
+
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java
new file mode 100644
index 000000000..27fd543ab
--- /dev/null
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright © 2017-2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+ package io.cdap.wrangler.api.parser;
+
+ import com.google.gson.JsonElement;
+ import com.google.gson.JsonObject;
+ import io.cdap.wrangler.api.annotations.PublicEvolving;
+
+ /**
+ * Represents a ByteSize token, capable of parsing strings like "10KB", "1.5MB",
+ * and converting them into bytes.
+ */
+ @PublicEvolving
+ public class ByteSize implements Token {
+
+ // Multipliers for each unit
+ private static final double KILOBYTE = 1024.0;
+ private static final double MEGABYTE = KILOBYTE * 1024.0;
+ private static final double GIGABYTE = MEGABYTE * 1024.0;
+ private static final double TERABYTE = GIGABYTE * 1024.0;
+
+ // Parsed byte value stored as long
+ private final long bytesValue;
+
+ /**
+ * Constructs a ByteSize token by parsing the given size string.
+ *
+ * @param sizeString The string to parse (e.g., "10KB", "1.5MB").
+ * @throws IllegalArgumentException If the string format is invalid.
+ */
+ public ByteSize(String sizeString) {
+ this.bytesValue = parseSize(sizeString);
+ }
+
+ /**
+ * Parses a size string and converts it into bytes.
+ *
+ * @param sizeString The input string representing a byte size.
+ * @return The size in bytes.
+ */
+ private long parseSize(String sizeString) {
+ if (sizeString == null || sizeString.trim().isEmpty()) {
+ throw new IllegalArgumentException("Size string must not be null or empty.");
+ }
+
+ sizeString = sizeString.trim().toUpperCase();
+ String numericPart;
+ double multiplier;
+
+ try {
+ if (sizeString.endsWith("KB")) {
+ numericPart = sizeString.substring(0, sizeString.length() - 2);
+ multiplier = KILOBYTE;
+ } else if (sizeString.endsWith("MB")) {
+ numericPart = sizeString.substring(0, sizeString.length() - 2);
+ multiplier = MEGABYTE;
+ } else if (sizeString.endsWith("GB")) {
+ numericPart = sizeString.substring(0, sizeString.length() - 2);
+ multiplier = GIGABYTE;
+ } else if (sizeString.endsWith("TB")) {
+ numericPart = sizeString.substring(0, sizeString.length() - 2);
+ multiplier = TERABYTE;
+ } else if (sizeString.endsWith("B")) {
+ numericPart = sizeString.substring(0, sizeString.length() - 1);
+ multiplier = 1.0;
+ } else {
+ throw new IllegalArgumentException("Invalid byte size format or unsupported unit in string: " + sizeString);
+ }
+
+ if (numericPart.isEmpty()) {
+ throw new IllegalArgumentException("Missing numeric value in size string: " + sizeString);
+ }
+
+ double parsedValue = Double.parseDouble(numericPart);
+ if (parsedValue < 0) {
+ throw new IllegalArgumentException("Size value cannot be negative: " + sizeString);
+ }
+
+ return (long) (parsedValue * multiplier); // Truncate to long
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid numeric value in size string: " + sizeString, e);
+ }
+ }
+
+ /**
+ * @return Size in bytes.
+ */
+ public long getBytes() {
+ return bytesValue;
+ }
+
+ /**
+ * @return Size in kilobytes.
+ */
+ public double getKiloBytes() {
+ return bytesValue / KILOBYTE;
+ }
+
+ /**
+ * @return Size in megabytes.
+ */
+ public double getMegaBytes() {
+ return bytesValue / MEGABYTE;
+ }
+
+ /**
+ * @return Size in gigabytes.
+ */
+ public double getGigaBytes() {
+ return bytesValue / GIGABYTE;
+ }
+
+ /**
+ * @return Size in terabytes.
+ */
+ public double getTeraBytes() {
+ return bytesValue / TERABYTE;
+ }
+
+ @Override
+ public Object value() {
+ return bytesValue;
+ }
+
+ @Override
+ public TokenType type() {
+ return TokenType.BYTE_SIZE;
+ }
+
+ @Override
+ public JsonElement toJson() {
+ JsonObject object = new JsonObject();
+ object.addProperty("type", TokenType.BYTE_SIZE.name());
+ object.addProperty("value", bytesValue);
+ return object;
+ }
+
+ @Override
+ public String toString() {
+ return bytesValue + "B";
+ }
+ }
\ No newline at end of file
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java
new file mode 100644
index 000000000..c86802d20
--- /dev/null
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright © 2017-2019 Cask Data, Inc.
+ * Copyright © 2023 Google LLC // Update copyright year/holder if needed
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+ package io.cdap.wrangler.api.parser;
+
+ import com.google.gson.JsonElement;
+ import com.google.gson.JsonObject;
+ import io.cdap.wrangler.api.annotations.PublicEvolving;
+
+ import java.util.concurrent.TimeUnit;
+
+ /**
+ * Represents a Token for time durations, capable of parsing strings
+ * like "5ms", "2.1s", or "3h" and converting them into milliseconds.
+ */
+ @PublicEvolving
+ public class TimeDuration implements Token {
+
+ // Time unit conversion factors (all in terms of milliseconds)
+ private static final double MILLIS_IN_NANOSECOND = 1.0 / 1_000_000.0;
+ private static final double MILLIS_IN_MICROSECOND = 1.0 / 1_000.0;
+ private static final double MILLIS_IN_SECOND = 1_000.0;
+ private static final double MILLIS_IN_MINUTE = 60.0 * MILLIS_IN_SECOND;
+ private static final double MILLIS_IN_HOUR = 60.0 * MILLIS_IN_MINUTE;
+ private static final double MILLIS_IN_DAY = 24.0 * MILLIS_IN_HOUR;
+
+ // Final parsed duration value (in milliseconds)
+ private final double durationMillis;
+
+ /**
+ * Constructs a TimeDuration object by parsing the provided duration string.
+ *
+ * @param inputDuration A string representing duration (e.g., "5ms", "2.5h").
+ * @throws IllegalArgumentException if the string format is invalid.
+ */
+ public TimeDuration(String inputDuration) {
+ this.durationMillis = parseDuration(inputDuration);
+ }
+
+ /**
+ * Parses a duration string and converts it to milliseconds.
+ *
+ * @param durationStr Input duration string to parse.
+ * @return Duration value in milliseconds.
+ * @throws IllegalArgumentException if format or value is invalid.
+ */
+ private double parseDuration(String durationStr) {
+ if (durationStr == null || durationStr.trim().isEmpty()) {
+ throw new IllegalArgumentException("Duration string must not be null or empty.");
+ }
+
+ durationStr = durationStr.trim().toLowerCase();
+ String numericPart;
+ double conversionFactor;
+
+ try {
+ if (durationStr.endsWith("ns")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 2);
+ conversionFactor = MILLIS_IN_NANOSECOND;
+ } else if (durationStr.endsWith("us")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 2);
+ conversionFactor = MILLIS_IN_MICROSECOND;
+ } else if (durationStr.endsWith("ms")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 2);
+ conversionFactor = 1.0;
+ } else if (durationStr.endsWith("s")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 1);
+ conversionFactor = MILLIS_IN_SECOND;
+ } else if (durationStr.endsWith("min")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 3);
+ conversionFactor = MILLIS_IN_MINUTE;
+ } else if (durationStr.endsWith("m")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 1);
+ conversionFactor = MILLIS_IN_MINUTE;
+ } else if (durationStr.endsWith("h")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 1);
+ conversionFactor = MILLIS_IN_HOUR;
+ } else if (durationStr.endsWith("d")) {
+ numericPart = durationStr.substring(0, durationStr.length() - 1);
+ conversionFactor = MILLIS_IN_DAY;
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid time duration format or unsupported unit in string: " + durationStr);
+ }
+
+ if (numericPart.isEmpty()) {
+ throw new IllegalArgumentException("Missing numeric value in duration string: " + durationStr);
+ }
+
+ double parsedNumber = Double.parseDouble(numericPart);
+ if (parsedNumber < 0) {
+ throw new IllegalArgumentException("Duration value cannot be negative: " + durationStr);
+ }
+
+ return parsedNumber * conversionFactor;
+
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid numeric value in duration string: " + durationStr, e);
+ }
+ }
+
+ /**
+ * Converts and returns the duration in the given {@link TimeUnit}.
+ *
+ * @param unit Time unit to convert to.
+ * @return Converted duration in the specified unit (rounded).
+ */
+ public long getDuration(TimeUnit unit) {
+ return unit.convert((long) this.durationMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return Canonical duration value in milliseconds.
+ */
+ public double getValue() {
+ return durationMillis;
+ }
+
+ @Override
+ public Object value() {
+ return durationMillis;
+ }
+
+ @Override
+ public TokenType type() {
+ return TokenType.TIME_DURATION;
+ }
+
+ @Override
+ public JsonElement toJson() {
+ JsonObject object = new JsonObject();
+ object.addProperty("type", TokenType.TIME_DURATION.name());
+ object.addProperty("value", durationMillis);
+ return object;
+ }
+
+ @Override
+ public String toString() {
+ return durationMillis + "ms";
+ }
+ }
\ No newline at end of file
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java
index 8c93b0e6a..04a796945 100644
--- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java
@@ -40,6 +40,8 @@
* @see Expression
* @see Text
* @see TextList
+ * @see ByteSize
+ * @see TimeDuration
*/
@PublicEvolving
public enum TokenType implements Serializable {
@@ -152,5 +154,22 @@ public enum TokenType implements Serializable {
* Represents the enumerated type for the object of type {@code String} with restrictions
* on characters that can be present in a string.
*/
- IDENTIFIER
+ IDENTIFIER,
+
+
+ /**
+ * Enum representing specialized data types that require unit-based parsing.
+ * Indicates a value of type {@code ByteSize}, which consists of a numeric component
+ * followed by a byte unit (e.g., B, KB, MB, GB, TB, PB).
+ * Examples include: "10KB", "1.5MB", "2GB".
+ */
+ BYTE_SIZE,
+
+ /**
+ * Represents the enumerated type for the object of type {@code TimeDuration} type.
+ * This type is associated with a numeric value followed by a time unit (e.g.,ns, ms, s, m, h, d).
+ * For example: "100ms", "5s", "2.5h".
+ */
+ TIME_DURATION,
+
}
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java
index 78800b7d1..bc4bad9ce 100644
--- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java
@@ -126,6 +126,10 @@ public String toString() {
sb.append("prop:{key:value,[key:value]*");
} else if (token.type().equals(TokenType.RANGES)) {
sb.append("start:end=[bool|text|numeric][,start:end=[bool|text|numeric]*");
+ } else if (token.type().equals(TokenType.BYTE_SIZE)) {
+ sb.append(":").append(token.name());
+ } else if (token.type().equals(TokenType.TIME_DURATION)) {
+ sb.append(":").append(token.name());
}
}
diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml
index e2dcb3c2b..e3e6069e8 100644
--- a/wrangler-core/pom.xml
+++ b/wrangler-core/pom.xml
@@ -45,6 +45,7 @@
io.cdap.wrangler
wrangler-api
${project.version}
+ compile
io.cdap.wrangler
@@ -99,6 +100,12 @@
2.0.2
test
+
+ junit
+ junit
+ 4.13.2
+ test
+
org.powermock
powermock-api-mockito2
@@ -320,6 +327,16 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.22.2
+
+ 1
+ true
+ --add-opens java.base/java.lang=ALL-UNNAMED
+
+
org.antlr
antlr4-maven-plugin
diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
index 7c517ed6a..1a62308ca 100644
--- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
+++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
@@ -57,6 +57,8 @@ directive
| text
| number
| bool
+ | byteSizeArg
+ | timeDurationArg
| column
| colList
| numberList
@@ -140,7 +142,7 @@ numberRange
;
value
- : String | Number | Column | Bool
+ : String | Number | Column | Bool | ByteSize | TimeDuration
;
ecommand
@@ -167,6 +169,14 @@ bool
: Bool
;
+byteSizeArg
+ : BYTE_SIZE
+ ;
+
+timeDurationArg
+ : TIME_DURATION
+ ;
+
condition
: OBrace (~CBrace | condition)* CBrace
;
@@ -253,6 +263,14 @@ Bool
| 'false'
;
+BYTE_SIZE
+ : DecimalNumber BYTE_UNIT
+ ;
+
+TIME_DURATION
+ : DecimalNumber TIME_UNIT
+ ;
+
Number
: Int ('.' Digit*)?
;
@@ -311,3 +329,29 @@ fragment Int
fragment Digit
: [0-9]
;
+
+
+fragment DecimalNumber
+ : Digit+ ('.' Digit+)? // Matches decimal numbers like 123, 1.5, 0.25, etc.
+ ;
+
+
+fragment ByteUnit
+ : [kK][bB] // Kilobytes
+ | [mM][bB] // Megabytes
+ | [gG][bB] // Gigabytes
+ | [tT][bB] // Terabytes
+ | [pP][bB] // Petabytes
+ | [bB] // Bytes
+ ;
+
+
+fragment TimeUnit
+ : [nN][sS] // Nanoseconds
+ | [uU][sS] // Microseconds
+ | [mM][sS] // Milliseconds
+ | [sS] // Seconds
+ | [mM] // Minutes
+ | [hH] // Hours
+ | [dD] // Days
+ ;
\ No newline at end of file
diff --git a/wrangler-core/src/main/java/io/cdap/directives/aggregates/aggregateStats.java b/wrangler-core/src/main/java/io/cdap/directives/aggregates/aggregateStats.java
new file mode 100644
index 000000000..cfe5dd76d
--- /dev/null
+++ b/wrangler-core/src/main/java/io/cdap/directives/aggregates/aggregateStats.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright © 2017-2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.directives.aggregates;
+
+import io.cdap.cdap.api.annotation.Name;
+
+import io.cdap.cdap.api.annotation.Plugin;
+
+import io.cdap.cdap.api.data.schema.Schema;
+
+import io.cdap.wrangler.api.Arguments;
+
+import io.cdap.wrangler.api.Directive;
+
+import io.cdap.wrangler.api.DirectiveExecutionException;
+
+import io.cdap.wrangler.api.DirectiveParseException;
+
+import io.cdap.wrangler.api.ExecutorContext;
+
+import io.cdap.wrangler.api.Row;
+
+import io.cdap.wrangler.api.annotations.Categories;
+
+import io.cdap.wrangler.api.parser.ByteSize;
+
+import io.cdap.wrangler.api.parser.ColumnName;
+
+import io.cdap.wrangler.api.parser.Text;
+
+import io.cdap.wrangler.api.parser.TimeDuration;
+
+import io.cdap.wrangler.api.parser.TokenType;
+
+import io.cdap.wrangler.api.parser.UsageDefinition;
+
+import java.util.ArrayList;
+
+import java.util.List;
+/**
+ * Directive to aggregate statistics over a dataset. It computes
+ * total bytes and total time, then outputs the summary in MB and seconds.
+ */
+@Plugin(type = Directive.TYPE)
+@Name(AggregateStats.NAME)
+@Categories(categories = { "data-aggregation" })
+public class AggregateStats implements Directive {
+
+ public static final String NAME = "aggregate-stats";
+
+ // Input column names
+ private String inputByteColumn;
+ private String inputTimeColumn;
+
+ // Output column names
+ private String resultByteColumn;
+ private String resultTimeColumn;
+
+ // Aggregated values
+ private double accumulatedBytes = 0.0;
+ private double accumulatedTimeMs = 0.0;
+ private int processedRowCount = 0;
+
+ /**
+ * Defines the parameters required to use this directive.
+ *
+ * @return UsageDefinition instance outlining directive inputs.
+ */
+ @Override
+ public UsageDefinition define() {
+ UsageDefinition.Builder builder = UsageDefinition.builder(NAME);
+ builder.define("byteCol", TokenType.COLUMN_NAME);
+ builder.define("timeCol", TokenType.COLUMN_NAME);
+ builder.define("outputSizeCol", TokenType.TEXT);
+ builder.define("outputTimeCol", TokenType.TEXT);
+ return builder.build();
+ }
+
+ /**
+ * Initializes the directive with user-provided arguments.
+ *
+ * @param args The input arguments.
+ * @throws DirectiveParseException if parsing fails.
+ */
+ @Override
+ public void initialize(Arguments args) throws DirectiveParseException {
+ inputByteColumn = ((ColumnName) args.value("byteCol")).value();
+ inputTimeColumn = ((ColumnName) args.value("timeCol")).value();
+ resultByteColumn = ((Text) args.value("outputSizeCol")).value();
+ resultTimeColumn = ((Text) args.value("outputTimeCol")).value();
+ }
+
+ /**
+ * Executes aggregation logic over input rows.
+ *
+ * @param rows List of input rows.
+ * @param ctx Execution context.
+ * @return List containing a single row with aggregated metrics.
+ * @throws DirectiveExecutionException if any error occurs during execution.
+ */
+ @Override
+ public List execute(List rows, ExecutorContext ctx) throws DirectiveExecutionException {
+ try {
+ for (Row row : rows) {
+ if (row.find(inputByteColumn) != -1 && row.find(inputTimeColumn) != -1) {
+ String byteValue = row.getValue(inputByteColumn).toString();
+ String timeValue = row.getValue(inputTimeColumn).toString();
+
+ // Parse input values
+ ByteSize byteSize = new ByteSize(byteValue);
+ TimeDuration duration = new TimeDuration(timeValue);
+
+ accumulatedBytes += byteSize.getBytes();
+ accumulatedTimeMs += duration.getValue();
+ processedRowCount++;
+ }
+ }
+
+ if (processedRowCount == 0) {
+ return new ArrayList<>();
+ }
+
+ // Prepare output
+ List results = new ArrayList<>();
+ Row resultRow = new Row();
+
+ resultRow.add(resultByteColumn, accumulatedBytes / (1024.0 * 1024.0)); // Convert bytes to MB
+ resultRow.add(resultTimeColumn, accumulatedTimeMs / 1000.0); // Convert ms to seconds
+
+ results.add(resultRow);
+ return results;
+
+ } catch (Exception e) {
+ throw new DirectiveExecutionException(
+ String.format("Error aggregating stats: %s", e.getMessage())
+ );
+ }
+ }
+
+ /**
+ * Defines the output schema for the transformed dataset.
+ *
+ * @param inputSchema The schema of incoming data.
+ * @return Schema of the resulting data.
+ */
+ public Schema getOutputSchema(Schema inputSchema) {
+ List fields = new ArrayList<>();
+ fields.add(Schema.Field.of(resultByteColumn, Schema.of(Schema.Type.DOUBLE)));
+ fields.add(Schema.Field.of(resultTimeColumn, Schema.of(Schema.Type.DOUBLE)));
+ return Schema.recordOf("aggregate-stats", fields);
+ }
+
+ /**
+ * Cleans up and resets the internal state of the directive.
+ */
+ @Override
+ public void destroy() {
+ accumulatedBytes = 0.0;
+ accumulatedTimeMs = 0.0;
+ processedRowCount = 0;
+
+ inputByteColumn = null;
+ inputTimeColumn = null;
+ resultByteColumn = null;
+ resultTimeColumn = null;
+ }
+}
\ No newline at end of file
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java
index ac35e7a5e..f9503cb59 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java
@@ -22,6 +22,7 @@
import io.cdap.wrangler.api.Triplet;
import io.cdap.wrangler.api.parser.Bool;
import io.cdap.wrangler.api.parser.BoolList;
+import io.cdap.wrangler.api.parser.ByteSize;
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.ColumnNameList;
import io.cdap.wrangler.api.parser.DirectiveName;
@@ -33,6 +34,7 @@
import io.cdap.wrangler.api.parser.Ranges;
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TextList;
+import io.cdap.wrangler.api.parser.TimeDuration;
import io.cdap.wrangler.api.parser.Token;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.misc.Interval;
@@ -317,6 +319,39 @@ public RecipeSymbol.Builder visitStringList(DirectivesParser.StringListContext c
return builder;
}
+ /**
+ * Parses a byte size argument (e.g., "10MB") from the directive script and creates
+ * a {@link ByteSize} token. This token is then added to the directive builder for
+ * further processing during execution.
+ *
+ * @param ctx the parser context representing a BYTE_SIZE argument
+ * @return the updated {@link RecipeSymbol.Builder} instance
+ */
+@Override
+public RecipeSymbol.Builder visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) {
+ String byteSizeValue = ctx.BYTE_SIZE().getText(); // e.g., "10MB"
+ ByteSize byteSize = new ByteSize(byteSizeValue);
+ builder.addToken(byteSize);
+ return builder;
+}
+
+/**
+ * Parses a time duration argument (e.g., "5s", "1h") from the directive script and creates
+ * a {@link TimeDuration} token. This token is added to the directive builder to support
+ * duration-based operations within directives.
+ *
+ * @param ctx the parser context representing a TIME_DURATION argument
+ * @return the updated {@link RecipeSymbol.Builder} instance
+ */
+@Override
+public RecipeSymbol.Builder visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) {
+ String durationValue = ctx.TIME_DURATION().getText(); // e.g., "5s"
+ TimeDuration timeDuration = new TimeDuration(durationValue);
+ builder.addToken(timeDuration);
+ return builder;
+}
+
+
private SourceInfo getOriginalSource(ParserRuleContext ctx) {
int a = ctx.getStart().getStartIndex();
int b = ctx.getStop().getStopIndex();
diff --git a/wrangler-core/src/test/java/io/cdap/directives/aggregates/aggregatestats.java b/wrangler-core/src/test/java/io/cdap/directives/aggregates/aggregatestats.java
new file mode 100644
index 000000000..f7e68814b
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/directives/aggregates/aggregatestats.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright © 2017-2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.directives.aggregates;
+
+import com.google.gson.JsonElement;
+import io.cdap.wrangler.api.Arguments;
+import io.cdap.wrangler.api.Row;
+import io.cdap.wrangler.api.parser.ColumnName;
+import io.cdap.wrangler.api.parser.Text;
+import io.cdap.wrangler.api.parser.Token;
+import io.cdap.wrangler.api.parser.TokenType;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AggregateStatsTest {
+
+ @Test
+ public void testAggregateStatsDirective() throws Exception {
+ // Prepare sample input rows with size in KB and response time in seconds/milliseconds
+ List rows = new ArrayList<>();
+ rows.add(new Row().add("data_transfer_size", "12KB").add("response_time", "1.2s"));
+ rows.add(new Row().add("data_transfer_size", "8KB").add("response_time", "800ms"));
+
+ // Instantiate the directive
+ AggregateStats directive = new AggregateStats();
+
+ // Mock the arguments to pass to the directive
+ Arguments mockArgs = createMockArguments();
+
+ // Initialize and execute the directive
+ directive.initialize(mockArgs);
+ List result = directive.execute(rows, null);
+
+ // Validate output row count
+ Assert.assertEquals(1, result.size());
+ Row output = result.get(0);
+
+ // Expected: 20KB = 20480 bytes → 20480 / (1024 * 1024)
+ double expectedMB = 20480.0 / (1024 * 1024);
+ double actualMB = (Double) output.getValue("total_size_mb");
+
+ // Expected time: 1.2s + 0.8s = 2.0s
+ double expectedSeconds = 2.0;
+ double actualSeconds = (Double) output.getValue("total_time_sec");
+
+ Assert.assertEquals(expectedMB, actualMB, 0.001);
+ Assert.assertEquals(expectedSeconds, actualSeconds, 0.001);
+ }
+
+ @Test
+ public void testEmptyInputRows() throws Exception {
+ // Test case with no input rows
+ List rows = new ArrayList<>();
+ AggregateStats directive = new AggregateStats();
+ Arguments mockArgs = createMockArguments();
+
+ directive.initialize(mockArgs);
+ List result = directive.execute(rows, null);
+
+ Assert.assertEquals(0, result.size());
+ }
+
+ @Test
+ public void testInvalidData() throws Exception {
+ // Input rows with invalid formats
+ List rows = new ArrayList<>();
+ rows.add(new Row().add("data_transfer_size", "abcKB").add("response_time", "xyzTime"));
+
+ AggregateStats directive = new AggregateStats();
+ Arguments mockArgs = createMockArguments();
+
+ directive.initialize(mockArgs);
+ try {
+ directive.execute(rows, null);
+ Assert.fail("Expected an exception due to invalid data format");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Error aggregating stats"));
+ }
+ }
+
+ @Test
+ public void testLargeData() throws Exception {
+ // Test with large values in MB and time in hours/minutes
+ List rows = new ArrayList<>();
+ rows.add(new Row().add("data_transfer_size", "2048MB").add("response_time", "1h"));
+ rows.add(new Row().add("data_transfer_size", "1024MB").add("response_time", "30m"));
+
+ AggregateStats directive = new AggregateStats();
+ Arguments mockArgs = createMockArguments();
+
+ directive.initialize(mockArgs);
+ List result = directive.execute(rows, null);
+
+ Assert.assertEquals(1, result.size());
+
+ Row output = result.get(0);
+
+ double expectedMB = 3072.0; // 2048MB + 1024MB
+ double actualMB = (Double) output.getValue("total_size_mb");
+
+ double expectedSeconds = 5400.0; // 1h + 30m = 3600 + 1800 = 5400s
+ double actualSeconds = (Double) output.getValue("total_time_sec");
+
+ Assert.assertEquals(expectedMB, actualMB, 0.001);
+ Assert.assertEquals(expectedSeconds, actualSeconds, 0.001);
+ }
+
+ @Test
+ public void testEdgeCases() throws Exception {
+ // Edge case: zero data and zero time
+ List rows = new ArrayList<>();
+ rows.add(new Row().add("data_transfer_size", "0KB").add("response_time", "0s"));
+
+ AggregateStats directive = new AggregateStats();
+ Arguments mockArgs = createMockArguments();
+
+ directive.initialize(mockArgs);
+ List result = directive.execute(rows, null);
+
+ Assert.assertEquals(1, result.size());
+
+ Row output = result.get(0);
+
+ double actualMB = (Double) output.getValue("total_size_mb");
+ double actualSeconds = (Double) output.getValue("total_time_sec");
+
+ Assert.assertEquals(0.0, actualMB, 0.001);
+ Assert.assertEquals(0.0, actualSeconds, 0.001);
+ }
+
+ // Create mock implementation of Arguments for unit testing
+ private Arguments createMockArguments() {
+ return new Arguments() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public T value(String name) {
+ switch (name) {
+ case "byteCol": return (T) new ColumnName("data_transfer_size");
+ case "timeCol": return (T) new ColumnName("response_time");
+ case "outputSizeCol": return (T) new Text("total_size_mb");
+ case "outputTimeCol": return (T) new Text("total_time_sec");
+ }
+ return null;
+ }
+
+ @Override public int size() {
+ return 4;
+ }
+
+ @Override public boolean contains(String name) {
+ return true;
+ }
+
+ @Override public TokenType type(String name) {
+ return null;
+ }
+
+ @Override public int line() {
+ return 1;
+ }
+
+ @Override public int column() {
+ return 0;
+ }
+
+ @Override public String source() {
+ return "aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec";
+ }
+ @Override public JsonElement toJson() {
+ return null;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/wrangler-core/src/test/java/io/cdap/directives/parser/ByteSizeAndTimeDurationTest.java b/wrangler-core/src/test/java/io/cdap/directives/parser/ByteSizeAndTimeDurationTest.java
new file mode 100644
index 000000000..342129498
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/directives/parser/ByteSizeAndTimeDurationTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright © 2017-2019 Cask Data, Inc.
+ * Copyright © 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package io.cdap.wrangler.parser;
+
+ import io.cdap.wrangler.api.parser.ByteSize;
+ import io.cdap.wrangler.api.parser.TimeDuration;
+ import org.junit.Assert;
+ import org.junit.Test;
+
+ /**
+ * Unit tests for validating {@link ByteSize} and {@link TimeDuration} parsing and conversions.
+ */
+ public class ByteSizeAndTimeDurationTest {
+
+ @Test
+ public void testByteSizeParsing() {
+ // Validating conversion from string to bytes
+
+ Assert.assertEquals(2048L, new ByteSize("2KB").getBytes()); // 2 KB = 2048 bytes
+ Assert.assertEquals(5242880L, new ByteSize("5MB").getBytes()); // 5 MB = 5 * 1024 * 1024
+ Assert.assertEquals(2147483648L, new ByteSize("2GB").getBytes()); // 2 GB = 2 * 1024^3
+ Assert.assertEquals(256L, new ByteSize("256B").getBytes()); // 256 bytes
+ Assert.assertEquals(7340032L, new ByteSize("7MB").getBytes()); // 7 MB
+ Assert.assertEquals(3145728L, new ByteSize("3MB").getBytes()); // 3 MB
+
+ // Case-insensitive unit support
+ Assert.assertEquals(1572864L, new ByteSize("1.5mb").getBytes()); // 1.5 MB (lowercase)
+ Assert.assertEquals(4096L, new ByteSize("4kB").getBytes()); // 4 KB (mixed case)
+ }
+
+ @Test
+ public void testTimeDurationParsing() {
+ // Validating conversion from string to milliseconds
+ double delta = 0.0001;
+
+ Assert.assertEquals(10.0, new TimeDuration("10ms").getValue(), delta); // 10 milliseconds
+ Assert.assertEquals(1500.0, new TimeDuration("1.5s").getValue(), delta); // 1.5 seconds
+ Assert.assertEquals(7200000.0, new TimeDuration("2h").getValue(), delta); // 2 hours
+ Assert.assertEquals(180000.0, new TimeDuration("3min").getValue(), delta); // 3 minutes
+ Assert.assertEquals(0.5, new TimeDuration("500us").getValue(), delta); // 500 microseconds
+ Assert.assertEquals(2.5, new TimeDuration("2500000ns").getValue(), delta); // 2.5 microseconds
+ Assert.assertEquals(172800000.0, new TimeDuration("2d").getValue(), delta); // 2 days
+ }
+}
+
+
\ No newline at end of file
diff --git a/wrangler-test/pom.xml b/wrangler-test/pom.xml
index 5e4878d86..d1b84789b 100644
--- a/wrangler-test/pom.xml
+++ b/wrangler-test/pom.xml
@@ -27,6 +27,12 @@
Wrangler Testing Framework
+
+ io.cdap.wrangler
+ wrangler-api
+ ${project.version}
+ test
+
io.cdap.wrangler
wrangler-core