From c7202cc6f97d61ffaf7683f00973fd1f1940d928 Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:28:47 +0530 Subject: [PATCH 1/9] Update Directives.g4 --- .../src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6a..a7b9cf724 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -311,3 +311,9 @@ fragment Int fragment Digit : [0-9] ; + +BYTE_SIZE: [0-9]+ ('.' [0-9]+)? BYTE_UNIT; +fragment BYTE_UNIT: ('B' | 'KB' | 'MB' | 'GB'); + +TIME_DURATION: [0-9]+ ('.' [0-9]+)? TIME_UNIT; +fragment TIME_UNIT: ('ms' | 's' | 'min' | 'h'); From d22b36ff747fe327a52e88eaeef988f871877abf Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:33:19 +0530 Subject: [PATCH 2/9] Create ByteSize.java --- .../io/cdap/wrangler/parser/ByteSize.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java new file mode 100644 index 000000000..d870859c2 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java @@ -0,0 +1,23 @@ +package io.cdap.wrangler.api.parser; + +public class ByteSize extends Token { + private long bytes; + + public ByteSize(String value) { + super(value); + this.bytes = parseToBytes(value); + } + + private long parseToBytes(String value) { + value = value.toUpperCase().trim(); + if (value.endsWith("KB")) return (long)(Double.parseDouble(value.replace("KB", "")) * 1024); + if (value.endsWith("MB")) return (long)(Double.parseDouble(value.replace("MB", "")) * 1024 * 1024); + if (value.endsWith("GB")) return (long)(Double.parseDouble(value.replace("GB", "")) * 1024 * 1024 * 1024); + if (value.endsWith("B")) return (long)(Double.parseDouble(value.replace("B", ""))); + return 0; + } + + public long getBytes() { + return bytes; + } +} From 261a18083c5d8938af7317a7c724e3db8c7e75fa Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:34:56 +0530 Subject: [PATCH 3/9] Create TimeDuration.java --- .../io/cdap/wrangler/parser/TimeDuration.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 wrangler-core/src/main/java/io/cdap/wrangler/parser/TimeDuration.java diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/TimeDuration.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/TimeDuration.java new file mode 100644 index 000000000..5f7990ecc --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/TimeDuration.java @@ -0,0 +1,23 @@ +package io.cdap.wrangler.api.parser; + +public class TimeDuration extends Token { + private long millis; + + public TimeDuration(String value) { + super(value); + this.millis = parseToMillis(value); + } + + private long parseToMillis(String value) { + value = value.toLowerCase().trim(); + if (value.endsWith("ms")) return (long)(Double.parseDouble(value.replace("ms", ""))); + if (value.endsWith("s")) return (long)(Double.parseDouble(value.replace("s", "")) * 1000); + if (value.endsWith("min")) return (long)(Double.parseDouble(value.replace("min", "")) * 60 * 1000); + if (value.endsWith("h")) return (long)(Double.parseDouble(value.replace("h", "")) * 60 * 60 * 1000); + return 0; + } + + public long getMillis() { + return millis; + } +} From 8a0a8ec573a5a4be0d354e695c08afcb44d0eb5a Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:40:25 +0530 Subject: [PATCH 4/9] Update RecipeVisitor.java --- .../java/io/cdap/wrangler/parser/RecipeVisitor.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java index ac35e7a5e..e7f56cdc9 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java @@ -326,4 +326,15 @@ private SourceInfo getOriginalSource(ParserRuleContext ctx) { int column = ctx.getStart().getCharPositionInLine(); return new SourceInfo(lineno, column, text); } + + @Override +public Token visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) { + return new ByteSize(ctx.getText()); +} + +@Override +public Token visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) { + return new TimeDuration(ctx.getText()); +} + } From aff42ba5a10e78a2ba46323f2e8f3e6f8b5fedd5 Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:42:59 +0530 Subject: [PATCH 5/9] Create AggregateStats.java --- .../cdap/directives/row/AggregateStats.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 wrangler-core/src/main/java/io/cdap/directives/row/AggregateStats.java diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/AggregateStats.java b/wrangler-core/src/main/java/io/cdap/directives/row/AggregateStats.java new file mode 100644 index 000000000..754c82b30 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/directives/row/AggregateStats.java @@ -0,0 +1,25 @@ +public class AggregateStats implements Directive { + // Fields to store column names and totals + // Implement initialize(), define(), execute(), and finalize() + + @Override + public UsageDefinition define() { + return UsageDefinition.builder("aggregate-stats") + .define("inputByteSizeColumn", TokenType.COLUMN_NAME) + .define("inputTimeColumn", TokenType.COLUMN_NAME) + .define("outputSizeColumn", TokenType.COLUMN_NAME) + .define("outputTimeColumn", TokenType.COLUMN_NAME) + .build(); + } + + @Override + public void initialize(Arguments args) { + // Read and store column names from args + } + + @Override + public List execute(List rows, ExecutorContext context) { + // For each row, read values, convert to bytes/ms, add to totals + // Return a single row with calculated totals + } +} From 8df74dfdd3a4af96ff0d15a2bee9a39b34905db2 Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:47:53 +0530 Subject: [PATCH 6/9] Create ByteSizeTest.java --- .../io/cdap/wrangler/parser/ByteSizeTest.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSizeTest.java diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSizeTest.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSizeTest.java new file mode 100644 index 000000000..1f074b02b --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSizeTest.java @@ -0,0 +1,21 @@ +package io.cdap.wrangler.parser; + +import io.cdap.wrangler.api.parser.ByteSize; +import org.junit.Assert; +import org.junit.Test; + +public class ByteSizeTest { + + @Test + public void testByteSize() { + ByteSize size = new ByteSize("10MB"); + Assert.assertEquals(10 * 1024 * 1024, size.getBytes()); + } + + @Test + public void testDifferentUnits() { + Assert.assertEquals(1024, new ByteSize("1KB").getBytes()); + Assert.assertEquals(1, new ByteSize("1B").getBytes()); + Assert.assertEquals(1024L * 1024 * 1024, new ByteSize("1GB").getBytes()); + } +} From 6acbbf166e860b1a6393233afb912a7d87f1fe54 Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:58:29 +0530 Subject: [PATCH 7/9] Create ByteSize.java --- .../io/cdap/wrangler/api/parser/ByteSize.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 wrangler-api/src/test/java/io/cdap/wrangler/api/parser/ByteSize.java diff --git a/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/ByteSize.java new file mode 100644 index 000000000..1f074b02b --- /dev/null +++ b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/ByteSize.java @@ -0,0 +1,21 @@ +package io.cdap.wrangler.parser; + +import io.cdap.wrangler.api.parser.ByteSize; +import org.junit.Assert; +import org.junit.Test; + +public class ByteSizeTest { + + @Test + public void testByteSize() { + ByteSize size = new ByteSize("10MB"); + Assert.assertEquals(10 * 1024 * 1024, size.getBytes()); + } + + @Test + public void testDifferentUnits() { + Assert.assertEquals(1024, new ByteSize("1KB").getBytes()); + Assert.assertEquals(1, new ByteSize("1B").getBytes()); + Assert.assertEquals(1024L * 1024 * 1024, new ByteSize("1GB").getBytes()); + } +} From 62f513678910b0a881404b7d8cb305b26e8940d3 Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 15:00:21 +0530 Subject: [PATCH 8/9] Create TimeDuration.java --- .../wrangler/api/parser/TimeDuration.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 wrangler-api/src/test/java/io/cdap/wrangler/api/parser/TimeDuration.java diff --git a/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/TimeDuration.java new file mode 100644 index 000000000..5f7990ecc --- /dev/null +++ b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/TimeDuration.java @@ -0,0 +1,23 @@ +package io.cdap.wrangler.api.parser; + +public class TimeDuration extends Token { + private long millis; + + public TimeDuration(String value) { + super(value); + this.millis = parseToMillis(value); + } + + private long parseToMillis(String value) { + value = value.toLowerCase().trim(); + if (value.endsWith("ms")) return (long)(Double.parseDouble(value.replace("ms", ""))); + if (value.endsWith("s")) return (long)(Double.parseDouble(value.replace("s", "")) * 1000); + if (value.endsWith("min")) return (long)(Double.parseDouble(value.replace("min", "")) * 60 * 1000); + if (value.endsWith("h")) return (long)(Double.parseDouble(value.replace("h", "")) * 60 * 60 * 1000); + return 0; + } + + public long getMillis() { + return millis; + } +} From 5bfe71bf296aea8758726b94666e183bd7391c24 Mon Sep 17 00:00:00 2001 From: Pranav Jejurkar <95302595+PranavJejurkar@users.noreply.github.com> Date: Fri, 11 Apr 2025 15:14:03 +0530 Subject: [PATCH 9/9] Update README.md --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index 4aa6eeb3a..9a5acd893 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,21 @@ # Data Prep +## New Feature: Byte Size & Time Duration Parsers + AggregateStats Directive + +This enhancement adds native support in Wrangler for parsing: + +- **Byte Sizes** like `10KB`, `1.5MB`, `2GB`, `512B` +- **Time Durations** like `500ms`, `2s`, `1min`, `1h` + +### New Directive: `aggregate-stats` + +This directive aggregates byte size and time duration columns and returns total/average results. + +**Usage:** +```text +aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec + + ![cm-available](https://cdap-users.herokuapp.com/assets/cm-available.svg) ![cdap-transform](https://cdap-users.herokuapp.com/assets/cdap-transform.svg) [![Build Status](https://travis-ci.org/cdapio/hydrator-plugins.svg?branch=develop)](https://travis-ci.org/cdapio/hydrator-plugins)