diff --git a/README.md b/README.md index 4aa6eeb3a..604ecc80f 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ These directives are currently available: | [Write JSON Object](wrangler-docs/directives/write-as-json-object.md) | Composes a JSON object based on the fields specified. | | [Format as Currency](wrangler-docs/directives/format-as-currency.md) | Formats a number as currency as specified by locale. | | **Transformations** | | +| [Aggregate Stats](wrangler-docs/directives/aggregate-stats.md) | Analyzes byte size and time duration values, generating statistics | | [Changing Case](wrangler-docs/directives/changing-case.md) | Changes the case of column values | | [Cut Character](wrangler-docs/directives/cut-character.md) | Selects parts of a string value | | [Set Column](wrangler-docs/directives/set-column.md) | Sets the column value to the result of an expression execution | @@ -175,6 +176,28 @@ rates below are specified as *records/second*. | High (167 Directives) | 426 | 127,946,398 | 82,677,845,324 | 106,367.27 | | High (167 Directives) | 426 | 511,785,592 | 330,711,381,296 | 105,768.93 | +## Byte Size and Time Duration Support + +The Wrangler library provides support for parsing and aggregating byte size and time duration values. This feature allows you to work with human-readable size and duration values directly in your recipes. + +### Byte Size Units + +The following byte size units are supported: + +- B: Bytes +- KB: Kilobytes (1024 bytes) +- MB: Megabytes (1024 \* 1024 bytes) +- GB: Gigabytes (1024 _ 1024 _ 1024 bytes) +- TB: Terabytes (1024 _ 1024 _ 1024 \* 1024 bytes) + +### Time Duration Units + +The following time duration units are supported: + +- ms: Milliseconds +- s: Seconds +- m: Minutes +- h: Hours ## Contact @@ -214,5 +237,6 @@ and limitations under the License. Cask is a trademark of Cask Data, Inc. All rights reserved. + Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. diff --git a/README.pdf b/README.pdf new file mode 100644 index 000000000..5bfd086b6 Binary files /dev/null and b/README.pdf differ diff --git a/pom.xml b/pom.xml index a4ee67662..843dfad8c 100644 --- a/pom.xml +++ b/pom.xml @@ -176,6 +176,16 @@ ${testSourceLocation} + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + 1 + true + --add-opens java.base/java.lang=ALL-UNNAMED + + org.apache.maven.plugins maven-compiler-plugin diff --git a/prompts.text b/prompts.text new file mode 100644 index 000000000..f5a273d1b --- /dev/null +++ b/prompts.text @@ -0,0 +1,125 @@ +Prompt 1 +I am modifying an ANTLR grammar for a Java project. Help me write lexer and parser rules in Directives.g4 for +two new tokens: + +BYTE_SIZE → matches values like "10KB", "2.5MB", "1GB". + +TIME_DURATION → matches values like "5ms", "3.2s", "1min". + +Also, include helpful fragments like BYTE_UNIT, TIME_UNIT. Finally, show how to update the value parser rule +(or create byteSizeArg, timeDurationArg if needed) so the new tokens are accepted as directive arguments. + + + +Prompt 2: Create ByteSize and TimeDuration Token Classes +I am working on a Java project where tokens represent directive arguments. Help me create two new token classes: + +ByteSize.java and TimeDuration.java + +Each class should: + +Extend io.cdap.wrangler.api.parser.Token + +Parse strings like "10KB", "2.5MB", "1GB" (for ByteSize) and "500ms", "1.2s", "3min" (for TimeDuration) + +Internally store the value in canonical units (bytes for ByteSize, milliseconds or nanoseconds for TimeDuration) + +Provide getter methods like getBytes() and getMilliseconds() + + +Prompt 3: Update Token Types and Directive Argument Support +I am extending a token parsing framework in Java for a data transformation tool. Guide me to: + +Add two new token types: BYTE_SIZE and TIME_DURATION in the token registry or enum used (if any). + +Update the logic that defines valid argument types in directives, +so that BYTE_SIZE and TIME_DURATION can be accepted where appropriate. + +Mention any necessary updates in registration/configuration files or classes if applicable. + + + +Prompt 4: Add Visitor Methods for New Parser Rules +In my ANTLR-based Java parser for a directive language, +I’ve added two new parser rules: byteSizeArg and timeDurationArg. Help me: + +Implement visitor methods visitByteSizeArg and visitTimeDurationArg in the appropriate visitor or parser class. + +These methods should return instances of ByteSize and TimeDuration tokens respectively using ctx.getText(). + +Ensure these token instances are added to the TokenGroup for the directive being parsed. + + + +Prompt 5: Implement New AggregateStats Directive +I’m creating a new directive class called AggregateStats in a Java-based data transformation engine. Guide me to: + +Implement the Directive interface + +Accept at least 4 arguments: + +Source column (byte sizes) + +Source column (time durations) + +Target column for total size + +Target column for total/average time + +Optionally accept: + +Aggregation type (total, avg) + +Output unit (MB, GB, seconds, minutes) + +In initialize, store the argument values + +In execute, use ExecutorContext.getStore() to: + +Accumulate byte size and time duration values (convert to canonical units) + +In finalize, return a single Row with converted results (e.g., MB, seconds) + + +Prompt 6: Write Unit Tests for ByteSize and TimeDuration +Help me write JUnit tests for one Java class: ByteSize and TimeDuration. + These class parse strings like "10KB" and "500ms" respectively. + +Test valid cases: "10KB", "1.5MB", "1GB" for ByteSize and "500ms", "2s", "1min" for TimeDuration. + +Verify that getBytes() or getMilliseconds() return the correct canonical values. + +Include a few invalid input tests and assert that they throw proper exceptions. + + + + +Prompt 7: Write Parser Tests for New Grammar +I’ve added BYTE_SIZE and TIME_DURATION tokens to an ANTLR grammar. Help me write parser tests in Java to: + +Validate that inputs like "10KB", "1.5MB", "5ms", "3min" are accepted in directive recipes. + +Use test classes like GrammarBasedParserTest.java or RecipeCompilerTest.java. + +Also test invalid values (e.g., "10KBB", "1..5MB", "ms5") and ensure they are rejected. + + + + +Prompt 8: Write Integration Test for AggregateStats Directive +I’ve created an AggregateStats directive that aggregates byte size and time duration columns. Help me write an integration test using TestingRig to: + +Create input data: List with columns like data_transfer_size and response_time using values like "1MB", "500KB", "2s", "500ms". + +Define recipe like: + +java + +String[] recipe = new String[] { + "aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec" +}; +Execute with TestingRig.execute(recipe, rows) + +Assert that the resulting row contains correct aggregated values (in MB and seconds) + +Use a delta tolerance (e.g., 0.001) for comparing float values \ No newline at end of file diff --git a/wrangler-api/pom.xml b/wrangler-api/pom.xml index e97464a64..b595000dd 100644 --- a/wrangler-api/pom.xml +++ b/wrangler-api/pom.xml @@ -39,6 +39,35 @@ ${cdap.version} provided - + + + + org.apache.rat + apache-rat-plugin + + rat-excludes.txt + 2 + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.0.0 + + checkstyle.xml + suppressions.xml + + + + validate + validate + + check + + + + + + diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java new file mode 100644 index 000000000..27fd543ab --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/ByteSize.java @@ -0,0 +1,156 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + package io.cdap.wrangler.api.parser; + + import com.google.gson.JsonElement; + import com.google.gson.JsonObject; + import io.cdap.wrangler.api.annotations.PublicEvolving; + + /** + * Represents a ByteSize token, capable of parsing strings like "10KB", "1.5MB", + * and converting them into bytes. + */ + @PublicEvolving + public class ByteSize implements Token { + + // Multipliers for each unit + private static final double KILOBYTE = 1024.0; + private static final double MEGABYTE = KILOBYTE * 1024.0; + private static final double GIGABYTE = MEGABYTE * 1024.0; + private static final double TERABYTE = GIGABYTE * 1024.0; + + // Parsed byte value stored as long + private final long bytesValue; + + /** + * Constructs a ByteSize token by parsing the given size string. + * + * @param sizeString The string to parse (e.g., "10KB", "1.5MB"). + * @throws IllegalArgumentException If the string format is invalid. + */ + public ByteSize(String sizeString) { + this.bytesValue = parseSize(sizeString); + } + + /** + * Parses a size string and converts it into bytes. + * + * @param sizeString The input string representing a byte size. + * @return The size in bytes. + */ + private long parseSize(String sizeString) { + if (sizeString == null || sizeString.trim().isEmpty()) { + throw new IllegalArgumentException("Size string must not be null or empty."); + } + + sizeString = sizeString.trim().toUpperCase(); + String numericPart; + double multiplier; + + try { + if (sizeString.endsWith("KB")) { + numericPart = sizeString.substring(0, sizeString.length() - 2); + multiplier = KILOBYTE; + } else if (sizeString.endsWith("MB")) { + numericPart = sizeString.substring(0, sizeString.length() - 2); + multiplier = MEGABYTE; + } else if (sizeString.endsWith("GB")) { + numericPart = sizeString.substring(0, sizeString.length() - 2); + multiplier = GIGABYTE; + } else if (sizeString.endsWith("TB")) { + numericPart = sizeString.substring(0, sizeString.length() - 2); + multiplier = TERABYTE; + } else if (sizeString.endsWith("B")) { + numericPart = sizeString.substring(0, sizeString.length() - 1); + multiplier = 1.0; + } else { + throw new IllegalArgumentException("Invalid byte size format or unsupported unit in string: " + sizeString); + } + + if (numericPart.isEmpty()) { + throw new IllegalArgumentException("Missing numeric value in size string: " + sizeString); + } + + double parsedValue = Double.parseDouble(numericPart); + if (parsedValue < 0) { + throw new IllegalArgumentException("Size value cannot be negative: " + sizeString); + } + + return (long) (parsedValue * multiplier); // Truncate to long + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid numeric value in size string: " + sizeString, e); + } + } + + /** + * @return Size in bytes. + */ + public long getBytes() { + return bytesValue; + } + + /** + * @return Size in kilobytes. + */ + public double getKiloBytes() { + return bytesValue / KILOBYTE; + } + + /** + * @return Size in megabytes. + */ + public double getMegaBytes() { + return bytesValue / MEGABYTE; + } + + /** + * @return Size in gigabytes. + */ + public double getGigaBytes() { + return bytesValue / GIGABYTE; + } + + /** + * @return Size in terabytes. + */ + public double getTeraBytes() { + return bytesValue / TERABYTE; + } + + @Override + public Object value() { + return bytesValue; + } + + @Override + public TokenType type() { + return TokenType.BYTE_SIZE; + } + + @Override + public JsonElement toJson() { + JsonObject object = new JsonObject(); + object.addProperty("type", TokenType.BYTE_SIZE.name()); + object.addProperty("value", bytesValue); + return object; + } + + @Override + public String toString() { + return bytesValue + "B"; + } + } \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java new file mode 100644 index 000000000..c86802d20 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TimeDuration.java @@ -0,0 +1,155 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * Copyright © 2023 Google LLC // Update copyright year/holder if needed + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + package io.cdap.wrangler.api.parser; + + import com.google.gson.JsonElement; + import com.google.gson.JsonObject; + import io.cdap.wrangler.api.annotations.PublicEvolving; + + import java.util.concurrent.TimeUnit; + + /** + * Represents a Token for time durations, capable of parsing strings + * like "5ms", "2.1s", or "3h" and converting them into milliseconds. + */ + @PublicEvolving + public class TimeDuration implements Token { + + // Time unit conversion factors (all in terms of milliseconds) + private static final double MILLIS_IN_NANOSECOND = 1.0 / 1_000_000.0; + private static final double MILLIS_IN_MICROSECOND = 1.0 / 1_000.0; + private static final double MILLIS_IN_SECOND = 1_000.0; + private static final double MILLIS_IN_MINUTE = 60.0 * MILLIS_IN_SECOND; + private static final double MILLIS_IN_HOUR = 60.0 * MILLIS_IN_MINUTE; + private static final double MILLIS_IN_DAY = 24.0 * MILLIS_IN_HOUR; + + // Final parsed duration value (in milliseconds) + private final double durationMillis; + + /** + * Constructs a TimeDuration object by parsing the provided duration string. + * + * @param inputDuration A string representing duration (e.g., "5ms", "2.5h"). + * @throws IllegalArgumentException if the string format is invalid. + */ + public TimeDuration(String inputDuration) { + this.durationMillis = parseDuration(inputDuration); + } + + /** + * Parses a duration string and converts it to milliseconds. + * + * @param durationStr Input duration string to parse. + * @return Duration value in milliseconds. + * @throws IllegalArgumentException if format or value is invalid. + */ + private double parseDuration(String durationStr) { + if (durationStr == null || durationStr.trim().isEmpty()) { + throw new IllegalArgumentException("Duration string must not be null or empty."); + } + + durationStr = durationStr.trim().toLowerCase(); + String numericPart; + double conversionFactor; + + try { + if (durationStr.endsWith("ns")) { + numericPart = durationStr.substring(0, durationStr.length() - 2); + conversionFactor = MILLIS_IN_NANOSECOND; + } else if (durationStr.endsWith("us")) { + numericPart = durationStr.substring(0, durationStr.length() - 2); + conversionFactor = MILLIS_IN_MICROSECOND; + } else if (durationStr.endsWith("ms")) { + numericPart = durationStr.substring(0, durationStr.length() - 2); + conversionFactor = 1.0; + } else if (durationStr.endsWith("s")) { + numericPart = durationStr.substring(0, durationStr.length() - 1); + conversionFactor = MILLIS_IN_SECOND; + } else if (durationStr.endsWith("min")) { + numericPart = durationStr.substring(0, durationStr.length() - 3); + conversionFactor = MILLIS_IN_MINUTE; + } else if (durationStr.endsWith("m")) { + numericPart = durationStr.substring(0, durationStr.length() - 1); + conversionFactor = MILLIS_IN_MINUTE; + } else if (durationStr.endsWith("h")) { + numericPart = durationStr.substring(0, durationStr.length() - 1); + conversionFactor = MILLIS_IN_HOUR; + } else if (durationStr.endsWith("d")) { + numericPart = durationStr.substring(0, durationStr.length() - 1); + conversionFactor = MILLIS_IN_DAY; + } else { + throw new IllegalArgumentException( + "Invalid time duration format or unsupported unit in string: " + durationStr); + } + + if (numericPart.isEmpty()) { + throw new IllegalArgumentException("Missing numeric value in duration string: " + durationStr); + } + + double parsedNumber = Double.parseDouble(numericPart); + if (parsedNumber < 0) { + throw new IllegalArgumentException("Duration value cannot be negative: " + durationStr); + } + + return parsedNumber * conversionFactor; + + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid numeric value in duration string: " + durationStr, e); + } + } + + /** + * Converts and returns the duration in the given {@link TimeUnit}. + * + * @param unit Time unit to convert to. + * @return Converted duration in the specified unit (rounded). + */ + public long getDuration(TimeUnit unit) { + return unit.convert((long) this.durationMillis, TimeUnit.MILLISECONDS); + } + + /** + * @return Canonical duration value in milliseconds. + */ + public double getValue() { + return durationMillis; + } + + @Override + public Object value() { + return durationMillis; + } + + @Override + public TokenType type() { + return TokenType.TIME_DURATION; + } + + @Override + public JsonElement toJson() { + JsonObject object = new JsonObject(); + object.addProperty("type", TokenType.TIME_DURATION.name()); + object.addProperty("value", durationMillis); + return object; + } + + @Override + public String toString() { + return durationMillis + "ms"; + } + } \ No newline at end of file diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java index 8c93b0e6a..04a796945 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/TokenType.java @@ -40,6 +40,8 @@ * @see Expression * @see Text * @see TextList + * @see ByteSize + * @see TimeDuration */ @PublicEvolving public enum TokenType implements Serializable { @@ -152,5 +154,22 @@ public enum TokenType implements Serializable { * Represents the enumerated type for the object of type {@code String} with restrictions * on characters that can be present in a string. */ - IDENTIFIER + IDENTIFIER, + + + /** + * Enum representing specialized data types that require unit-based parsing. + * Indicates a value of type {@code ByteSize}, which consists of a numeric component + * followed by a byte unit (e.g., B, KB, MB, GB, TB, PB). + * Examples include: "10KB", "1.5MB", "2GB". + */ + BYTE_SIZE, + + /** + * Represents the enumerated type for the object of type {@code TimeDuration} type. + * This type is associated with a numeric value followed by a time unit (e.g.,ns, ms, s, m, h, d). + * For example: "100ms", "5s", "2.5h". + */ + TIME_DURATION, + } diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java index 78800b7d1..bc4bad9ce 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/parser/UsageDefinition.java @@ -126,6 +126,10 @@ public String toString() { sb.append("prop:{key:value,[key:value]*"); } else if (token.type().equals(TokenType.RANGES)) { sb.append("start:end=[bool|text|numeric][,start:end=[bool|text|numeric]*"); + } else if (token.type().equals(TokenType.BYTE_SIZE)) { + sb.append(":").append(token.name()); + } else if (token.type().equals(TokenType.TIME_DURATION)) { + sb.append(":").append(token.name()); } } diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml index e2dcb3c2b..e3e6069e8 100644 --- a/wrangler-core/pom.xml +++ b/wrangler-core/pom.xml @@ -45,6 +45,7 @@ io.cdap.wrangler wrangler-api ${project.version} + compile io.cdap.wrangler @@ -99,6 +100,12 @@ 2.0.2 test + + junit + junit + 4.13.2 + test + org.powermock powermock-api-mockito2 @@ -320,6 +327,16 @@ + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + 1 + true + --add-opens java.base/java.lang=ALL-UNNAMED + + org.antlr antlr4-maven-plugin diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index 7c517ed6a..1a62308ca 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -57,6 +57,8 @@ directive | text | number | bool + | byteSizeArg + | timeDurationArg | column | colList | numberList @@ -140,7 +142,7 @@ numberRange ; value - : String | Number | Column | Bool + : String | Number | Column | Bool | ByteSize | TimeDuration ; ecommand @@ -167,6 +169,14 @@ bool : Bool ; +byteSizeArg + : BYTE_SIZE + ; + +timeDurationArg + : TIME_DURATION + ; + condition : OBrace (~CBrace | condition)* CBrace ; @@ -253,6 +263,14 @@ Bool | 'false' ; +BYTE_SIZE + : DecimalNumber BYTE_UNIT + ; + +TIME_DURATION + : DecimalNumber TIME_UNIT + ; + Number : Int ('.' Digit*)? ; @@ -311,3 +329,29 @@ fragment Int fragment Digit : [0-9] ; + + +fragment DecimalNumber + : Digit+ ('.' Digit+)? // Matches decimal numbers like 123, 1.5, 0.25, etc. + ; + + +fragment ByteUnit + : [kK][bB] // Kilobytes + | [mM][bB] // Megabytes + | [gG][bB] // Gigabytes + | [tT][bB] // Terabytes + | [pP][bB] // Petabytes + | [bB] // Bytes + ; + + +fragment TimeUnit + : [nN][sS] // Nanoseconds + | [uU][sS] // Microseconds + | [mM][sS] // Milliseconds + | [sS] // Seconds + | [mM] // Minutes + | [hH] // Hours + | [dD] // Days + ; \ No newline at end of file diff --git a/wrangler-core/src/main/java/io/cdap/directives/aggregates/aggregateStats.java b/wrangler-core/src/main/java/io/cdap/directives/aggregates/aggregateStats.java new file mode 100644 index 000000000..cfe5dd76d --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/directives/aggregates/aggregateStats.java @@ -0,0 +1,180 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.directives.aggregates; + +import io.cdap.cdap.api.annotation.Name; + +import io.cdap.cdap.api.annotation.Plugin; + +import io.cdap.cdap.api.data.schema.Schema; + +import io.cdap.wrangler.api.Arguments; + +import io.cdap.wrangler.api.Directive; + +import io.cdap.wrangler.api.DirectiveExecutionException; + +import io.cdap.wrangler.api.DirectiveParseException; + +import io.cdap.wrangler.api.ExecutorContext; + +import io.cdap.wrangler.api.Row; + +import io.cdap.wrangler.api.annotations.Categories; + +import io.cdap.wrangler.api.parser.ByteSize; + +import io.cdap.wrangler.api.parser.ColumnName; + +import io.cdap.wrangler.api.parser.Text; + +import io.cdap.wrangler.api.parser.TimeDuration; + +import io.cdap.wrangler.api.parser.TokenType; + +import io.cdap.wrangler.api.parser.UsageDefinition; + +import java.util.ArrayList; + +import java.util.List; +/** + * Directive to aggregate statistics over a dataset. It computes + * total bytes and total time, then outputs the summary in MB and seconds. + */ +@Plugin(type = Directive.TYPE) +@Name(AggregateStats.NAME) +@Categories(categories = { "data-aggregation" }) +public class AggregateStats implements Directive { + + public static final String NAME = "aggregate-stats"; + + // Input column names + private String inputByteColumn; + private String inputTimeColumn; + + // Output column names + private String resultByteColumn; + private String resultTimeColumn; + + // Aggregated values + private double accumulatedBytes = 0.0; + private double accumulatedTimeMs = 0.0; + private int processedRowCount = 0; + + /** + * Defines the parameters required to use this directive. + * + * @return UsageDefinition instance outlining directive inputs. + */ + @Override + public UsageDefinition define() { + UsageDefinition.Builder builder = UsageDefinition.builder(NAME); + builder.define("byteCol", TokenType.COLUMN_NAME); + builder.define("timeCol", TokenType.COLUMN_NAME); + builder.define("outputSizeCol", TokenType.TEXT); + builder.define("outputTimeCol", TokenType.TEXT); + return builder.build(); + } + + /** + * Initializes the directive with user-provided arguments. + * + * @param args The input arguments. + * @throws DirectiveParseException if parsing fails. + */ + @Override + public void initialize(Arguments args) throws DirectiveParseException { + inputByteColumn = ((ColumnName) args.value("byteCol")).value(); + inputTimeColumn = ((ColumnName) args.value("timeCol")).value(); + resultByteColumn = ((Text) args.value("outputSizeCol")).value(); + resultTimeColumn = ((Text) args.value("outputTimeCol")).value(); + } + + /** + * Executes aggregation logic over input rows. + * + * @param rows List of input rows. + * @param ctx Execution context. + * @return List containing a single row with aggregated metrics. + * @throws DirectiveExecutionException if any error occurs during execution. + */ + @Override + public List execute(List rows, ExecutorContext ctx) throws DirectiveExecutionException { + try { + for (Row row : rows) { + if (row.find(inputByteColumn) != -1 && row.find(inputTimeColumn) != -1) { + String byteValue = row.getValue(inputByteColumn).toString(); + String timeValue = row.getValue(inputTimeColumn).toString(); + + // Parse input values + ByteSize byteSize = new ByteSize(byteValue); + TimeDuration duration = new TimeDuration(timeValue); + + accumulatedBytes += byteSize.getBytes(); + accumulatedTimeMs += duration.getValue(); + processedRowCount++; + } + } + + if (processedRowCount == 0) { + return new ArrayList<>(); + } + + // Prepare output + List results = new ArrayList<>(); + Row resultRow = new Row(); + + resultRow.add(resultByteColumn, accumulatedBytes / (1024.0 * 1024.0)); // Convert bytes to MB + resultRow.add(resultTimeColumn, accumulatedTimeMs / 1000.0); // Convert ms to seconds + + results.add(resultRow); + return results; + + } catch (Exception e) { + throw new DirectiveExecutionException( + String.format("Error aggregating stats: %s", e.getMessage()) + ); + } + } + + /** + * Defines the output schema for the transformed dataset. + * + * @param inputSchema The schema of incoming data. + * @return Schema of the resulting data. + */ + public Schema getOutputSchema(Schema inputSchema) { + List fields = new ArrayList<>(); + fields.add(Schema.Field.of(resultByteColumn, Schema.of(Schema.Type.DOUBLE))); + fields.add(Schema.Field.of(resultTimeColumn, Schema.of(Schema.Type.DOUBLE))); + return Schema.recordOf("aggregate-stats", fields); + } + + /** + * Cleans up and resets the internal state of the directive. + */ + @Override + public void destroy() { + accumulatedBytes = 0.0; + accumulatedTimeMs = 0.0; + processedRowCount = 0; + + inputByteColumn = null; + inputTimeColumn = null; + resultByteColumn = null; + resultTimeColumn = null; + } +} \ No newline at end of file diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java index ac35e7a5e..f9503cb59 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/RecipeVisitor.java @@ -22,6 +22,7 @@ import io.cdap.wrangler.api.Triplet; import io.cdap.wrangler.api.parser.Bool; import io.cdap.wrangler.api.parser.BoolList; +import io.cdap.wrangler.api.parser.ByteSize; import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.DirectiveName; @@ -33,6 +34,7 @@ import io.cdap.wrangler.api.parser.Ranges; import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TextList; +import io.cdap.wrangler.api.parser.TimeDuration; import io.cdap.wrangler.api.parser.Token; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.misc.Interval; @@ -317,6 +319,39 @@ public RecipeSymbol.Builder visitStringList(DirectivesParser.StringListContext c return builder; } + /** + * Parses a byte size argument (e.g., "10MB") from the directive script and creates + * a {@link ByteSize} token. This token is then added to the directive builder for + * further processing during execution. + * + * @param ctx the parser context representing a BYTE_SIZE argument + * @return the updated {@link RecipeSymbol.Builder} instance + */ +@Override +public RecipeSymbol.Builder visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) { + String byteSizeValue = ctx.BYTE_SIZE().getText(); // e.g., "10MB" + ByteSize byteSize = new ByteSize(byteSizeValue); + builder.addToken(byteSize); + return builder; +} + +/** + * Parses a time duration argument (e.g., "5s", "1h") from the directive script and creates + * a {@link TimeDuration} token. This token is added to the directive builder to support + * duration-based operations within directives. + * + * @param ctx the parser context representing a TIME_DURATION argument + * @return the updated {@link RecipeSymbol.Builder} instance + */ +@Override +public RecipeSymbol.Builder visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) { + String durationValue = ctx.TIME_DURATION().getText(); // e.g., "5s" + TimeDuration timeDuration = new TimeDuration(durationValue); + builder.addToken(timeDuration); + return builder; +} + + private SourceInfo getOriginalSource(ParserRuleContext ctx) { int a = ctx.getStart().getStartIndex(); int b = ctx.getStop().getStopIndex(); diff --git a/wrangler-core/src/test/java/io/cdap/directives/aggregates/aggregatestats.java b/wrangler-core/src/test/java/io/cdap/directives/aggregates/aggregatestats.java new file mode 100644 index 000000000..f7e68814b --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/directives/aggregates/aggregatestats.java @@ -0,0 +1,189 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.directives.aggregates; + +import com.google.gson.JsonElement; +import io.cdap.wrangler.api.Arguments; +import io.cdap.wrangler.api.Row; +import io.cdap.wrangler.api.parser.ColumnName; +import io.cdap.wrangler.api.parser.Text; +import io.cdap.wrangler.api.parser.Token; +import io.cdap.wrangler.api.parser.TokenType; +import org.junit.Assert; +import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + +public class AggregateStatsTest { + + @Test + public void testAggregateStatsDirective() throws Exception { + // Prepare sample input rows with size in KB and response time in seconds/milliseconds + List rows = new ArrayList<>(); + rows.add(new Row().add("data_transfer_size", "12KB").add("response_time", "1.2s")); + rows.add(new Row().add("data_transfer_size", "8KB").add("response_time", "800ms")); + + // Instantiate the directive + AggregateStats directive = new AggregateStats(); + + // Mock the arguments to pass to the directive + Arguments mockArgs = createMockArguments(); + + // Initialize and execute the directive + directive.initialize(mockArgs); + List result = directive.execute(rows, null); + + // Validate output row count + Assert.assertEquals(1, result.size()); + Row output = result.get(0); + + // Expected: 20KB = 20480 bytes → 20480 / (1024 * 1024) + double expectedMB = 20480.0 / (1024 * 1024); + double actualMB = (Double) output.getValue("total_size_mb"); + + // Expected time: 1.2s + 0.8s = 2.0s + double expectedSeconds = 2.0; + double actualSeconds = (Double) output.getValue("total_time_sec"); + + Assert.assertEquals(expectedMB, actualMB, 0.001); + Assert.assertEquals(expectedSeconds, actualSeconds, 0.001); + } + + @Test + public void testEmptyInputRows() throws Exception { + // Test case with no input rows + List rows = new ArrayList<>(); + AggregateStats directive = new AggregateStats(); + Arguments mockArgs = createMockArguments(); + + directive.initialize(mockArgs); + List result = directive.execute(rows, null); + + Assert.assertEquals(0, result.size()); + } + + @Test + public void testInvalidData() throws Exception { + // Input rows with invalid formats + List rows = new ArrayList<>(); + rows.add(new Row().add("data_transfer_size", "abcKB").add("response_time", "xyzTime")); + + AggregateStats directive = new AggregateStats(); + Arguments mockArgs = createMockArguments(); + + directive.initialize(mockArgs); + try { + directive.execute(rows, null); + Assert.fail("Expected an exception due to invalid data format"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Error aggregating stats")); + } + } + + @Test + public void testLargeData() throws Exception { + // Test with large values in MB and time in hours/minutes + List rows = new ArrayList<>(); + rows.add(new Row().add("data_transfer_size", "2048MB").add("response_time", "1h")); + rows.add(new Row().add("data_transfer_size", "1024MB").add("response_time", "30m")); + + AggregateStats directive = new AggregateStats(); + Arguments mockArgs = createMockArguments(); + + directive.initialize(mockArgs); + List result = directive.execute(rows, null); + + Assert.assertEquals(1, result.size()); + + Row output = result.get(0); + + double expectedMB = 3072.0; // 2048MB + 1024MB + double actualMB = (Double) output.getValue("total_size_mb"); + + double expectedSeconds = 5400.0; // 1h + 30m = 3600 + 1800 = 5400s + double actualSeconds = (Double) output.getValue("total_time_sec"); + + Assert.assertEquals(expectedMB, actualMB, 0.001); + Assert.assertEquals(expectedSeconds, actualSeconds, 0.001); + } + + @Test + public void testEdgeCases() throws Exception { + // Edge case: zero data and zero time + List rows = new ArrayList<>(); + rows.add(new Row().add("data_transfer_size", "0KB").add("response_time", "0s")); + + AggregateStats directive = new AggregateStats(); + Arguments mockArgs = createMockArguments(); + + directive.initialize(mockArgs); + List result = directive.execute(rows, null); + + Assert.assertEquals(1, result.size()); + + Row output = result.get(0); + + double actualMB = (Double) output.getValue("total_size_mb"); + double actualSeconds = (Double) output.getValue("total_time_sec"); + + Assert.assertEquals(0.0, actualMB, 0.001); + Assert.assertEquals(0.0, actualSeconds, 0.001); + } + + // Create mock implementation of Arguments for unit testing + private Arguments createMockArguments() { + return new Arguments() { + @SuppressWarnings("unchecked") + @Override + public T value(String name) { + switch (name) { + case "byteCol": return (T) new ColumnName("data_transfer_size"); + case "timeCol": return (T) new ColumnName("response_time"); + case "outputSizeCol": return (T) new Text("total_size_mb"); + case "outputTimeCol": return (T) new Text("total_time_sec"); + } + return null; + } + + @Override public int size() { + return 4; + } + + @Override public boolean contains(String name) { + return true; + } + + @Override public TokenType type(String name) { + return null; + } + + @Override public int line() { + return 1; + } + + @Override public int column() { + return 0; + } + + @Override public String source() { + return "aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec"; + } + @Override public JsonElement toJson() { + return null; + } + }; + } +} \ No newline at end of file diff --git a/wrangler-core/src/test/java/io/cdap/directives/parser/ByteSizeAndTimeDurationTest.java b/wrangler-core/src/test/java/io/cdap/directives/parser/ByteSizeAndTimeDurationTest.java new file mode 100644 index 000000000..342129498 --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/directives/parser/ByteSizeAndTimeDurationTest.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2017-2019 Cask Data, Inc. + * Copyright © 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package io.cdap.wrangler.parser; + + import io.cdap.wrangler.api.parser.ByteSize; + import io.cdap.wrangler.api.parser.TimeDuration; + import org.junit.Assert; + import org.junit.Test; + + /** + * Unit tests for validating {@link ByteSize} and {@link TimeDuration} parsing and conversions. + */ + public class ByteSizeAndTimeDurationTest { + + @Test + public void testByteSizeParsing() { + // Validating conversion from string to bytes + + Assert.assertEquals(2048L, new ByteSize("2KB").getBytes()); // 2 KB = 2048 bytes + Assert.assertEquals(5242880L, new ByteSize("5MB").getBytes()); // 5 MB = 5 * 1024 * 1024 + Assert.assertEquals(2147483648L, new ByteSize("2GB").getBytes()); // 2 GB = 2 * 1024^3 + Assert.assertEquals(256L, new ByteSize("256B").getBytes()); // 256 bytes + Assert.assertEquals(7340032L, new ByteSize("7MB").getBytes()); // 7 MB + Assert.assertEquals(3145728L, new ByteSize("3MB").getBytes()); // 3 MB + + // Case-insensitive unit support + Assert.assertEquals(1572864L, new ByteSize("1.5mb").getBytes()); // 1.5 MB (lowercase) + Assert.assertEquals(4096L, new ByteSize("4kB").getBytes()); // 4 KB (mixed case) + } + + @Test + public void testTimeDurationParsing() { + // Validating conversion from string to milliseconds + double delta = 0.0001; + + Assert.assertEquals(10.0, new TimeDuration("10ms").getValue(), delta); // 10 milliseconds + Assert.assertEquals(1500.0, new TimeDuration("1.5s").getValue(), delta); // 1.5 seconds + Assert.assertEquals(7200000.0, new TimeDuration("2h").getValue(), delta); // 2 hours + Assert.assertEquals(180000.0, new TimeDuration("3min").getValue(), delta); // 3 minutes + Assert.assertEquals(0.5, new TimeDuration("500us").getValue(), delta); // 500 microseconds + Assert.assertEquals(2.5, new TimeDuration("2500000ns").getValue(), delta); // 2.5 microseconds + Assert.assertEquals(172800000.0, new TimeDuration("2d").getValue(), delta); // 2 days + } +} + + \ No newline at end of file diff --git a/wrangler-test/pom.xml b/wrangler-test/pom.xml index 5e4878d86..d1b84789b 100644 --- a/wrangler-test/pom.xml +++ b/wrangler-test/pom.xml @@ -27,6 +27,12 @@ Wrangler Testing Framework + + io.cdap.wrangler + wrangler-api + ${project.version} + test + io.cdap.wrangler wrangler-core