Skip to content
Open

HEY #969

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Data Prep

## New Feature: Byte Size & Time Duration Parsers + AggregateStats Directive

This enhancement adds native support in Wrangler for parsing:

- **Byte Sizes** like `10KB`, `1.5MB`, `2GB`, `512B`
- **Time Durations** like `500ms`, `2s`, `1min`, `1h`

### New Directive: `aggregate-stats`

This directive aggregates byte size and time duration columns and returns total/average results.

**Usage:**
```text
aggregate-stats :data_transfer_size :response_time total_size_mb total_time_sec


![cm-available](https://cdap-users.herokuapp.com/assets/cm-available.svg)
![cdap-transform](https://cdap-users.herokuapp.com/assets/cdap-transform.svg)
[![Build Status](https://travis-ci.org/cdapio/hydrator-plugins.svg?branch=develop)](https://travis-ci.org/cdapio/hydrator-plugins)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.cdap.wrangler.parser;

import io.cdap.wrangler.api.parser.ByteSize;
import org.junit.Assert;
import org.junit.Test;

public class ByteSizeTest {

@Test
public void testByteSize() {
ByteSize size = new ByteSize("10MB");
Assert.assertEquals(10 * 1024 * 1024, size.getBytes());
}

@Test
public void testDifferentUnits() {
Assert.assertEquals(1024, new ByteSize("1KB").getBytes());
Assert.assertEquals(1, new ByteSize("1B").getBytes());
Assert.assertEquals(1024L * 1024 * 1024, new ByteSize("1GB").getBytes());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.cdap.wrangler.api.parser;

public class TimeDuration extends Token {
private long millis;

public TimeDuration(String value) {
super(value);
this.millis = parseToMillis(value);
}

private long parseToMillis(String value) {
value = value.toLowerCase().trim();
if (value.endsWith("ms")) return (long)(Double.parseDouble(value.replace("ms", "")));
if (value.endsWith("s")) return (long)(Double.parseDouble(value.replace("s", "")) * 1000);
if (value.endsWith("min")) return (long)(Double.parseDouble(value.replace("min", "")) * 60 * 1000);
if (value.endsWith("h")) return (long)(Double.parseDouble(value.replace("h", "")) * 60 * 60 * 1000);
return 0;
}

public long getMillis() {
return millis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,9 @@ fragment Int
fragment Digit
: [0-9]
;

BYTE_SIZE: [0-9]+ ('.' [0-9]+)? BYTE_UNIT;
fragment BYTE_UNIT: ('B' | 'KB' | 'MB' | 'GB');

TIME_DURATION: [0-9]+ ('.' [0-9]+)? TIME_UNIT;
fragment TIME_UNIT: ('ms' | 's' | 'min' | 'h');
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
public class AggregateStats implements Directive {
// Fields to store column names and totals
// Implement initialize(), define(), execute(), and finalize()

@Override
public UsageDefinition define() {
return UsageDefinition.builder("aggregate-stats")
.define("inputByteSizeColumn", TokenType.COLUMN_NAME)
.define("inputTimeColumn", TokenType.COLUMN_NAME)
.define("outputSizeColumn", TokenType.COLUMN_NAME)
.define("outputTimeColumn", TokenType.COLUMN_NAME)
.build();
}

@Override
public void initialize(Arguments args) {
// Read and store column names from args
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) {
// For each row, read values, convert to bytes/ms, add to totals
// Return a single row with calculated totals
}
}
23 changes: 23 additions & 0 deletions wrangler-core/src/main/java/io/cdap/wrangler/parser/ByteSize.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.cdap.wrangler.api.parser;

public class ByteSize extends Token {
private long bytes;

public ByteSize(String value) {
super(value);
this.bytes = parseToBytes(value);
}

private long parseToBytes(String value) {
value = value.toUpperCase().trim();
if (value.endsWith("KB")) return (long)(Double.parseDouble(value.replace("KB", "")) * 1024);
if (value.endsWith("MB")) return (long)(Double.parseDouble(value.replace("MB", "")) * 1024 * 1024);
if (value.endsWith("GB")) return (long)(Double.parseDouble(value.replace("GB", "")) * 1024 * 1024 * 1024);
if (value.endsWith("B")) return (long)(Double.parseDouble(value.replace("B", "")));
return 0;
}

public long getBytes() {
return bytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.cdap.wrangler.parser;

import io.cdap.wrangler.api.parser.ByteSize;
import org.junit.Assert;
import org.junit.Test;

public class ByteSizeTest {

@Test
public void testByteSize() {
ByteSize size = new ByteSize("10MB");
Assert.assertEquals(10 * 1024 * 1024, size.getBytes());
}

@Test
public void testDifferentUnits() {
Assert.assertEquals(1024, new ByteSize("1KB").getBytes());
Assert.assertEquals(1, new ByteSize("1B").getBytes());
Assert.assertEquals(1024L * 1024 * 1024, new ByteSize("1GB").getBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,15 @@ private SourceInfo getOriginalSource(ParserRuleContext ctx) {
int column = ctx.getStart().getCharPositionInLine();
return new SourceInfo(lineno, column, text);
}

@Override
public Token visitByteSizeArg(DirectivesParser.ByteSizeArgContext ctx) {
return new ByteSize(ctx.getText());
}

@Override
public Token visitTimeDurationArg(DirectivesParser.TimeDurationArgContext ctx) {
return new TimeDuration(ctx.getText());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.cdap.wrangler.api.parser;

public class TimeDuration extends Token {
private long millis;

public TimeDuration(String value) {
super(value);
this.millis = parseToMillis(value);
}

private long parseToMillis(String value) {
value = value.toLowerCase().trim();
if (value.endsWith("ms")) return (long)(Double.parseDouble(value.replace("ms", "")));
if (value.endsWith("s")) return (long)(Double.parseDouble(value.replace("s", "")) * 1000);
if (value.endsWith("min")) return (long)(Double.parseDouble(value.replace("min", "")) * 60 * 1000);
if (value.endsWith("h")) return (long)(Double.parseDouble(value.replace("h", "")) * 60 * 60 * 1000);
return 0;
}

public long getMillis() {
return millis;
}
}