diff --git a/pom.xml b/pom.xml
index 04d12b4d..4bc0a15c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
5.14.2
1.7.36
2.43.0
- 1.10.1
+ 1.10.1-SNAPSHOT
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java
index 8bdf24e1..55c80c9c 100644
--- a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtension.java
@@ -38,6 +38,7 @@
import se.kuseman.payloadbuilder.api.catalog.CatalogException;
import se.kuseman.payloadbuilder.api.execution.IQuerySession;
import se.kuseman.payloadbuilder.api.execution.UTF8String;
+import se.kuseman.payloadbuilder.catalog.AuthType;
import se.kuseman.payloadbuilder.catalog.Common;
import se.kuseman.payloadbuilder.catalog.CredentialsException;
import se.kuseman.payloadbuilder.catalog.es.ESConnectionsModel.Connection;
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java
index faf1691c..4efc4d5d 100644
--- a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/es/ESConnectionsModel.java
@@ -43,7 +43,9 @@
import se.kuseman.payloadbuilder.api.catalog.CatalogException;
import se.kuseman.payloadbuilder.api.execution.IQuerySession;
+import se.kuseman.payloadbuilder.catalog.AuthType;
import se.kuseman.payloadbuilder.catalog.Common;
+import se.kuseman.payloadbuilder.catalog.HttpClientUtils;
/**
* Model for {@link ESCatalogExtension}'s connections
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java
index 03c7da71..111fe0b1 100644
--- a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogExtensionFactory.java
@@ -3,8 +3,8 @@
import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension;
import com.queryeer.api.extensions.payloadbuilder.ICatalogExtensionFactory;
-/** SPI factory for {@link FilesystemCatalogExtension}. */
-public class FilesystemCatalogExtensionFactory implements ICatalogExtensionFactory
+/** Extension factory for {@link FilesystemCatalogExtension}. */
+class FilesystemCatalogExtensionFactory implements ICatalogExtensionFactory
{
@Override
public ICatalogExtension create(String catalogAlias)
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtension.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtension.java
new file mode 100644
index 00000000..c6014221
--- /dev/null
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtension.java
@@ -0,0 +1,508 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import static java.util.Collections.emptyList;
+import static java.util.Objects.requireNonNull;
+import static org.apache.commons.lang3.ObjectUtils.getIfNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.Strings.CI;
+
+import java.awt.Component;
+import java.awt.Dimension;
+import java.awt.GridBagConstraints;
+import java.awt.GridBagLayout;
+import java.awt.Insets;
+import java.util.List;
+
+import javax.swing.DefaultComboBoxModel;
+import javax.swing.DefaultListCellRenderer;
+import javax.swing.JButton;
+import javax.swing.JComboBox;
+import javax.swing.JLabel;
+import javax.swing.JList;
+import javax.swing.JPanel;
+import javax.swing.SwingUtilities;
+import javax.swing.event.ListDataListener;
+
+import com.queryeer.api.IQueryFile;
+import com.queryeer.api.component.AutoCompletionComboBox;
+import com.queryeer.api.extensions.IConfigurable;
+import com.queryeer.api.extensions.engine.IQueryEngine.IState.MetaParameter;
+import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension;
+import com.queryeer.api.extensions.payloadbuilder.ICatalogExtensionView;
+import com.queryeer.api.extensions.payloadbuilder.IPayloadbuilderState;
+import com.queryeer.api.service.IQueryFileProvider;
+
+import se.kuseman.payloadbuilder.api.catalog.Catalog;
+import se.kuseman.payloadbuilder.api.catalog.CatalogException;
+import se.kuseman.payloadbuilder.api.execution.IQuerySession;
+import se.kuseman.payloadbuilder.catalog.CredentialsException;
+
+/** Queryeer extension for {@link KafkaCatalog}. */
+class KafkaCatalogExtension implements ICatalogExtension
+{
+ static final Catalog CATALOG = new KafkaCatalog();
+
+ private final IQueryFileProvider queryFileProvider;
+ private final KafkaConnectionsModel connectionsModel;
+ private final String catalogAlias;
+
+ private QuickPropertiesPanel quickPropertiesPanel;
+
+ KafkaCatalogExtension(IQueryFileProvider queryFileProvider, KafkaConnectionsModel connectionsModel, String catalogAlias)
+ {
+ this.queryFileProvider = requireNonNull(queryFileProvider, "queryFileProvider");
+ this.connectionsModel = requireNonNull(connectionsModel, "connectionsModel");
+ this.catalogAlias = catalogAlias;
+ }
+
+ @Override
+ public String getTitle()
+ {
+ return CATALOG.getName();
+ }
+
+ @Override
+ public Catalog getCatalog()
+ {
+ return CATALOG;
+ }
+
+ @Override
+ public Class extends IConfigurable> getConfigurableClass()
+ {
+ return KafkaConnectionsConfigurable.class;
+ }
+
+ @Override
+ public boolean hasQuickPropertieComponent()
+ {
+ return true;
+ }
+
+ @Override
+ public Component getQuickPropertiesComponent()
+ {
+ if (quickPropertiesPanel == null)
+ {
+ quickPropertiesPanel = new QuickPropertiesPanel();
+ }
+ return quickPropertiesPanel;
+ }
+
+ @Override
+ public List getMetaParameters(IQuerySession querySession, boolean testData)
+ {
+ String bootstrapServers = "localhost:9092";
+ String schemaRegistryUrl = "";
+ String securityProtocol = KafkaConnection.SecurityProtocol.PLAINTEXT.name();
+ String saslMechanism = KafkaConnection.SaslMechanism.PLAIN.name();
+ String topic = "my_topic";
+
+ if (!testData)
+ {
+ bootstrapServers = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.BOOTSTRAP_SERVERS)
+ .valueAsString(0);
+ schemaRegistryUrl = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.SCHEMA_REGISTRY_URL)
+ .valueAsString(0);
+ securityProtocol = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.SECURITY_PROTOCOL)
+ .valueAsString(0);
+ saslMechanism = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.SASL_MECHANISM)
+ .valueAsString(0);
+ topic = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.TOPIC)
+ .valueAsString(0);
+ }
+
+ return List.of(new MetaParameter(KafkaCatalog.BOOTSTRAP_SERVERS, bootstrapServers, "Kafka bootstrap servers"),
+ new MetaParameter(KafkaCatalog.SCHEMA_REGISTRY_URL, schemaRegistryUrl, "Schema registry url"),
+ new MetaParameter(KafkaCatalog.SECURITY_PROTOCOL, securityProtocol, "Kafka security protocol"), new MetaParameter(KafkaCatalog.SASL_MECHANISM, saslMechanism, "Kafka sasl mechanism"),
+ new MetaParameter(KafkaCatalog.TOPIC, topic, "Default topic"));
+ }
+
+ @Override
+ public ExceptionAction handleException(IQuerySession querySession, CatalogException exception)
+ {
+ if (!(exception instanceof CredentialsException))
+ {
+ return ExceptionAction.NONE;
+ }
+
+ KafkaConnection selectedConnection = (KafkaConnection) quickPropertiesPanel.connections.getSelectedItem();
+ KafkaConnection connection = connectionsModel.findConnection(querySession, catalogAlias);
+
+ if (connection != null
+ && selectedConnection != null
+ && selectedConnection != connection)
+ {
+ if (!connectionsModel.prepare(connection, false))
+ {
+ return ExceptionAction.NONE;
+ }
+
+ connection.setup(querySession, catalogAlias);
+ return ExceptionAction.RERUN;
+ }
+
+ if (connection != null)
+ {
+ connection.setRuntimeSaslJaasPassword(null);
+ if (!connectionsModel.prepare(connection, false))
+ {
+ return ExceptionAction.NONE;
+ }
+
+ connection.setup(querySession, catalogAlias);
+ quickPropertiesPanel.updateAuthStatus(connection);
+ return ExceptionAction.RERUN;
+ }
+
+ return ExceptionAction.NONE;
+ }
+
+ void setupConnection(KafkaConnection connection)
+ {
+ IQueryFile queryFile = queryFileProvider.getCurrentFile();
+ if (queryFile == null)
+ {
+ return;
+ }
+ if (connection != null
+ && queryFile.getEngineState() instanceof IPayloadbuilderState state)
+ {
+ connection.setup(state.getQuerySession(), catalogAlias);
+ state.getQuerySession()
+ .setCatalogProperty(catalogAlias, KafkaCatalog.TOPIC, (String) null);
+ }
+ }
+
+ void setupTopic(String topic)
+ {
+ IQueryFile queryFile = queryFileProvider.getCurrentFile();
+ if (queryFile == null)
+ {
+ return;
+ }
+ if (queryFile.getEngineState() instanceof IPayloadbuilderState state)
+ {
+ state.getQuerySession()
+ .setCatalogProperty(catalogAlias, KafkaCatalog.TOPIC, !isBlank(topic) ? topic
+ : null);
+ }
+ }
+
+ void update(IQueryFile queryFile)
+ {
+ if (queryFile == null)
+ {
+ return;
+ }
+ IPayloadbuilderState state = (IPayloadbuilderState) queryFile.getEngineState();
+ if (state == null)
+ {
+ return;
+ }
+
+ IQuerySession session = state.getQuerySession();
+
+ KafkaConnection connectionToSet = connectionsModel.findConnection(session, catalogAlias);
+ boolean sessionHasConnection = connectionToSet != null;
+ String topic = session.getCatalogProperty(catalogAlias, KafkaCatalog.TOPIC)
+ .valueAsString(0);
+
+ if (connectionToSet == null
+ && connectionsModel.getSize() > 0)
+ {
+ connectionToSet = connectionsModel.getElementAt(0);
+ }
+
+ if (!sessionHasConnection
+ && connectionToSet != null)
+ {
+ connectionToSet.setup(session, catalogAlias);
+ }
+
+ String topicToSet = null;
+ if (connectionToSet != null)
+ {
+ List topics = getIfNull(connectionsModel.getTopics(connectionToSet, false, false), emptyList());
+ for (String item : topics)
+ {
+ if (CI.equals(item, topic))
+ {
+ topicToSet = item;
+ break;
+ }
+ }
+ }
+
+ final KafkaConnection connectionFinal = connectionToSet;
+ final String topicFinal = topicToSet;
+
+ SwingUtilities.invokeLater(() ->
+ {
+ quickPropertiesPanel.suppressEvents = true;
+ try
+ {
+ quickPropertiesPanel.connections.getModel()
+ .setSelectedItem(connectionFinal);
+ quickPropertiesPanel.populateTopics(connectionFinal);
+ quickPropertiesPanel.topics.getModel()
+ .setSelectedItem(topicFinal);
+ quickPropertiesPanel.updateAuthStatus(connectionFinal);
+ }
+ finally
+ {
+ quickPropertiesPanel.suppressEvents = false;
+ }
+ });
+ }
+
+ /** Quick properties panel. */
+ class QuickPropertiesPanel extends JPanel implements ICatalogExtensionView
+ {
+ private static final String PROTOTYPE_TOPIC = "topic-with-a-very-long-name-to-display";
+ private final DefaultComboBoxModel topicsModel = new DefaultComboBoxModel<>();
+ private final JLabel authStatus = new JLabel();
+ private final JButton reload = new JButton("Reload");
+ boolean suppressEvents;
+
+ final JComboBox connections;
+ final JComboBox topics;
+
+ QuickPropertiesPanel()
+ {
+ setLayout(new GridBagLayout());
+
+ connections = new JComboBox<>(new ConnectionsSelectionModel());
+ connections.setRenderer(new DefaultListCellRenderer()
+ {
+ @Override
+ public Component getListCellRendererComponent(JList> list, Object value, int index, boolean isSelected, boolean cellHasFocus)
+ {
+ super.getListCellRendererComponent(list, value, index, isSelected, cellHasFocus);
+ if (value instanceof KafkaConnection connection)
+ {
+ if (!connection.isEnabled())
+ {
+ setText("" + connection + "");
+ }
+ else
+ {
+ setText(connection.toString());
+ }
+ }
+ return this;
+ }
+ });
+ connections.addItemListener(l ->
+ {
+ if (suppressEvents)
+ {
+ return;
+ }
+
+ suppressEvents = true;
+ try
+ {
+ KafkaConnection connection = (KafkaConnection) connections.getSelectedItem();
+ topicsModel.removeAllElements();
+ if (connection != null)
+ {
+ connectionsModel.prepare(connection, true);
+ setupConnection(connection);
+ loadTopics(connection, false);
+ }
+ updateAuthStatus(connection);
+ }
+ finally
+ {
+ suppressEvents = false;
+ }
+ });
+
+ topics = new JComboBox<>(topicsModel);
+ topics.setPrototypeDisplayValue(PROTOTYPE_TOPIC);
+ topics.setMaximumRowCount(25);
+ AutoCompletionComboBox.enable(topics);
+ topics.addItemListener(l ->
+ {
+ if (suppressEvents)
+ {
+ return;
+ }
+ setupTopic((String) topics.getSelectedItem());
+ });
+
+ connectionsModel.registerReloadButton(reload);
+ reload.addActionListener(l ->
+ {
+ KafkaConnection connection = (KafkaConnection) connections.getSelectedItem();
+ if (connection == null
+ || !connection.isEnabled())
+ {
+ return;
+ }
+ if (!connectionsModel.prepare(connection, false))
+ {
+ return;
+ }
+
+ setupConnection(connection);
+ loadTopics(connection, true);
+ });
+
+ add(new JLabel("Connection"), new GridBagConstraints(0, 0, 1, 1, 0.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.NONE, new Insets(0, 0, 1, 3), 0, 0));
+ add(connections, new GridBagConstraints(1, 0, 1, 1, 1.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.HORIZONTAL, new Insets(0, 0, 1, 0), 0, 0));
+ add(new JLabel("Topic"), new GridBagConstraints(0, 1, 1, 1, 0.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.NONE, new Insets(0, 0, 1, 3), 0, 0));
+ add(topics, new GridBagConstraints(1, 1, 1, 1, 1.0, 0.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.HORIZONTAL, new Insets(0, 0, 1, 0), 0, 0));
+ add(authStatus, new GridBagConstraints(0, 2, 1, 1, 0.0, 1.0, GridBagConstraints.NORTH, GridBagConstraints.HORIZONTAL, new Insets(3, 0, 0, 0), 0, 0));
+ add(reload, new GridBagConstraints(1, 2, 1, 1, 1.0, 1.0, GridBagConstraints.BASELINE_LEADING, GridBagConstraints.HORIZONTAL, new Insets(0, 0, 0, 0), 0, 0));
+ setPreferredSize(new Dimension(240, 75));
+ }
+
+ @Override
+ public void afterExecute(IQueryFile queryFile)
+ {
+ if (queryFileProvider.getCurrentFile() == queryFile)
+ {
+ KafkaCatalogExtension.this.update(queryFile);
+ }
+ }
+
+ @Override
+ public void focus(IQueryFile queryFile)
+ {
+ KafkaCatalogExtension.this.update(queryFile);
+ }
+
+ void updateAuthStatus(KafkaConnection connection)
+ {
+ if (connection == connections.getSelectedItem())
+ {
+ boolean hasCredentials = connection != null
+ && connection.hasCredentials();
+ if (connection != null
+ && connection.isSaslEnabled())
+ {
+ authStatus.setText(hasCredentials ? "Authenticated"
+ : "Credentials required");
+ }
+ else
+ {
+ authStatus.setText("");
+ }
+ }
+ }
+
+ void populateTopics(KafkaConnection connection)
+ {
+ topicsModel.removeAllElements();
+ if (connection == null
+ || connection.getTopics() == null)
+ {
+ return;
+ }
+ for (String topic : connection.getTopics())
+ {
+ topicsModel.addElement(topic);
+ }
+ }
+
+ private void loadTopics(KafkaConnection connection, boolean forceReload)
+ {
+ if (!connection.hasCredentials())
+ {
+ return;
+ }
+
+ IQueryFile currentFile = queryFileProvider.getCurrentFile();
+ Runnable load = () ->
+ {
+ try
+ {
+ connectionsModel.getTopics(connection, forceReload, true);
+ SwingUtilities.invokeLater(() ->
+ {
+ Object selected = topics.getSelectedItem();
+ populateTopics(connection);
+ topics.setSelectedItem(selected);
+ updateAuthStatus(connection);
+ });
+ }
+ catch (Exception e)
+ {
+ if (currentFile != null)
+ {
+ e.printStackTrace(currentFile.getMessagesWriter());
+ currentFile.focusMessages();
+ }
+ else
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ new Thread(load).start();
+ }
+ }
+
+ private class ConnectionsSelectionModel extends DefaultComboBoxModel
+ {
+ private int selectedItemIndex;
+
+ @Override
+ public Object getSelectedItem()
+ {
+ if (selectedItemIndex >= 0
+ && selectedItemIndex < connectionsModel.getConnections()
+ .size())
+ {
+ return connectionsModel.getElementAt(selectedItemIndex);
+ }
+ return null;
+ }
+
+ @Override
+ public void setSelectedItem(Object connection)
+ {
+ int newIndex = connectionsModel.getConnections()
+ .indexOf(connection);
+ if (newIndex == -1)
+ {
+ return;
+ }
+
+ if (selectedItemIndex != newIndex)
+ {
+ selectedItemIndex = newIndex;
+ fireContentsChanged(this, -1, -1);
+ }
+ }
+
+ @Override
+ public int getSize()
+ {
+ return connectionsModel.getSize();
+ }
+
+ @Override
+ public KafkaConnection getElementAt(int index)
+ {
+ return connectionsModel.getElementAt(index);
+ }
+
+ @Override
+ public void addListDataListener(ListDataListener l)
+ {
+ super.addListDataListener(l);
+ connectionsModel.addListDataListener(l);
+ }
+
+ @Override
+ public void removeListDataListener(ListDataListener l)
+ {
+ super.removeListDataListener(l);
+ connectionsModel.removeListDataListener(l);
+ }
+ }
+}
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionFactory.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionFactory.java
new file mode 100644
index 00000000..5f1b0938
--- /dev/null
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionFactory.java
@@ -0,0 +1,44 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension;
+import com.queryeer.api.extensions.payloadbuilder.ICatalogExtensionFactory;
+import com.queryeer.api.service.IQueryFileProvider;
+
+/** Extension factory for {@link KafkaCatalogExtension}. */
+class KafkaCatalogExtensionFactory implements ICatalogExtensionFactory
+{
+ private final IQueryFileProvider queryFileProvider;
+ private final KafkaConnectionsModel connectionsModel;
+
+ KafkaCatalogExtensionFactory(IQueryFileProvider queryFileProvider, KafkaConnectionsModel connectionsModel)
+ {
+ this.queryFileProvider = requireNonNull(queryFileProvider, "queryFileProvider");
+ this.connectionsModel = requireNonNull(connectionsModel, "connectionsModel");
+ }
+
+ @Override
+ public ICatalogExtension create(String catalogAlias)
+ {
+ return new KafkaCatalogExtension(queryFileProvider, connectionsModel, catalogAlias);
+ }
+
+ @Override
+ public String getTitle()
+ {
+ return KafkaCatalogExtension.CATALOG.getName();
+ }
+
+ @Override
+ public String getDefaultAlias()
+ {
+ return "kafka";
+ }
+
+ @Override
+ public int order()
+ {
+ return 2;
+ }
+}
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnection.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnection.java
new file mode 100644
index 00000000..a9085c10
--- /dev/null
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnection.java
@@ -0,0 +1,376 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.queryeer.api.component.IPropertyAware;
+import com.queryeer.api.component.Property;
+
+import se.kuseman.payloadbuilder.api.execution.IQuerySession;
+
+/** Kafka connection definition. */
+class KafkaConnection implements IPropertyAware
+{
+ @JsonProperty
+ private String name;
+ @JsonProperty
+ private String bootstrapServers = "localhost:9092";
+ @JsonProperty
+ private String schemaRegistryUrl;
+ @JsonProperty
+ private SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+ @JsonProperty
+ private SaslMechanism saslMechanism = SaslMechanism.PLAIN;
+ @JsonProperty
+ private String saslJaasLoginModule = "org.apache.kafka.common.security.plain.PlainLoginModule";
+ @JsonProperty
+ private String saslJaasControlFlag = "required";
+ @JsonProperty
+ private String saslJaasUsername;
+ /** Stored encrypted password if set. */
+ @JsonProperty
+ private String saslJaasPassword;
+ @JsonProperty
+ private String saslJaasOptions;
+ @JsonProperty
+ private boolean enabled = true;
+
+ @JsonIgnore
+ private List topics;
+ /** Runtime decrypted / prompted password. */
+ @JsonIgnore
+ private char[] runtimeSaslJaasPassword;
+
+ KafkaConnection()
+ {
+ }
+
+ KafkaConnection(KafkaConnection source)
+ {
+ this.name = source.name;
+ this.bootstrapServers = source.bootstrapServers;
+ this.schemaRegistryUrl = source.schemaRegistryUrl;
+ this.securityProtocol = source.securityProtocol;
+ this.saslMechanism = source.saslMechanism;
+ this.saslJaasLoginModule = source.saslJaasLoginModule;
+ this.saslJaasControlFlag = source.saslJaasControlFlag;
+ this.saslJaasUsername = source.saslJaasUsername;
+ this.saslJaasPassword = source.saslJaasPassword;
+ this.saslJaasOptions = source.saslJaasOptions;
+ this.enabled = source.enabled;
+ this.topics = source.topics != null ? new ArrayList<>(source.topics)
+ : null;
+ this.runtimeSaslJaasPassword = source.runtimeSaslJaasPassword != null ? Arrays.copyOf(source.runtimeSaslJaasPassword, source.runtimeSaslJaasPassword.length)
+ : null;
+ }
+
+ void setup(IQuerySession querySession, String catalogAlias)
+ {
+ querySession.setCatalogProperty(catalogAlias, KafkaCatalog.BOOTSTRAP_SERVERS, bootstrapServers);
+ querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SCHEMA_REGISTRY_URL, schemaRegistryUrl);
+ querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SECURITY_PROTOCOL, securityProtocol.name());
+ querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SASL_MECHANISM, saslMechanism.name());
+ querySession.setCatalogProperty(catalogAlias, KafkaCatalog.SASL_JAAS_CONFIG, getSaslJaasConfig());
+ }
+
+ Properties toAdminClientProperties()
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServers);
+ props.put("security.protocol", securityProtocol.name());
+
+ if (isSaslEnabled())
+ {
+ props.put("sasl.mechanism", saslMechanism.name());
+ String jaasConfig = getSaslJaasConfig();
+ if (!isBlank(jaasConfig))
+ {
+ props.put("sasl.jaas.config", jaasConfig);
+ }
+ }
+ return props;
+ }
+
+ boolean isSaslEnabled()
+ {
+ return securityProtocol == SecurityProtocol.SASL_PLAINTEXT
+ || securityProtocol == SecurityProtocol.SASL_SSL;
+ }
+
+ boolean hasCredentials()
+ {
+ return !isSaslEnabled()
+ || (!isBlank(saslJaasUsername)
+ && runtimeSaslJaasPassword != null
+ && runtimeSaslJaasPassword.length > 0);
+ }
+
+ String getSaslJaasConfig()
+ {
+ if (!isSaslEnabled())
+ {
+ return null;
+ }
+
+ String username = defaultIfBlank(saslJaasUsername, null);
+ String password = runtimeSaslJaasPassword != null ? new String(runtimeSaslJaasPassword)
+ : null;
+
+ if (isBlank(saslJaasLoginModule)
+ || isBlank(saslJaasControlFlag)
+ || isBlank(username)
+ || isBlank(password))
+ {
+ return null;
+ }
+
+ StringBuilder sb = new StringBuilder(256);
+ sb.append(saslJaasLoginModule)
+ .append(' ')
+ .append(saslJaasControlFlag)
+ .append(' ')
+ .append("username=\"")
+ .append(escapeJaasValue(username))
+ .append("\" ")
+ .append("password=\"")
+ .append(escapeJaasValue(password))
+ .append('"');
+
+ if (!isBlank(saslJaasOptions))
+ {
+ sb.append(' ')
+ .append(saslJaasOptions.trim());
+ }
+ if (!sb.toString()
+ .endsWith(";"))
+ {
+ sb.append(';');
+ }
+ return sb.toString();
+ }
+
+ private static String escapeJaasValue(String value)
+ {
+ return value.replace("\\", "\\\\")
+ .replace("\"", "\\\"");
+ }
+
+ @Property(
+ order = -1,
+ title = "Name")
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ @Property(
+ order = 1,
+ title = "Bootstrap Servers",
+ tooltip = "Comma separated values are supported")
+ public String getBootstrapServers()
+ {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers)
+ {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ @Property(
+ order = 2,
+ title = "Schema Registry URL",
+ tooltip = "Comma separated values are supported")
+ public String getSchemaRegistryUrl()
+ {
+ return schemaRegistryUrl;
+ }
+
+ public void setSchemaRegistryUrl(String schemaRegistryUrl)
+ {
+ this.schemaRegistryUrl = schemaRegistryUrl;
+ }
+
+ @Property(
+ order = 3,
+ title = "security_protocol")
+ public SecurityProtocol getSecurityProtocol()
+ {
+ return securityProtocol;
+ }
+
+ public void setSecurityProtocol(SecurityProtocol securityProtocol)
+ {
+ this.securityProtocol = securityProtocol;
+ }
+
+ @Property(
+ order = 4,
+ title = "sasl_mechanism",
+ visibleAware = true)
+ public SaslMechanism getSaslMechanism()
+ {
+ return saslMechanism;
+ }
+
+ public void setSaslMechanism(SaslMechanism saslMechanism)
+ {
+ this.saslMechanism = saslMechanism;
+ }
+
+ @Property(
+ order = 5,
+ title = "sasl_jaas_login_module",
+ visibleAware = true)
+ public String getSaslJaasLoginModule()
+ {
+ return saslJaasLoginModule;
+ }
+
+ public void setSaslJaasLoginModule(String saslJaasLoginModule)
+ {
+ this.saslJaasLoginModule = saslJaasLoginModule;
+ }
+
+ @Property(
+ order = 6,
+ title = "sasl_jaas_control_flag",
+ visibleAware = true)
+ public String getSaslJaasControlFlag()
+ {
+ return saslJaasControlFlag;
+ }
+
+ public void setSaslJaasControlFlag(String saslJaasControlFlag)
+ {
+ this.saslJaasControlFlag = saslJaasControlFlag;
+ }
+
+ @Property(
+ order = 7,
+ title = "sasl_jaas_username",
+ visibleAware = true)
+ public String getSaslJaasUsername()
+ {
+ return saslJaasUsername;
+ }
+
+ public void setSaslJaasUsername(String saslJaasUsername)
+ {
+ this.saslJaasUsername = saslJaasUsername;
+ }
+
+ @Property(
+ order = 8,
+ title = "sasl_jaas_password",
+ visibleAware = true,
+ password = true)
+ public String getSaslJaasPassword()
+ {
+ return saslJaasPassword;
+ }
+
+ public void setSaslJaasPassword(String saslJaasPassword)
+ {
+ this.saslJaasPassword = saslJaasPassword;
+ }
+
+ @Property(
+ order = 9,
+ title = "sasl_jaas_options",
+ visibleAware = true,
+ tooltip = "Optional extra JAAS options, e.g. serviceName=\"kafka\"")
+ public String getSaslJaasOptions()
+ {
+ return saslJaasOptions;
+ }
+
+ public void setSaslJaasOptions(String saslJaasOptions)
+ {
+ this.saslJaasOptions = saslJaasOptions;
+ }
+
+ @Property(
+ order = 10,
+ title = "Enabled")
+ public boolean isEnabled()
+ {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled)
+ {
+ this.enabled = enabled;
+ }
+
+ List getTopics()
+ {
+ return topics;
+ }
+
+ void setTopics(List topics)
+ {
+ this.topics = topics;
+ }
+
+ char[] getRuntimeSaslJaasPassword()
+ {
+ return runtimeSaslJaasPassword;
+ }
+
+ void setRuntimeSaslJaasPassword(char[] runtimeSaslJaasPassword)
+ {
+ this.runtimeSaslJaasPassword = runtimeSaslJaasPassword;
+ }
+
+ @Override
+ public boolean visible(String property)
+ {
+ if ("saslMechanism".equals(property)
+ || "saslJaasLoginModule".equals(property)
+ || "saslJaasControlFlag".equals(property)
+ || "saslJaasUsername".equals(property)
+ || "saslJaasPassword".equals(property)
+ || "saslJaasOptions".equals(property))
+ {
+ return isSaslEnabled();
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return Objects.toString(name, bootstrapServers);
+ }
+
+ public enum SecurityProtocol
+ {
+ PLAINTEXT,
+ SSL,
+ SASL_PLAINTEXT,
+ SASL_SSL
+ }
+
+ public enum SaslMechanism
+ {
+ PLAIN,
+ SCRAM_SHA_256,
+ SCRAM_SHA_512,
+ GSSAPI,
+ OAUTHBEARER
+ }
+}
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsConfigurable.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsConfigurable.java
new file mode 100644
index 00000000..e33c9cd2
--- /dev/null
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsConfigurable.java
@@ -0,0 +1,172 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonMap;
+import static java.util.Objects.requireNonNull;
+import static org.apache.commons.lang3.ObjectUtils.getIfNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.queryeer.api.component.ListPropertiesComponent;
+import com.queryeer.api.extensions.IConfigurable;
+import com.queryeer.api.service.IConfig;
+import com.queryeer.api.service.ICryptoService;
+
+/** Kafka configurable. */
+class KafkaConnectionsConfigurable implements IConfigurable
+{
+ private static final String NAME = "se.kuseman.payloadbuilder.catalog.kafka.KafkaCatalogExtension";
+ private static final String CONNECTIONS = "connections";
+
+ private final IConfig config;
+ private final ICryptoService cryptoService;
+ private final KafkaConnectionsModel connectionsModel;
+ private final List> dirtyStateConsumers = new ArrayList<>();
+
+ private ListPropertiesComponent configComponent;
+
+ KafkaConnectionsConfigurable(IConfig config, ICryptoService cryptoService, KafkaConnectionsModel connectionsModel)
+ {
+ this.config = requireNonNull(config, "config");
+ this.cryptoService = requireNonNull(cryptoService, "cryptoService");
+ this.connectionsModel = requireNonNull(connectionsModel, "connectionsModel");
+ loadSettings();
+ }
+
+ @Override
+ public ListPropertiesComponent getComponent()
+ {
+ if (configComponent == null)
+ {
+ configComponent = new ListPropertiesComponent<>(KafkaConnection.class, this::notifyDirty, this::newConnection, this::cloneConnection);
+ configComponent.init(connectionsModel.copyConnections());
+ }
+ return configComponent;
+ }
+
+ @Override
+ public void revertChanges()
+ {
+ getComponent().init(connectionsModel.copyConnections());
+ }
+
+ @Override
+ public boolean commitChanges()
+ {
+ connectionsModel.setConnections(getComponent().getResult());
+
+ for (KafkaConnection con : connectionsModel.getConnections())
+ {
+ String pass = con.getSaslJaasPassword();
+ if (isBlank(pass))
+ {
+ continue;
+ }
+
+ con.setRuntimeSaslJaasPassword(pass.toCharArray());
+
+ String encryptedPass = cryptoService.encryptString(pass);
+ if (encryptedPass == null)
+ {
+ return false;
+ }
+ con.setSaslJaasPassword(encryptedPass);
+ }
+
+ config.saveExtensionConfig(NAME, singletonMap(CONNECTIONS, connectionsModel.getConnections()));
+ getComponent().init(connectionsModel.copyConnections());
+ return true;
+ }
+
+ @Override
+ public EncryptionResult reEncryptSecrets(ICryptoService newCryptoService)
+ {
+ boolean change = false;
+ for (KafkaConnection con : getComponent().getResult())
+ {
+ String pass = con.getSaslJaasPassword();
+ if (isBlank(pass))
+ {
+ continue;
+ }
+
+ if ((pass = cryptoService.decryptString(pass)) == null)
+ {
+ return EncryptionResult.ABORT;
+ }
+
+ con.setRuntimeSaslJaasPassword(pass.toCharArray());
+ con.setSaslJaasPassword(newCryptoService.encryptString(pass));
+ change = true;
+ }
+ return change ? EncryptionResult.SUCCESS
+ : EncryptionResult.NO_CHANGE;
+ }
+
+ @Override
+ public String getTitle()
+ {
+ return "Connections";
+ }
+
+ @Override
+ public String getLongTitle()
+ {
+ return "Connections to Kafka";
+ }
+
+ @Override
+ public String groupName()
+ {
+ return KafkaConstants.TITLE;
+ }
+
+ @Override
+ public void addDirtyStateConsumer(Consumer consumer)
+ {
+ dirtyStateConsumers.add(consumer);
+ }
+
+ @Override
+ public void removeDirtyStateConsumer(Consumer consumer)
+ {
+ dirtyStateConsumers.remove(consumer);
+ }
+
+ private void notifyDirty(boolean dirty)
+ {
+ dirtyStateConsumers.forEach(c -> c.accept(dirty));
+ }
+
+ private KafkaConnection newConnection()
+ {
+ KafkaConnection connection = new KafkaConnection();
+ connection.setName("New Kafka Connection");
+ return connection;
+ }
+
+ private KafkaConnection cloneConnection(KafkaConnection connection)
+ {
+ KafkaConnection newConnection = new KafkaConnection(connection);
+ newConnection.setName(connection.getName() + " - Copy");
+ return newConnection;
+ }
+
+ private void loadSettings()
+ {
+ Map settings = config.loadExtensionConfig(NAME);
+ List connections = KafkaConstants.MAPPER.convertValue(getIfNull(settings.get(CONNECTIONS), emptyList()), new TypeReference>()
+ {
+ });
+ if (connections == null)
+ {
+ return;
+ }
+ connectionsModel.setConnections(connections);
+ }
+}
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsModel.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsModel.java
new file mode 100644
index 00000000..47da8a4b
--- /dev/null
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionsModel.java
@@ -0,0 +1,217 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import static java.util.Collections.emptyList;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.Strings.CI;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.swing.AbstractListModel;
+import javax.swing.JButton;
+import javax.swing.SwingUtilities;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+
+import com.queryeer.api.IQueryFile;
+import com.queryeer.api.extensions.Inject;
+import com.queryeer.api.service.ICryptoService;
+import com.queryeer.api.service.IQueryFileProvider;
+import com.queryeer.api.utils.CredentialUtils;
+import com.queryeer.api.utils.CredentialUtils.Credentials;
+
+import se.kuseman.payloadbuilder.api.execution.IQuerySession;
+import se.kuseman.payloadbuilder.api.execution.ValueVector;
+
+/** Model for {@link KafkaCatalogExtension}'s connections. */
+@Inject
+class KafkaConnectionsModel extends AbstractListModel
+{
+ private final IQueryFileProvider queryFileProvider;
+ private final ICryptoService cryptoService;
+ private final List connections = new ArrayList<>();
+ private final List reloadButtons = new ArrayList<>();
+
+ KafkaConnectionsModel(IQueryFileProvider queryFileProvider, ICryptoService cryptoService)
+ {
+ this.queryFileProvider = requireNonNull(queryFileProvider, "queryFileProvider");
+ this.cryptoService = requireNonNull(cryptoService, "cryptoService");
+ }
+
+ void registerReloadButton(JButton button)
+ {
+ reloadButtons.add(button);
+ }
+
+ @Override
+ public int getSize()
+ {
+ return connections.size();
+ }
+
+ @Override
+ public KafkaConnection getElementAt(int index)
+ {
+ return connections.get(index);
+ }
+
+ List copyConnections()
+ {
+ return connections.stream()
+ .map(KafkaConnection::new)
+ .collect(toList());
+ }
+
+ List getConnections()
+ {
+ return connections;
+ }
+
+ void setConnections(List connections)
+ {
+ this.connections.clear();
+ this.connections.addAll(connections);
+ fireContentsChanged(this, 0, getSize() - 1);
+ }
+
+ KafkaConnection findConnection(IQuerySession querySession, String catalogAlias)
+ {
+ ValueVector property = querySession.getCatalogProperty(catalogAlias, KafkaCatalog.BOOTSTRAP_SERVERS);
+ if (property == null
+ || property.isNull(0))
+ {
+ return null;
+ }
+ String bootstrapServers = property.valueAsString(0);
+
+ int size = getSize();
+ for (int i = 0; i < size; i++)
+ {
+ KafkaConnection connection = getElementAt(i);
+ if (CI.equals(connection.getBootstrapServers(), bootstrapServers))
+ {
+ return connection;
+ }
+ }
+ return null;
+ }
+
+ protected Credentials getCredentials(String connectionDescription, String prefilledUsername, boolean readOnlyUsername)
+ {
+ return CredentialUtils.getCredentials(connectionDescription, prefilledUsername, readOnlyUsername);
+ }
+
+ boolean prepare(KafkaConnection connection, boolean silent)
+ {
+ if (connection == null)
+ {
+ return false;
+ }
+
+ if (connection.hasCredentials())
+ {
+ return true;
+ }
+
+ if (!connection.isSaslEnabled())
+ {
+ return true;
+ }
+
+ if (isBlank(connection.getSaslJaasPassword()))
+ {
+ if (silent)
+ {
+ return false;
+ }
+
+ Credentials credentials = getCredentials(connection.toString(), connection.getSaslJaasUsername(), false);
+ if (credentials == null)
+ {
+ return false;
+ }
+
+ if (!isBlank(credentials.getUsername()))
+ {
+ connection.setSaslJaasUsername(credentials.getUsername());
+ }
+ connection.setRuntimeSaslJaasPassword(credentials.getPassword());
+ return true;
+ }
+
+ if (silent
+ && !cryptoService.isInitalized())
+ {
+ return false;
+ }
+
+ String decrypted = cryptoService.decryptString(connection.getSaslJaasPassword());
+ if (decrypted == null)
+ {
+ return false;
+ }
+ connection.setRuntimeSaslJaasPassword(decrypted.toCharArray());
+ return true;
+ }
+
+ List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError)
+ {
+ SwingUtilities.invokeLater(() -> reloadButtons.forEach(b -> b.setEnabled(false)));
+ try
+ {
+ synchronized (connection)
+ {
+ List topics = connection.getTopics();
+ if ((!forceReload
+ && topics != null)
+ || (!forceReload
+ && !connection.hasCredentials()))
+ {
+ return topics != null ? topics
+ : emptyList();
+ }
+
+ try (AdminClient admin = AdminClient.create(connection.toAdminClientProperties()))
+ {
+ Set names = admin.listTopics(new ListTopicsOptions().listInternal(false))
+ .names()
+ .get(15, TimeUnit.SECONDS);
+ List result = new ArrayList<>(names);
+ result.sort(String.CASE_INSENSITIVE_ORDER);
+ connection.setTopics(result);
+ }
+ catch (Exception e)
+ {
+ connection.setTopics(emptyList());
+ connection.setRuntimeSaslJaasPassword(null);
+
+ IQueryFile queryFile = queryFileProvider.getCurrentFile();
+ if (queryFile != null)
+ {
+ e.printStackTrace(queryFile.getMessagesWriter());
+ queryFile.focusMessages();
+ }
+ else
+ {
+ e.printStackTrace();
+ }
+
+ if (reThrowError)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return connection.getTopics();
+ }
+ }
+ finally
+ {
+ SwingUtilities.invokeLater(() -> reloadButtons.forEach(b -> b.setEnabled(true)));
+ }
+ }
+}
diff --git a/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConstants.java b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConstants.java
new file mode 100644
index 00000000..20cce12b
--- /dev/null
+++ b/queryeer-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConstants.java
@@ -0,0 +1,11 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Constants for Kafka module. */
+interface KafkaConstants
+{
+ String TITLE = "Kafka";
+ ObjectMapper MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+}
diff --git a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java
index c8687a9a..b049ed9f 100644
--- a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java
+++ b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/es/ESCatalogExtensionTest.java
@@ -29,6 +29,7 @@
import se.kuseman.payloadbuilder.api.catalog.CatalogException;
import se.kuseman.payloadbuilder.api.catalog.Column.Type;
import se.kuseman.payloadbuilder.api.execution.IQuerySession;
+import se.kuseman.payloadbuilder.catalog.AuthType;
import se.kuseman.payloadbuilder.catalog.CredentialsException;
import se.kuseman.payloadbuilder.catalog.es.ESConnectionsModel.Connection;
import se.kuseman.payloadbuilder.test.VectorTestUtils;
diff --git a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionTest.java b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionTest.java
new file mode 100644
index 00000000..0d2de82b
--- /dev/null
+++ b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalogExtensionTest.java
@@ -0,0 +1,189 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+import javax.swing.SwingUtilities;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import com.queryeer.api.IQueryFile;
+import com.queryeer.api.extensions.payloadbuilder.ICatalogExtension;
+import com.queryeer.api.extensions.payloadbuilder.IPayloadbuilderState;
+import com.queryeer.api.service.ICryptoService;
+import com.queryeer.api.service.IQueryFileProvider;
+
+import se.kuseman.payloadbuilder.api.catalog.CatalogException;
+import se.kuseman.payloadbuilder.api.catalog.Column.Type;
+import se.kuseman.payloadbuilder.api.execution.IQuerySession;
+import se.kuseman.payloadbuilder.test.VectorTestUtils;
+
+/** Test of {@link KafkaCatalogExtension}. */
+class KafkaCatalogExtensionTest
+{
+ @Test
+ void test_handle_exception_non_credentials()
+ {
+ IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class);
+ ICryptoService cryptoService = Mockito.mock(ICryptoService.class);
+ IQuerySession session = Mockito.mock(IQuerySession.class);
+
+ KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService);
+ KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka");
+
+ assertEquals(ICatalogExtension.ExceptionAction.NONE, extension.handleException(session, new CatalogException("kafka", "boom")));
+ }
+
+ @Test
+ void test_setup() throws InvocationTargetException, InterruptedException
+ {
+ IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class);
+ IQuerySession session = Mockito.mock(IQuerySession.class);
+ IQueryFile queryFile = Mockito.mock(IQueryFile.class);
+ ICryptoService cryptoService = Mockito.mock(ICryptoService.class);
+ IPayloadbuilderState state = Mockito.mock(IPayloadbuilderState.class);
+ when(state.getQuerySession()).thenReturn(session);
+
+ when(queryFileProvider.getCurrentFile()).thenReturn(queryFile);
+ when(queryFile.getEngineState()).thenReturn(state);
+
+ KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService)
+ {
+ @Override
+ List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError)
+ {
+ return connection.getTopics();
+ }
+ };
+
+ KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka");
+
+ KafkaConnection connection = new KafkaConnection();
+ connection.setName("dev");
+ connection.setBootstrapServers("localhost:9092");
+ connection.setSchemaRegistryUrl("http://localhost:8081");
+ connection.setSecurityProtocol(KafkaConnection.SecurityProtocol.SASL_SSL);
+ connection.setSaslMechanism(KafkaConnection.SaslMechanism.SCRAM_SHA_512);
+ connection.setSaslJaasLoginModule("org.apache.kafka.common.security.scram.ScramLoginModule");
+ connection.setSaslJaasControlFlag("required");
+ connection.setSaslJaasUsername("user");
+ connection.setRuntimeSaslJaasPassword("password".toCharArray());
+ connection.setTopics(asList("topic_a", "topic_b"));
+ model.setConnections(asList(connection));
+
+ extension.setupConnection(connection);
+
+ Mockito.verify(session)
+ .setCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS, "localhost:9092");
+ Mockito.verify(session)
+ .setCatalogProperty("kafka", KafkaCatalog.SCHEMA_REGISTRY_URL, "http://localhost:8081");
+ Mockito.verify(session)
+ .setCatalogProperty("kafka", KafkaCatalog.SECURITY_PROTOCOL, KafkaConnection.SecurityProtocol.SASL_SSL.name());
+ Mockito.verify(session)
+ .setCatalogProperty("kafka", KafkaCatalog.SASL_MECHANISM, KafkaConnection.SaslMechanism.SCRAM_SHA_512.name());
+
+ extension.setupTopic("topic_b");
+ Mockito.verify(session)
+ .setCatalogProperty("kafka", KafkaCatalog.TOPIC, "topic_b");
+ }
+
+ @Test
+ void test_update() throws InvocationTargetException, InterruptedException
+ {
+ IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class);
+ IQuerySession session = Mockito.mock(IQuerySession.class);
+ IQueryFile queryFile = Mockito.mock(IQueryFile.class);
+ ICryptoService cryptoService = Mockito.mock(ICryptoService.class);
+ IPayloadbuilderState state = Mockito.mock(IPayloadbuilderState.class);
+ when(state.getQuerySession()).thenReturn(session);
+ when(queryFileProvider.getCurrentFile()).thenReturn(queryFile);
+ when(queryFile.getEngineState()).thenReturn(state);
+
+ KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService)
+ {
+ @Override
+ List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError)
+ {
+ return connection.getTopics();
+ }
+ };
+
+ KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka");
+
+ KafkaConnection connection1 = new KafkaConnection();
+ connection1.setBootstrapServers("localhost:9092");
+ connection1.setTopics(asList("orders", "payments"));
+ KafkaConnection connection2 = new KafkaConnection();
+ connection2.setBootstrapServers("localhost:9093");
+ connection2.setTopics(asList("metrics"));
+ model.setConnections(asList(connection1, connection2));
+
+ KafkaCatalogExtension.QuickPropertiesPanel qp = (KafkaCatalogExtension.QuickPropertiesPanel) extension.getQuickPropertiesComponent();
+
+ doReturn(VectorTestUtils.vv(Type.String, "localhost:9092")).when(session)
+ .getCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS);
+ doReturn(VectorTestUtils.vv(Type.String, "payments")).when(session)
+ .getCatalogProperty("kafka", KafkaCatalog.TOPIC);
+
+ SwingUtilities.invokeAndWait(() -> extension.update(queryFile));
+ SwingUtilities.invokeAndWait(() ->
+ {
+ // Flush queued UI updates from invokeLater in extension.update
+ });
+
+ assertEquals("localhost:9092", ((KafkaConnection) qp.connections.getSelectedItem()).getBootstrapServers());
+ assertEquals("payments", qp.topics.getSelectedItem());
+ }
+
+ @Test
+ void test_update_sets_default_connection_in_session_when_missing() throws InvocationTargetException, InterruptedException
+ {
+ IQueryFileProvider queryFileProvider = Mockito.mock(IQueryFileProvider.class);
+ IQuerySession session = Mockito.mock(IQuerySession.class);
+ IQueryFile queryFile = Mockito.mock(IQueryFile.class);
+ ICryptoService cryptoService = Mockito.mock(ICryptoService.class);
+ IPayloadbuilderState state = Mockito.mock(IPayloadbuilderState.class);
+ when(state.getQuerySession()).thenReturn(session);
+ when(queryFileProvider.getCurrentFile()).thenReturn(queryFile);
+ when(queryFile.getEngineState()).thenReturn(state);
+
+ KafkaConnectionsModel model = new KafkaConnectionsModel(queryFileProvider, cryptoService)
+ {
+ @Override
+ List getTopics(KafkaConnection connection, boolean forceReload, boolean reThrowError)
+ {
+ return connection.getTopics();
+ }
+ };
+
+ KafkaCatalogExtension extension = new KafkaCatalogExtension(queryFileProvider, model, "kafka");
+
+ KafkaConnection connection1 = new KafkaConnection();
+ connection1.setBootstrapServers("localhost:9092");
+ connection1.setTopics(asList("orders", "payments"));
+ model.setConnections(asList(connection1));
+
+ KafkaCatalogExtension.QuickPropertiesPanel qp = (KafkaCatalogExtension.QuickPropertiesPanel) extension.getQuickPropertiesComponent();
+
+ doReturn(null).when(session)
+ .getCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS);
+ doReturn(VectorTestUtils.vv(Type.String, "payments")).when(session)
+ .getCatalogProperty("kafka", KafkaCatalog.TOPIC);
+
+ SwingUtilities.invokeAndWait(() -> extension.update(queryFile));
+ SwingUtilities.invokeAndWait(() ->
+ {
+ // Flush queued UI updates from invokeLater in extension.update
+ });
+
+ Mockito.verify(session)
+ .setCatalogProperty("kafka", KafkaCatalog.BOOTSTRAP_SERVERS, "localhost:9092");
+ assertEquals("localhost:9092", ((KafkaConnection) qp.connections.getSelectedItem()).getBootstrapServers());
+ }
+}
diff --git a/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionTest.java b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionTest.java
new file mode 100644
index 00000000..54ff973c
--- /dev/null
+++ b/queryeer-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaConnectionTest.java
@@ -0,0 +1,36 @@
+package se.kuseman.payloadbuilder.catalog.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+/** Test of {@link KafkaConnection}. */
+class KafkaConnectionTest
+{
+ @Test
+ void test_jaas_config_assembly()
+ {
+ KafkaConnection connection = new KafkaConnection();
+ connection.setSecurityProtocol(KafkaConnection.SecurityProtocol.SASL_SSL);
+ connection.setSaslMechanism(KafkaConnection.SaslMechanism.SCRAM_SHA_512);
+ connection.setSaslJaasLoginModule("org.apache.kafka.common.security.scram.ScramLoginModule");
+ connection.setSaslJaasControlFlag("required");
+ connection.setSaslJaasUsername("user");
+ connection.setRuntimeSaslJaasPassword("pa\"ss\\word".toCharArray());
+ connection.setSaslJaasOptions("serviceName=\"kafka\"");
+
+ assertEquals("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"pa\\\"ss\\\\word\" serviceName=\"kafka\";", connection.getSaslJaasConfig());
+ }
+
+ @Test
+ void test_jaas_config_null_when_not_sasl()
+ {
+ KafkaConnection connection = new KafkaConnection();
+ connection.setSecurityProtocol(KafkaConnection.SecurityProtocol.PLAINTEXT);
+ connection.setSaslJaasUsername("user");
+ connection.setRuntimeSaslJaasPassword("password".toCharArray());
+
+ assertNull(connection.getSaslJaasConfig());
+ }
+}