From bc9a95974a741ff13bf4c6ba46cf98091b6cd730 Mon Sep 17 00:00:00 2001 From: Marcus Henriksson Date: Fri, 17 Apr 2026 15:53:45 +0200 Subject: [PATCH] feat: Add kafka catalog as a payloadbuilder catalog extension --- pom.xml | 2 +- .../catalog/es/ESCatalogExtension.java | 1 + .../catalog/es/ESConnectionsModel.java | 2 + .../fs/FilesystemCatalogExtensionFactory.java | 4 +- .../catalog/kafka/KafkaCatalogExtension.java | 508 ++++++++++++++++++ .../kafka/KafkaCatalogExtensionFactory.java | 44 ++ .../catalog/kafka/KafkaConnection.java | 376 +++++++++++++ .../kafka/KafkaConnectionsConfigurable.java | 172 ++++++ .../catalog/kafka/KafkaConnectionsModel.java | 217 ++++++++ .../catalog/kafka/KafkaConstants.java | 11 + .../catalog/es/ESCatalogExtensionTest.java | 1 + .../kafka/KafkaCatalogExtensionTest.java | 189 +++++++ .../catalog/kafka/KafkaConnectionTest.java | 36 ++ 13 files changed, 1560 insertions(+), 3 deletions(-) create mode 100644 queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtension.java create mode 100644 queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionFactory.java create mode 100644 queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnection.java create mode 100644 queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsConfigurable.java create mode 100644 queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsModel.java create mode 100644 queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConstants.java create mode 100644 queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionTest.java create mode 100644 queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionTest.java diff --git a/pom.xml b/pom.xml index 04d12b4d..4bc0a15c 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 5.14.2 1.7.36 2.43.0 - 1.10.1 + 1.10.1-SNAPSHOT diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java index 8bdf24e1..55c80c9c 100644 --- a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java @@ -38,6 +38,7 @@ import se.kuseman.payloadbuilder.api.catalog.CatalogException; import se.kuseman.payloadbuilder.api.execution.IQuerySession; import se.kuseman.payloadbuilder.api.execution.UTF8String; +import se.kuseman.payloadbuilder.catalog.AuthType; import se.kuseman.payloadbuilder.catalog.Common; import se.kuseman.payloadbuilder.catalog.CredentialsException; import se.kuseman.payloadbuilder.catalog.es.ESConnectionsModel.Connection; diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java index faf1691c..4efc4d5d 100644 --- a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java @@ -43,7 +43,9 @@ import se.kuseman.payloadbuilder.api.catalog.CatalogException; import se.kuseman.payloadbuilder.api.execution.IQuerySession; +import se.kuseman.payloadbuilder.catalog.AuthType; import se.kuseman.payloadbuilder.catalog.Common; +import se.kuseman.payloadbuilder.catalog.HttpClientUtils; /** * Model for {@link ESCatalogExtension}'s connections diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java index 03c7da71..111fe0b1 100644 --- a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java @@ -3,8 +3,8 @@ import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension; import com.queryeer.api.extensions.payloadbuilder.ICatalogExtensionFactory; -/** SPI factory for {@link FilesystemCatalogExtension}. */ -public class FilesystemCatalogExtensionFactory implements ICatalogExtensionFactory +/** Extension factory for {@link FilesystemCatalogExtension}. */ +class FilesystemCatalogExtensionFactory implements ICatalogExtensionFactory { @Override public ICatalogExtension create(String catalogAlias) diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtension.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtension.java new file mode 100644 index 00000000..c6014221 --- /dev/null +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtension.java @@ -0,0 +1,508 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import static java.util.Collections.emptyList; +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.ObjectUtils.getIfNull; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.Strings.CI; + +import java.awt.Component; +import java.awt.Dimension; +import java.awt.GridBagConstraints; +import java.awt.GridBagLayout; +import java.awt.Insets; +import java.util.List; + +import javax.swing.DefaultComboBoxModel; +import javax.swing.DefaultListCellRenderer; +import javax.swing.JButton; +import javax.swing.JComboBox; +import javax.swing.JLabel; +import javax.swing.JList; +import javax.swing.JPanel; +import javax.swing.SwingUtilities; +import javax.swing.event.ListDataListener; + +import com.queryeer.api.IQueryFile; +import com.queryeer.api.component.AutoCompletionComboBox; +import com.queryeer.api.extensions.IConfigurable; +import com.queryeer.api.extensions.engine.IQueryEngine.IState.MetaParameter; +import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension; +import com.queryeer.api.extensions.payloadbuilder.ICatalogExtensionView; +import com.queryeer.api.extensions.payloadbuilder.IPayloadbuilderState; +import com.queryeer.api.service.IQueryFileProvider; + +import se.kuseman.payloadbuilder.api.catalog.Catalog; +import se.kuseman.payloadbuilder.api.catalog.CatalogException; +import se.kuseman.payloadbuilder.api.execution.IQuerySession; +import se.kuseman.payloadbuilder.catalog.CredentialsException; + +/** Queryeer extension for {@link KafkaCatalog}. */ +class KafkaCatalogExtension implements ICatalogExtension +{ + static final Catalog CATALOG = new KafkaCatalog(); + + private final IQueryFileProvider queryFileProvider; + private final KafkaConnectionsModel connectionsModel; + private final String catalogAlias; + + private QuickPropertiesPanel quickPropertiesPanel; + + KafkaCatalogExtension(IQueryFileProvider queryFileProvider, KafkaConnectionsModel connectionsModel, String catalogAlias) + { + this.queryFileProvider = requireNonNull(queryFileProvider, "queryFileProvider"); + this.connectionsModel = requireNonNull(connectionsModel, "connectionsModel"); + this.catalogAlias = catalogAlias; + } + + @Override + public String getTitle() + { + return CATALOG.getName(); + } + + @Override + public Catalog getCatalog() + { + return CATALOG; + } + + @Override + public Class getConfigurableClass() + { + return KafkaConnectionsConfigurable.class; + } + + @Override + public boolean hasQuickPropertieComponent() + { + return true; + } + + @Override + public Component getQuickPropertiesComponent() + { + if (quickPropertiesPanel == null) + { + quickPropertiesPanel = new QuickPropertiesPanel(); + } + return quickPropertiesPanel; + } + + @Override + public List getMetaParameters(IQuerySession querySession, boolean testData) + { + String bootstrapServers = "localhost:9092"; + String schemaRegistryUrl = ""; + String securityProtocol = KafkaConnection.SecurityProtocol.PLAINTEXT.name(); + String saslMechanism = KafkaConnection.SaslMechanism.PLAIN.name(); + String topic = "my_topic"; + + if (!testData) + { + bootstrapServers = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.BOOTSTRAP_SERVERS) + .valueAsString(0); + schemaRegistryUrl = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.SCHEMA_REGISTRY_URL) + .valueAsString(0); + securityProtocol = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.SECURITY_PROTOCOL) + .valueAsString(0); + saslMechanism = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.SASL_MECHANISM) + .valueAsString(0); + topic = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.TOPIC) + .valueAsString(0); + } + + return List.of(new MetaParameter(KafkaCatalog.BOOTSTRAP_SERVERS, bootstrapServers, "Kafka bootstrap servers"), + new MetaParameter(KafkaCatalog.SCHEMA_REGISTRY_URL, schemaRegistryUrl, "Schema registry url"), + new MetaParameter(KafkaCatalog.SECURITY_PROTOCOL, securityProtocol, "Kafka security protocol"), new MetaParameter(KafkaCatalog.SASL_MECHANISM, saslMechanism, "Kafka sasl mechanism"), + new MetaParameter(KafkaCatalog.TOPIC, topic, "Default topic")); + } + + @Override + public ExceptionAction handleException(IQuerySession querySession, CatalogException exception) + { + if (!(exception instanceof CredentialsException)) + { + return ExceptionAction.NONE; + } + + KafkaConnection selectedConnection = (KafkaConnection) quickPropertiesPanel.connections.getSelectedItem(); + KafkaConnection connection = connectionsModel.findConnection(querySession, catalogAlias); + + if (connection != null + && selectedConnection != null + && selectedConnection != connection) + { + if (!connectionsModel.prepare(connection, false)) + { + return ExceptionAction.NONE; + } + + connection.setup(querySession, catalogAlias); + return ExceptionAction.RERUN; + } + + if (connection != null) + { + connection.setRuntimeSaslJaasPassword(null); + if (!connectionsModel.prepare(connection, false)) + { + return ExceptionAction.NONE; + } + + connection.setup(querySession, catalogAlias); + quickPropertiesPanel.updateAuthStatus(connection); + return ExceptionAction.RERUN; + } + + return ExceptionAction.NONE; + } + + void setupConnection(KafkaConnection connection) + { + IQueryFile queryFile = queryFileProvider.getCurrentFile(); + if (queryFile == null) + { + return; + } + if (connection != null + && queryFile.getEngineState() instanceof IPayloadbuilderState state) + { + connection.setup(state.getQuerySession(), catalogAlias); + state.getQuerySession() + .setCatalogProperty(catalogAlias, KafkaCatalog.TOPIC, (String) null); + } + } + + void setupTopic(String topic) + { + IQueryFile queryFile = queryFileProvider.getCurrentFile(); + if (queryFile == null) + { + return; + } + if (queryFile.getEngineState() instanceof IPayloadbuilderState state) + { + state.getQuerySession() + .setCatalogProperty(catalogAlias, KafkaCatalog.TOPIC, !isBlank(topic) ? topic + : null); + } + } + + void update(IQueryFile queryFile) + { + if (queryFile == null) + { + return; + } + IPayloadbuilderState state = (IPayloadbuilderState) queryFile.getEngineState(); + if (state == null) + { + return; + } + + IQuerySession session = state.getQuerySession(); + + KafkaConnection connectionToSet = connectionsModel.findConnection(session, catalogAlias); + boolean sessionHasConnection = connectionToSet != null; + String topic = session.getCatalogProperty(catalogAlias, KafkaCatalog.TOPIC) + .valueAsString(0); + + if (connectionToSet == null + && connectionsModel.getSize() > 0) + { + connectionToSet = connectionsModel.getElementAt(0); + } + + if (!sessionHasConnection + && connectionToSet != null) + { + connectionToSet.setup(session, catalogAlias); + } + + String topicToSet = null; + if (connectionToSet != null) + { + List topics = getIfNull(connectionsModel.getTopics(connectionToSet, false, false), emptyList()); + for (String item : topics) + { + if (CI.equals(item, topic)) + { + topicToSet = item; + break; + } + } + } + + final KafkaConnection connectionFinal = connectionToSet; + final String topicFinal = topicToSet; + + SwingUtilities.invokeLater(() -> + { + quickPropertiesPanel.suppressEvents = true; + try + { + quickPropertiesPanel.connections.getModel() + .setSelectedItem(connectionFinal); + quickPropertiesPanel.populateTopics(connectionFinal); + quickPropertiesPanel.topics.getModel() + .setSelectedItem(topicFinal); + quickPropertiesPanel.updateAuthStatus(connectionFinal); + } + finally + { + quickPropertiesPanel.suppressEvents = false; + } + }); + } + + /** Quick properties panel. */ + class QuickPropertiesPanel extends JPanel implements ICatalogExtensionView + { + private static final String PROTOTYPE_TOPIC = "topic-with-a-very-long-name-to-display"; + private final DefaultComboBoxModel topicsModel = new DefaultComboBoxModel<>(); + private final JLabel authStatus = new JLabel(); + private final JButton reload = new JButton("Reload"); + boolean suppressEvents; + + final JComboBox connections; + final JComboBox topics; + + QuickPropertiesPanel() + { + setLayout(new GridBagLayout()); + + connections = new JComboBox<>(new ConnectionsSelectionModel()); + connections.setRenderer(new DefaultListCellRenderer() + { + @Override + public Component getListCellRendererComponent(JList list, Object value, int index, boolean isSelected, boolean cellHasFocus) + { + super.getListCellRendererComponent(list, value, index, isSelected, cellHasFocus); + if (value instanceof KafkaConnection connection) + { + if (!connection.isEnabled()) + { + setText("" + connection + ""); + } + else + { + setText(connection.toString()); + } + } + return this; + } + }); + connections.addItemListener(l -> + { + if (suppressEvents) + { + return; + } + + suppressEvents = true; + try + { + KafkaConnection connection = (KafkaConnection) connections.getSelectedItem(); + topicsModel.removeAllElements(); + if (connection != null) + { + connectionsModel.prepare(connection, true); + setupConnection(connection); + loadTopics(connection, false); + } + updateAuthStatus(connection); + } + finally + { + suppressEvents = false; + } + }); + + topics = new JComboBox<>(topicsModel); + topics.setPrototypeDisplayValue(PROTOTYPE_TOPIC); + topics.setMaximumRowCount(25); + AutoCompletionComboBox.enable(topics); + topics.addItemListener(l -> + { + if (suppressEvents) + { + return; + } + setupTopic((String) topics.getSelectedItem()); + }); + + connectionsModel.registerReloadButton(reload); + reload.addActionListener(l -> + { + KafkaConnection connection = (KafkaConnection) connections.getSelectedItem(); + if (connection == null + || !connection.isEnabled()) + { + return; + } + if (!connectionsModel.prepare(connection, false)) + { + return; + } + + setupConnection(connection); + loadTopics(connection, true); + }); + + add(new JLabel("Connection"), new GridBagConstraints(0, 0, 1, 1, 0.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.NONE, new Insets(0, 0, 1, 3), 0, 0)); + add(connections, new GridBagConstraints(1, 0, 1, 1, 1.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.HORIZONTAL, new Insets(0, 0, 1, 0), 0, 0)); + add(new JLabel("Topic"), new GridBagConstraints(0, 1, 1, 1, 0.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.NONE, new Insets(0, 0, 1, 3), 0, 0)); + add(topics, new GridBagConstraints(1, 1, 1, 1, 1.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.HORIZONTAL, new Insets(0, 0, 1, 0), 0, 0)); + add(authStatus, new GridBagConstraints(0, 2, 1, 1, 0.0, 1.0, GridBagConstraints.NORTH, GridBagConstraints.HORIZONTAL, new Insets(3, 0, 0, 0), 0, 0)); + add(reload, new GridBagConstraints(1, 2, 1, 1, 1.0, 1.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.HORIZONTAL, new Insets(0, 0, 0, 0), 0, 0)); + setPreferredSize(new Dimension(240, 75)); + } + + @Override + public void afterExecute(IQueryFile queryFile) + { + if (queryFileProvider.getCurrentFile() == queryFile) + { + KafkaCatalogExtension.this.update(queryFile); + } + } + + @Override + public void focus(IQueryFile queryFile) + { + KafkaCatalogExtension.this.update(queryFile); + } + + void updateAuthStatus(KafkaConnection connection) + { + if (connection == connections.getSelectedItem()) + { + boolean hasCredentials = connection != null + && connection.hasCredentials(); + if (connection != null + && connection.isSaslEnabled()) + { + authStatus.setText(hasCredentials ? "Authenticated" + : "Credentials required"); + } + else + { + authStatus.setText(""); + } + } + } + + void populateTopics(KafkaConnection connection) + { + topicsModel.removeAllElements(); + if (connection == null + || connection.getTopics() == null) + { + return; + } + for (String topic : connection.getTopics()) + { + topicsModel.addElement(topic); + } + } + + private void loadTopics(KafkaConnection connection, boolean forceReload) + { + if (!connection.hasCredentials()) + { + return; + } + + IQueryFile currentFile = queryFileProvider.getCurrentFile(); + Runnable load = () -> + { + try + { + connectionsModel.getTopics(connection, forceReload, true); + SwingUtilities.invokeLater(() -> + { + Object selected = topics.getSelectedItem(); + populateTopics(connection); + topics.setSelectedItem(selected); + updateAuthStatus(connection); + }); + } + catch (Exception e) + { + if (currentFile != null) + { + e.printStackTrace(currentFile.getMessagesWriter()); + currentFile.focusMessages(); + } + else + { + e.printStackTrace(); + } + } + }; + + new Thread(load).start(); + } + } + + private class ConnectionsSelectionModel extends DefaultComboBoxModel + { + private int selectedItemIndex; + + @Override + public Object getSelectedItem() + { + if (selectedItemIndex >= 0 + && selectedItemIndex < connectionsModel.getConnections() + .size()) + { + return connectionsModel.getElementAt(selectedItemIndex); + } + return null; + } + + @Override + public void setSelectedItem(Object connection) + { + int newIndex = connectionsModel.getConnections() + .indexOf(connection); + if (newIndex == -1) + { + return; + } + + if (selectedItemIndex != newIndex) + { + selectedItemIndex = newIndex; + fireContentsChanged(this, -1, -1); + } + } + + @Override + public int getSize() + { + return connectionsModel.getSize(); + } + + @Override + public KafkaConnection getElementAt(int index) + { + return connectionsModel.getElementAt(index); + } + + @Override + public void addListDataListener(ListDataListener l) + { + super.addListDataListener(l); + connectionsModel.addListDataListener(l); + } + + @Override + public void removeListDataListener(ListDataListener l) + { + super.removeListDataListener(l); + connectionsModel.removeListDataListener(l); + } + } +} diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionFactory.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionFactory.java new file mode 100644 index 00000000..5f1b0938 --- /dev/null +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionFactory.java @@ -0,0 +1,44 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import static java.util.Objects.requireNonNull; + +import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension; +import com.queryeer.api.extensions.payloadbuilder.ICatalogExtensionFactory; +import com.queryeer.api.service.IQueryFileProvider; + +/** Extension factory for {@link KafkaCatalogExtension}. */ +class KafkaCatalogExtensionFactory implements ICatalogExtensionFactory +{ + private final IQueryFileProvider queryFileProvider; + private final KafkaConnectionsModel connectionsModel; + + KafkaCatalogExtensionFactory(IQueryFileProvider queryFileProvider, KafkaConnectionsModel connectionsModel) + { + this.queryFileProvider = requireNonNull(queryFileProvider, "queryFileProvider"); + this.connectionsModel = requireNonNull(connectionsModel, "connectionsModel"); + } + + @Override + public ICatalogExtension create(String catalogAlias) + { + return new KafkaCatalogExtension(queryFileProvider, connectionsModel, catalogAlias); + } + + @Override + public String getTitle() + { + return KafkaCatalogExtension.CATALOG.getName(); + } + + @Override + public String getDefaultAlias() + { + return "kafka"; + } + + @Override + public int order() + { + return 2; + } +} diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnection.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnection.java new file mode 100644 index 00000000..a9085c10 --- /dev/null +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnection.java @@ -0,0 +1,376 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import static org.apache.commons.lang3.StringUtils.defaultIfBlank; +import static org.apache.commons.lang3.StringUtils.isBlank; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.queryeer.api.component.IPropertyAware; +import com.queryeer.api.component.Property; + +import se.kuseman.payloadbuilder.api.execution.IQuerySession; + +/** Kafka connection definition. */ +class KafkaConnection implements IPropertyAware +{ + @JsonProperty + private String name; + @JsonProperty + private String bootstrapServers = "localhost:9092"; + @JsonProperty + private String schemaRegistryUrl; + @JsonProperty + private SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + @JsonProperty + private SaslMechanism saslMechanism = SaslMechanism.PLAIN; + @JsonProperty + private String saslJaasLoginModule = "org.apache.kafka.common.security.plain.PlainLoginModule"; + @JsonProperty + private String saslJaasControlFlag = "required"; + @JsonProperty + private String saslJaasUsername; + /** Stored encrypted password if set. */ + @JsonProperty + private String saslJaasPassword; + @JsonProperty + private String saslJaasOptions; + @JsonProperty + private boolean enabled = true; + + @JsonIgnore + private List topics; + /** Runtime decrypted / prompted password. */ + @JsonIgnore + private char[] runtimeSaslJaasPassword; + + KafkaConnection() + { + } + + KafkaConnection(KafkaConnection source) + { + this.name = source.name; + this.bootstrapServers = source.bootstrapServers; + this.schemaRegistryUrl = source.schemaRegistryUrl; + this.securityProtocol = source.securityProtocol; + this.saslMechanism = source.saslMechanism; + this.saslJaasLoginModule = source.saslJaasLoginModule; + this.saslJaasControlFlag = source.saslJaasControlFlag; + this.saslJaasUsername = source.saslJaasUsername; + this.saslJaasPassword = source.saslJaasPassword; + this.saslJaasOptions = source.saslJaasOptions; + this.enabled = source.enabled; + this.topics = source.topics != null ? new ArrayList<>(source.topics) + : null; + this.runtimeSaslJaasPassword = source.runtimeSaslJaasPassword != null ? Arrays.copyOf(source.runtimeSaslJaasPassword, source.runtimeSaslJaasPassword.length) + : null; + } + + void setup(IQuerySession querySession, String catalogAlias) + { + querySession.setCatalogProperty(catalogAlias, KafkaCatalog.BOOTSTRAP_SERVERS, bootstrapServers); + querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SCHEMA_REGISTRY_URL, schemaRegistryUrl); + querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SECURITY_PROTOCOL, securityProtocol.name()); + querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SASL_MECHANISM, saslMechanism.name()); + querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SASL_JAAS_CONFIG, getSaslJaasConfig()); + } + + Properties toAdminClientProperties() + { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("security.protocol", securityProtocol.name()); + + if (isSaslEnabled()) + { + props.put("sasl.mechanism", saslMechanism.name()); + String jaasConfig = getSaslJaasConfig(); + if (!isBlank(jaasConfig)) + { + props.put("sasl.jaas.config", jaasConfig); + } + } + return props; + } + + boolean isSaslEnabled() + { + return securityProtocol == SecurityProtocol.SASL_PLAINTEXT + || securityProtocol == SecurityProtocol.SASL_SSL; + } + + boolean hasCredentials() + { + return !isSaslEnabled() + || (!isBlank(saslJaasUsername) + && runtimeSaslJaasPassword != null + && runtimeSaslJaasPassword.length > 0); + } + + String getSaslJaasConfig() + { + if (!isSaslEnabled()) + { + return null; + } + + String username = defaultIfBlank(saslJaasUsername, null); + String password = runtimeSaslJaasPassword != null ? new String(runtimeSaslJaasPassword) + : null; + + if (isBlank(saslJaasLoginModule) + || isBlank(saslJaasControlFlag) + || isBlank(username) + || isBlank(password)) + { + return null; + } + + StringBuilder sb = new StringBuilder(256); + sb.append(saslJaasLoginModule) + .append(' ') + .append(saslJaasControlFlag) + .append(' ') + .append("username=\"") + .append(escapeJaasValue(username)) + .append("\" ") + .append("password=\"") + .append(escapeJaasValue(password)) + .append('"'); + + if (!isBlank(saslJaasOptions)) + { + sb.append(' ') + .append(saslJaasOptions.trim()); + } + if (!sb.toString() + .endsWith(";")) + { + sb.append(';'); + } + return sb.toString(); + } + + private static String escapeJaasValue(String value) + { + return value.replace("\\", "\\\\") + .replace("\"", "\\\""); + } + + @Property( + order = -1, + title = "Name") + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + @Property( + order = 1, + title = "Bootstrap Servers", + tooltip = "Comma separated values are supported") + public String getBootstrapServers() + { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + } + + @Property( + order = 2, + title = "Schema Registry URL", + tooltip = "Comma separated values are supported") + public String getSchemaRegistryUrl() + { + return schemaRegistryUrl; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) + { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + @Property( + order = 3, + title = "security_protocol") + public SecurityProtocol getSecurityProtocol() + { + return securityProtocol; + } + + public void setSecurityProtocol(SecurityProtocol securityProtocol) + { + this.securityProtocol = securityProtocol; + } + + @Property( + order = 4, + title = "sasl_mechanism", + visibleAware = true) + public SaslMechanism getSaslMechanism() + { + return saslMechanism; + } + + public void setSaslMechanism(SaslMechanism saslMechanism) + { + this.saslMechanism = saslMechanism; + } + + @Property( + order = 5, + title = "sasl_jaas_login_module", + visibleAware = true) + public String getSaslJaasLoginModule() + { + return saslJaasLoginModule; + } + + public void setSaslJaasLoginModule(String saslJaasLoginModule) + { + this.saslJaasLoginModule = saslJaasLoginModule; + } + + @Property( + order = 6, + title = "sasl_jaas_control_flag", + visibleAware = true) + public String getSaslJaasControlFlag() + { + return saslJaasControlFlag; + } + + public void setSaslJaasControlFlag(String saslJaasControlFlag) + { + this.saslJaasControlFlag = saslJaasControlFlag; + } + + @Property( + order = 7, + title = "sasl_jaas_username", + visibleAware = true) + public String getSaslJaasUsername() + { + return saslJaasUsername; + } + + public void setSaslJaasUsername(String saslJaasUsername) + { + this.saslJaasUsername = saslJaasUsername; + } + + @Property( + order = 8, + title = "sasl_jaas_password", + visibleAware = true, + password = true) + public String getSaslJaasPassword() + { + return saslJaasPassword; + } + + public void setSaslJaasPassword(String saslJaasPassword) + { + this.saslJaasPassword = saslJaasPassword; + } + + @Property( + order = 9, + title = "sasl_jaas_options", + visibleAware = true, + tooltip = "Optional extra JAAS options, e.g. serviceName=\"kafka\"") + public String getSaslJaasOptions() + { + return saslJaasOptions; + } + + public void setSaslJaasOptions(String saslJaasOptions) + { + this.saslJaasOptions = saslJaasOptions; + } + + @Property( + order = 10, + title = "Enabled") + public boolean isEnabled() + { + return enabled; + } + + public void setEnabled(boolean enabled) + { + this.enabled = enabled; + } + + List getTopics() + { + return topics; + } + + void setTopics(List topics) + { + this.topics = topics; + } + + char[] getRuntimeSaslJaasPassword() + { + return runtimeSaslJaasPassword; + } + + void setRuntimeSaslJaasPassword(char[] runtimeSaslJaasPassword) + { + this.runtimeSaslJaasPassword = runtimeSaslJaasPassword; + } + + @Override + public boolean visible(String property) + { + if ("saslMechanism".equals(property) + || "saslJaasLoginModule".equals(property) + || "saslJaasControlFlag".equals(property) + || "saslJaasUsername".equals(property) + || "saslJaasPassword".equals(property) + || "saslJaasOptions".equals(property)) + { + return isSaslEnabled(); + } + return true; + } + + @Override + public String toString() + { + return Objects.toString(name, bootstrapServers); + } + + public enum SecurityProtocol + { + PLAINTEXT, + SSL, + SASL_PLAINTEXT, + SASL_SSL + } + + public enum SaslMechanism + { + PLAIN, + SCRAM_SHA_256, + SCRAM_SHA_512, + GSSAPI, + OAUTHBEARER + } +} diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsConfigurable.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsConfigurable.java new file mode 100644 index 00000000..e33c9cd2 --- /dev/null +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsConfigurable.java @@ -0,0 +1,172 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonMap; +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.ObjectUtils.getIfNull; +import static org.apache.commons.lang3.StringUtils.isBlank; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.queryeer.api.component.ListPropertiesComponent; +import com.queryeer.api.extensions.IConfigurable; +import com.queryeer.api.service.IConfig; +import com.queryeer.api.service.ICryptoService; + +/** Kafka configurable. */ +class KafkaConnectionsConfigurable implements IConfigurable +{ + private static final String NAME = "se.kuseman.payloadbuilder.catalog.kafka.KafkaCatalogExtension"; + private static final String CONNECTIONS = "connections"; + + private final IConfig config; + private final ICryptoService cryptoService; + private final KafkaConnectionsModel connectionsModel; + private final List> dirtyStateConsumers = new ArrayList<>(); + + private ListPropertiesComponent configComponent; + + KafkaConnectionsConfigurable(IConfig config, ICryptoService cryptoService, KafkaConnectionsModel connectionsModel) + { + this.config = requireNonNull(config, "config"); + this.cryptoService = requireNonNull(cryptoService, "cryptoService"); + this.connectionsModel = requireNonNull(connectionsModel, "connectionsModel"); + loadSettings(); + } + + @Override + public ListPropertiesComponent getComponent() + { + if (configComponent == null) + { + configComponent = new ListPropertiesComponent<>(KafkaConnection.class, this::notifyDirty, this::newConnection, this::cloneConnection); + configComponent.init(connectionsModel.copyConnections()); + } + return configComponent; + } + + @Override + public void revertChanges() + { + getComponent().init(connectionsModel.copyConnections()); + } + + @Override + public boolean commitChanges() + { + connectionsModel.setConnections(getComponent().getResult()); + + for (KafkaConnection con : connectionsModel.getConnections()) + { + String pass = con.getSaslJaasPassword(); + if (isBlank(pass)) + { + continue; + } + + con.setRuntimeSaslJaasPassword(pass.toCharArray()); + + String encryptedPass = cryptoService.encryptString(pass); + if (encryptedPass == null) + { + return false; + } + con.setSaslJaasPassword(encryptedPass); + } + + config.saveExtensionConfig(NAME, singletonMap(CONNECTIONS, connectionsModel.getConnections())); + getComponent().init(connectionsModel.copyConnections()); + return true; + } + + @Override + public EncryptionResult reEncryptSecrets(ICryptoService newCryptoService) + { + boolean change = false; + for (KafkaConnection con : getComponent().getResult()) + { + String pass = con.getSaslJaasPassword(); + if (isBlank(pass)) + { + continue; + } + + if ((pass = cryptoService.decryptString(pass)) == null) + { + return EncryptionResult.ABORT; + } + + con.setRuntimeSaslJaasPassword(pass.toCharArray()); + con.setSaslJaasPassword(newCryptoService.encryptString(pass)); + change = true; + } + return change ? EncryptionResult.SUCCESS + : EncryptionResult.NO_CHANGE; + } + + @Override + public String getTitle() + { + return "Connections"; + } + + @Override + public String getLongTitle() + { + return "Connections to Kafka"; + } + + @Override + public String groupName() + { + return KafkaConstants.TITLE; + } + + @Override + public void addDirtyStateConsumer(Consumer consumer) + { + dirtyStateConsumers.add(consumer); + } + + @Override + public void removeDirtyStateConsumer(Consumer consumer) + { + dirtyStateConsumers.remove(consumer); + } + + private void notifyDirty(boolean dirty) + { + dirtyStateConsumers.forEach(c -> c.accept(dirty)); + } + + private KafkaConnection newConnection() + { + KafkaConnection connection = new KafkaConnection(); + connection.setName("New Kafka Connection"); + return connection; + } + + private KafkaConnection cloneConnection(KafkaConnection connection) + { + KafkaConnection newConnection = new KafkaConnection(connection); + newConnection.setName(connection.getName() + " - Copy"); + return newConnection; + } + + private void loadSettings() + { + Map settings = config.loadExtensionConfig(NAME); + List connections = KafkaConstants.MAPPER.convertValue(getIfNull(settings.get(CONNECTIONS), emptyList()), new TypeReference>() + { + }); + if (connections == null) + { + return; + } + connectionsModel.setConnections(connections); + } +} diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsModel.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsModel.java new file mode 100644 index 00000000..47da8a4b --- /dev/null +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsModel.java @@ -0,0 +1,217 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import static java.util.Collections.emptyList; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.Strings.CI; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.swing.AbstractListModel; +import javax.swing.JButton; +import javax.swing.SwingUtilities; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsOptions; + +import com.queryeer.api.IQueryFile; +import com.queryeer.api.extensions.Inject; +import com.queryeer.api.service.ICryptoService; +import com.queryeer.api.service.IQueryFileProvider; +import com.queryeer.api.utils.CredentialUtils; +import com.queryeer.api.utils.CredentialUtils.Credentials; + +import se.kuseman.payloadbuilder.api.execution.IQuerySession; +import se.kuseman.payloadbuilder.api.execution.ValueVector; + +/** Model for {@link KafkaCatalogExtension}'s connections. */ +@Inject +class KafkaConnectionsModel extends AbstractListModel +{ + private final IQueryFileProvider queryFileProvider; + private final ICryptoService cryptoService; + private final List connections = new ArrayList<>(); + private final List reloadButtons = new ArrayList<>(); + + KafkaConnectionsModel(IQueryFileProvider queryFileProvider, ICryptoService cryptoService) + { + this.queryFileProvider = requireNonNull(queryFileProvider, "queryFileProvider"); + this.cryptoService = requireNonNull(cryptoService, "cryptoService"); + } + + void registerReloadButton(JButton button) + { + reloadButtons.add(button); + } + + @Override + public int getSize() + { + return connections.size(); + } + + @Override + public KafkaConnection getElementAt(int index) + { + return connections.get(index); + } + + List copyConnections() + { + return connections.stream() + .map(KafkaConnection::new) + .collect(toList()); + } + + List getConnections() + { + return connections; + } + + void setConnections(List connections) + { + this.connections.clear(); + this.connections.addAll(connections); + fireContentsChanged(this, 0, getSize() - 1); + } + + KafkaConnection findConnection(IQuerySession querySession, String catalogAlias) + { + ValueVector property = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.BOOTSTRAP_SERVERS); + if (property == null + || property.isNull(0)) + { + return null; + } + String bootstrapServers = property.valueAsString(0); + + int size = getSize(); + for (int i = 0; i < size; i++) + { + KafkaConnection connection = getElementAt(i); + if (CI.equals(connection.getBootstrapServers(), bootstrapServers)) + { + return connection; + } + } + return null; + } + + protected Credentials getCredentials(String connectionDescription, String prefilledUsername, boolean readOnlyUsername) + { + return CredentialUtils.getCredentials(connectionDescription, prefilledUsername, readOnlyUsername); + } + + boolean prepare(KafkaConnection connection, boolean silent) + { + if (connection == null) + { + return false; + } + + if (connection.hasCredentials()) + { + return true; + } + + if (!connection.isSaslEnabled()) + { + return true; + } + + if (isBlank(connection.getSaslJaasPassword())) + { + if (silent) + { + return false; + } + + Credentials credentials = getCredentials(connection.toString(), connection.getSaslJaasUsername(), false); + if (credentials == null) + { + return false; + } + + if (!isBlank(credentials.getUsername())) + { + connection.setSaslJaasUsername(credentials.getUsername()); + } + connection.setRuntimeSaslJaasPassword(credentials.getPassword()); + return true; + } + + if (silent + && !cryptoService.isInitalized()) + { + return false; + } + + String decrypted = cryptoService.decryptString(connection.getSaslJaasPassword()); + if (decrypted == null) + { + return false; + } + connection.setRuntimeSaslJaasPassword(decrypted.toCharArray()); + return true; + } + + List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError) + { + SwingUtilities.invokeLater(() -> reloadButtons.forEach(b -> b.setEnabled(false))); + try + { + synchronized (connection) + { + List topics = connection.getTopics(); + if ((!forceReload + && topics != null) + || (!forceReload + && !connection.hasCredentials())) + { + return topics != null ? topics + : emptyList(); + } + + try (AdminClient admin = AdminClient.create(connection.toAdminClientProperties())) + { + Set names = admin.listTopics(new ListTopicsOptions().listInternal(false)) + .names() + .get(15, TimeUnit.SECONDS); + List result = new ArrayList<>(names); + result.sort(String.CASE_INSENSITIVE_ORDER); + connection.setTopics(result); + } + catch (Exception e) + { + connection.setTopics(emptyList()); + connection.setRuntimeSaslJaasPassword(null); + + IQueryFile queryFile = queryFileProvider.getCurrentFile(); + if (queryFile != null) + { + e.printStackTrace(queryFile.getMessagesWriter()); + queryFile.focusMessages(); + } + else + { + e.printStackTrace(); + } + + if (reThrowError) + { + throw new RuntimeException(e); + } + } + return connection.getTopics(); + } + } + finally + { + SwingUtilities.invokeLater(() -> reloadButtons.forEach(b -> b.setEnabled(true))); + } + } +} diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConstants.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConstants.java new file mode 100644 index 00000000..20cce12b --- /dev/null +++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConstants.java @@ -0,0 +1,11 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** Constants for Kafka module. */ +interface KafkaConstants +{ + String TITLE = "Kafka"; + ObjectMapper MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); +} diff --git a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java index c8687a9a..b049ed9f 100644 --- a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java +++ b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java @@ -29,6 +29,7 @@ import se.kuseman.payloadbuilder.api.catalog.CatalogException; import se.kuseman.payloadbuilder.api.catalog.Column.Type; import se.kuseman.payloadbuilder.api.execution.IQuerySession; +import se.kuseman.payloadbuilder.catalog.AuthType; import se.kuseman.payloadbuilder.catalog.CredentialsException; import se.kuseman.payloadbuilder.catalog.es.ESConnectionsModel.Connection; import se.kuseman.payloadbuilder.test.VectorTestUtils; diff --git a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionTest.java b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionTest.java new file mode 100644 index 00000000..0d2de82b --- /dev/null +++ b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionTest.java @@ -0,0 +1,189 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; + +import javax.swing.SwingUtilities; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.queryeer.api.IQueryFile; +import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension; +import com.queryeer.api.extensions.payloadbuilder.IPayloadbuilderState; +import com.queryeer.api.service.ICryptoService; +import com.queryeer.api.service.IQueryFileProvider; + +import se.kuseman.payloadbuilder.api.catalog.CatalogException; +import se.kuseman.payloadbuilder.api.catalog.Column.Type; +import se.kuseman.payloadbuilder.api.execution.IQuerySession; +import se.kuseman.payloadbuilder.test.VectorTestUtils; + +/** Test of {@link KafkaCatalogExtension}. */ +class KafkaCatalogExtensionTest +{ + @Test + void test_handle_exception_non_credentials() + { + IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class); + ICryptoService cryptoService = Mockito.mock(ICryptoService.class); + IQuerySession session = Mockito.mock(IQuerySession.class); + + KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService); + KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka"); + + assertEquals(ICatalogExtension.ExceptionAction.NONE, extension.handleException(session, new CatalogException("kafka", "boom"))); + } + + @Test + void test_setup() throws InvocationTargetException, InterruptedException + { + IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class); + IQuerySession session = Mockito.mock(IQuerySession.class); + IQueryFile queryFile = Mockito.mock(IQueryFile.class); + ICryptoService cryptoService = Mockito.mock(ICryptoService.class); + IPayloadbuilderState state = Mockito.mock(IPayloadbuilderState.class); + when(state.getQuerySession()).thenReturn(session); + + when(queryFileProvider.getCurrentFile()).thenReturn(queryFile); + when(queryFile.getEngineState()).thenReturn(state); + + KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService) + { + @Override + List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError) + { + return connection.getTopics(); + } + }; + + KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka"); + + KafkaConnection connection = new KafkaConnection(); + connection.setName("dev"); + connection.setBootstrapServers("localhost:9092"); + connection.setSchemaRegistryUrl("http://localhost:8081"); + connection.setSecurityProtocol(KafkaConnection.SecurityProtocol.SASL_SSL); + connection.setSaslMechanism(KafkaConnection.SaslMechanism.SCRAM_SHA_512); + connection.setSaslJaasLoginModule("org.apache.kafka.common.security.scram.ScramLoginModule"); + connection.setSaslJaasControlFlag("required"); + connection.setSaslJaasUsername("user"); + connection.setRuntimeSaslJaasPassword("password".toCharArray()); + connection.setTopics(asList("topic_a", "topic_b")); + model.setConnections(asList(connection)); + + extension.setupConnection(connection); + + Mockito.verify(session) + .setCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS, "localhost:9092"); + Mockito.verify(session) + .setCatalogProperty("kafka", KafkaCatalog.SCHEMA_REGISTRY_URL, "http://localhost:8081"); + Mockito.verify(session) + .setCatalogProperty("kafka", KafkaCatalog.SECURITY_PROTOCOL, KafkaConnection.SecurityProtocol.SASL_SSL.name()); + Mockito.verify(session) + .setCatalogProperty("kafka", KafkaCatalog.SASL_MECHANISM, KafkaConnection.SaslMechanism.SCRAM_SHA_512.name()); + + extension.setupTopic("topic_b"); + Mockito.verify(session) + .setCatalogProperty("kafka", KafkaCatalog.TOPIC, "topic_b"); + } + + @Test + void test_update() throws InvocationTargetException, InterruptedException + { + IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class); + IQuerySession session = Mockito.mock(IQuerySession.class); + IQueryFile queryFile = Mockito.mock(IQueryFile.class); + ICryptoService cryptoService = Mockito.mock(ICryptoService.class); + IPayloadbuilderState state = Mockito.mock(IPayloadbuilderState.class); + when(state.getQuerySession()).thenReturn(session); + when(queryFileProvider.getCurrentFile()).thenReturn(queryFile); + when(queryFile.getEngineState()).thenReturn(state); + + KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService) + { + @Override + List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError) + { + return connection.getTopics(); + } + }; + + KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka"); + + KafkaConnection connection1 = new KafkaConnection(); + connection1.setBootstrapServers("localhost:9092"); + connection1.setTopics(asList("orders", "payments")); + KafkaConnection connection2 = new KafkaConnection(); + connection2.setBootstrapServers("localhost:9093"); + connection2.setTopics(asList("metrics")); + model.setConnections(asList(connection1, connection2)); + + KafkaCatalogExtension.QuickPropertiesPanel qp = (KafkaCatalogExtension.QuickPropertiesPanel) extension.getQuickPropertiesComponent(); + + doReturn(VectorTestUtils.vv(Type.String, "localhost:9092")).when(session) + .getCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS); + doReturn(VectorTestUtils.vv(Type.String, "payments")).when(session) + .getCatalogProperty("kafka", KafkaCatalog.TOPIC); + + SwingUtilities.invokeAndWait(() -> extension.update(queryFile)); + SwingUtilities.invokeAndWait(() -> + { + // Flush queued UI updates from invokeLater in extension.update + }); + + assertEquals("localhost:9092", ((KafkaConnection) qp.connections.getSelectedItem()).getBootstrapServers()); + assertEquals("payments", qp.topics.getSelectedItem()); + } + + @Test + void test_update_sets_default_connection_in_session_when_missing() throws InvocationTargetException, InterruptedException + { + IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class); + IQuerySession session = Mockito.mock(IQuerySession.class); + IQueryFile queryFile = Mockito.mock(IQueryFile.class); + ICryptoService cryptoService = Mockito.mock(ICryptoService.class); + IPayloadbuilderState state = Mockito.mock(IPayloadbuilderState.class); + when(state.getQuerySession()).thenReturn(session); + when(queryFileProvider.getCurrentFile()).thenReturn(queryFile); + when(queryFile.getEngineState()).thenReturn(state); + + KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService) + { + @Override + List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError) + { + return connection.getTopics(); + } + }; + + KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka"); + + KafkaConnection connection1 = new KafkaConnection(); + connection1.setBootstrapServers("localhost:9092"); + connection1.setTopics(asList("orders", "payments")); + model.setConnections(asList(connection1)); + + KafkaCatalogExtension.QuickPropertiesPanel qp = (KafkaCatalogExtension.QuickPropertiesPanel) extension.getQuickPropertiesComponent(); + + doReturn(null).when(session) + .getCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS); + doReturn(VectorTestUtils.vv(Type.String, "payments")).when(session) + .getCatalogProperty("kafka", KafkaCatalog.TOPIC); + + SwingUtilities.invokeAndWait(() -> extension.update(queryFile)); + SwingUtilities.invokeAndWait(() -> + { + // Flush queued UI updates from invokeLater in extension.update + }); + + Mockito.verify(session) + .setCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS, "localhost:9092"); + assertEquals("localhost:9092", ((KafkaConnection) qp.connections.getSelectedItem()).getBootstrapServers()); + } +} diff --git a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionTest.java b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionTest.java new file mode 100644 index 00000000..54ff973c --- /dev/null +++ b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionTest.java @@ -0,0 +1,36 @@ +package se.kuseman.payloadbuilder.catalog.kafka; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +/** Test of {@link KafkaConnection}. */ +class KafkaConnectionTest +{ + @Test + void test_jaas_config_assembly() + { + KafkaConnection connection = new KafkaConnection(); + connection.setSecurityProtocol(KafkaConnection.SecurityProtocol.SASL_SSL); + connection.setSaslMechanism(KafkaConnection.SaslMechanism.SCRAM_SHA_512); + connection.setSaslJaasLoginModule("org.apache.kafka.common.security.scram.ScramLoginModule"); + connection.setSaslJaasControlFlag("required"); + connection.setSaslJaasUsername("user"); + connection.setRuntimeSaslJaasPassword("pa\"ss\\word".toCharArray()); + connection.setSaslJaasOptions("serviceName=\"kafka\""); + + assertEquals("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"pa\\\"ss\\\\word\" serviceName=\"kafka\";", connection.getSaslJaasConfig()); + } + + @Test + void test_jaas_config_null_when_not_sasl() + { + KafkaConnection connection = new KafkaConnection(); + connection.setSecurityProtocol(KafkaConnection.SecurityProtocol.PLAINTEXT); + connection.setSaslJaasUsername("user"); + connection.setRuntimeSaslJaasPassword("password".toCharArray()); + + assertNull(connection.getSaslJaasConfig()); + } +}