From 6f377de2365f54734a832240dc5b7e6a47ae69d2 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 18 Nov 2025 22:20:59 +0100 Subject: [PATCH] cluster-bootstrap support TLS requests in client calls (#426) * cluster-bootstrap support TLS requests in client calls Update HttpContactPointBootstrap.scala Update HttpContactPointBootstrap.scala add cert Update BootstrapCoordinatorSpec.scala extra test make TLS version configurable cert unused Update BootstrapCoordinatorSpec.scala Update BootstrapCoordinatorSpec.scala Update HttpContactPointBootstrap.scala Revert "cert unused" This reverts commit 83b45b537de0cef771f6e029f2663c07bbd5ec26. add tests * Update management-cluster-bootstrap/src/main/resources/reference.conf Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- build.sbt | 1 + .../src/main/resources/reference.conf | 7 ++ .../bootstrap/ClusterBootstrapSettings.scala | 7 ++ .../internal/HttpContactPointBootstrap.scala | 39 ++++++++++- .../src/test/files/ca.crt | 18 +++++ .../{reference.conf => application.conf} | 0 .../internal/BootstrapCoordinatorSpec.scala | 9 +-- .../HttpContactPointBootstrapSpec.scala | 67 ++++++++++++++++++- 8 files changed, 142 insertions(+), 6 deletions(-) create mode 100644 management-cluster-bootstrap/src/test/files/ca.crt rename management-cluster-bootstrap/src/test/resources/{reference.conf => application.conf} (100%) diff --git a/build.sbt b/build.sbt index 351e9ef9..c3e76ef5 100644 --- a/build.sbt +++ b/build.sbt @@ -151,6 +151,7 @@ lazy val managementClusterBootstrap = pekkoModule("management-cluster-bootstrap" libraryDependencies := Dependencies.managementClusterBootstrap, mimaPreviousArtifactsSet) .dependsOn(management) + .dependsOn(managementPki) lazy val leaseKubernetes = pekkoModule("lease-kubernetes") .enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin) diff --git a/management-cluster-bootstrap/src/main/resources/reference.conf b/management-cluster-bootstrap/src/main/resources/reference.conf index 5332ae2e..0f4d7844 100644 --- a/management-cluster-bootstrap/src/main/resources/reference.conf +++ b/management-cluster-bootstrap/src/main/resources/reference.conf @@ -134,6 +134,13 @@ pekko.management { # Max amount of jitter to be added on retries probe-interval-jitter = 0.2 + + http-client { + # set this to your HTTPS certificate path if you want to setup a HTTPS trust store + ca-path = "" + # the TLS version to use when connecting to contact points + tls-version = "TLSv1.2" + } } join-decider { diff --git a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala index ca6d47e5..ba77a437 100644 --- a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala +++ b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala @@ -134,6 +134,13 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) { object contactPoint { private val contactPointConfig = bootConfig.getConfig("contact-point") + object httpClient { + private val httpClientConfig = contactPointConfig.getConfig("http-client") + + val caPath: String = httpClientConfig.getString("ca-path") + val tlsVersion: String = httpClientConfig.getString("tls-version") + } + val fallbackPort: Int = contactPointConfig .optDefinedValue("fallback-port") diff --git a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala index cea30b9b..f9a2c62a 100644 --- a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala +++ b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala @@ -14,10 +14,13 @@ package org.apache.pekko.management.cluster.bootstrap.internal import java.time.LocalDateTime +import java.security.{ KeyStore, SecureRandom } import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeoutException +import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager } import scala.concurrent.Future import scala.concurrent.duration._ + import org.apache.pekko import pekko.actor.Actor import pekko.actor.ActorLogging @@ -29,7 +32,9 @@ import pekko.actor.Timers import pekko.annotation.InternalApi import pekko.cluster.Cluster import pekko.discovery.ServiceDiscovery.ResolvedTarget +import pekko.http.scaladsl.ConnectionContext import pekko.http.scaladsl.Http +import pekko.http.scaladsl.HttpsConnectionContext import pekko.http.scaladsl.model.HttpResponse import pekko.http.scaladsl.model.StatusCodes import pekko.http.scaladsl.model.Uri @@ -41,6 +46,7 @@ import pekko.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol import pekko.management.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol } import pekko.pattern.after import pekko.pattern.pipe +import pekko.pki.kubernetes.PemManagersProvider @InternalApi private[bootstrap] object HttpContactPointBootstrap { @@ -56,6 +62,26 @@ private[bootstrap] object HttpContactPointBootstrap { private case object ProbeTick extends DeadLetterSuppression private val ProbingTimerKey = "probing-key" + + def generateSSLContext(settings: ClusterBootstrapSettings): SSLContext = { + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(null) + factory.init(keyStore, Array.empty) + val km: Array[KeyManager] = factory.getKeyManagers + val caPath = settings.contactPoint.httpClient.caPath.trim + val tm: Array[TrustManager] = if (caPath.isEmpty) { + Array.empty + } else { + val certificates = PemManagersProvider.loadCertificates(caPath) + PemManagersProvider.buildTrustManagers(certificates) + } + val tlsVersion = settings.contactPoint.httpClient.tlsVersion.trim + val random: SecureRandom = new SecureRandom + val sslContext = SSLContext.getInstance(tlsVersion) + sslContext.init(km, tm, random) + sslContext + } } /** @@ -88,7 +114,12 @@ private[bootstrap] class HttpContactPointBootstrap( } private implicit val sys: ActorSystem = context.system + + private lazy val clientSslContext: HttpsConnectionContext = + ConnectionContext.httpsClient(HttpContactPointBootstrap.generateSSLContext(settings)) + private val http = Http() + private val connectionPoolWithoutRetries = ConnectionPoolSettings(context.system).withMaxRetries(0) import context.dispatcher @@ -111,7 +142,13 @@ private[bootstrap] class HttpContactPointBootstrap( override def receive = { case ProbeTick => log.debug("Probing [{}] for seed nodes...", probeRequest.uri) - val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse) + val reply = if (probeRequest.uri.scheme == "https") { + http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries, + connectionContext = clientSslContext) + } else { + http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries) + }.flatMap(handleResponse) + val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout) Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self) diff --git a/management-cluster-bootstrap/src/test/files/ca.crt b/management-cluster-bootstrap/src/test/files/ca.crt new file mode 100644 index 00000000..7fc98192 --- /dev/null +++ b/management-cluster-bootstrap/src/test/files/ca.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p +a3ViZUNBMB4XDTE3MTIxMjEzMzY1MVoXDTI3MTIxMDEzMzY1MVowFTETMBEGA1UE +AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMrk +QcE8e2L3Rnm8K51y1Y4CHWwx4XwD0SqPwGq9nnygFaBsibIIrex89Im4f73QaqR5 +h87ypi0dyqlaTZdleZN7Q4hNSpWF1t/zSGanm7QOSl76FlTAFNm/eVNamfuGRf1x +OYWGWRwdct3Six5K+R/qHh6oJ9XDli9LuV4vxHTDB/mr/2Xgyz1MDrIdRDYpiqev +3HNJqnfXFT3eGWXk4ENZsc+I/R5LbSXA+cSQd9xrkrBhbreHLk99pif7eAKwVKNZ +Rcsp9QBgMOUAoFgk+sU6YeVrasXIF1R4BB7g+LpqpM3F6jqmD79j2mREMIU3kjEQ +eXMqi1W31i9ug1VxwTUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW +MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 +DQEBCwUAA4IBAQAyRchLY4Jhu1EBFlhYebGLrEO/twZCu2NQyM0by5XUoXApJeqf +S00Q7A67CRcQlbtRAH5vqhpCxutlKc26dF5Y1MmJmkGT7WIjujV0UIF/jJDnmwKK ++DRQl1UgA1e4WS6XOwUaSo9ltgJQ+GJfgkg3Xs3pzjjIpX94eF4V9ArJ8npRVO+w +cCxE01P+Nm9U5H24QnlY+1IxNeszitm34SGiRy6SqoKSfYQoNyQadG9KVybs4FAs +7aeYAB10I7FLFt4+Ji93zZjnWcKXjv59vz7NBDPtCsaXhJ82983GsfV2z+WQ3kRZ +R2XVTsdz8yu0rgmyewxVKH7Roo5Ts+qpZFbi +-----END CERTIFICATE----- diff --git a/management-cluster-bootstrap/src/test/resources/reference.conf b/management-cluster-bootstrap/src/test/resources/application.conf similarity index 100% rename from management-cluster-bootstrap/src/test/resources/reference.conf rename to management-cluster-bootstrap/src/test/resources/application.conf diff --git a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala index 32c72aae..9995694c 100644 --- a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala +++ b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala @@ -14,22 +14,23 @@ package org.apache.pekko.management.cluster.bootstrap.internal import java.util.concurrent.atomic.AtomicReference + import org.apache.pekko import pekko.actor.{ ActorRef, ActorSystem, Props } import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } import pekko.discovery.{ Lookup, MockDiscovery } import pekko.http.scaladsl.model.Uri -import com.typesafe.config.ConfigFactory import pekko.management.cluster.bootstrap.internal.BootstrapCoordinator.Protocol.InitiateBootstrapping import pekko.management.cluster.bootstrap.{ ClusterBootstrapSettings, LowestAddressJoinDecider } -import org.scalatest.concurrent.Eventually +import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers import org.scalatest.time.{ Millis, Seconds, Span } +import org.scalatest.wordspec.AnyWordSpec import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec class BootstrapCoordinatorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eventually { val serviceName = "bootstrap-coordinator-test-service" diff --git a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala index ef80e5be..b3f8892d 100644 --- a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala +++ b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala @@ -13,17 +13,82 @@ package org.apache.pekko.management.cluster.bootstrap.internal +import java.nio.file.NoSuchFileException + import org.apache.pekko -import pekko.actor.ActorPath +import pekko.actor.{ ActorPath, ActorSystem } +import pekko.event.Logging +import pekko.management.cluster.bootstrap.ClusterBootstrapSettings import pekko.http.scaladsl.model.Uri.Host +import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class HttpContactPointBootstrapSpec extends AnyWordSpec with Matchers { + "HttpContactPointBootstrap" should { "use a safe name when connecting over IPv6" in { val name = HttpContactPointBootstrap.name(Host("[fe80::1013:2070:258a:c662]"), 443) ActorPath.isValidPathElement(name) should be(true) } + "generate SSLContext with default config" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val settings = new ClusterBootstrapSettings(sys.settings.config, log) + HttpContactPointBootstrap.generateSSLContext(settings) should not be null + } finally { + sys.terminate() + } + } + "generate SSLContext with cert" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/ca.crt" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + HttpContactPointBootstrap.generateSSLContext(settings) should not be null + } finally { + sys.terminate() + } + } + "fail to generate SSLContext with missing cert" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/non-existent.crt" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + intercept[NoSuchFileException] { + HttpContactPointBootstrap.generateSSLContext(settings) + } + } finally { + sys.terminate() + } + } + "fail to generate SSLContext with bad tls-version" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/ca.crt" + tls-version = "BAD_VERSION" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + val noSuchAlgorithmException = intercept[java.security.NoSuchAlgorithmException] { + HttpContactPointBootstrap.generateSSLContext(settings) + } + noSuchAlgorithmException.getMessage.contains("BAD_VERSION") should be(true) + } finally { + sys.terminate() + } + } + } }