Skip to content

Commit dffd031

Browse files
aglinxinyuanCopilotchenlica
authored
feat: allow Multi-Link on Input Ports (#4342)
### What changes were proposed in this PR? Enabling multi-link support for input ports. | Before the change | After the change | | ------------- | ------------- | | <img width="651" height="491" alt="image" src="https://github.com/user-attachments/assets/6ebee08c-89ab-4731-bc4a-cfcd95ea8203" /> | <img width="661" height="498" alt="image" src="https://github.com/user-attachments/assets/bab2ca5c-be72-4643-8d21-5c56156e61a1" /> | ### Any related issues, documentation, discussions? Closes #4329 ### How was this PR tested? Tested manually. ### Was this PR authored or co-authored using generative AI tooling? No. --------- Signed-off-by: Xinyuan Lin <xinyual3@uci.edu> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Chen Li <chenli@gmail.com>
1 parent 353a31e commit dffd031

65 files changed

Lines changed: 162 additions & 234 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/config/src/main/resources/storage.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ storage {
4343
uri-without-scheme = "localhost:5432/texera_iceberg_catalog"
4444
uri-without-scheme = ${?STORAGE_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME}
4545

46-
username = "texera"
46+
username = "postgres"
4747
username = ${?STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME}
4848

49-
password = "password"
49+
password = "postgres"
5050
password = ${?STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD}
5151
}
5252
}

common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,11 @@ message GlobalPortIdentity{
4242
message InputPort {
4343
PortIdentity id = 1 [(scalapb.field).no_box = true];
4444
string displayName = 2;
45-
bool allowMultiLinks = 3;
45+
bool disallowMultiLinks = 3;
4646
repeated PortIdentity dependencies = 4;
4747
}
4848

4949

50-
5150
message OutputPort {
5251
enum OutputMode {
5352
// outputs complete result set snapshot for each update

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/PortDescriptor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.texera.amber.core.workflow.PartitionInfo
2525
case class PortDescription(
2626
portID: String,
2727
displayName: String,
28-
allowMultiInputs: Boolean,
28+
disallowMultiInputs: Boolean,
2929
isDynamicPort: Boolean,
3030
partitionRequirement: PartitionInfo,
3131
dependencies: List[Int] = List.empty

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/dummy/DummyOpDesc.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ class DummyOpDesc extends LogicalOp with PortDescriptor {
3939
InputPort(
4040
PortIdentity(idx),
4141
displayName = portDesc.displayName,
42-
allowMultiLinks = portDesc.allowMultiInputs,
42+
disallowMultiLinks = portDesc.disallowMultiInputs,
4343
dependencies = portDesc.dependencies.map(idx => PortIdentity(idx))
4444
)
4545
}
4646
} else {
47-
List(InputPort(PortIdentity(), allowMultiLinks = true))
47+
List(InputPort())
4848
}
4949
val outputPortInfo = if (outputPorts != null) {
5050
outputPorts.zipWithIndex.map {

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/metadata/OperatorMetadataGenerator.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType
2525
import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}
2626
import com.kjetland.jackson.jsonSchema.JsonSchemaConfig.html5EnabledSchema
2727
import com.kjetland.jackson.jsonSchema.{JsonSchemaConfig, JsonSchemaDraft, JsonSchemaGenerator}
28+
import org.apache.texera.amber.core.workflow.OutputPort.OutputMode
2829
import org.apache.texera.amber.core.workflow.{InputPort, OutputPort}
2930
import org.apache.texera.amber.operator.LogicalOp
3031
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
@@ -45,6 +46,21 @@ case class OperatorInfo(
4546
allowPortCustomization: Boolean = false
4647
)
4748

49+
object OperatorInfo {
50+
def forVisualization(
51+
userFriendlyName: String,
52+
operatorDescription: String,
53+
operatorGroupName: String
54+
): OperatorInfo =
55+
OperatorInfo(
56+
userFriendlyName,
57+
operatorDescription,
58+
operatorGroupName,
59+
inputPorts = List(InputPort(disallowMultiLinks = true)),
60+
outputPorts = List(OutputPort(mode = OutputMode.SINGLE_SNAPSHOT))
61+
)
62+
}
63+
4864
case class OperatorMetadata(
4965
operatorType: String,
5066
jsonSchema: JsonNode,

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ class SklearnTestingOpDesc extends PythonOperatorDescriptor {
9191
InputPort(
9292
PortIdentity(),
9393
"model",
94-
dependencies = List(PortIdentity(1)),
95-
allowMultiLinks = true
94+
dependencies = List(PortIdentity(1))
9695
),
9796
InputPort(PortIdentity(1), "data")
9897
),

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/java/JavaUDFOpDesc.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,12 @@ class JavaUDFOpDesc extends LogicalOp {
143143
InputPort(
144144
PortIdentity(idx),
145145
displayName = portDesc.displayName,
146-
allowMultiLinks = portDesc.allowMultiInputs,
146+
disallowMultiLinks = portDesc.disallowMultiInputs,
147147
dependencies = portDesc.dependencies.map(idx => PortIdentity(idx))
148148
)
149149
}
150150
} else {
151-
List(InputPort(PortIdentity(), allowMultiLinks = true))
151+
List(InputPort())
152152
}
153153
val outputPortInfo = if (outputPorts != null) {
154154
outputPorts.zipWithIndex.map {

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,10 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
137137
"User-defined function operator in Python script",
138138
OperatorGroupConstants.PYTHON_GROUP,
139139
inputPorts = List(
140-
InputPort(PortIdentity(), displayName = "model", allowMultiLinks = true),
140+
InputPort(PortIdentity(), displayName = "model"),
141141
InputPort(
142142
PortIdentity(1),
143143
displayName = "tuples",
144-
allowMultiLinks = true,
145144
dependencies = List(PortIdentity(0))
146145
)
147146
),

common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,12 @@ class PythonUDFOpDescV2 extends LogicalOp {
149149
InputPort(
150150
PortIdentity(idx),
151151
displayName = portDesc.displayName,
152-
allowMultiLinks = portDesc.allowMultiInputs,
152+
disallowMultiLinks = portDesc.disallowMultiInputs,
153153
dependencies = portDesc.dependencies.map(idx => PortIdentity(idx))
154154
)
155155
}
156156
} else {
157-
List(InputPort(PortIdentity(), allowMultiLinks = true))
157+
List(InputPort())
158158
}
159159
val outputPortInfo = if (outputPorts != null) {
160160
outputPorts.zipWithIndex.map {

0 commit comments

Comments
 (0)