Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -42,6 +44,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.faker.FakerUtils;
import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
Expand All @@ -65,6 +68,7 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand All @@ -84,9 +88,12 @@
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"),
})
@CapabilityDescription("This processor creates FlowFiles with records having random value for the specified fields. GenerateRecord is useful " +
"for testing, configuration, and simulation. It uses either user-defined properties to define a record schema or a provided schema and generates the specified number of records using " +
"random data for the fields in the schema.")
@CapabilityDescription("""
This processor creates FlowFiles with records having random value for the specified fields. GenerateRecord is useful
for testing, configuration, and simulation. It uses one of three methods to define a record schema: (1) a provided Avro Schema Text,
(2) a Predefined Schema template such as Person, Order, Event, Sensor, Product, Stock Trade, or Complete Example covering all data types,
or (3) user-defined dynamic properties. The processor generates the specified number of records using random data for the fields in the schema.
""")
@DynamicProperties({
@DynamicProperty(
name = "Field name in generated record",
Expand Down Expand Up @@ -122,16 +129,21 @@ public class GenerateRecord extends AbstractProcessor {

static final PropertyDescriptor NULLABLE_FIELDS = new PropertyDescriptor.Builder()
.name("Nullable Fields")
.description("Whether the generated fields will be nullable. Note that this property is ignored if Schema Text is set. Also it only affects the schema of the generated data, " +
"not whether any values will be null. If this property is true, see 'Null Value Percentage' to set the probability that any generated field will be null.")
.description("""
Whether the generated fields will be nullable. Note that this property is ignored if Schema Text is set.
Also it only affects the schema of the generated data, not whether any values will be null.
If this property is true, see 'Null Value Percentage' to set the probability that any generated field will be null.
""")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
static final PropertyDescriptor NULL_PERCENTAGE = new PropertyDescriptor.Builder()
.name("Null Value Percentage")
.description("The percent probability (0-100%) that a generated value for any nullable field will be null. Set this property to zero to have no null values, or 100 to have all " +
"null values.")
.description("""
The percent probability (0-100%) that a generated value for any nullable field will be null.
Set this property to zero to have no null values, or 100 to have all null values.
""")
.addValidator(StandardValidators.createLongValidator(0L, 100L, true))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(true)
Expand All @@ -141,18 +153,34 @@ public class GenerateRecord extends AbstractProcessor {

static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
.name("Schema Text")
.description("The text of an Avro-formatted Schema used to generate record data. If this property is set, any user-defined properties are ignored.")
.description("""
The text of an Avro-formatted Schema used to generate record data.
Only one of Schema Text, Predefined Schema, or user-defined dynamic properties should be configured.
""")
.addValidator(new AvroSchemaValidator())
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.build();

static final PropertyDescriptor PREDEFINED_SCHEMA = new PropertyDescriptor.Builder()
.name("Predefined Schema")
.description("""
Select a predefined schema template for quick record generation. Predefined schemas provide ready-to-use
templates with multiple fields covering various data types including nested records, arrays, maps, dates, timestamps, etc.
Only one of Schema Text, Predefined Schema, or user-defined dynamic properties should be configured.
Note: This feature is intended for quick testing purposes only. Predefined schemas may change between NiFi versions.
""")
.allowableValues(PredefinedRecordSchema.class)
.required(false)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
RECORD_WRITER,
NUM_RECORDS,
NULLABLE_FIELDS,
NULL_PERCENTAGE,
SCHEMA_TEXT
SCHEMA_TEXT,
PREDEFINED_SCHEMA
);

static final Relationship REL_SUCCESS = new Relationship.Builder()
Expand Down Expand Up @@ -188,6 +216,46 @@ public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}

@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();

final boolean hasSchemaText = validationContext.getProperty(SCHEMA_TEXT).isSet();
final boolean hasPredefinedSchema = validationContext.getProperty(PREDEFINED_SCHEMA).isSet();
final boolean hasDynamicProperties = validationContext.getProperties().keySet().stream()
.anyMatch(PropertyDescriptor::isDynamic);

int configuredCount = 0;
if (hasSchemaText) {
configuredCount++;
}
if (hasPredefinedSchema) {
configuredCount++;
}
if (hasDynamicProperties) {
configuredCount++;
}

if (configuredCount == 0) {
results.add(new ValidationResult.Builder()
.subject("Schema Configuration")
.valid(false)
.explanation("At least one schema configuration must be provided: Schema Text, Predefined Schema, or user-defined dynamic properties")
.build());
} else if (configuredCount > 1) {
results.add(new ValidationResult.Builder()
.subject("Schema Configuration")
.valid(false)
.explanation("Only one schema configuration should be provided. Found multiple configurations: "
+ (hasSchemaText ? "Schema Text, " : "")
+ (hasPredefinedSchema ? "Predefined Schema, " : "")
+ (hasDynamicProperties ? "Dynamic Properties" : ""))
.build());
}

return results;
}

@OnScheduled
public void onScheduled(final ProcessContext context) {
// Force the en-US Locale for more predictable results
Expand All @@ -198,6 +266,8 @@ public void onScheduled(final ProcessContext context) {
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {

final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions().getValue();
final String predefinedSchemaName = context.getProperty(PREDEFINED_SCHEMA).getValue();
final PredefinedRecordSchema predefinedSchema = PredefinedRecordSchema.fromName(predefinedSchemaName);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final int numRecords = context.getProperty(NUM_RECORDS).evaluateAttributeExpressions().asInteger();
final boolean nullable = context.getProperty(NULLABLE_FIELDS).asBoolean();
Expand All @@ -210,46 +280,57 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
try {
flowFile = session.write(flowFile, out -> {
final RecordSchema recordSchema;
final boolean usingSchema;
final SchemaSource schemaSource;
if (StringUtils.isNotEmpty(schemaText)) {
// Schema Text takes highest precedence
final Schema avroSchema = new Schema.Parser().parse(schemaText);
recordSchema = AvroTypeUtil.createSchema(avroSchema);
usingSchema = true;
schemaSource = SchemaSource.SCHEMA_TEXT;
} else if (predefinedSchema != null) {
// Predefined schema takes second precedence
recordSchema = predefinedSchema.getSchema(nullable);
schemaSource = SchemaSource.PREDEFINED;
} else {
// Generate RecordSchema from user-defined properties
final Map<String, String> fields = getFields(context);
recordSchema = generateRecordSchema(fields, nullable);
usingSchema = false;
schemaSource = SchemaSource.DYNAMIC_PROPERTIES;
}
try {
final RecordSchema writeSchema = writerFactory.getSchema(attributes, recordSchema);
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, attributes)) {
writer.beginRecordSet();

Record record;
List<RecordField> writeFieldNames = writeSchema.getFields();
Map<String, Object> recordEntries = new HashMap<>();
for (int i = 0; i < numRecords; i++) {
for (RecordField writeRecordField : writeFieldNames) {
final String writeFieldName = writeRecordField.getFieldName();
final Object writeFieldValue;
if (usingSchema) {
writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage);
} else {
final boolean nullValue =
nullPercentage > 0 && faker.number().numberBetween(0, 100) <= nullPercentage;

if (nullValue) {
writeFieldValue = null;
if (schemaSource == SchemaSource.PREDEFINED) {
// Use the predefined schema's optimized value generation
final Map<String, Object> recordEntries = predefinedSchema.generateValues(faker, recordSchema, nullPercentage);
record = new MapRecord(recordSchema, recordEntries);
} else {
// Use original logic for Schema Text or dynamic properties
List<RecordField> writeFieldNames = writeSchema.getFields();
Map<String, Object> recordEntries = new HashMap<>();
for (RecordField writeRecordField : writeFieldNames) {
final String writeFieldName = writeRecordField.getFieldName();
final Object writeFieldValue;
if (schemaSource == SchemaSource.SCHEMA_TEXT) {
writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage);
} else {
final String propertyValue = context.getProperty(writeFieldName).getValue();
writeFieldValue = FakerUtils.getFakeData(propertyValue, faker);
final boolean nullValue =
nullPercentage > 0 && faker.number().numberBetween(0, 100) <= nullPercentage;

if (nullValue) {
writeFieldValue = null;
} else {
final String propertyValue = context.getProperty(writeFieldName).getValue();
writeFieldValue = FakerUtils.getFakeData(propertyValue, faker);
}
}
recordEntries.put(writeFieldName, writeFieldValue);
}

recordEntries.put(writeFieldName, writeFieldValue);
record = new MapRecord(recordSchema, recordEntries);
}
record = new MapRecord(recordSchema, recordEntries);
writer.write(record);
}

Expand Down Expand Up @@ -403,4 +484,13 @@ protected RecordSchema generateRecordSchema(final Map<String, String> fields, fi
}
return new SimpleRecordSchema(recordFields);
}

/**
* Enum to track which source is being used for the record schema.
*/
private enum SchemaSource {
SCHEMA_TEXT,
PREDEFINED,
DYNAMIC_PROPERTIES
}
}
Loading
Loading