@@ -548,7 +548,7 @@ CONSOLE_LOG=new`
548548 })
549549 })
550550
551- When ("(web) MQTT, STOMP are enabled" , func () {
551+ When ("(web) MQTT, STOMP and stream are enabled" , func () {
552552 var (
553553 cluster * rabbitmqv1beta1.RabbitmqCluster
554554 hostname string
@@ -591,48 +591,17 @@ CONSOLE_LOG=new`
591591 By ("STOMP" )
592592 publishAndConsumeSTOMPMsg (hostname , rabbitmqNodePort (ctx , clientSet , cluster , "stomp" ), username , password , nil )
593593
594- })
595-
596- })
597-
598- When ("stream plugin is enabled" , func () {
599- var (
600- cluster * rabbitmqv1beta1.RabbitmqCluster
601- hostname string
602- username string
603- password string
604- )
605-
606- BeforeEach (func () {
607- instanceName := "stream"
608- cluster = newRabbitmqCluster (namespace , instanceName )
609- cluster .Spec .Service .Type = "NodePort"
610- cluster .Spec .Rabbitmq .AdditionalPlugins = []rabbitmqv1beta1.Plugin {
611- "rabbitmq_stream" ,
612- }
613- Expect (createRabbitmqCluster (ctx , rmqClusterClient , cluster )).To (Succeed ())
614- waitForRabbitmqRunning (cluster )
615-
616- hostname = kubernetesNodeIp (ctx , clientSet )
617- var err error
618- username , password , err = getUsernameAndPassword (ctx , clientSet , "rabbitmq-system" , instanceName )
619- Expect (err ).NotTo (HaveOccurred ())
620- })
621-
622- AfterEach (func () {
623- Expect (rmqClusterClient .Delete (context .TODO (), cluster )).To (Succeed ())
624- })
625-
626- It ("publishes and consumes a message" , func () {
594+ By ("Streams" )
627595 if ! hasFeatureEnabled (cluster , "stream_queue" ) {
628596 Skip ("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster .Spec .Image )
629597 }else {
630- fmt .Println ("Stream feature is enabled " )
631598 waitForPortConnectivity (cluster )
632599 waitForPortReadiness (cluster , 5552 ) // stream
633600 publishAndConsumeStreamMsg (hostname , rabbitmqNodePort (ctx , clientSet , cluster , "stream" ), username , password )
634601 }
635602 })
636603
604+
637605 })
606+
638607})
0 commit comments