Skip to content

Commit 58d62d1

Browse files
authored
KAFKA-19634 Formalize nullable and non-nullable type distinctions in protocol specification (#20614)
This patch introduces a clear separation between nullable and non-nullable data structures. The key changes include: 1. Differentiates between nullable and non-nullable versions of `RECORDS`, `COMPACT_RECORDS`, and `Schema` types. 2. Adds explicit nullable type names for `ArrayOf` and `CompactArrayOf`. 3. Introduces a new, concise syntax for representing types: - `{}` for struct, `?{}` for nullable struct - `[T]` for array, `?[T]` for nullable array - `(T)` for compact array, `?(T)` for nullable compact array 4. Declares shared schemas as non-nullable `Schema` by default. A field that references a shared schema and is nullable must be explicitly declared as a new `NullableSchema(X)`. 5. Add UTs to verify the consistency between schema and message serialization. Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 7311fe5 commit 58d62d1

File tree

15 files changed

+609
-68
lines changed

15 files changed

+609
-68
lines changed

clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@
3535
import static org.apache.kafka.common.protocol.types.Type.BYTES;
3636
import static org.apache.kafka.common.protocol.types.Type.COMPACT_BYTES;
3737
import static org.apache.kafka.common.protocol.types.Type.COMPACT_NULLABLE_BYTES;
38+
import static org.apache.kafka.common.protocol.types.Type.COMPACT_NULLABLE_RECORDS;
39+
import static org.apache.kafka.common.protocol.types.Type.COMPACT_RECORDS;
3840
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
41+
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_RECORDS;
3942
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
4043

4144
/**
@@ -135,7 +138,6 @@ public enum ApiKeys {
135138
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
136139
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
137140
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
138-
139141

140142
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
141143
new EnumMap<>(ApiMessageType.ListenerType.class);
@@ -342,9 +344,14 @@ private static boolean retainsBufferReference(Schema schema) {
342344
Schema.Visitor detector = new Schema.Visitor() {
343345
@Override
344346
public void visit(Type field) {
345-
if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS ||
346-
field == COMPACT_BYTES || field == COMPACT_NULLABLE_BYTES)
347+
// avoid BooleanExpressionComplexity checkstyle warning
348+
boolean isBytesType = field == BYTES || field == NULLABLE_BYTES ||
349+
field == COMPACT_BYTES || field == COMPACT_NULLABLE_BYTES;
350+
boolean isRecordsType = field == RECORDS || field == NULLABLE_RECORDS ||
351+
field == COMPACT_RECORDS || field == COMPACT_NULLABLE_RECORDS;
352+
if (isBytesType || isRecordsType) {
347353
hasBuffer.set(true);
354+
}
348355
}
349356
};
350357
schema.walk(detector);

clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,26 @@ private static void schemaToBnfHtml(Schema schema, StringBuilder b, int indentSi
4040
final String indentStr = indentString(indentSize);
4141
final Map<String, Type> subTypes = new LinkedHashMap<>();
4242

43+
b.append(schema.leftBracket());
44+
b.append(" ");
4345
// Top level fields
4446
for (BoundField field: schema.fields()) {
4547
Type type = field.def.type;
4648
if (type.isArray()) {
47-
b.append("[");
49+
b.append(type.leftBracket());
4850
b.append(field.def.name);
49-
b.append("] ");
51+
b.append(type.rightBracket());
52+
b.append(" ");
5053
if (!subTypes.containsKey(field.def.name)) {
5154
subTypes.put(field.def.name, type.arrayElementType().get());
5255
}
5356
} else if (type instanceof TaggedFields) {
5457
Map<Integer, Field> taggedFields = new TreeMap<>(((TaggedFields) type).fields());
5558
taggedFields.forEach((tag, taggedField) -> {
5659
if (taggedField.type.isArray()) {
57-
b.append("[");
60+
b.append(type.leftBracket());
5861
b.append(taggedField.name);
59-
b.append("]");
62+
b.append(type.rightBracket());
6063
if (!subTypes.containsKey(taggedField.name))
6164
subTypes.put(taggedField.name + "&lt;tag: " + tag.toString() + "&gt;", taggedField.type.arrayElementType().get());
6265
} else {
@@ -75,6 +78,7 @@ private static void schemaToBnfHtml(Schema schema, StringBuilder b, int indentSi
7578
subTypes.put(field.def.name, type);
7679
}
7780
}
81+
b.append(schema.rightBracket());
7882
b.append("\n");
7983

8084
// Sub Types/Schemas
@@ -227,14 +231,14 @@ public static String toHtml() {
227231
b.append(" Response (Version: ");
228232
b.append(version);
229233
b.append(") => ");
230-
schemaToBnfHtml(responses[version], b, 2);
234+
schemaToBnfHtml(schema, b, 2);
231235
b.append("</pre>");
232236

233237
b.append("<p><b>Response header version:</b> ");
234238
b.append(key.responseHeaderVersion((short) version));
235239
b.append("</p>\n");
236240

237-
schemaToFieldTableHtml(responses[version], b);
241+
schemaToFieldTableHtml(schema, b);
238242
b.append("</div>\n");
239243
}
240244
}

clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class ArrayOf extends DocumentedType {
2828

2929
private static final String ARRAY_TYPE_NAME = "ARRAY";
3030

31+
private static final String NULLABLE_ARRAY_TYPE_NAME = "NULLABLE_ARRAY";
32+
3133
private final Type type;
3234
private final boolean nullable;
3335

@@ -97,9 +99,20 @@ public Optional<Type> arrayElementType() {
9799
return Optional.of(type);
98100
}
99101

102+
@Override
103+
public String leftBracket() {
104+
return nullable ? "?[" : "[";
105+
}
106+
107+
@Override
108+
public String rightBracket() {
109+
return "]";
110+
}
111+
100112
@Override
101113
public String toString() {
102-
return ARRAY_TYPE_NAME + "(" + type + ")";
114+
String name = nullable ? NULLABLE_ARRAY_TYPE_NAME : ARRAY_TYPE_NAME;
115+
return name + "(" + type + ")";
103116
}
104117

105118
@Override
@@ -119,15 +132,27 @@ public Object[] validate(Object item) {
119132

120133
@Override
121134
public String typeName() {
122-
return ARRAY_TYPE_NAME;
135+
return nullable ? NULLABLE_ARRAY_TYPE_NAME : ARRAY_TYPE_NAME;
123136
}
124137

125138
@Override
126139
public String documentation() {
127-
return "Represents a sequence of objects of a given type T. " +
140+
String doc;
141+
if (nullable) {
142+
doc = "Represents a sequence of objects of a given type T. " +
128143
"Type T can be either a primitive type (e.g. " + STRING + ") or a structure. " +
129144
"First, the length N is given as an " + INT32 + ". Then N instances of type T follow. " +
130145
"A null array is represented with a length of -1. " +
131-
"In protocol documentation an array of T instances is referred to as [T].";
146+
"In protocol documentation a nullable array of T instances is referred to as " +
147+
leftBracket() + "T" + rightBracket() + ".";
148+
} else {
149+
doc = "Represents a sequence of objects of a given type T. " +
150+
"Type T can be either a primitive type (e.g. " + STRING + ") or a structure. " +
151+
"First, the length N is given as an " + INT32 + ". Then N instances of type T follow. " +
152+
"In protocol documentation an array of T instances is referred to as " +
153+
leftBracket() + "T" + rightBracket() + ".";
154+
}
155+
156+
return doc;
132157
}
133158
}

clients/src/main/java/org/apache/kafka/common/protocol/types/BoundField.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public BoundField(Field def, Schema schema, int index) {
2929
this.schema = schema;
3030
this.index = index;
3131
}
32-
32+
3333
@Override
3434
public String toString() {
3535
return def.name + ":" + def.type;

clients/src/main/java/org/apache/kafka/common/protocol/types/CompactArrayOf.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
public class CompactArrayOf extends DocumentedType {
3131
private static final String COMPACT_ARRAY_TYPE_NAME = "COMPACT_ARRAY";
3232

33+
private static final String COMPACT_NULLABLE_ARRAY_TYPE_NAME = "COMPACT_NULLABLE_ARRAY";
34+
3335
private final Type type;
3436
private final boolean nullable;
3537

@@ -103,9 +105,20 @@ public Optional<Type> arrayElementType() {
103105
return Optional.of(type);
104106
}
105107

108+
@Override
109+
public String leftBracket() {
110+
return nullable ? "?(" : "(";
111+
}
112+
113+
@Override
114+
public String rightBracket() {
115+
return ")";
116+
}
117+
106118
@Override
107119
public String toString() {
108-
return COMPACT_ARRAY_TYPE_NAME + "(" + type + ")";
120+
String name = nullable ? COMPACT_NULLABLE_ARRAY_TYPE_NAME : COMPACT_ARRAY_TYPE_NAME;
121+
return name + "(" + type + ")";
109122
}
110123

111124
@Override
@@ -125,15 +138,26 @@ public Object[] validate(Object item) {
125138

126139
@Override
127140
public String typeName() {
128-
return COMPACT_ARRAY_TYPE_NAME;
141+
return nullable ? COMPACT_NULLABLE_ARRAY_TYPE_NAME : COMPACT_ARRAY_TYPE_NAME;
129142
}
130143

131144
@Override
132145
public String documentation() {
133-
return "Represents a sequence of objects of a given type T. " +
146+
String doc;
147+
if (nullable) {
148+
doc = "Represents a sequence of objects of a given type T. " +
134149
"Type T can be either a primitive type (e.g. " + STRING + ") or a structure. " +
135150
"First, the length N + 1 is given as an UNSIGNED_VARINT. Then N instances of type T follow. " +
136151
"A null array is represented with a length of 0. " +
137-
"In protocol documentation an array of T instances is referred to as [T].";
152+
"In protocol documentation a compact nullable array of T instances is referred to as " +
153+
leftBracket() + "T" + rightBracket() + ".";
154+
} else {
155+
doc = "Represents a sequence of objects of a given type T. " +
156+
"Type T can be either a primitive type (e.g. " + STRING + ") or a structure. " +
157+
"First, the length N + 1 is given as an UNSIGNED_VARINT. Then N instances of type T follow. " +
158+
"In protocol documentation a compact array of T instances is referred to as " +
159+
leftBracket() + "T" + rightBracket() + ".";
160+
}
161+
return doc;
138162
}
139163
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.protocol.types;
18+
19+
import java.nio.ByteBuffer;
20+
import java.util.Arrays;
21+
22+
/**
23+
* The nullable schema for a compound record definition
24+
*/
25+
public final class NullableSchema extends Schema {
26+
27+
private static final String NULLABLE_STRUCT_TYPE_NAME = "NULLABLE_STRUCT";
28+
29+
public NullableSchema(Schema schema) {
30+
super(schema.tolerateMissingFieldsWithDefaults(), Arrays.stream(schema.fields()).map(field -> field.def).toArray(Field[]::new));
31+
}
32+
33+
@Override
34+
public boolean isNullable() {
35+
return true;
36+
}
37+
38+
/**
39+
* Write a struct to the buffer with special handling for null values
40+
* If the input object is null, writes a byte value of -1 to the buffer as a null indicator.
41+
*/
42+
@Override
43+
public void write(ByteBuffer buffer, Object o) {
44+
if (o == null) {
45+
buffer.put((byte) -1);
46+
return;
47+
}
48+
49+
buffer.put((byte) 1);
50+
super.write(buffer, o);
51+
}
52+
53+
@Override
54+
public Struct read(ByteBuffer buffer) {
55+
byte nullIndicator = buffer.get();
56+
if (nullIndicator < 0)
57+
return null;
58+
59+
return super.read(buffer);
60+
}
61+
62+
@Override
63+
public int sizeOf(Object o) {
64+
if (o == null)
65+
return 1;
66+
67+
return 1 + super.sizeOf(o);
68+
}
69+
70+
@Override
71+
public Struct validate(Object item) {
72+
if (item == null)
73+
return null;
74+
75+
return super.validate(item);
76+
}
77+
78+
@Override
79+
public String typeName() {
80+
return NULLABLE_STRUCT_TYPE_NAME;
81+
}
82+
83+
@Override
84+
public String leftBracket() {
85+
return "?{";
86+
}
87+
88+
@Override
89+
public String rightBracket() {
90+
return "}";
91+
}
92+
93+
@Override
94+
public String documentation() {
95+
return "A nullable struct is named by a string with a capitalized first letter and consists of one or more fields. " +
96+
"It represents a composite object or null. " +
97+
"For non-null values, the first byte has value 1, " +
98+
"followed by the serialization of each field in the order they are defined. " +
99+
"A null value is encoded as a byte with value -1 and there are no following bytes." +
100+
"In protocol documentation a nullable struct containing multiple fields is enclosed by " +
101+
leftBracket() + " and " + rightBracket() + ".";
102+
}
103+
}

clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.kafka.common.protocol.types;
1818

19+
import org.apache.kafka.common.protocol.types.Type.DocumentedType;
20+
1921
import java.nio.ByteBuffer;
2022
import java.util.HashMap;
2123
import java.util.Map;
@@ -24,7 +26,9 @@
2426
/**
2527
* The schema for a compound record definition
2628
*/
27-
public final class Schema extends Type {
29+
public class Schema extends DocumentedType {
30+
private static final String STRUCT_TYPE_NAME = "STRUCT";
31+
2832
private static final Object[] NO_VALUES = new Object[0];
2933

3034
private final BoundField[] fields;
@@ -53,6 +57,7 @@ public Schema(Field... fs) {
5357
*
5458
* @throws SchemaException If the given list have duplicate fields
5559
*/
60+
@SuppressWarnings("this-escape")
5661
public Schema(boolean tolerateMissingFieldsWithDefaults, Field... fs) {
5762
this.fields = new BoundField[fs.length];
5863
this.fieldsByName = new HashMap<>();
@@ -173,6 +178,20 @@ public BoundField[] fields() {
173178
return this.fields;
174179
}
175180

181+
protected boolean tolerateMissingFieldsWithDefaults() {
182+
return this.tolerateMissingFieldsWithDefaults;
183+
}
184+
185+
@Override
186+
public String leftBracket() {
187+
return "{";
188+
}
189+
190+
@Override
191+
public String rightBracket() {
192+
return "}";
193+
}
194+
176195
/**
177196
* Display a string representation of the schema
178197
*/
@@ -206,6 +225,19 @@ public Struct validate(Object item) {
206225
}
207226
}
208227

228+
@Override
229+
public String typeName() {
230+
return STRUCT_TYPE_NAME;
231+
}
232+
233+
@Override
234+
public String documentation() {
235+
return "A struct is named by a string with a capitalized first letter and consists of one or more fields. " +
236+
"It represents a composite object encoded as the serialization of each field in the order they are defined." +
237+
"In protocol documentation a struct containing multiple fields is enclosed by " +
238+
leftBracket() + " and " + rightBracket() + ".";
239+
}
240+
209241
public void walk(Visitor visitor) {
210242
Objects.requireNonNull(visitor, "visitor must be non-null");
211243
handleNode(this, visitor);

0 commit comments

Comments
 (0)