From 7402081d603107f0a1c901242c94846b269d47e7 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 9 Jan 2026 13:57:52 +0100 Subject: [PATCH 1/3] NIFI-15448 - Add option for using predefined schemas in GenerateRecord --- .../processors/standard/GenerateRecord.java | 150 +++- .../faker/PredefinedRecordSchema.java | 742 ++++++++++++++++++ .../standard/TestGenerateRecord.java | 190 ++++- 3 files changed, 1049 insertions(+), 33 deletions(-) create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java index d355ee58aea9..818d7679c8a0 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java @@ -31,6 +31,8 @@ import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -42,6 +44,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.faker.FakerUtils; +import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; @@ -65,6 +68,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -84,9 +88,12 @@ @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"), }) -@CapabilityDescription("This processor creates FlowFiles with records having random value for the specified fields. GenerateRecord is useful " + - "for testing, configuration, and simulation. It uses either user-defined properties to define a record schema or a provided schema and generates the specified number of records using " + - "random data for the fields in the schema.") +@CapabilityDescription(""" + This processor creates FlowFiles with records having random value for the specified fields. GenerateRecord is useful + for testing, configuration, and simulation. It uses one of three methods to define a record schema: (1) a provided Avro Schema Text, + (2) a Predefined Schema template such as Person, Order, Event, Sensor, Product, Stock Trade, or Complete Example covering all data types, + or (3) user-defined dynamic properties. The processor generates the specified number of records using random data for the fields in the schema. + """) @DynamicProperties({ @DynamicProperty( name = "Field name in generated record", @@ -122,16 +129,21 @@ public class GenerateRecord extends AbstractProcessor { static final PropertyDescriptor NULLABLE_FIELDS = new PropertyDescriptor.Builder() .name("Nullable Fields") - .description("Whether the generated fields will be nullable. Note that this property is ignored if Schema Text is set. Also it only affects the schema of the generated data, " + - "not whether any values will be null. If this property is true, see 'Null Value Percentage' to set the probability that any generated field will be null.") + .description(""" + Whether the generated fields will be nullable. Note that this property is ignored if Schema Text is set. + Also it only affects the schema of the generated data, not whether any values will be null. + If this property is true, see 'Null Value Percentage' to set the probability that any generated field will be null. + """) .allowableValues("true", "false") .defaultValue("true") .required(true) .build(); static final PropertyDescriptor NULL_PERCENTAGE = new PropertyDescriptor.Builder() .name("Null Value Percentage") - .description("The percent probability (0-100%) that a generated value for any nullable field will be null. Set this property to zero to have no null values, or 100 to have all " + - "null values.") + .description(""" + The percent probability (0-100%) that a generated value for any nullable field will be null. + Set this property to zero to have no null values, or 100 to have all null values. + """) .addValidator(StandardValidators.createLongValidator(0L, 100L, true)) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .required(true) @@ -141,18 +153,34 @@ public class GenerateRecord extends AbstractProcessor { static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder() .name("Schema Text") - .description("The text of an Avro-formatted Schema used to generate record data. If this property is set, any user-defined properties are ignored.") + .description(""" + The text of an Avro-formatted Schema used to generate record data. + Only one of Schema Text, Predefined Schema, or user-defined dynamic properties should be configured. + """) .addValidator(new AvroSchemaValidator()) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .required(false) .build(); + static final PropertyDescriptor PREDEFINED_SCHEMA = new PropertyDescriptor.Builder() + .name("Predefined Schema") + .description(""" + Select a predefined schema template for quick record generation. Predefined schemas provide ready-to-use + templates with multiple fields covering various data types including nested records, arrays, maps, dates, timestamps, etc. + Only one of Schema Text, Predefined Schema, or user-defined dynamic properties should be configured. + Note: This feature is intended for quick testing purposes only. Predefined schemas may change between NiFi versions. + """) + .allowableValues(PredefinedRecordSchema.class) + .required(false) + .build(); + private static final List PROPERTY_DESCRIPTORS = List.of( RECORD_WRITER, NUM_RECORDS, NULLABLE_FIELDS, NULL_PERCENTAGE, - SCHEMA_TEXT + SCHEMA_TEXT, + PREDEFINED_SCHEMA ); static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -188,6 +216,46 @@ public Set getRelationships() { return RELATIONSHIPS; } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(); + + final boolean hasSchemaText = validationContext.getProperty(SCHEMA_TEXT).isSet(); + final boolean hasPredefinedSchema = validationContext.getProperty(PREDEFINED_SCHEMA).isSet(); + final boolean hasDynamicProperties = validationContext.getProperties().keySet().stream() + .anyMatch(PropertyDescriptor::isDynamic); + + int configuredCount = 0; + if (hasSchemaText) { + configuredCount++; + } + if (hasPredefinedSchema) { + configuredCount++; + } + if (hasDynamicProperties) { + configuredCount++; + } + + if (configuredCount == 0) { + results.add(new ValidationResult.Builder() + .subject("Schema Configuration") + .valid(false) + .explanation("At least one schema configuration must be provided: Schema Text, Predefined Schema, or user-defined dynamic properties") + .build()); + } else if (configuredCount > 1) { + results.add(new ValidationResult.Builder() + .subject("Schema Configuration") + .valid(false) + .explanation("Only one schema configuration should be provided. Found multiple configurations: " + + (hasSchemaText ? "Schema Text, " : "") + + (hasPredefinedSchema ? "Predefined Schema, " : "") + + (hasDynamicProperties ? "Dynamic Properties" : "")) + .build()); + } + + return results; + } + @OnScheduled public void onScheduled(final ProcessContext context) { // Force the en-US Locale for more predictable results @@ -198,6 +266,8 @@ public void onScheduled(final ProcessContext context) { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions().getValue(); + final String predefinedSchemaName = context.getProperty(PREDEFINED_SCHEMA).getValue(); + final PredefinedRecordSchema predefinedSchema = PredefinedRecordSchema.fromName(predefinedSchemaName); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final int numRecords = context.getProperty(NUM_RECORDS).evaluateAttributeExpressions().asInteger(); final boolean nullable = context.getProperty(NULLABLE_FIELDS).asBoolean(); @@ -210,16 +280,21 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro try { flowFile = session.write(flowFile, out -> { final RecordSchema recordSchema; - final boolean usingSchema; + final SchemaSource schemaSource; if (StringUtils.isNotEmpty(schemaText)) { + // Schema Text takes highest precedence final Schema avroSchema = new Schema.Parser().parse(schemaText); recordSchema = AvroTypeUtil.createSchema(avroSchema); - usingSchema = true; + schemaSource = SchemaSource.SCHEMA_TEXT; + } else if (predefinedSchema != null) { + // Predefined schema takes second precedence + recordSchema = predefinedSchema.getSchema(nullable); + schemaSource = SchemaSource.PREDEFINED; } else { // Generate RecordSchema from user-defined properties final Map fields = getFields(context); recordSchema = generateRecordSchema(fields, nullable); - usingSchema = false; + schemaSource = SchemaSource.DYNAMIC_PROPERTIES; } try { final RecordSchema writeSchema = writerFactory.getSchema(attributes, recordSchema); @@ -227,29 +302,35 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro writer.beginRecordSet(); Record record; - List writeFieldNames = writeSchema.getFields(); - Map recordEntries = new HashMap<>(); for (int i = 0; i < numRecords; i++) { - for (RecordField writeRecordField : writeFieldNames) { - final String writeFieldName = writeRecordField.getFieldName(); - final Object writeFieldValue; - if (usingSchema) { - writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage); - } else { - final boolean nullValue = - nullPercentage > 0 && faker.number().numberBetween(0, 100) <= nullPercentage; - - if (nullValue) { - writeFieldValue = null; + if (schemaSource == SchemaSource.PREDEFINED) { + // Use the predefined schema's optimized value generation + final Map recordEntries = predefinedSchema.generateValues(faker, recordSchema, nullPercentage); + record = new MapRecord(recordSchema, recordEntries); + } else { + // Use original logic for Schema Text or dynamic properties + List writeFieldNames = writeSchema.getFields(); + Map recordEntries = new HashMap<>(); + for (RecordField writeRecordField : writeFieldNames) { + final String writeFieldName = writeRecordField.getFieldName(); + final Object writeFieldValue; + if (schemaSource == SchemaSource.SCHEMA_TEXT) { + writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage); } else { - final String propertyValue = context.getProperty(writeFieldName).getValue(); - writeFieldValue = FakerUtils.getFakeData(propertyValue, faker); + final boolean nullValue = + nullPercentage > 0 && faker.number().numberBetween(0, 100) <= nullPercentage; + + if (nullValue) { + writeFieldValue = null; + } else { + final String propertyValue = context.getProperty(writeFieldName).getValue(); + writeFieldValue = FakerUtils.getFakeData(propertyValue, faker); + } } + recordEntries.put(writeFieldName, writeFieldValue); } - - recordEntries.put(writeFieldName, writeFieldValue); + record = new MapRecord(recordSchema, recordEntries); } - record = new MapRecord(recordSchema, recordEntries); writer.write(record); } @@ -403,4 +484,13 @@ protected RecordSchema generateRecordSchema(final Map fields, fi } return new SimpleRecordSchema(recordFields); } + + /** + * Enum to track which source is being used for the record schema. + */ + private enum SchemaSource { + SCHEMA_TEXT, + PREDEFINED, + DYNAMIC_PROPERTIES + } } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java new file mode 100644 index 000000000000..fd4bdef3c63f --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.faker; + +import net.datafaker.Faker; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * Predefined record schemas for the GenerateRecord processor. + * These schemas provide ready-to-use templates for generating fake data + * without requiring manual configuration of dynamic properties. + */ +public enum PredefinedRecordSchema implements DescribedValue { + + PERSON("Person", "A person with name, contact information, and address") { + @Override + public RecordSchema getSchema(boolean nullable) { + List addressFields = Arrays.asList( + new RecordField("street", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("city", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("state", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("zipCode", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("country", RecordFieldType.STRING.getDataType(), nullable) + ); + RecordSchema addressSchema = new SimpleRecordSchema(addressFields); + + List fields = Arrays.asList( + new RecordField("id", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("firstName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("lastName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("email", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("phoneNumber", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("dateOfBirth", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("age", RecordFieldType.INT.getDataType(), nullable), + new RecordField("active", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("address", RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable) + ); + return new SimpleRecordSchema(fields); + } + + @Override + public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { + Map values = new LinkedHashMap<>(); + values.put("id", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("firstName", generateNullableValue(nullPercentage, faker, f -> f.name().firstName())); + values.put("lastName", generateNullableValue(nullPercentage, faker, f -> f.name().lastName())); + values.put("email", generateNullableValue(nullPercentage, faker, f -> f.internet().emailAddress())); + values.put("phoneNumber", generateNullableValue(nullPercentage, faker, f -> f.phoneNumber().phoneNumber())); + values.put("dateOfBirth", generateNullableValue(nullPercentage, faker, f -> { + LocalDate birthday = f.timeAndDate().birthday(18, 80); + return Date.valueOf(birthday); + })); + values.put("age", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(18, 80))); + values.put("active", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + + // Generate nested address record + Map addressValues = new LinkedHashMap<>(); + addressValues.put("street", generateNullableValue(nullPercentage, faker, f -> f.address().streetAddress())); + addressValues.put("city", generateNullableValue(nullPercentage, faker, f -> f.address().city())); + addressValues.put("state", generateNullableValue(nullPercentage, faker, f -> f.address().state())); + addressValues.put("zipCode", generateNullableValue(nullPercentage, faker, f -> f.address().zipCode())); + addressValues.put("country", generateNullableValue(nullPercentage, faker, f -> f.address().country())); + + RecordSchema addressSchema = schema.getField("address").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) schema.getField("address").get().getDataType()).getChildSchema() + : null; + + if (addressSchema != null) { + values.put("address", generateNullableValue(nullPercentage, faker, f -> new MapRecord(addressSchema, addressValues))); + } + + return values; + } + }, + + ORDER("Order", "An e-commerce order with line items, amounts, and timestamps") { + @Override + public RecordSchema getSchema(boolean nullable) { + List lineItemFields = Arrays.asList( + new RecordField("productId", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("productName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("quantity", RecordFieldType.INT.getDataType(), nullable), + new RecordField("unitPrice", RecordFieldType.DOUBLE.getDataType(), nullable) + ); + RecordSchema lineItemSchema = new SimpleRecordSchema(lineItemFields); + + List fields = Arrays.asList( + new RecordField("orderId", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("customerId", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("customerName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("customerEmail", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("orderDate", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("orderTime", RecordFieldType.TIME.getDataType(), nullable), + new RecordField("orderTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("totalAmount", RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable), + new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("status", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("shipped", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("itemCount", RecordFieldType.INT.getDataType(), nullable), + new RecordField("lineItems", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(lineItemSchema)), nullable) + ); + return new SimpleRecordSchema(fields); + } + + @Override + public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { + Map values = new LinkedHashMap<>(); + values.put("orderId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("customerId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("customerName", generateNullableValue(nullPercentage, faker, f -> f.name().fullName())); + values.put("customerEmail", generateNullableValue(nullPercentage, faker, f -> f.internet().emailAddress())); + + Instant orderInstant = faker.timeAndDate().past(365, TimeUnit.DAYS); + values.put("orderDate", generateNullableValue(nullPercentage, faker, f -> new Date(orderInstant.toEpochMilli()))); + values.put("orderTime", generateNullableValue(nullPercentage, faker, f -> new Time(orderInstant.toEpochMilli()))); + values.put("orderTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(orderInstant.toEpochMilli()))); + + String[] statuses = {"PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED"}; + String status = statuses[faker.number().numberBetween(0, statuses.length)]; + values.put("status", generateNullableValue(nullPercentage, faker, f -> status)); + values.put("shipped", generateNullableValue(nullPercentage, faker, f -> "SHIPPED".equals(status) || "DELIVERED".equals(status))); + + String[] currencies = {"USD", "EUR", "GBP", "JPY", "CAD"}; + values.put("currency", generateNullableValue(nullPercentage, faker, f -> currencies[f.number().numberBetween(0, currencies.length)])); + + // Generate line items + int itemCount = faker.number().numberBetween(1, 5); + values.put("itemCount", generateNullableValue(nullPercentage, faker, f -> itemCount)); + + RecordSchema lineItemSchema = null; + if (schema.getField("lineItems").get().getDataType().getFieldType() == RecordFieldType.ARRAY) { + DataType elementType = ((ArrayDataType) schema.getField("lineItems").get().getDataType()).getElementType(); + if (elementType.getFieldType() == RecordFieldType.RECORD) { + lineItemSchema = ((RecordDataType) elementType).getChildSchema(); + } + } + + double totalAmount = 0.0; + Object[] lineItems = new Object[itemCount]; + for (int i = 0; i < itemCount; i++) { + Map lineItemValues = new LinkedHashMap<>(); + lineItemValues.put("productId", "PRD-" + faker.number().digits(8)); + lineItemValues.put("productName", faker.commerce().productName()); + int quantity = faker.number().numberBetween(1, 10); + double unitPrice = faker.number().randomDouble(2, 10, 500); + lineItemValues.put("quantity", quantity); + lineItemValues.put("unitPrice", unitPrice); + totalAmount += quantity * unitPrice; + + if (lineItemSchema != null) { + lineItems[i] = new MapRecord(lineItemSchema, lineItemValues); + } + } + values.put("lineItems", generateNullableValue(nullPercentage, faker, f -> lineItems)); + final double finalTotal = totalAmount; + values.put("totalAmount", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(finalTotal).setScale(2, RoundingMode.HALF_UP))); + + return values; + } + }, + + EVENT("Event", "A timestamped event with metadata and tags") { + @Override + public RecordSchema getSchema(boolean nullable) { + List fields = Arrays.asList( + new RecordField("eventId", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("eventType", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("eventDate", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("eventTime", RecordFieldType.TIME.getDataType(), nullable), + new RecordField("eventTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("source", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("severity", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("message", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("processed", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("retryCount", RecordFieldType.INT.getDataType(), nullable), + new RecordField("durationMs", RecordFieldType.LONG.getDataType(), nullable), + new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("metadata", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), nullable) + ); + return new SimpleRecordSchema(fields); + } + + @Override + public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { + Map values = new LinkedHashMap<>(); + values.put("eventId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + + String[] eventTypes = {"USER_LOGIN", "USER_LOGOUT", "PURCHASE", "PAGE_VIEW", "API_CALL", "ERROR", "WARNING", "AUDIT"}; + values.put("eventType", generateNullableValue(nullPercentage, faker, f -> eventTypes[f.number().numberBetween(0, eventTypes.length)])); + + Instant eventInstant = faker.timeAndDate().past(30, TimeUnit.DAYS); + values.put("eventDate", generateNullableValue(nullPercentage, faker, f -> new Date(eventInstant.toEpochMilli()))); + values.put("eventTime", generateNullableValue(nullPercentage, faker, f -> new Time(eventInstant.toEpochMilli()))); + values.put("eventTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(eventInstant.toEpochMilli()))); + + String[] sources = {"web-app", "mobile-app", "api-gateway", "batch-processor", "stream-processor"}; + values.put("source", generateNullableValue(nullPercentage, faker, f -> sources[f.number().numberBetween(0, sources.length)])); + + String[] severities = {"DEBUG", "INFO", "WARN", "ERROR", "CRITICAL"}; + values.put("severity", generateNullableValue(nullPercentage, faker, f -> severities[f.number().numberBetween(0, severities.length)])); + values.put("message", generateNullableValue(nullPercentage, faker, f -> f.lorem().sentence())); + values.put("processed", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + values.put("retryCount", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 5))); + values.put("durationMs", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(1L, 10000L))); + + // Generate tags array + String[] possibleTags = {"important", "urgent", "reviewed", "automated", "manual", "verified", "pending"}; + int tagCount = faker.number().numberBetween(1, 4); + String[] tags = new String[tagCount]; + for (int i = 0; i < tagCount; i++) { + tags[i] = possibleTags[faker.number().numberBetween(0, possibleTags.length)]; + } + values.put("tags", generateNullableValue(nullPercentage, faker, f -> tags)); + + // Generate metadata map + Map metadata = new HashMap<>(); + metadata.put("version", "1." + faker.number().numberBetween(0, 10)); + metadata.put("environment", faker.options().option("dev", "staging", "prod")); + metadata.put("region", faker.options().option("us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1")); + metadata.put("correlationId", UUID.randomUUID().toString()); + values.put("metadata", generateNullableValue(nullPercentage, faker, f -> metadata)); + + return values; + } + }, + + SENSOR("Sensor", "An IoT sensor reading with location and measurements") { + @Override + public RecordSchema getSchema(boolean nullable) { + List locationFields = Arrays.asList( + new RecordField("latitude", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("longitude", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("altitude", RecordFieldType.DOUBLE.getDataType(), nullable) + ); + RecordSchema locationSchema = new SimpleRecordSchema(locationFields); + + List fields = Arrays.asList( + new RecordField("sensorId", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("deviceType", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("manufacturer", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("readingTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("temperature", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("humidity", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("pressure", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("batteryLevel", RecordFieldType.INT.getDataType(), nullable), + new RecordField("signalStrength", RecordFieldType.INT.getDataType(), nullable), + new RecordField("online", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("location", RecordFieldType.RECORD.getRecordDataType(locationSchema), nullable) + ); + return new SimpleRecordSchema(fields); + } + + @Override + public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { + Map values = new LinkedHashMap<>(); + values.put("sensorId", generateNullableValue(nullPercentage, faker, f -> "SNS-" + f.number().digits(10))); + + String[] deviceTypes = {"TEMPERATURE", "HUMIDITY", "PRESSURE", "MOTION", "LIGHT", "AIR_QUALITY", "MULTI"}; + values.put("deviceType", generateNullableValue(nullPercentage, faker, f -> deviceTypes[f.number().numberBetween(0, deviceTypes.length)])); + + String[] manufacturers = {"SensorCorp", "IoTech", "SmartDevices", "DataSense", "EnviroMonitor"}; + values.put("manufacturer", generateNullableValue(nullPercentage, faker, f -> manufacturers[f.number().numberBetween(0, manufacturers.length)])); + + values.put("readingTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(System.currentTimeMillis() - f.number().numberBetween(0, 3600000)))); + values.put("temperature", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, -20, 45))); + values.put("humidity", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, 0, 100))); + values.put("pressure", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, 980, 1050))); + values.put("batteryLevel", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 100))); + values.put("signalStrength", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(-100, -30))); + values.put("online", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + + // Generate nested location record + Map locationValues = new LinkedHashMap<>(); + locationValues.put("latitude", faker.number().randomDouble(6, -90, 90)); + locationValues.put("longitude", faker.number().randomDouble(6, -180, 180)); + locationValues.put("altitude", faker.number().randomDouble(2, 0, 3000)); + + RecordSchema locationSchema = schema.getField("location").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) schema.getField("location").get().getDataType()).getChildSchema() + : null; + + if (locationSchema != null) { + values.put("location", generateNullableValue(nullPercentage, faker, f -> new MapRecord(locationSchema, locationValues))); + } + + return values; + } + }, + + PRODUCT("Product", "A product catalog entry with pricing and inventory") { + @Override + public RecordSchema getSchema(boolean nullable) { + List dimensionFields = Arrays.asList( + new RecordField("length", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("width", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("height", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("weight", RecordFieldType.DOUBLE.getDataType(), nullable) + ); + RecordSchema dimensionSchema = new SimpleRecordSchema(dimensionFields); + + List fields = Arrays.asList( + new RecordField("productId", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("sku", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("name", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("description", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("category", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("brand", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("price", RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable), + new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("inStock", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("quantity", RecordFieldType.INT.getDataType(), nullable), + new RecordField("rating", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("reviewCount", RecordFieldType.INT.getDataType(), nullable), + new RecordField("createdDate", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("lastUpdated", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("dimensions", RecordFieldType.RECORD.getRecordDataType(dimensionSchema), nullable) + ); + return new SimpleRecordSchema(fields); + } + + @Override + public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { + Map values = new LinkedHashMap<>(); + values.put("productId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("sku", generateNullableValue(nullPercentage, faker, f -> "SKU-" + f.number().digits(8))); + values.put("name", generateNullableValue(nullPercentage, faker, f -> f.commerce().productName())); + values.put("description", generateNullableValue(nullPercentage, faker, f -> f.lorem().paragraph())); + values.put("category", generateNullableValue(nullPercentage, faker, f -> f.commerce().department())); + values.put("brand", generateNullableValue(nullPercentage, faker, f -> f.company().name())); + values.put("price", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(f.number().randomDouble(2, 5, 2000)).setScale(2, RoundingMode.HALF_UP))); + + String[] currencies = {"USD", "EUR", "GBP"}; + values.put("currency", generateNullableValue(nullPercentage, faker, f -> currencies[f.number().numberBetween(0, currencies.length)])); + + int quantity = faker.number().numberBetween(0, 500); + values.put("inStock", generateNullableValue(nullPercentage, faker, f -> quantity > 0)); + values.put("quantity", generateNullableValue(nullPercentage, faker, f -> quantity)); + values.put("rating", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(1, 1, 5))); + values.put("reviewCount", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 5000))); + values.put("createdDate", generateNullableValue(nullPercentage, faker, f -> new Date(f.timeAndDate().past(365, TimeUnit.DAYS).toEpochMilli()))); + values.put("lastUpdated", generateNullableValue(nullPercentage, faker, f -> new Timestamp(f.timeAndDate().past(30, TimeUnit.DAYS).toEpochMilli()))); + + // Generate tags array + String[] possibleTags = {"new", "sale", "bestseller", "limited", "exclusive", "eco-friendly", "premium"}; + int tagCount = faker.number().numberBetween(0, 4); + String[] tags = new String[tagCount]; + for (int i = 0; i < tagCount; i++) { + tags[i] = possibleTags[faker.number().numberBetween(0, possibleTags.length)]; + } + values.put("tags", generateNullableValue(nullPercentage, faker, f -> tags)); + + // Generate nested dimensions record + Map dimensionValues = new LinkedHashMap<>(); + dimensionValues.put("length", faker.number().randomDouble(2, 1, 100)); + dimensionValues.put("width", faker.number().randomDouble(2, 1, 100)); + dimensionValues.put("height", faker.number().randomDouble(2, 1, 100)); + dimensionValues.put("weight", faker.number().randomDouble(2, 1, 50)); + + RecordSchema dimensionSchema = schema.getField("dimensions").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) schema.getField("dimensions").get().getDataType()).getChildSchema() + : null; + + if (dimensionSchema != null) { + values.put("dimensions", generateNullableValue(nullPercentage, faker, f -> new MapRecord(dimensionSchema, dimensionValues))); + } + + return values; + } + }, + + STOCK_TRADE("Stock Trade", "A stock market trade with pricing and volume") { + @Override + public RecordSchema getSchema(boolean nullable) { + List fields = Arrays.asList( + new RecordField("tradeId", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("symbol", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("companyName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("exchange", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("tradeType", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("tradeTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("price", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), + new RecordField("quantity", RecordFieldType.LONG.getDataType(), nullable), + new RecordField("totalValue", RecordFieldType.DECIMAL.getDecimalDataType(16, 2), nullable), + new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("bidPrice", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), + new RecordField("askPrice", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), + new RecordField("high52Week", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), + new RecordField("low52Week", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), + new RecordField("marketCap", RecordFieldType.LONG.getDataType(), nullable), + new RecordField("settled", RecordFieldType.BOOLEAN.getDataType(), nullable) + ); + return new SimpleRecordSchema(fields); + } + + @Override + public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { + Map values = new LinkedHashMap<>(); + values.put("tradeId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + + String[] symbols = {"AAPL", "GOOGL", "MSFT", "AMZN", "META", "TSLA", "NVDA", "JPM", "V", "JNJ"}; + String[] companies = {"Apple Inc.", "Alphabet Inc.", "Microsoft Corp.", "Amazon.com Inc.", "Meta Platforms Inc.", + "Tesla Inc.", "NVIDIA Corp.", "JPMorgan Chase & Co.", "Visa Inc.", "Johnson & Johnson"}; + int symbolIdx = faker.number().numberBetween(0, symbols.length); + values.put("symbol", generateNullableValue(nullPercentage, faker, f -> symbols[symbolIdx])); + values.put("companyName", generateNullableValue(nullPercentage, faker, f -> companies[symbolIdx])); + + String[] exchanges = {"NYSE", "NASDAQ", "LSE", "TSE"}; + values.put("exchange", generateNullableValue(nullPercentage, faker, f -> exchanges[f.number().numberBetween(0, exchanges.length)])); + + String[] tradeTypes = {"BUY", "SELL"}; + values.put("tradeType", generateNullableValue(nullPercentage, faker, f -> tradeTypes[f.number().numberBetween(0, tradeTypes.length)])); + values.put("tradeTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(System.currentTimeMillis() - f.number().numberBetween(0, 86400000)))); + + double price = faker.number().randomDouble(4, 10, 3000); + long quantity = faker.number().numberBetween(1, 10000); + values.put("price", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price).setScale(4, RoundingMode.HALF_UP))); + values.put("quantity", generateNullableValue(nullPercentage, faker, f -> quantity)); + values.put("totalValue", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * quantity).setScale(2, RoundingMode.HALF_UP))); + values.put("currency", generateNullableValue(nullPercentage, faker, f -> "USD")); + values.put("bidPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 0.999).setScale(4, RoundingMode.HALF_UP))); + values.put("askPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 1.001).setScale(4, RoundingMode.HALF_UP))); + values.put("high52Week", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 1.5).setScale(4, RoundingMode.HALF_UP))); + values.put("low52Week", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 0.6).setScale(4, RoundingMode.HALF_UP))); + values.put("marketCap", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(1_000_000_000L, 3_000_000_000_000L))); + values.put("settled", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + + return values; + } + }, + + COMPLETE_EXAMPLE("Complete Example", "A comprehensive schema demonstrating all supported data types including nested records, arrays, and maps") { + @Override + public RecordSchema getSchema(boolean nullable) { + // Deepest nested record: Coordinates + List coordinateFields = Arrays.asList( + new RecordField("latitude", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("longitude", RecordFieldType.DOUBLE.getDataType(), nullable) + ); + RecordSchema coordinateSchema = new SimpleRecordSchema(coordinateFields); + + // Address record with nested coordinates + List addressFields = Arrays.asList( + new RecordField("street", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("city", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("state", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("zipCode", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("country", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("coordinates", RecordFieldType.RECORD.getRecordDataType(coordinateSchema), nullable) + ); + RecordSchema addressSchema = new SimpleRecordSchema(addressFields); + + // Profile record with nested address + List profileFields = Arrays.asList( + new RecordField("firstName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("lastName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("email", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("age", RecordFieldType.INT.getDataType(), nullable), + new RecordField("verified", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("address", RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable) + ); + RecordSchema profileSchema = new SimpleRecordSchema(profileFields); + + // Order record for array of records + List orderFields = Arrays.asList( + new RecordField("orderId", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("amount", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("placed", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("shipped", RecordFieldType.BOOLEAN.getDataType(), nullable) + ); + RecordSchema orderSchema = new SimpleRecordSchema(orderFields); + + // Main schema with all types + List fields = Arrays.asList( + // Basic types + new RecordField("id", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("active", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("score", RecordFieldType.INT.getDataType(), nullable), + new RecordField("count", RecordFieldType.LONG.getDataType(), nullable), + new RecordField("rating", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("price", RecordFieldType.FLOAT.getDataType(), nullable), + new RecordField("balance", RecordFieldType.DECIMAL.getDecimalDataType(12, 2), nullable), + new RecordField("initial", RecordFieldType.CHAR.getDataType(), nullable), + new RecordField("flags", RecordFieldType.BYTE.getDataType(), nullable), + new RecordField("rank", RecordFieldType.SHORT.getDataType(), nullable), + + // Date/Time types + new RecordField("createdDate", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("lastLoginTime", RecordFieldType.TIME.getDataType(), nullable), + new RecordField("lastModified", RecordFieldType.TIMESTAMP.getDataType(), nullable), + + // Complex types + new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("scores", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()), nullable), + new RecordField("metadata", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("profile", RecordFieldType.RECORD.getRecordDataType(profileSchema), nullable), + new RecordField("orders", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderSchema)), nullable) + ); + return new SimpleRecordSchema(fields); + } + + @Override + public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { + Map values = new LinkedHashMap<>(); + + // Basic types + values.put("id", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("active", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + values.put("score", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 100))); + values.put("count", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0L, 1_000_000L))); + values.put("rating", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, 0, 5))); + values.put("price", generateNullableValue(nullPercentage, faker, f -> (float) f.number().randomDouble(2, 1, 1000))); + values.put("balance", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(f.number().randomDouble(2, -10000, 50000)).setScale(2, RoundingMode.HALF_UP))); + values.put("initial", generateNullableValue(nullPercentage, faker, f -> (char) ('A' + f.number().numberBetween(0, 26)))); + values.put("flags", generateNullableValue(nullPercentage, faker, f -> (byte) f.number().numberBetween(0, 127))); + values.put("rank", generateNullableValue(nullPercentage, faker, f -> (short) f.number().numberBetween(1, 1000))); + + // Date/Time types + Instant pastInstant = faker.timeAndDate().past(365, TimeUnit.DAYS); + values.put("createdDate", generateNullableValue(nullPercentage, faker, f -> new Date(pastInstant.toEpochMilli()))); + values.put("lastLoginTime", generateNullableValue(nullPercentage, faker, f -> new Time(f.timeAndDate().past(1, TimeUnit.DAYS).toEpochMilli()))); + values.put("lastModified", generateNullableValue(nullPercentage, faker, f -> new Timestamp(f.timeAndDate().past(7, TimeUnit.DAYS).toEpochMilli()))); + + // Array of strings + String[] possibleTags = {"important", "urgent", "reviewed", "automated", "verified", "pending", "approved"}; + int tagCount = faker.number().numberBetween(1, 5); + String[] tags = new String[tagCount]; + for (int i = 0; i < tagCount; i++) { + tags[i] = possibleTags[faker.number().numberBetween(0, possibleTags.length)]; + } + values.put("tags", generateNullableValue(nullPercentage, faker, f -> tags)); + + // Array of integers + int scoreCount = faker.number().numberBetween(3, 8); + Integer[] scores = new Integer[scoreCount]; + for (int i = 0; i < scoreCount; i++) { + scores[i] = faker.number().numberBetween(50, 100); + } + values.put("scores", generateNullableValue(nullPercentage, faker, f -> scores)); + + // Map + Map metadata = new HashMap<>(); + metadata.put("source", faker.options().option("web", "mobile", "api", "batch")); + metadata.put("version", "1." + faker.number().numberBetween(0, 10)); + metadata.put("environment", faker.options().option("dev", "staging", "prod")); + metadata.put("region", faker.options().option("us-east-1", "us-west-2", "eu-west-1")); + values.put("metadata", generateNullableValue(nullPercentage, faker, f -> metadata)); + + // Nested profile record (3 levels deep: profile -> address -> coordinates) + RecordSchema profileSchema = schema.getField("profile").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) schema.getField("profile").get().getDataType()).getChildSchema() + : null; + + if (profileSchema != null) { + Map profileValues = new LinkedHashMap<>(); + profileValues.put("firstName", faker.name().firstName()); + profileValues.put("lastName", faker.name().lastName()); + profileValues.put("email", faker.internet().emailAddress()); + profileValues.put("age", faker.number().numberBetween(18, 80)); + profileValues.put("verified", faker.bool().bool()); + + RecordSchema addressSchema = profileSchema.getField("address").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) profileSchema.getField("address").get().getDataType()).getChildSchema() + : null; + + if (addressSchema != null) { + Map addressValues = new LinkedHashMap<>(); + addressValues.put("street", faker.address().streetAddress()); + addressValues.put("city", faker.address().city()); + addressValues.put("state", faker.address().state()); + addressValues.put("zipCode", faker.address().zipCode()); + addressValues.put("country", faker.address().country()); + + RecordSchema coordinateSchema = addressSchema.getField("coordinates").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) addressSchema.getField("coordinates").get().getDataType()).getChildSchema() + : null; + + if (coordinateSchema != null) { + Map coordValues = new LinkedHashMap<>(); + coordValues.put("latitude", faker.number().randomDouble(6, -90, 90)); + coordValues.put("longitude", faker.number().randomDouble(6, -180, 180)); + addressValues.put("coordinates", new MapRecord(coordinateSchema, coordValues)); + } + + profileValues.put("address", new MapRecord(addressSchema, addressValues)); + } + + values.put("profile", generateNullableValue(nullPercentage, faker, f -> new MapRecord(profileSchema, profileValues))); + } + + // Array of order records + RecordSchema orderSchema = null; + if (schema.getField("orders").get().getDataType().getFieldType() == RecordFieldType.ARRAY) { + DataType elementType = ((ArrayDataType) schema.getField("orders").get().getDataType()).getElementType(); + if (elementType.getFieldType() == RecordFieldType.RECORD) { + orderSchema = ((RecordDataType) elementType).getChildSchema(); + } + } + + if (orderSchema != null) { + final RecordSchema finalOrderSchema = orderSchema; + int orderCount = faker.number().numberBetween(1, 4); + Object[] orders = new Object[orderCount]; + for (int i = 0; i < orderCount; i++) { + Map orderValues = new LinkedHashMap<>(); + orderValues.put("orderId", "ORD-" + faker.number().digits(8)); + orderValues.put("amount", faker.number().randomDouble(2, 10, 500)); + String[] currencies = {"USD", "EUR", "GBP"}; + orderValues.put("currency", currencies[faker.number().numberBetween(0, currencies.length)]); + orderValues.put("placed", new Date(faker.timeAndDate().past(90, TimeUnit.DAYS).toEpochMilli())); + orderValues.put("shipped", faker.bool().bool()); + orders[i] = new MapRecord(finalOrderSchema, orderValues); + } + values.put("orders", generateNullableValue(nullPercentage, faker, f -> orders)); + } + + return values; + } + }; + + private final String displayName; + private final String description; + + PredefinedRecordSchema(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + /** + * Get the record schema for this predefined schema type. + * + * @param nullable whether fields should be nullable + * @return the RecordSchema + */ + public abstract RecordSchema getSchema(boolean nullable); + + /** + * Generate random values for all fields in this schema. + * + * @param faker the Faker instance to use for generating random data + * @param schema the RecordSchema to generate values for + * @param nullPercentage the percentage chance (0-100) that nullable fields will be null + * @return a Map of field names to generated values + */ + public abstract Map generateValues(Faker faker, RecordSchema schema, int nullPercentage); + + /** + * Generate a record with random values. + * + * @param faker the Faker instance to use for generating random data + * @param nullable whether fields should be nullable + * @param nullPercentage the percentage chance (0-100) that nullable fields will be null + * @return a Record with generated values + */ + public Record generateRecord(Faker faker, boolean nullable, int nullPercentage) { + RecordSchema schema = getSchema(nullable); + Map values = generateValues(faker, schema, nullPercentage); + return new MapRecord(schema, values); + } + + /** + * Helper method to generate a value with a chance of being null. + */ + protected static T generateNullableValue(int nullPercentage, Faker faker, Function generator) { + if (nullPercentage > 0 && faker.number().numberBetween(0, 100) < nullPercentage) { + return null; + } + return generator.apply(faker); + } + + /** + * Get a predefined schema by name, or null if not found or empty. + * + * @param name the name of the predefined schema + * @return the PredefinedRecordSchema, or null if not found or name is null/empty + */ + public static PredefinedRecordSchema fromName(String name) { + if (name == null || name.isEmpty()) { + return null; + } + try { + return valueOf(name); + } catch (IllegalArgumentException e) { + return null; + } + } +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java index 399a394cd65e..b61b5eaf9e86 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java @@ -21,8 +21,13 @@ import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processors.standard.faker.FakerMethodHolder; import org.apache.nifi.processors.standard.faker.FakerUtils; +import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; @@ -149,13 +154,15 @@ public void testGenerateNullableFieldsOneHundredNullPercentage() throws Exceptio flowFile.assertContentEquals(String.join("", Collections.nCopies(FakerUtils.getDatatypeFunctionMap().size() - 1, ",")) + "\n"); } - // Tests that the remaining fields are supported by the processor. + // Tests that the special FT_* types in FakerUtils are supported by the processor. + // Note: FT_BOOL is defined but not added to createFakerPropertyList(), so we exclude it. @Test public void testFieldsReturnValue() throws Exception { - List fieldTypeFields = Arrays.stream(GenerateRecord.class.getFields()).filter((field) -> field.getName().startsWith("FT_")).toList(); +List fieldTypeFields = Arrays.stream(GenerateRecord.class.getFields()).filter((field) -> field.getName().startsWith("FT_")).toList(); for (Field field : fieldTypeFields) { - testRunner.setProperty(field.getName().toLowerCase(Locale.ROOT), ((AllowableValue) field.get(processor)).getValue()); + field.setAccessible(true); + testRunner.setProperty(field.getName().toLowerCase(Locale.ROOT), ((AllowableValue) field.get(null)).getValue()); } final Map recordFields = processor.getFields(testRunner.getProcessContext()); @@ -301,4 +308,181 @@ void testMigrateProperties() { final PropertyMigrationResult propertyMigrationResult = testRunner.migrateProperties(); assertEquals(expectedRenamed, propertyMigrationResult.getPropertiesRenamed()); } + + @Test + public void testValidationFailsWithNoSchemaConfiguration() throws InitializationException { + final MockRecordWriter recordWriter = new MockRecordWriter(null, true); + testRunner.addControllerService("record-writer", recordWriter); + testRunner.enableControllerService(recordWriter); + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer"); + testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1"); + + testRunner.assertNotValid(); + } + + @Test + public void testValidationFailsWithMultipleSchemaConfigurations() throws Exception { + String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_nullable.avsc"))); + + final MockRecordWriter recordWriter = new MockRecordWriter(null, true); + testRunner.addControllerService("record-writer", recordWriter); + testRunner.enableControllerService(recordWriter); + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer"); + testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1"); + + // Set both Schema Text and Predefined Schema - should be invalid + testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText); + testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, PredefinedRecordSchema.PERSON.name()); + + testRunner.assertNotValid(); + } + + @Test + public void testValidationFailsWithPredefinedSchemaAndDynamicProperties() throws Exception { + final MockRecordWriter recordWriter = new MockRecordWriter(null, true); + testRunner.addControllerService("record-writer", recordWriter); + testRunner.enableControllerService(recordWriter); + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer"); + testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1"); + + // Set both Predefined Schema and dynamic property - should be invalid + testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, PredefinedRecordSchema.PERSON.name()); + testRunner.setProperty("myField", "Address.fullAddress"); + + testRunner.assertNotValid(); + } + + @Test + public void testPredefinedSchemaPerson() throws Exception { + testPredefinedSchema(PredefinedRecordSchema.PERSON, 5, + "id", "firstName", "lastName", "email", "phoneNumber", "dateOfBirth", "age", "active", "address"); + } + + @Test + public void testPredefinedSchemaOrder() throws Exception { + testPredefinedSchema(PredefinedRecordSchema.ORDER, 5, + "orderId", "customerId", "customerName", "customerEmail", "orderDate", "orderTime", + "orderTimestamp", "totalAmount", "currency", "status", "shipped", "itemCount", "lineItems"); + } + + @Test + public void testPredefinedSchemaEvent() throws Exception { + testPredefinedSchema(PredefinedRecordSchema.EVENT, 5, + "eventId", "eventType", "eventDate", "eventTime", "eventTimestamp", "source", + "severity", "message", "processed", "retryCount", "durationMs", "tags", "metadata"); + } + + @Test + public void testPredefinedSchemaSensor() throws Exception { + testPredefinedSchema(PredefinedRecordSchema.SENSOR, 5, + "sensorId", "deviceType", "manufacturer", "readingTimestamp", "temperature", + "humidity", "pressure", "batteryLevel", "signalStrength", "online", "location"); + } + + @Test + public void testPredefinedSchemaProduct() throws Exception { + testPredefinedSchema(PredefinedRecordSchema.PRODUCT, 5, + "productId", "sku", "name", "description", "category", "brand", "price", + "currency", "inStock", "quantity", "rating", "reviewCount", "createdDate", + "lastUpdated", "tags", "dimensions"); + } + + @Test + public void testPredefinedSchemaStockTrade() throws Exception { + testPredefinedSchema(PredefinedRecordSchema.STOCK_TRADE, 5, + "tradeId", "symbol", "companyName", "exchange", "tradeType", "tradeTimestamp", + "price", "quantity", "totalValue", "currency", "bidPrice", "askPrice", + "high52Week", "low52Week", "marketCap", "settled"); + } + + @Test + public void testPredefinedSchemaCompleteExample() throws Exception { + testPredefinedSchema(PredefinedRecordSchema.COMPLETE_EXAMPLE, 3, + "id", "active", "score", "count", "rating", "price", "balance", "initial", + "flags", "rank", "createdDate", "lastLoginTime", "lastModified", "tags", + "scores", "metadata", "profile", "orders"); + } + + private void testPredefinedSchema(PredefinedRecordSchema predefinedSchema, int numRecords, String... expectedFields) throws Exception { + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + testRunner.addControllerService("json-writer", jsonWriter); + testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + testRunner.enableControllerService(jsonWriter); + + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "json-writer"); + testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, predefinedSchema.name()); + testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true"); + testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0"); + testRunner.setProperty(GenerateRecord.NUM_RECORDS, String.valueOf(numRecords)); + + testRunner.assertValid(); + testRunner.run(); + + testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0); + + // Verify record count attribute + flowFile.assertAttributeEquals("record.count", String.valueOf(numRecords)); + flowFile.assertAttributeEquals("mime.type", "application/json"); + + // Verify content is valid JSON and contains expected fields + final String content = flowFile.getContent(); + assertNotNull(content); + assertTrue(content.startsWith("["), "Content should be a JSON array"); + + // Verify all expected fields are present in the output + for (String field : expectedFields) { + assertTrue(content.contains("\"" + field + "\""), + "Expected field '" + field + "' not found in output for schema " + predefinedSchema.name()); + } + + // Parse and verify records using JsonTreeReader + final JsonTreeReader jsonReader = new JsonTreeReader(); + testRunner.addControllerService("json-reader", jsonReader); + testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema"); + testRunner.enableControllerService(jsonReader); + + final byte[] contentBytes = flowFile.toByteArray(); + try (final ByteArrayInputStream inputStream = new ByteArrayInputStream(contentBytes); + final RecordReader recordReader = jsonReader.createRecordReader(flowFile.getAttributes(), inputStream, contentBytes.length, testRunner.getLogger())) { + + int recordCount = 0; + Record record; + while ((record = recordReader.nextRecord()) != null) { + recordCount++; + // Verify each expected field exists in the record + for (String field : expectedFields) { + assertTrue(record.getSchema().getFieldNames().contains(field), + "Record schema should contain field '" + field + "' for schema " + predefinedSchema.name()); + } + } + assertEquals(numRecords, recordCount, "Should have generated " + numRecords + " records"); + } + } + + @Test + public void testPredefinedSchemaWithNullPercentage() throws Exception { + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + testRunner.addControllerService("json-writer", jsonWriter); + testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + testRunner.enableControllerService(jsonWriter); + + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "json-writer"); + testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, PredefinedRecordSchema.PERSON.name()); + testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true"); + testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100"); + testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1"); + + testRunner.assertValid(); + testRunner.run(); + + testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0); + + // With 100% null percentage, all nullable fields should be null + final String content = flowFile.getContent(); + assertNotNull(content); + // The content should contain null values + assertTrue(content.contains("null"), "With 100% null percentage, output should contain null values"); + } } From dfd949f9919ca35e94cedf100915ab5e57901282 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 13 Jan 2026 12:53:23 +0100 Subject: [PATCH 2/3] review --- .../faker/PredefinedRecordSchema.java | 601 +++++++++--------- .../standard/TestGenerateRecord.java | 39 +- 2 files changed, 331 insertions(+), 309 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java index fd4bdef3c63f..965d9ce4e315 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/PredefinedRecordSchema.java @@ -51,25 +51,27 @@ */ public enum PredefinedRecordSchema implements DescribedValue { - PERSON("Person", "A person with name, contact information, and address") { + PERSON("Person", "A person with name, contact information, and address (schema.org/Person)") { @Override public RecordSchema getSchema(boolean nullable) { + // PostalAddress fields per schema.org/PostalAddress List addressFields = Arrays.asList( - new RecordField("street", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("city", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("state", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("zipCode", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("country", RecordFieldType.STRING.getDataType(), nullable) + new RecordField("streetAddress", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("addressLocality", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("addressRegion", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("postalCode", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("addressCountry", RecordFieldType.STRING.getDataType(), nullable) ); RecordSchema addressSchema = new SimpleRecordSchema(addressFields); + // Person fields per schema.org/Person List fields = Arrays.asList( - new RecordField("id", RecordFieldType.UUID.getDataType(), nullable), - new RecordField("firstName", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("lastName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("identifier", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("givenName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("familyName", RecordFieldType.STRING.getDataType(), nullable), new RecordField("email", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("phoneNumber", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("dateOfBirth", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("telephone", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("birthDate", RecordFieldType.DATE.getDataType(), nullable), new RecordField("age", RecordFieldType.INT.getDataType(), nullable), new RecordField("active", RecordFieldType.BOOLEAN.getDataType(), nullable), new RecordField("address", RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable) @@ -80,25 +82,25 @@ public RecordSchema getSchema(boolean nullable) { @Override public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { Map values = new LinkedHashMap<>(); - values.put("id", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); - values.put("firstName", generateNullableValue(nullPercentage, faker, f -> f.name().firstName())); - values.put("lastName", generateNullableValue(nullPercentage, faker, f -> f.name().lastName())); + values.put("identifier", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("givenName", generateNullableValue(nullPercentage, faker, f -> f.name().firstName())); + values.put("familyName", generateNullableValue(nullPercentage, faker, f -> f.name().lastName())); values.put("email", generateNullableValue(nullPercentage, faker, f -> f.internet().emailAddress())); - values.put("phoneNumber", generateNullableValue(nullPercentage, faker, f -> f.phoneNumber().phoneNumber())); - values.put("dateOfBirth", generateNullableValue(nullPercentage, faker, f -> { + values.put("telephone", generateNullableValue(nullPercentage, faker, f -> f.phoneNumber().phoneNumber())); + values.put("birthDate", generateNullableValue(nullPercentage, faker, f -> { LocalDate birthday = f.timeAndDate().birthday(18, 80); return Date.valueOf(birthday); })); values.put("age", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(18, 80))); values.put("active", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); - // Generate nested address record + // Generate nested address record (PostalAddress) Map addressValues = new LinkedHashMap<>(); - addressValues.put("street", generateNullableValue(nullPercentage, faker, f -> f.address().streetAddress())); - addressValues.put("city", generateNullableValue(nullPercentage, faker, f -> f.address().city())); - addressValues.put("state", generateNullableValue(nullPercentage, faker, f -> f.address().state())); - addressValues.put("zipCode", generateNullableValue(nullPercentage, faker, f -> f.address().zipCode())); - addressValues.put("country", generateNullableValue(nullPercentage, faker, f -> f.address().country())); + addressValues.put("streetAddress", generateNullableValue(nullPercentage, faker, f -> f.address().streetAddress())); + addressValues.put("addressLocality", generateNullableValue(nullPercentage, faker, f -> f.address().city())); + addressValues.put("addressRegion", generateNullableValue(nullPercentage, faker, f -> f.address().state())); + addressValues.put("postalCode", generateNullableValue(nullPercentage, faker, f -> f.address().zipCode())); + addressValues.put("addressCountry", generateNullableValue(nullPercentage, faker, f -> f.address().country())); RecordSchema addressSchema = schema.getField("address").get().getDataType().getFieldType() == RecordFieldType.RECORD ? ((RecordDataType) schema.getField("address").get().getDataType()).getChildSchema() @@ -112,31 +114,33 @@ public Map generateValues(Faker faker, RecordSchema schema, int } }, - ORDER("Order", "An e-commerce order with line items, amounts, and timestamps") { + ORDER("Order", "An e-commerce order with line items, amounts, and timestamps (schema.org/Order)") { @Override public RecordSchema getSchema(boolean nullable) { - List lineItemFields = Arrays.asList( - new RecordField("productId", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("productName", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("quantity", RecordFieldType.INT.getDataType(), nullable), - new RecordField("unitPrice", RecordFieldType.DOUBLE.getDataType(), nullable) + // OrderItem fields per schema.org/OrderItem + List orderedItemFields = Arrays.asList( + new RecordField("identifier", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("name", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("orderQuantity", RecordFieldType.INT.getDataType(), nullable), + new RecordField("price", RecordFieldType.DOUBLE.getDataType(), nullable) ); - RecordSchema lineItemSchema = new SimpleRecordSchema(lineItemFields); + RecordSchema orderedItemSchema = new SimpleRecordSchema(orderedItemFields); + // Order fields per schema.org/Order List fields = Arrays.asList( - new RecordField("orderId", RecordFieldType.UUID.getDataType(), nullable), - new RecordField("customerId", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("orderNumber", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("customer", RecordFieldType.UUID.getDataType(), nullable), new RecordField("customerName", RecordFieldType.STRING.getDataType(), nullable), new RecordField("customerEmail", RecordFieldType.STRING.getDataType(), nullable), new RecordField("orderDate", RecordFieldType.DATE.getDataType(), nullable), new RecordField("orderTime", RecordFieldType.TIME.getDataType(), nullable), - new RecordField("orderTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), - new RecordField("totalAmount", RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable), - new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("status", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("shipped", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("orderDelivery", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("totalPrice", RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable), + new RecordField("priceCurrency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("orderStatus", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("isGift", RecordFieldType.BOOLEAN.getDataType(), nullable), new RecordField("itemCount", RecordFieldType.INT.getDataType(), nullable), - new RecordField("lineItems", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(lineItemSchema)), nullable) + new RecordField("orderedItem", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderedItemSchema)), nullable) ); return new SimpleRecordSchema(fields); } @@ -144,77 +148,80 @@ public RecordSchema getSchema(boolean nullable) { @Override public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { Map values = new LinkedHashMap<>(); - values.put("orderId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); - values.put("customerId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("orderNumber", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("customer", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); values.put("customerName", generateNullableValue(nullPercentage, faker, f -> f.name().fullName())); values.put("customerEmail", generateNullableValue(nullPercentage, faker, f -> f.internet().emailAddress())); Instant orderInstant = faker.timeAndDate().past(365, TimeUnit.DAYS); values.put("orderDate", generateNullableValue(nullPercentage, faker, f -> new Date(orderInstant.toEpochMilli()))); values.put("orderTime", generateNullableValue(nullPercentage, faker, f -> new Time(orderInstant.toEpochMilli()))); - values.put("orderTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(orderInstant.toEpochMilli()))); + values.put("orderDelivery", generateNullableValue(nullPercentage, faker, f -> new Timestamp(orderInstant.toEpochMilli()))); - String[] statuses = {"PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED"}; + // OrderStatus values per schema.org/OrderStatus + String[] statuses = {"OrderCancelled", "OrderDelivered", "OrderInTransit", "OrderPaymentDue", + "OrderPickupAvailable", "OrderProblem", "OrderProcessing", "OrderReturned"}; String status = statuses[faker.number().numberBetween(0, statuses.length)]; - values.put("status", generateNullableValue(nullPercentage, faker, f -> status)); - values.put("shipped", generateNullableValue(nullPercentage, faker, f -> "SHIPPED".equals(status) || "DELIVERED".equals(status))); + values.put("orderStatus", generateNullableValue(nullPercentage, faker, f -> status)); + values.put("isGift", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); - String[] currencies = {"USD", "EUR", "GBP", "JPY", "CAD"}; - values.put("currency", generateNullableValue(nullPercentage, faker, f -> currencies[f.number().numberBetween(0, currencies.length)])); + // Use Faker's money provider for currency codes + values.put("priceCurrency", generateNullableValue(nullPercentage, faker, f -> f.money().currencyCode())); - // Generate line items + // Generate ordered items int itemCount = faker.number().numberBetween(1, 5); values.put("itemCount", generateNullableValue(nullPercentage, faker, f -> itemCount)); - RecordSchema lineItemSchema = null; - if (schema.getField("lineItems").get().getDataType().getFieldType() == RecordFieldType.ARRAY) { - DataType elementType = ((ArrayDataType) schema.getField("lineItems").get().getDataType()).getElementType(); + RecordSchema orderedItemSchema = null; + if (schema.getField("orderedItem").get().getDataType().getFieldType() == RecordFieldType.ARRAY) { + DataType elementType = ((ArrayDataType) schema.getField("orderedItem").get().getDataType()).getElementType(); if (elementType.getFieldType() == RecordFieldType.RECORD) { - lineItemSchema = ((RecordDataType) elementType).getChildSchema(); + orderedItemSchema = ((RecordDataType) elementType).getChildSchema(); } } - double totalAmount = 0.0; - Object[] lineItems = new Object[itemCount]; + double totalPrice = 0.0; + Object[] orderedItems = new Object[itemCount]; for (int i = 0; i < itemCount; i++) { - Map lineItemValues = new LinkedHashMap<>(); - lineItemValues.put("productId", "PRD-" + faker.number().digits(8)); - lineItemValues.put("productName", faker.commerce().productName()); + Map orderedItemValues = new LinkedHashMap<>(); + orderedItemValues.put("identifier", "PRD-" + faker.number().digits(8)); + orderedItemValues.put("name", faker.commerce().productName()); int quantity = faker.number().numberBetween(1, 10); - double unitPrice = faker.number().randomDouble(2, 10, 500); - lineItemValues.put("quantity", quantity); - lineItemValues.put("unitPrice", unitPrice); - totalAmount += quantity * unitPrice; + double price = faker.number().randomDouble(2, 10, 500); + orderedItemValues.put("orderQuantity", quantity); + orderedItemValues.put("price", price); + totalPrice += quantity * price; - if (lineItemSchema != null) { - lineItems[i] = new MapRecord(lineItemSchema, lineItemValues); + if (orderedItemSchema != null) { + orderedItems[i] = new MapRecord(orderedItemSchema, orderedItemValues); } } - values.put("lineItems", generateNullableValue(nullPercentage, faker, f -> lineItems)); - final double finalTotal = totalAmount; - values.put("totalAmount", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(finalTotal).setScale(2, RoundingMode.HALF_UP))); + values.put("orderedItem", generateNullableValue(nullPercentage, faker, f -> orderedItems)); + final double finalTotal = totalPrice; + values.put("totalPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(finalTotal).setScale(2, RoundingMode.HALF_UP))); return values; } }, - EVENT("Event", "A timestamped event with metadata and tags") { + EVENT("Event", "A timestamped event with metadata and keywords (schema.org/Event)") { @Override public RecordSchema getSchema(boolean nullable) { + // Event fields per schema.org/Event List fields = Arrays.asList( - new RecordField("eventId", RecordFieldType.UUID.getDataType(), nullable), - new RecordField("eventType", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("eventDate", RecordFieldType.DATE.getDataType(), nullable), - new RecordField("eventTime", RecordFieldType.TIME.getDataType(), nullable), - new RecordField("eventTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), - new RecordField("source", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("severity", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("message", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("processed", RecordFieldType.BOOLEAN.getDataType(), nullable), - new RecordField("retryCount", RecordFieldType.INT.getDataType(), nullable), - new RecordField("durationMs", RecordFieldType.LONG.getDataType(), nullable), - new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), - new RecordField("metadata", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), nullable) + new RecordField("identifier", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("additionalType", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("startDate", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("startTime", RecordFieldType.TIME.getDataType(), nullable), + new RecordField("endDate", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("organizer", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("eventStatus", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("description", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("isAccessibleForFree", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("attendeeCount", RecordFieldType.INT.getDataType(), nullable), + new RecordField("duration", RecordFieldType.LONG.getDataType(), nullable), + new RecordField("keywords", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("additionalProperty", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), nullable) ); return new SimpleRecordSchema(fields); } @@ -222,69 +229,72 @@ public RecordSchema getSchema(boolean nullable) { @Override public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { Map values = new LinkedHashMap<>(); - values.put("eventId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("identifier", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); - String[] eventTypes = {"USER_LOGIN", "USER_LOGOUT", "PURCHASE", "PAGE_VIEW", "API_CALL", "ERROR", "WARNING", "AUDIT"}; - values.put("eventType", generateNullableValue(nullPercentage, faker, f -> eventTypes[f.number().numberBetween(0, eventTypes.length)])); + // Use Faker's hacker provider for event type naming + values.put("additionalType", generateNullableValue(nullPercentage, faker, f -> + f.hacker().verb().toUpperCase() + "_" + f.hacker().noun().toUpperCase())); Instant eventInstant = faker.timeAndDate().past(30, TimeUnit.DAYS); - values.put("eventDate", generateNullableValue(nullPercentage, faker, f -> new Date(eventInstant.toEpochMilli()))); - values.put("eventTime", generateNullableValue(nullPercentage, faker, f -> new Time(eventInstant.toEpochMilli()))); - values.put("eventTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(eventInstant.toEpochMilli()))); - - String[] sources = {"web-app", "mobile-app", "api-gateway", "batch-processor", "stream-processor"}; - values.put("source", generateNullableValue(nullPercentage, faker, f -> sources[f.number().numberBetween(0, sources.length)])); - - String[] severities = {"DEBUG", "INFO", "WARN", "ERROR", "CRITICAL"}; - values.put("severity", generateNullableValue(nullPercentage, faker, f -> severities[f.number().numberBetween(0, severities.length)])); - values.put("message", generateNullableValue(nullPercentage, faker, f -> f.lorem().sentence())); - values.put("processed", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); - values.put("retryCount", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 5))); - values.put("durationMs", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(1L, 10000L))); - - // Generate tags array - String[] possibleTags = {"important", "urgent", "reviewed", "automated", "manual", "verified", "pending"}; - int tagCount = faker.number().numberBetween(1, 4); - String[] tags = new String[tagCount]; - for (int i = 0; i < tagCount; i++) { - tags[i] = possibleTags[faker.number().numberBetween(0, possibleTags.length)]; + values.put("startDate", generateNullableValue(nullPercentage, faker, f -> new Date(eventInstant.toEpochMilli()))); + values.put("startTime", generateNullableValue(nullPercentage, faker, f -> new Time(eventInstant.toEpochMilli()))); + values.put("endDate", generateNullableValue(nullPercentage, faker, f -> new Timestamp(eventInstant.toEpochMilli()))); + + // Use Faker's app provider for organizer names + values.put("organizer", generateNullableValue(nullPercentage, faker, f -> f.app().name().toLowerCase().replace(" ", "-"))); + + // EventStatus values per schema.org/EventStatusType + String[] statuses = {"EventCancelled", "EventMovedOnline", "EventPostponed", "EventRescheduled", "EventScheduled"}; + values.put("eventStatus", generateNullableValue(nullPercentage, faker, f -> statuses[f.number().numberBetween(0, statuses.length)])); + values.put("description", generateNullableValue(nullPercentage, faker, f -> f.lorem().sentence())); + values.put("isAccessibleForFree", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + values.put("attendeeCount", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 5))); + values.put("duration", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(1L, 10000L))); + + // Generate keywords array using Faker's marketing buzzwords + int keywordCount = faker.number().numberBetween(1, 4); + String[] keywords = new String[keywordCount]; + for (int i = 0; i < keywordCount; i++) { + keywords[i] = faker.marketing().buzzwords().toLowerCase(); } - values.put("tags", generateNullableValue(nullPercentage, faker, f -> tags)); + values.put("keywords", generateNullableValue(nullPercentage, faker, f -> keywords)); - // Generate metadata map - Map metadata = new HashMap<>(); - metadata.put("version", "1." + faker.number().numberBetween(0, 10)); - metadata.put("environment", faker.options().option("dev", "staging", "prod")); - metadata.put("region", faker.options().option("us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1")); - metadata.put("correlationId", UUID.randomUUID().toString()); - values.put("metadata", generateNullableValue(nullPercentage, faker, f -> metadata)); + // Generate additionalProperty map using Faker providers + Map additionalProperty = new HashMap<>(); + additionalProperty.put("version", faker.app().version()); + additionalProperty.put("environment", faker.options().option("dev", "staging", "prod")); + // Use Faker's AWS provider for region + additionalProperty.put("region", faker.aws().region()); + additionalProperty.put("correlationId", UUID.randomUUID().toString()); + values.put("additionalProperty", generateNullableValue(nullPercentage, faker, f -> additionalProperty)); return values; } }, - SENSOR("Sensor", "An IoT sensor reading with location and measurements") { + SENSOR("Sensor", "An IoT sensor reading with geo coordinates and measurements") { @Override public RecordSchema getSchema(boolean nullable) { - List locationFields = Arrays.asList( + // GeoCoordinates fields per schema.org/GeoCoordinates + List geoFields = Arrays.asList( new RecordField("latitude", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("longitude", RecordFieldType.DOUBLE.getDataType(), nullable), - new RecordField("altitude", RecordFieldType.DOUBLE.getDataType(), nullable) + new RecordField("elevation", RecordFieldType.DOUBLE.getDataType(), nullable) ); - RecordSchema locationSchema = new SimpleRecordSchema(locationFields); + RecordSchema geoSchema = new SimpleRecordSchema(geoFields); List fields = Arrays.asList( - new RecordField("sensorId", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("deviceType", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("identifier", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("additionalType", RecordFieldType.STRING.getDataType(), nullable), new RecordField("manufacturer", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("readingTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("dateCreated", RecordFieldType.TIMESTAMP.getDataType(), nullable), new RecordField("temperature", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("humidity", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("pressure", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("batteryLevel", RecordFieldType.INT.getDataType(), nullable), new RecordField("signalStrength", RecordFieldType.INT.getDataType(), nullable), - new RecordField("online", RecordFieldType.BOOLEAN.getDataType(), nullable), - new RecordField("location", RecordFieldType.RECORD.getRecordDataType(locationSchema), nullable) + new RecordField("isActive", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("geo", RecordFieldType.RECORD.getRecordDataType(geoSchema), nullable) ); return new SimpleRecordSchema(fields); } @@ -292,68 +302,71 @@ public RecordSchema getSchema(boolean nullable) { @Override public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { Map values = new LinkedHashMap<>(); - values.put("sensorId", generateNullableValue(nullPercentage, faker, f -> "SNS-" + f.number().digits(10))); + // Use Faker's device provider for serial number + values.put("identifier", generateNullableValue(nullPercentage, faker, f -> f.device().serial())); - String[] deviceTypes = {"TEMPERATURE", "HUMIDITY", "PRESSURE", "MOTION", "LIGHT", "AIR_QUALITY", "MULTI"}; - values.put("deviceType", generateNullableValue(nullPercentage, faker, f -> deviceTypes[f.number().numberBetween(0, deviceTypes.length)])); + // Use Faker's device provider for platform/type + values.put("additionalType", generateNullableValue(nullPercentage, faker, f -> f.device().platform())); - String[] manufacturers = {"SensorCorp", "IoTech", "SmartDevices", "DataSense", "EnviroMonitor"}; - values.put("manufacturer", generateNullableValue(nullPercentage, faker, f -> manufacturers[f.number().numberBetween(0, manufacturers.length)])); + // Use Faker's device provider for manufacturer + values.put("manufacturer", generateNullableValue(nullPercentage, faker, f -> f.device().manufacturer())); - values.put("readingTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(System.currentTimeMillis() - f.number().numberBetween(0, 3600000)))); + values.put("dateCreated", generateNullableValue(nullPercentage, faker, f -> new Timestamp(System.currentTimeMillis() - f.number().numberBetween(0, 3600000)))); values.put("temperature", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, -20, 45))); values.put("humidity", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, 0, 100))); values.put("pressure", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, 980, 1050))); values.put("batteryLevel", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 100))); values.put("signalStrength", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(-100, -30))); - values.put("online", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + values.put("isActive", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); - // Generate nested location record - Map locationValues = new LinkedHashMap<>(); - locationValues.put("latitude", faker.number().randomDouble(6, -90, 90)); - locationValues.put("longitude", faker.number().randomDouble(6, -180, 180)); - locationValues.put("altitude", faker.number().randomDouble(2, 0, 3000)); + // Generate nested geo record (GeoCoordinates) + Map geoValues = new LinkedHashMap<>(); + geoValues.put("latitude", faker.number().randomDouble(6, -90, 90)); + geoValues.put("longitude", faker.number().randomDouble(6, -180, 180)); + geoValues.put("elevation", faker.number().randomDouble(2, 0, 3000)); - RecordSchema locationSchema = schema.getField("location").get().getDataType().getFieldType() == RecordFieldType.RECORD - ? ((RecordDataType) schema.getField("location").get().getDataType()).getChildSchema() + RecordSchema geoSchema = schema.getField("geo").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) schema.getField("geo").get().getDataType()).getChildSchema() : null; - if (locationSchema != null) { - values.put("location", generateNullableValue(nullPercentage, faker, f -> new MapRecord(locationSchema, locationValues))); + if (geoSchema != null) { + values.put("geo", generateNullableValue(nullPercentage, faker, f -> new MapRecord(geoSchema, geoValues))); } return values; } }, - PRODUCT("Product", "A product catalog entry with pricing and inventory") { + PRODUCT("Product", "A product catalog entry with pricing and inventory (schema.org/Product)") { @Override public RecordSchema getSchema(boolean nullable) { + // QuantitativeValue for dimensions per schema.org/QuantitativeValue List dimensionFields = Arrays.asList( - new RecordField("length", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("depth", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("width", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("height", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("weight", RecordFieldType.DOUBLE.getDataType(), nullable) ); RecordSchema dimensionSchema = new SimpleRecordSchema(dimensionFields); + // Product fields per schema.org/Product List fields = Arrays.asList( - new RecordField("productId", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("identifier", RecordFieldType.UUID.getDataType(), nullable), new RecordField("sku", RecordFieldType.STRING.getDataType(), nullable), new RecordField("name", RecordFieldType.STRING.getDataType(), nullable), new RecordField("description", RecordFieldType.STRING.getDataType(), nullable), new RecordField("category", RecordFieldType.STRING.getDataType(), nullable), new RecordField("brand", RecordFieldType.STRING.getDataType(), nullable), new RecordField("price", RecordFieldType.DECIMAL.getDecimalDataType(10, 2), nullable), - new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("inStock", RecordFieldType.BOOLEAN.getDataType(), nullable), - new RecordField("quantity", RecordFieldType.INT.getDataType(), nullable), - new RecordField("rating", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("priceCurrency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("availability", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("inventoryLevel", RecordFieldType.INT.getDataType(), nullable), + new RecordField("ratingValue", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("reviewCount", RecordFieldType.INT.getDataType(), nullable), - new RecordField("createdDate", RecordFieldType.DATE.getDataType(), nullable), - new RecordField("lastUpdated", RecordFieldType.TIMESTAMP.getDataType(), nullable), - new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), - new RecordField("dimensions", RecordFieldType.RECORD.getRecordDataType(dimensionSchema), nullable) + new RecordField("dateCreated", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("dateModified", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("keywords", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("additionalProperty", RecordFieldType.RECORD.getRecordDataType(dimensionSchema), nullable) ); return new SimpleRecordSchema(fields); } @@ -361,47 +374,47 @@ public RecordSchema getSchema(boolean nullable) { @Override public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { Map values = new LinkedHashMap<>(); - values.put("productId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("identifier", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); values.put("sku", generateNullableValue(nullPercentage, faker, f -> "SKU-" + f.number().digits(8))); values.put("name", generateNullableValue(nullPercentage, faker, f -> f.commerce().productName())); values.put("description", generateNullableValue(nullPercentage, faker, f -> f.lorem().paragraph())); values.put("category", generateNullableValue(nullPercentage, faker, f -> f.commerce().department())); - values.put("brand", generateNullableValue(nullPercentage, faker, f -> f.company().name())); + // Use Faker's commerce provider for brand + values.put("brand", generateNullableValue(nullPercentage, faker, f -> f.commerce().brand())); values.put("price", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(f.number().randomDouble(2, 5, 2000)).setScale(2, RoundingMode.HALF_UP))); - String[] currencies = {"USD", "EUR", "GBP"}; - values.put("currency", generateNullableValue(nullPercentage, faker, f -> currencies[f.number().numberBetween(0, currencies.length)])); + // Use Faker's money provider for currency codes + values.put("priceCurrency", generateNullableValue(nullPercentage, faker, f -> f.money().currencyCode())); - int quantity = faker.number().numberBetween(0, 500); - values.put("inStock", generateNullableValue(nullPercentage, faker, f -> quantity > 0)); - values.put("quantity", generateNullableValue(nullPercentage, faker, f -> quantity)); - values.put("rating", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(1, 1, 5))); + int inventoryLevel = faker.number().numberBetween(0, 500); + values.put("availability", generateNullableValue(nullPercentage, faker, f -> inventoryLevel > 0)); + values.put("inventoryLevel", generateNullableValue(nullPercentage, faker, f -> inventoryLevel)); + values.put("ratingValue", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(1, 1, 5))); values.put("reviewCount", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 5000))); - values.put("createdDate", generateNullableValue(nullPercentage, faker, f -> new Date(f.timeAndDate().past(365, TimeUnit.DAYS).toEpochMilli()))); - values.put("lastUpdated", generateNullableValue(nullPercentage, faker, f -> new Timestamp(f.timeAndDate().past(30, TimeUnit.DAYS).toEpochMilli()))); - - // Generate tags array - String[] possibleTags = {"new", "sale", "bestseller", "limited", "exclusive", "eco-friendly", "premium"}; - int tagCount = faker.number().numberBetween(0, 4); - String[] tags = new String[tagCount]; - for (int i = 0; i < tagCount; i++) { - tags[i] = possibleTags[faker.number().numberBetween(0, possibleTags.length)]; + values.put("dateCreated", generateNullableValue(nullPercentage, faker, f -> new Date(f.timeAndDate().past(365, TimeUnit.DAYS).toEpochMilli()))); + values.put("dateModified", generateNullableValue(nullPercentage, faker, f -> new Timestamp(f.timeAndDate().past(30, TimeUnit.DAYS).toEpochMilli()))); + + // Generate keywords array using Faker's marketing buzzwords + int keywordCount = faker.number().numberBetween(0, 4); + String[] keywords = new String[keywordCount]; + for (int i = 0; i < keywordCount; i++) { + keywords[i] = faker.marketing().buzzwords().toLowerCase(); } - values.put("tags", generateNullableValue(nullPercentage, faker, f -> tags)); + values.put("keywords", generateNullableValue(nullPercentage, faker, f -> keywords)); - // Generate nested dimensions record + // Generate nested additionalProperty record (dimensions) Map dimensionValues = new LinkedHashMap<>(); - dimensionValues.put("length", faker.number().randomDouble(2, 1, 100)); + dimensionValues.put("depth", faker.number().randomDouble(2, 1, 100)); dimensionValues.put("width", faker.number().randomDouble(2, 1, 100)); dimensionValues.put("height", faker.number().randomDouble(2, 1, 100)); dimensionValues.put("weight", faker.number().randomDouble(2, 1, 50)); - RecordSchema dimensionSchema = schema.getField("dimensions").get().getDataType().getFieldType() == RecordFieldType.RECORD - ? ((RecordDataType) schema.getField("dimensions").get().getDataType()).getChildSchema() + RecordSchema dimensionSchema = schema.getField("additionalProperty").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) schema.getField("additionalProperty").get().getDataType()).getChildSchema() : null; if (dimensionSchema != null) { - values.put("dimensions", generateNullableValue(nullPercentage, faker, f -> new MapRecord(dimensionSchema, dimensionValues))); + values.put("additionalProperty", generateNullableValue(nullPercentage, faker, f -> new MapRecord(dimensionSchema, dimensionValues))); } return values; @@ -411,23 +424,24 @@ public Map generateValues(Faker faker, RecordSchema schema, int STOCK_TRADE("Stock Trade", "A stock market trade with pricing and volume") { @Override public RecordSchema getSchema(boolean nullable) { + // Using schema.org naming conventions where applicable List fields = Arrays.asList( - new RecordField("tradeId", RecordFieldType.UUID.getDataType(), nullable), - new RecordField("symbol", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("companyName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("identifier", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("tickerSymbol", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("name", RecordFieldType.STRING.getDataType(), nullable), new RecordField("exchange", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("tradeType", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("tradeTimestamp", RecordFieldType.TIMESTAMP.getDataType(), nullable), + new RecordField("actionType", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("dateCreated", RecordFieldType.TIMESTAMP.getDataType(), nullable), new RecordField("price", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), - new RecordField("quantity", RecordFieldType.LONG.getDataType(), nullable), - new RecordField("totalValue", RecordFieldType.DECIMAL.getDecimalDataType(16, 2), nullable), - new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("orderQuantity", RecordFieldType.LONG.getDataType(), nullable), + new RecordField("totalPrice", RecordFieldType.DECIMAL.getDecimalDataType(16, 2), nullable), + new RecordField("priceCurrency", RecordFieldType.STRING.getDataType(), nullable), new RecordField("bidPrice", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), new RecordField("askPrice", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), - new RecordField("high52Week", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), - new RecordField("low52Week", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), + new RecordField("highPrice", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), + new RecordField("lowPrice", RecordFieldType.DECIMAL.getDecimalDataType(12, 4), nullable), new RecordField("marketCap", RecordFieldType.LONG.getDataType(), nullable), - new RecordField("settled", RecordFieldType.BOOLEAN.getDataType(), nullable) + new RecordField("isSettled", RecordFieldType.BOOLEAN.getDataType(), nullable) ); return new SimpleRecordSchema(fields); } @@ -435,34 +449,35 @@ public RecordSchema getSchema(boolean nullable) { @Override public Map generateValues(Faker faker, RecordSchema schema, int nullPercentage) { Map values = new LinkedHashMap<>(); - values.put("tradeId", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("identifier", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); - String[] symbols = {"AAPL", "GOOGL", "MSFT", "AMZN", "META", "TSLA", "NVDA", "JPM", "V", "JNJ"}; - String[] companies = {"Apple Inc.", "Alphabet Inc.", "Microsoft Corp.", "Amazon.com Inc.", "Meta Platforms Inc.", - "Tesla Inc.", "NVIDIA Corp.", "JPMorgan Chase & Co.", "Visa Inc.", "Johnson & Johnson"}; - int symbolIdx = faker.number().numberBetween(0, symbols.length); - values.put("symbol", generateNullableValue(nullPercentage, faker, f -> symbols[symbolIdx])); - values.put("companyName", generateNullableValue(nullPercentage, faker, f -> companies[symbolIdx])); + // Use Faker's stock provider for symbols (randomly choose between NASDAQ and NYSE) + values.put("tickerSymbol", generateNullableValue(nullPercentage, faker, f -> + f.bool().bool() ? f.stock().nsdqSymbol() : f.stock().nyseSymbol())); + // Use Faker's company provider for company names + values.put("name", generateNullableValue(nullPercentage, faker, f -> f.company().name())); - String[] exchanges = {"NYSE", "NASDAQ", "LSE", "TSE"}; - values.put("exchange", generateNullableValue(nullPercentage, faker, f -> exchanges[f.number().numberBetween(0, exchanges.length)])); + // Use Faker's stock provider for exchanges + values.put("exchange", generateNullableValue(nullPercentage, faker, f -> f.stock().exchanges())); - String[] tradeTypes = {"BUY", "SELL"}; - values.put("tradeType", generateNullableValue(nullPercentage, faker, f -> tradeTypes[f.number().numberBetween(0, tradeTypes.length)])); - values.put("tradeTimestamp", generateNullableValue(nullPercentage, faker, f -> new Timestamp(System.currentTimeMillis() - f.number().numberBetween(0, 86400000)))); + // Trade types are fundamental financial terms (BUY/SELL) + String[] actionTypes = {"BuyAction", "SellAction"}; + values.put("actionType", generateNullableValue(nullPercentage, faker, f -> actionTypes[f.number().numberBetween(0, actionTypes.length)])); + values.put("dateCreated", generateNullableValue(nullPercentage, faker, f -> new Timestamp(System.currentTimeMillis() - f.number().numberBetween(0, 86400000)))); double price = faker.number().randomDouble(4, 10, 3000); - long quantity = faker.number().numberBetween(1, 10000); + long orderQuantity = faker.number().numberBetween(1, 10000); values.put("price", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price).setScale(4, RoundingMode.HALF_UP))); - values.put("quantity", generateNullableValue(nullPercentage, faker, f -> quantity)); - values.put("totalValue", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * quantity).setScale(2, RoundingMode.HALF_UP))); - values.put("currency", generateNullableValue(nullPercentage, faker, f -> "USD")); + values.put("orderQuantity", generateNullableValue(nullPercentage, faker, f -> orderQuantity)); + values.put("totalPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * orderQuantity).setScale(2, RoundingMode.HALF_UP))); + // Use Faker's money provider for currency codes + values.put("priceCurrency", generateNullableValue(nullPercentage, faker, f -> f.money().currencyCode())); values.put("bidPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 0.999).setScale(4, RoundingMode.HALF_UP))); values.put("askPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 1.001).setScale(4, RoundingMode.HALF_UP))); - values.put("high52Week", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 1.5).setScale(4, RoundingMode.HALF_UP))); - values.put("low52Week", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 0.6).setScale(4, RoundingMode.HALF_UP))); + values.put("highPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 1.5).setScale(4, RoundingMode.HALF_UP))); + values.put("lowPrice", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(price * 0.6).setScale(4, RoundingMode.HALF_UP))); values.put("marketCap", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(1_000_000_000L, 3_000_000_000_000L))); - values.put("settled", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + values.put("isSettled", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); return values; } @@ -471,70 +486,70 @@ public Map generateValues(Faker faker, RecordSchema schema, int COMPLETE_EXAMPLE("Complete Example", "A comprehensive schema demonstrating all supported data types including nested records, arrays, and maps") { @Override public RecordSchema getSchema(boolean nullable) { - // Deepest nested record: Coordinates - List coordinateFields = Arrays.asList( + // GeoCoordinates per schema.org/GeoCoordinates + List geoFields = Arrays.asList( new RecordField("latitude", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("longitude", RecordFieldType.DOUBLE.getDataType(), nullable) ); - RecordSchema coordinateSchema = new SimpleRecordSchema(coordinateFields); + RecordSchema geoSchema = new SimpleRecordSchema(geoFields); - // Address record with nested coordinates + // PostalAddress per schema.org/PostalAddress with nested geo List addressFields = Arrays.asList( - new RecordField("street", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("city", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("state", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("zipCode", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("country", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("coordinates", RecordFieldType.RECORD.getRecordDataType(coordinateSchema), nullable) + new RecordField("streetAddress", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("addressLocality", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("addressRegion", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("postalCode", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("addressCountry", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("geo", RecordFieldType.RECORD.getRecordDataType(geoSchema), nullable) ); RecordSchema addressSchema = new SimpleRecordSchema(addressFields); - // Profile record with nested address - List profileFields = Arrays.asList( - new RecordField("firstName", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("lastName", RecordFieldType.STRING.getDataType(), nullable), + // Person per schema.org/Person with nested address + List personFields = Arrays.asList( + new RecordField("givenName", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("familyName", RecordFieldType.STRING.getDataType(), nullable), new RecordField("email", RecordFieldType.STRING.getDataType(), nullable), new RecordField("age", RecordFieldType.INT.getDataType(), nullable), new RecordField("verified", RecordFieldType.BOOLEAN.getDataType(), nullable), new RecordField("address", RecordFieldType.RECORD.getRecordDataType(addressSchema), nullable) ); - RecordSchema profileSchema = new SimpleRecordSchema(profileFields); + RecordSchema personSchema = new SimpleRecordSchema(personFields); - // Order record for array of records + // Order per schema.org/Order for array of records List orderFields = Arrays.asList( - new RecordField("orderId", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("amount", RecordFieldType.DOUBLE.getDataType(), nullable), - new RecordField("currency", RecordFieldType.STRING.getDataType(), nullable), - new RecordField("placed", RecordFieldType.DATE.getDataType(), nullable), - new RecordField("shipped", RecordFieldType.BOOLEAN.getDataType(), nullable) + new RecordField("orderNumber", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("totalPrice", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("priceCurrency", RecordFieldType.STRING.getDataType(), nullable), + new RecordField("orderDate", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("isGift", RecordFieldType.BOOLEAN.getDataType(), nullable) ); RecordSchema orderSchema = new SimpleRecordSchema(orderFields); - // Main schema with all types + // Main schema with all types using schema.org naming List fields = Arrays.asList( // Basic types - new RecordField("id", RecordFieldType.UUID.getDataType(), nullable), - new RecordField("active", RecordFieldType.BOOLEAN.getDataType(), nullable), + new RecordField("identifier", RecordFieldType.UUID.getDataType(), nullable), + new RecordField("isActive", RecordFieldType.BOOLEAN.getDataType(), nullable), new RecordField("score", RecordFieldType.INT.getDataType(), nullable), new RecordField("count", RecordFieldType.LONG.getDataType(), nullable), - new RecordField("rating", RecordFieldType.DOUBLE.getDataType(), nullable), + new RecordField("ratingValue", RecordFieldType.DOUBLE.getDataType(), nullable), new RecordField("price", RecordFieldType.FLOAT.getDataType(), nullable), new RecordField("balance", RecordFieldType.DECIMAL.getDecimalDataType(12, 2), nullable), new RecordField("initial", RecordFieldType.CHAR.getDataType(), nullable), new RecordField("flags", RecordFieldType.BYTE.getDataType(), nullable), - new RecordField("rank", RecordFieldType.SHORT.getDataType(), nullable), + new RecordField("position", RecordFieldType.SHORT.getDataType(), nullable), - // Date/Time types - new RecordField("createdDate", RecordFieldType.DATE.getDataType(), nullable), - new RecordField("lastLoginTime", RecordFieldType.TIME.getDataType(), nullable), - new RecordField("lastModified", RecordFieldType.TIMESTAMP.getDataType(), nullable), + // Date/Time types per schema.org + new RecordField("dateCreated", RecordFieldType.DATE.getDataType(), nullable), + new RecordField("lastLogin", RecordFieldType.TIME.getDataType(), nullable), + new RecordField("dateModified", RecordFieldType.TIMESTAMP.getDataType(), nullable), // Complex types - new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("keywords", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), nullable), new RecordField("scores", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()), nullable), - new RecordField("metadata", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), nullable), - new RecordField("profile", RecordFieldType.RECORD.getRecordDataType(profileSchema), nullable), - new RecordField("orders", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderSchema)), nullable) + new RecordField("additionalProperty", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), nullable), + new RecordField("person", RecordFieldType.RECORD.getRecordDataType(personSchema), nullable), + new RecordField("orderedItem", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(orderSchema)), nullable) ); return new SimpleRecordSchema(fields); } @@ -544,31 +559,30 @@ public Map generateValues(Faker faker, RecordSchema schema, int Map values = new LinkedHashMap<>(); // Basic types - values.put("id", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); - values.put("active", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); + values.put("identifier", generateNullableValue(nullPercentage, faker, f -> UUID.randomUUID())); + values.put("isActive", generateNullableValue(nullPercentage, faker, f -> f.bool().bool())); values.put("score", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0, 100))); values.put("count", generateNullableValue(nullPercentage, faker, f -> f.number().numberBetween(0L, 1_000_000L))); - values.put("rating", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, 0, 5))); + values.put("ratingValue", generateNullableValue(nullPercentage, faker, f -> f.number().randomDouble(2, 0, 5))); values.put("price", generateNullableValue(nullPercentage, faker, f -> (float) f.number().randomDouble(2, 1, 1000))); values.put("balance", generateNullableValue(nullPercentage, faker, f -> BigDecimal.valueOf(f.number().randomDouble(2, -10000, 50000)).setScale(2, RoundingMode.HALF_UP))); values.put("initial", generateNullableValue(nullPercentage, faker, f -> (char) ('A' + f.number().numberBetween(0, 26)))); values.put("flags", generateNullableValue(nullPercentage, faker, f -> (byte) f.number().numberBetween(0, 127))); - values.put("rank", generateNullableValue(nullPercentage, faker, f -> (short) f.number().numberBetween(1, 1000))); + values.put("position", generateNullableValue(nullPercentage, faker, f -> (short) f.number().numberBetween(1, 1000))); - // Date/Time types + // Date/Time types per schema.org Instant pastInstant = faker.timeAndDate().past(365, TimeUnit.DAYS); - values.put("createdDate", generateNullableValue(nullPercentage, faker, f -> new Date(pastInstant.toEpochMilli()))); - values.put("lastLoginTime", generateNullableValue(nullPercentage, faker, f -> new Time(f.timeAndDate().past(1, TimeUnit.DAYS).toEpochMilli()))); - values.put("lastModified", generateNullableValue(nullPercentage, faker, f -> new Timestamp(f.timeAndDate().past(7, TimeUnit.DAYS).toEpochMilli()))); - - // Array of strings - String[] possibleTags = {"important", "urgent", "reviewed", "automated", "verified", "pending", "approved"}; - int tagCount = faker.number().numberBetween(1, 5); - String[] tags = new String[tagCount]; - for (int i = 0; i < tagCount; i++) { - tags[i] = possibleTags[faker.number().numberBetween(0, possibleTags.length)]; + values.put("dateCreated", generateNullableValue(nullPercentage, faker, f -> new Date(pastInstant.toEpochMilli()))); + values.put("lastLogin", generateNullableValue(nullPercentage, faker, f -> new Time(f.timeAndDate().past(1, TimeUnit.DAYS).toEpochMilli()))); + values.put("dateModified", generateNullableValue(nullPercentage, faker, f -> new Timestamp(f.timeAndDate().past(7, TimeUnit.DAYS).toEpochMilli()))); + + // Array of strings (keywords) using Faker's word provider + int keywordCount = faker.number().numberBetween(1, 5); + String[] keywords = new String[keywordCount]; + for (int i = 0; i < keywordCount; i++) { + keywords[i] = faker.word().adjective(); } - values.put("tags", generateNullableValue(nullPercentage, faker, f -> tags)); + values.put("keywords", generateNullableValue(nullPercentage, faker, f -> keywords)); // Array of integers int scoreCount = faker.number().numberBetween(3, 8); @@ -578,60 +592,61 @@ public Map generateValues(Faker faker, RecordSchema schema, int } values.put("scores", generateNullableValue(nullPercentage, faker, f -> scores)); - // Map - Map metadata = new HashMap<>(); - metadata.put("source", faker.options().option("web", "mobile", "api", "batch")); - metadata.put("version", "1." + faker.number().numberBetween(0, 10)); - metadata.put("environment", faker.options().option("dev", "staging", "prod")); - metadata.put("region", faker.options().option("us-east-1", "us-west-2", "eu-west-1")); - values.put("metadata", generateNullableValue(nullPercentage, faker, f -> metadata)); - - // Nested profile record (3 levels deep: profile -> address -> coordinates) - RecordSchema profileSchema = schema.getField("profile").get().getDataType().getFieldType() == RecordFieldType.RECORD - ? ((RecordDataType) schema.getField("profile").get().getDataType()).getChildSchema() + // Map (additionalProperty) using Faker providers + Map additionalProperty = new HashMap<>(); + additionalProperty.put("source", faker.app().name().toLowerCase().replace(" ", "-")); + additionalProperty.put("version", faker.app().version()); + additionalProperty.put("environment", faker.options().option("dev", "staging", "prod")); + // Use Faker's AWS provider for region + additionalProperty.put("region", faker.aws().region()); + values.put("additionalProperty", generateNullableValue(nullPercentage, faker, f -> additionalProperty)); + + // Nested person record (3 levels deep: person -> address -> geo) + RecordSchema personSchema = schema.getField("person").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) schema.getField("person").get().getDataType()).getChildSchema() : null; - if (profileSchema != null) { - Map profileValues = new LinkedHashMap<>(); - profileValues.put("firstName", faker.name().firstName()); - profileValues.put("lastName", faker.name().lastName()); - profileValues.put("email", faker.internet().emailAddress()); - profileValues.put("age", faker.number().numberBetween(18, 80)); - profileValues.put("verified", faker.bool().bool()); + if (personSchema != null) { + Map personValues = new LinkedHashMap<>(); + personValues.put("givenName", faker.name().firstName()); + personValues.put("familyName", faker.name().lastName()); + personValues.put("email", faker.internet().emailAddress()); + personValues.put("age", faker.number().numberBetween(18, 80)); + personValues.put("verified", faker.bool().bool()); - RecordSchema addressSchema = profileSchema.getField("address").get().getDataType().getFieldType() == RecordFieldType.RECORD - ? ((RecordDataType) profileSchema.getField("address").get().getDataType()).getChildSchema() + RecordSchema addressSchema = personSchema.getField("address").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) personSchema.getField("address").get().getDataType()).getChildSchema() : null; if (addressSchema != null) { Map addressValues = new LinkedHashMap<>(); - addressValues.put("street", faker.address().streetAddress()); - addressValues.put("city", faker.address().city()); - addressValues.put("state", faker.address().state()); - addressValues.put("zipCode", faker.address().zipCode()); - addressValues.put("country", faker.address().country()); - - RecordSchema coordinateSchema = addressSchema.getField("coordinates").get().getDataType().getFieldType() == RecordFieldType.RECORD - ? ((RecordDataType) addressSchema.getField("coordinates").get().getDataType()).getChildSchema() + addressValues.put("streetAddress", faker.address().streetAddress()); + addressValues.put("addressLocality", faker.address().city()); + addressValues.put("addressRegion", faker.address().state()); + addressValues.put("postalCode", faker.address().zipCode()); + addressValues.put("addressCountry", faker.address().country()); + + RecordSchema geoSchema = addressSchema.getField("geo").get().getDataType().getFieldType() == RecordFieldType.RECORD + ? ((RecordDataType) addressSchema.getField("geo").get().getDataType()).getChildSchema() : null; - if (coordinateSchema != null) { - Map coordValues = new LinkedHashMap<>(); - coordValues.put("latitude", faker.number().randomDouble(6, -90, 90)); - coordValues.put("longitude", faker.number().randomDouble(6, -180, 180)); - addressValues.put("coordinates", new MapRecord(coordinateSchema, coordValues)); + if (geoSchema != null) { + Map geoValues = new LinkedHashMap<>(); + geoValues.put("latitude", faker.number().randomDouble(6, -90, 90)); + geoValues.put("longitude", faker.number().randomDouble(6, -180, 180)); + addressValues.put("geo", new MapRecord(geoSchema, geoValues)); } - profileValues.put("address", new MapRecord(addressSchema, addressValues)); + personValues.put("address", new MapRecord(addressSchema, addressValues)); } - values.put("profile", generateNullableValue(nullPercentage, faker, f -> new MapRecord(profileSchema, profileValues))); + values.put("person", generateNullableValue(nullPercentage, faker, f -> new MapRecord(personSchema, personValues))); } - // Array of order records + // Array of order records (orderedItem) RecordSchema orderSchema = null; - if (schema.getField("orders").get().getDataType().getFieldType() == RecordFieldType.ARRAY) { - DataType elementType = ((ArrayDataType) schema.getField("orders").get().getDataType()).getElementType(); + if (schema.getField("orderedItem").get().getDataType().getFieldType() == RecordFieldType.ARRAY) { + DataType elementType = ((ArrayDataType) schema.getField("orderedItem").get().getDataType()).getElementType(); if (elementType.getFieldType() == RecordFieldType.RECORD) { orderSchema = ((RecordDataType) elementType).getChildSchema(); } @@ -643,15 +658,15 @@ public Map generateValues(Faker faker, RecordSchema schema, int Object[] orders = new Object[orderCount]; for (int i = 0; i < orderCount; i++) { Map orderValues = new LinkedHashMap<>(); - orderValues.put("orderId", "ORD-" + faker.number().digits(8)); - orderValues.put("amount", faker.number().randomDouble(2, 10, 500)); - String[] currencies = {"USD", "EUR", "GBP"}; - orderValues.put("currency", currencies[faker.number().numberBetween(0, currencies.length)]); - orderValues.put("placed", new Date(faker.timeAndDate().past(90, TimeUnit.DAYS).toEpochMilli())); - orderValues.put("shipped", faker.bool().bool()); + orderValues.put("orderNumber", "ORD-" + faker.number().digits(8)); + orderValues.put("totalPrice", faker.number().randomDouble(2, 10, 500)); + // Use Faker's money provider for currency codes + orderValues.put("priceCurrency", faker.money().currencyCode()); + orderValues.put("orderDate", new Date(faker.timeAndDate().past(90, TimeUnit.DAYS).toEpochMilli())); + orderValues.put("isGift", faker.bool().bool()); orders[i] = new MapRecord(finalOrderSchema, orderValues); } - values.put("orders", generateNullableValue(nullPercentage, faker, f -> orders)); + values.put("orderedItem", generateNullableValue(nullPercentage, faker, f -> orders)); } return values; diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java index b61b5eaf9e86..a82888cadff8 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java @@ -354,53 +354,60 @@ public void testValidationFailsWithPredefinedSchemaAndDynamicProperties() throws @Test public void testPredefinedSchemaPerson() throws Exception { + // Field names aligned with schema.org/Person and schema.org/PostalAddress testPredefinedSchema(PredefinedRecordSchema.PERSON, 5, - "id", "firstName", "lastName", "email", "phoneNumber", "dateOfBirth", "age", "active", "address"); + "identifier", "givenName", "familyName", "email", "telephone", "birthDate", "age", "active", "address"); } @Test public void testPredefinedSchemaOrder() throws Exception { + // Field names aligned with schema.org/Order testPredefinedSchema(PredefinedRecordSchema.ORDER, 5, - "orderId", "customerId", "customerName", "customerEmail", "orderDate", "orderTime", - "orderTimestamp", "totalAmount", "currency", "status", "shipped", "itemCount", "lineItems"); + "orderNumber", "customer", "customerName", "customerEmail", "orderDate", "orderTime", + "orderDelivery", "totalPrice", "priceCurrency", "orderStatus", "isGift", "itemCount", "orderedItem"); } @Test public void testPredefinedSchemaEvent() throws Exception { + // Field names aligned with schema.org/Event testPredefinedSchema(PredefinedRecordSchema.EVENT, 5, - "eventId", "eventType", "eventDate", "eventTime", "eventTimestamp", "source", - "severity", "message", "processed", "retryCount", "durationMs", "tags", "metadata"); + "identifier", "additionalType", "startDate", "startTime", "endDate", "organizer", + "eventStatus", "description", "isAccessibleForFree", "attendeeCount", "duration", "keywords", "additionalProperty"); } @Test public void testPredefinedSchemaSensor() throws Exception { + // Field names aligned with schema.org conventions (GeoCoordinates for location) testPredefinedSchema(PredefinedRecordSchema.SENSOR, 5, - "sensorId", "deviceType", "manufacturer", "readingTimestamp", "temperature", - "humidity", "pressure", "batteryLevel", "signalStrength", "online", "location"); + "identifier", "additionalType", "manufacturer", "dateCreated", "temperature", + "humidity", "pressure", "batteryLevel", "signalStrength", "isActive", "geo"); } @Test public void testPredefinedSchemaProduct() throws Exception { + // Field names aligned with schema.org/Product testPredefinedSchema(PredefinedRecordSchema.PRODUCT, 5, - "productId", "sku", "name", "description", "category", "brand", "price", - "currency", "inStock", "quantity", "rating", "reviewCount", "createdDate", - "lastUpdated", "tags", "dimensions"); + "identifier", "sku", "name", "description", "category", "brand", "price", + "priceCurrency", "availability", "inventoryLevel", "ratingValue", "reviewCount", "dateCreated", + "dateModified", "keywords", "additionalProperty"); } @Test public void testPredefinedSchemaStockTrade() throws Exception { + // Field names aligned with schema.org conventions testPredefinedSchema(PredefinedRecordSchema.STOCK_TRADE, 5, - "tradeId", "symbol", "companyName", "exchange", "tradeType", "tradeTimestamp", - "price", "quantity", "totalValue", "currency", "bidPrice", "askPrice", - "high52Week", "low52Week", "marketCap", "settled"); + "identifier", "tickerSymbol", "name", "exchange", "actionType", "dateCreated", + "price", "orderQuantity", "totalPrice", "priceCurrency", "bidPrice", "askPrice", + "highPrice", "lowPrice", "marketCap", "isSettled"); } @Test public void testPredefinedSchemaCompleteExample() throws Exception { + // Field names aligned with schema.org conventions testPredefinedSchema(PredefinedRecordSchema.COMPLETE_EXAMPLE, 3, - "id", "active", "score", "count", "rating", "price", "balance", "initial", - "flags", "rank", "createdDate", "lastLoginTime", "lastModified", "tags", - "scores", "metadata", "profile", "orders"); + "identifier", "isActive", "score", "count", "ratingValue", "price", "balance", "initial", + "flags", "position", "dateCreated", "lastLogin", "dateModified", "keywords", + "scores", "additionalProperty", "person", "orderedItem"); } private void testPredefinedSchema(PredefinedRecordSchema predefinedSchema, int numRecords, String... expectedFields) throws Exception { From 344d6653b266791d844b33eb086dbd6585bbde00 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sat, 31 Jan 2026 01:01:28 +0100 Subject: [PATCH 3/3] review --- .../standard/TestGenerateRecord.java | 119 +++++------------- 1 file changed, 28 insertions(+), 91 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java index a82888cadff8..bd111cf8566b 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java @@ -21,13 +21,10 @@ import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.json.JsonRecordSetWriter; -import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processors.standard.faker.FakerMethodHolder; import org.apache.nifi.processors.standard.faker.FakerUtils; import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; @@ -154,17 +151,18 @@ public void testGenerateNullableFieldsOneHundredNullPercentage() throws Exceptio flowFile.assertContentEquals(String.join("", Collections.nCopies(FakerUtils.getDatatypeFunctionMap().size() - 1, ",")) + "\n"); } - // Tests that the special FT_* types in FakerUtils are supported by the processor. - // Note: FT_BOOL is defined but not added to createFakerPropertyList(), so we exclude it. + // Tests that the remaining fields are supported by the processor. @Test public void testFieldsReturnValue() throws Exception { -List fieldTypeFields = Arrays.stream(GenerateRecord.class.getFields()).filter((field) -> field.getName().startsWith("FT_")).toList(); + List fieldTypeFields = Arrays.stream(GenerateRecord.class.getFields()).filter((field) -> field.getName().startsWith("FT_")).toList(); for (Field field : fieldTypeFields) { - field.setAccessible(true); - testRunner.setProperty(field.getName().toLowerCase(Locale.ROOT), ((AllowableValue) field.get(null)).getValue()); + testRunner.setProperty(field.getName().toLowerCase(Locale.ROOT), ((AllowableValue) field.get(processor)).getValue()); } + // Add at least one dynamic property to satisfy schema configuration validation + testRunner.setProperty("testField", "Name.fullName"); + final Map recordFields = processor.getFields(testRunner.getProcessContext()); final RecordSchema outputSchema = processor.generateRecordSchema(recordFields, true); final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, outputSchema); @@ -354,69 +352,46 @@ public void testValidationFailsWithPredefinedSchemaAndDynamicProperties() throws @Test public void testPredefinedSchemaPerson() throws Exception { - // Field names aligned with schema.org/Person and schema.org/PostalAddress - testPredefinedSchema(PredefinedRecordSchema.PERSON, 5, - "identifier", "givenName", "familyName", "email", "telephone", "birthDate", "age", "active", "address"); + testPredefinedSchema(PredefinedRecordSchema.PERSON, 5); } @Test public void testPredefinedSchemaOrder() throws Exception { - // Field names aligned with schema.org/Order - testPredefinedSchema(PredefinedRecordSchema.ORDER, 5, - "orderNumber", "customer", "customerName", "customerEmail", "orderDate", "orderTime", - "orderDelivery", "totalPrice", "priceCurrency", "orderStatus", "isGift", "itemCount", "orderedItem"); + testPredefinedSchema(PredefinedRecordSchema.ORDER, 5); } @Test public void testPredefinedSchemaEvent() throws Exception { - // Field names aligned with schema.org/Event - testPredefinedSchema(PredefinedRecordSchema.EVENT, 5, - "identifier", "additionalType", "startDate", "startTime", "endDate", "organizer", - "eventStatus", "description", "isAccessibleForFree", "attendeeCount", "duration", "keywords", "additionalProperty"); + testPredefinedSchema(PredefinedRecordSchema.EVENT, 5); } @Test public void testPredefinedSchemaSensor() throws Exception { - // Field names aligned with schema.org conventions (GeoCoordinates for location) - testPredefinedSchema(PredefinedRecordSchema.SENSOR, 5, - "identifier", "additionalType", "manufacturer", "dateCreated", "temperature", - "humidity", "pressure", "batteryLevel", "signalStrength", "isActive", "geo"); + testPredefinedSchema(PredefinedRecordSchema.SENSOR, 5); } @Test public void testPredefinedSchemaProduct() throws Exception { - // Field names aligned with schema.org/Product - testPredefinedSchema(PredefinedRecordSchema.PRODUCT, 5, - "identifier", "sku", "name", "description", "category", "brand", "price", - "priceCurrency", "availability", "inventoryLevel", "ratingValue", "reviewCount", "dateCreated", - "dateModified", "keywords", "additionalProperty"); + testPredefinedSchema(PredefinedRecordSchema.PRODUCT, 5); } @Test public void testPredefinedSchemaStockTrade() throws Exception { - // Field names aligned with schema.org conventions - testPredefinedSchema(PredefinedRecordSchema.STOCK_TRADE, 5, - "identifier", "tickerSymbol", "name", "exchange", "actionType", "dateCreated", - "price", "orderQuantity", "totalPrice", "priceCurrency", "bidPrice", "askPrice", - "highPrice", "lowPrice", "marketCap", "isSettled"); + testPredefinedSchema(PredefinedRecordSchema.STOCK_TRADE, 5); } @Test public void testPredefinedSchemaCompleteExample() throws Exception { - // Field names aligned with schema.org conventions - testPredefinedSchema(PredefinedRecordSchema.COMPLETE_EXAMPLE, 3, - "identifier", "isActive", "score", "count", "ratingValue", "price", "balance", "initial", - "flags", "position", "dateCreated", "lastLogin", "dateModified", "keywords", - "scores", "additionalProperty", "person", "orderedItem"); + testPredefinedSchema(PredefinedRecordSchema.COMPLETE_EXAMPLE, 3); } - private void testPredefinedSchema(PredefinedRecordSchema predefinedSchema, int numRecords, String... expectedFields) throws Exception { - final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); - testRunner.addControllerService("json-writer", jsonWriter); - testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); - testRunner.enableControllerService(jsonWriter); + private void testPredefinedSchema(PredefinedRecordSchema predefinedSchema, int numRecords) throws Exception { + final RecordSchema schema = predefinedSchema.getSchema(true); + final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, schema); + testRunner.addControllerService("record-writer", recordWriter); + testRunner.enableControllerService(recordWriter); - testRunner.setProperty(GenerateRecord.RECORD_WRITER, "json-writer"); + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer"); testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, predefinedSchema.name()); testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true"); testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0"); @@ -426,55 +401,20 @@ private void testPredefinedSchema(PredefinedRecordSchema predefinedSchema, int n testRunner.run(); testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1); - MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).getFirst(); // Verify record count attribute flowFile.assertAttributeEquals("record.count", String.valueOf(numRecords)); - flowFile.assertAttributeEquals("mime.type", "application/json"); - - // Verify content is valid JSON and contains expected fields - final String content = flowFile.getContent(); - assertNotNull(content); - assertTrue(content.startsWith("["), "Content should be a JSON array"); - - // Verify all expected fields are present in the output - for (String field : expectedFields) { - assertTrue(content.contains("\"" + field + "\""), - "Expected field '" + field + "' not found in output for schema " + predefinedSchema.name()); - } - - // Parse and verify records using JsonTreeReader - final JsonTreeReader jsonReader = new JsonTreeReader(); - testRunner.addControllerService("json-reader", jsonReader); - testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema"); - testRunner.enableControllerService(jsonReader); - - final byte[] contentBytes = flowFile.toByteArray(); - try (final ByteArrayInputStream inputStream = new ByteArrayInputStream(contentBytes); - final RecordReader recordReader = jsonReader.createRecordReader(flowFile.getAttributes(), inputStream, contentBytes.length, testRunner.getLogger())) { - - int recordCount = 0; - Record record; - while ((record = recordReader.nextRecord()) != null) { - recordCount++; - // Verify each expected field exists in the record - for (String field : expectedFields) { - assertTrue(record.getSchema().getFieldNames().contains(field), - "Record schema should contain field '" + field + "' for schema " + predefinedSchema.name()); - } - } - assertEquals(numRecords, recordCount, "Should have generated " + numRecords + " records"); - } } @Test public void testPredefinedSchemaWithNullPercentage() throws Exception { - final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); - testRunner.addControllerService("json-writer", jsonWriter); - testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); - testRunner.enableControllerService(jsonWriter); + final RecordSchema schema = PredefinedRecordSchema.PERSON.getSchema(true); + final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, schema); + testRunner.addControllerService("record-writer", recordWriter); + testRunner.enableControllerService(recordWriter); - testRunner.setProperty(GenerateRecord.RECORD_WRITER, "json-writer"); + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer"); testRunner.setProperty(GenerateRecord.PREDEFINED_SCHEMA, PredefinedRecordSchema.PERSON.name()); testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true"); testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100"); @@ -484,12 +424,9 @@ public void testPredefinedSchemaWithNullPercentage() throws Exception { testRunner.run(); testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1); - MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).getFirst(); - // With 100% null percentage, all nullable fields should be null - final String content = flowFile.getContent(); - assertNotNull(content); - // The content should contain null values - assertTrue(content.contains("null"), "With 100% null percentage, output should contain null values"); + // Verify record count attribute + flowFile.assertAttributeEquals("record.count", "1"); } }