From fb10f79f379d56fdc87d033d7a1c64a1094a1a37 Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Mon, 17 Mar 2025 18:38:25 -0400 Subject: [PATCH 1/8] EBP-30: Added howtos to demonstrate how to use the Go PubSub+ API with the PubSub+ cache --- howtos/how_to_use_pubsub_cache.go | 240 ++++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 howtos/how_to_use_pubsub_cache.go diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go new file mode 100644 index 0000000..4006def --- /dev/null +++ b/howtos/how_to_use_pubsub_cache.go @@ -0,0 +1,240 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "time" + + "solace.dev/go/messaging" + "solace.dev/go/messaging/pkg/solace" + "solace.dev/go/messaging/pkg/solace/config" + "solace.dev/go/messaging/pkg/solace/message" + "solace.dev/go/messaging/pkg/solace/resource" +) + +const ( + ValidCachedMessageAge int32 = 0 + ValidMaxCachedMessages int32 = 0 + ValidCacheAccessTimeout int32 = 5000 +) + +func getEnv(key, def string) string { + if val, ok := os.LookupEnv(key); ok { + return val + } + return def +} + +// HowToGetCacheRequestConfiguration - example of how to retrieve the cache request +// based on the different supported strategies +func HowToGetCacheRequestConfiguration(cacheRequestStrategy resource.CachedMessageSubscriptionStrategy) resource.CachedMessageSubscriptionRequest { + // specific the topic name + topic := "MaxMsgs3/default/notcached" + // specific the cache name + cacheName := "CacheMessages" + var cacheRequestConfig resource.CachedMessageSubscriptionRequest + + switch cacheRequestStrategy { + // For cache request with strategy AsAvailable + case resource.CacheRequestStrategyAsAvailable: + cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest( + resource.CacheRequestStrategyAsAvailable, + cacheName, + resource.TopicSubscriptionOf(topic), + ValidCacheAccessTimeout, // specific cache access timeout + ValidMaxCachedMessages, // specific max cached messages + ValidCachedMessageAge, // specific max cached message age + ) + // For cache request with strategy CachedFirst + case resource.CacheRequestStrategyCachedFirst: + cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest( + resource.CacheRequestStrategyCachedFirst, + cacheName, + resource.TopicSubscriptionOf(topic), + ValidCacheAccessTimeout, // specific cache access timeout + ValidMaxCachedMessages, // specific max cached messages + ValidCachedMessageAge, // specific max cached message age + ) + // For cache request with strategy CachedOnly + case resource.CacheRequestStrategyCachedOnly: + cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest( + resource.CacheRequestStrategyCachedOnly, + cacheName, + resource.TopicSubscriptionOf(topic), + ValidCacheAccessTimeout, // specific cache access timeout + ValidMaxCachedMessages, // specific max cached messages + ValidCachedMessageAge, // specific max cached message age + ) + // For cache request with strategy LiveCancelsCached + case resource.CacheRequestStrategyLiveCancelsCached: + cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest( + resource.CacheRequestStrategyLiveCancelsCached, + cacheName, + resource.TopicSubscriptionOf(topic), + ValidCacheAccessTimeout, // specific cache access timeout + ValidMaxCachedMessages, // specific max cached messages + ValidCachedMessageAge, // specific max cached message age + ) + } + + // Dump the cache request + fmt.Println(fmt.Sprintf("\nConstructed Cache Request Strategy: %s \n", cacheRequestConfig)) + return cacheRequestConfig +} + +// Message Handler +func CacheResponseCallback(cacheResponse solace.CacheResponse) { + // do something with the cache response + // ... + // ... + + // dump the cache response here + // How to check the cache request ID of a message + fmt.Printf("Received Cache Response; CacheRequestID %s\n", cacheResponse.GetCacheRequestID()) + + // How to check the CacheRequestOutcome of a cache response + fmt.Printf("Received Cache Response; CacheRequestOutcome %s\n", cacheResponse.GetCacheRequestOutcome()) + + // How to check the error of a cache response + fmt.Printf("Received Cache Response; Error %s\n", cacheResponse.GetError()) +} + +// HowToSendCacheRequestWithCallback - example of send the cache request and retrieve the cache response using a callback +func HowToSendCacheRequestWithCallback(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) { + // the cache request ID + cacheRequestID := message.CacheRequestID(1) + + // call the method on the direct receiver to send the cache request and retrieve the cache response using a callback + err := directReceiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, CacheResponseCallback) + + // we do not expect an error + if err != nil { + panic(err) + } + + // ... +} + +// HowToSendCacheRequestWithChannel - example of how to send the cache request and retrieve the cache response on a channel +func HowToSendCacheRequestWithChannel(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) { + // the cache request ID + cacheRequestID := message.CacheRequestID(1) + + // call the method on the direct receiver to send the cache request and retrieve the cache response on a channel + cacheResponseChannel, err := directReceiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + + // we do not expect an error + if err != nil { + panic(err) + } + + select { + // this will hold the cache response that was received on the channel + case cacheResponse := <-cacheResponseChannel: + // do something with the cache response + // ... + // ... + + // dump the cache response here + // How to check the cache request ID of a message + fmt.Printf("Received Cache Response; CacheRequestID %s\n", cacheResponse.GetCacheRequestID()) + + // How to check the CacheRequestOutcome of a cache response + fmt.Printf("Received Cache Response; CacheRequestOutcome %s\n", cacheResponse.GetCacheRequestOutcome()) + + // How to check the error of a cache response + fmt.Printf("Received Cache Response; Error %s\n", cacheResponse.GetError()) + + case <-time.After(1 * time.Second): + fmt.Printf("timed out waiting for cache response to be recieved") + } + + // ... +} + +// Code examples of how to use the Go PubSub+ cache. +func main() { + // logging.SetLogLevel(logging.LogLevelInfo) + + // Configuration parameters + brokerConfig := config.ServicePropertyMap{ + config.TransportLayerPropertyHost: getEnv("SOLACE_HOST", "tcp://localhost:55555,tcp://localhost:55554"), + config.ServicePropertyVPNName: getEnv("SOLACE_VPN", "default"), + config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD", "default"), + config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USERNAME", "default"), + config.TransportLayerPropertyReconnectionAttempts: 0, + } + + messagingService, err := messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(brokerConfig). + Build() + + if err != nil { + panic(err) + } + + // Connect to the messaging serice + if err := messagingService.Connect(); err != nil { + panic(err) + } + + fmt.Println("Connected to the broker? ", messagingService.IsConnected()) + + // Build a Direct message receivers with given topics + directReceiver, err := messagingService.CreateDirectMessageReceiverBuilder(). + // we are using an abitary value for back pressure (you can configure this based on your use case) + OnBackPressureDropOldest(100100). + Build() + + if err != nil { + panic(err) + } + + // Start Direct Message Receiver + if err := directReceiver.Start(); err != nil { + panic(err) + } + + fmt.Println("Direct Receiver running? ", directReceiver.IsRunning()) + + ///////////////////////////////////////// + // Cache EXAMPLES + ///////////////////////////////////////// + + // For cache request with strategy AsAvailable + var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyAsAvailable) + + // // For cache request with strategy CachedFirst + // var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyCachedFirst) + + // // For cache request with strategy CachedOnly + // var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyCachedOnly) + + // // For cache request with strategy LiveCancelsCached + // var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyLiveCancelsCached) + + // Send the cache request and retrieve the cache response using a callback + HowToSendCacheRequestWithCallback(directReceiver, cachedMessageSubscriptionRequest) + + // Send the cache request and retrieve the cache response using a callback + HowToSendCacheRequestWithChannel(directReceiver, cachedMessageSubscriptionRequest) + + fmt.Println("\n===Interrupt (CTR+C) to handle graceful terminaltion of the messaiging service===") + + // cleanup after the main calling function has finished execution + defer func() { + // Terminate the Direct Receiver + directReceiver.Terminate(1 * time.Second) + fmt.Println("\nDirect Receiver Terminated? ", directReceiver.IsTerminated()) + // Disconnect the Message Service + messagingService.Disconnect() + fmt.Println("Messaging Service Disconnected? ", !messagingService.IsConnected()) + }() + + // Run forever until an interrupt signal is received + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + // Block until a interrupt signal is received. + <-c +} From c482cef7878b78e6107eec7d252620b5ce0f8ff4 Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Tue, 18 Mar 2025 08:51:03 -0400 Subject: [PATCH 2/8] EBP-30: Added howtos to demonstrate how to use the Go PubSub+ API with the PubSub+ cache --- go.mod | 2 +- howtos/how_to_use_pubsub_cache.go | 35 ++++++++++++++++++++----------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 726a6bf..32b6085 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module SolaceSamples.com/PubSub+Go go 1.17 -require solace.dev/go/messaging v1.8.0 +require solace.dev/go/messaging v1.9.0 require solace.dev/go/messaging-trace/opentelemetry v1.0.0 diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go index 4006def..26d3281 100644 --- a/howtos/how_to_use_pubsub_cache.go +++ b/howtos/how_to_use_pubsub_cache.go @@ -83,14 +83,15 @@ func HowToGetCacheRequestConfiguration(cacheRequestStrategy resource.CachedMessa return cacheRequestConfig } -// Message Handler +// CacheResponseCallback - this is the callback to handle the cache response +// (used by the directReceiver.RequestCachedAsyncWithCallback()) func CacheResponseCallback(cacheResponse solace.CacheResponse) { // do something with the cache response // ... // ... // dump the cache response here - // How to check the cache request ID of a message + // How to correlate the cache request ID of a request with a received message and response fmt.Printf("Received Cache Response; CacheRequestID %s\n", cacheResponse.GetCacheRequestID()) // How to check the CacheRequestOutcome of a cache response @@ -100,9 +101,9 @@ func CacheResponseCallback(cacheResponse solace.CacheResponse) { fmt.Printf("Received Cache Response; Error %s\n", cacheResponse.GetError()) } -// HowToSendCacheRequestWithCallback - example of send the cache request and retrieve the cache response using a callback -func HowToSendCacheRequestWithCallback(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) { - // the cache request ID +// HowToSendCacheRequestAndProcessCacheResponseWithCallback - example of send the cache request and retrieve the cache response using a callback +func HowToSendCacheRequestAndProcessCacheResponseWithCallback(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) { + // submit a cache request with a specific cache request ID (in this case we use 1) cacheRequestID := message.CacheRequestID(1) // call the method on the direct receiver to send the cache request and retrieve the cache response using a callback @@ -116,9 +117,9 @@ func HowToSendCacheRequestWithCallback(directReceiver solace.DirectMessageReceiv // ... } -// HowToSendCacheRequestWithChannel - example of how to send the cache request and retrieve the cache response on a channel -func HowToSendCacheRequestWithChannel(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) { - // the cache request ID +// HowToSendCacheRequestAndProcessCacheResponseWithChannel - example of how to send the cache request and retrieve the cache response on a channel +func HowToSendCacheRequestAndProcessCacheResponseWithChannel(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) { + // submit a cache request with a specific cache request ID (in this case we use 1) cacheRequestID := message.CacheRequestID(1) // call the method on the direct receiver to send the cache request and retrieve the cache response on a channel @@ -137,7 +138,7 @@ func HowToSendCacheRequestWithChannel(directReceiver solace.DirectMessageReceive // ... // dump the cache response here - // How to check the cache request ID of a message + // How to correlate the cache request ID of a request with a received message and response fmt.Printf("Received Cache Response; CacheRequestID %s\n", cacheResponse.GetCacheRequestID()) // How to check the CacheRequestOutcome of a cache response @@ -215,18 +216,28 @@ func main() { // var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyLiveCancelsCached) // Send the cache request and retrieve the cache response using a callback - HowToSendCacheRequestWithCallback(directReceiver, cachedMessageSubscriptionRequest) + HowToSendCacheRequestAndProcessCacheResponseWithCallback(directReceiver, cachedMessageSubscriptionRequest) // Send the cache request and retrieve the cache response using a callback - HowToSendCacheRequestWithChannel(directReceiver, cachedMessageSubscriptionRequest) + HowToSendCacheRequestAndProcessCacheResponseWithChannel(directReceiver, cachedMessageSubscriptionRequest) fmt.Println("\n===Interrupt (CTR+C) to handle graceful terminaltion of the messaiging service===") // cleanup after the main calling function has finished execution defer func() { + // A graceful shutdown of the directReceiver is attempted within the specified + // grace period (in this case, we are using 1 second). + var gracePeriod time.Duration = 1 * time.Second + // Terminate the Direct Receiver - directReceiver.Terminate(1 * time.Second) + // The receiver can be terminated before the cache response has been completed, + // but this is not possible to demonstrate in this HowTo due to infrastructure limitations. + // Setting gracePeriod to 0 implies a non-graceful shutdown that ignores unfinished tasks or in-flight messages. + // This function returns an error if one occurred, or nil if it successfully and gracefully terminated. + // If gracePeriod is set to less than 0, the function waits indefinitely. + directReceiver.Terminate(gracePeriod) fmt.Println("\nDirect Receiver Terminated? ", directReceiver.IsTerminated()) + // Disconnect the Message Service messagingService.Disconnect() fmt.Println("Messaging Service Disconnected? ", !messagingService.IsConnected()) From 31d59a1bdb979e63c5162561a25eabbaa3a66ec4 Mon Sep 17 00:00:00 2001 From: Marc DiPasquale <1815312+Mrc0113@users.noreply.github.com> Date: Wed, 23 Jul 2025 12:55:21 -0400 Subject: [PATCH 3/8] Update howtos/how_to_use_pubsub_cache.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- howtos/how_to_use_pubsub_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go index 26d3281..df4679a 100644 --- a/howtos/how_to_use_pubsub_cache.go +++ b/howtos/how_to_use_pubsub_cache.go @@ -29,9 +29,9 @@ func getEnv(key, def string) string { // HowToGetCacheRequestConfiguration - example of how to retrieve the cache request // based on the different supported strategies func HowToGetCacheRequestConfiguration(cacheRequestStrategy resource.CachedMessageSubscriptionStrategy) resource.CachedMessageSubscriptionRequest { - // specific the topic name + // specify the topic name topic := "MaxMsgs3/default/notcached" - // specific the cache name + // specify the cache name cacheName := "CacheMessages" var cacheRequestConfig resource.CachedMessageSubscriptionRequest From bafdb5cb07ceb04b88b2c05d8b4878fd2c1dbce8 Mon Sep 17 00:00:00 2001 From: Marc DiPasquale <1815312+Mrc0113@users.noreply.github.com> Date: Wed, 23 Jul 2025 12:55:38 -0400 Subject: [PATCH 4/8] Update howtos/how_to_use_pubsub_cache.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- howtos/how_to_use_pubsub_cache.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go index df4679a..b0fc3aa 100644 --- a/howtos/how_to_use_pubsub_cache.go +++ b/howtos/how_to_use_pubsub_cache.go @@ -42,9 +42,9 @@ func HowToGetCacheRequestConfiguration(cacheRequestStrategy resource.CachedMessa resource.CacheRequestStrategyAsAvailable, cacheName, resource.TopicSubscriptionOf(topic), - ValidCacheAccessTimeout, // specific cache access timeout - ValidMaxCachedMessages, // specific max cached messages - ValidCachedMessageAge, // specific max cached message age + ValidCacheAccessTimeout, // specify cache access timeout + ValidMaxCachedMessages, // specify max cached messages + ValidCachedMessageAge, // specify max cached message age ) // For cache request with strategy CachedFirst case resource.CacheRequestStrategyCachedFirst: From fe8f44062b0c97d8f22e7de88efd2b65e240e5ac Mon Sep 17 00:00:00 2001 From: Marc DiPasquale <1815312+Mrc0113@users.noreply.github.com> Date: Wed, 23 Jul 2025 12:56:02 -0400 Subject: [PATCH 5/8] Update howtos/how_to_use_pubsub_cache.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- howtos/how_to_use_pubsub_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go index b0fc3aa..490875c 100644 --- a/howtos/how_to_use_pubsub_cache.go +++ b/howtos/how_to_use_pubsub_cache.go @@ -148,7 +148,7 @@ func HowToSendCacheRequestAndProcessCacheResponseWithChannel(directReceiver sola fmt.Printf("Received Cache Response; Error %s\n", cacheResponse.GetError()) case <-time.After(1 * time.Second): - fmt.Printf("timed out waiting for cache response to be recieved") + fmt.Printf("timed out waiting for cache response to be received") } // ... From 1745df28bff14db2e08c835f82cb84bf500b0121 Mon Sep 17 00:00:00 2001 From: Marc DiPasquale <1815312+Mrc0113@users.noreply.github.com> Date: Wed, 23 Jul 2025 12:56:09 -0400 Subject: [PATCH 6/8] Update howtos/how_to_use_pubsub_cache.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- howtos/how_to_use_pubsub_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go index 490875c..4abc906 100644 --- a/howtos/how_to_use_pubsub_cache.go +++ b/howtos/how_to_use_pubsub_cache.go @@ -175,7 +175,7 @@ func main() { panic(err) } - // Connect to the messaging serice + // Connect to the messaging service if err := messagingService.Connect(); err != nil { panic(err) } From 085f55e2b41af575856440f84da9a45c0bc2ecf0 Mon Sep 17 00:00:00 2001 From: Marc DiPasquale <1815312+Mrc0113@users.noreply.github.com> Date: Wed, 23 Jul 2025 12:56:19 -0400 Subject: [PATCH 7/8] Update howtos/how_to_use_pubsub_cache.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- howtos/how_to_use_pubsub_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go index 4abc906..1b0eacd 100644 --- a/howtos/how_to_use_pubsub_cache.go +++ b/howtos/how_to_use_pubsub_cache.go @@ -184,7 +184,7 @@ func main() { // Build a Direct message receivers with given topics directReceiver, err := messagingService.CreateDirectMessageReceiverBuilder(). - // we are using an abitary value for back pressure (you can configure this based on your use case) + // we are using an arbitrary value for back pressure (you can configure this based on your use case) OnBackPressureDropOldest(100100). Build() From 8016b4c985770c5247ab0338eb42c6377d754e2c Mon Sep 17 00:00:00 2001 From: Marc DiPasquale <1815312+Mrc0113@users.noreply.github.com> Date: Wed, 23 Jul 2025 12:56:27 -0400 Subject: [PATCH 8/8] Update howtos/how_to_use_pubsub_cache.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- howtos/how_to_use_pubsub_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/howtos/how_to_use_pubsub_cache.go b/howtos/how_to_use_pubsub_cache.go index 1b0eacd..4c61d4a 100644 --- a/howtos/how_to_use_pubsub_cache.go +++ b/howtos/how_to_use_pubsub_cache.go @@ -221,7 +221,7 @@ func main() { // Send the cache request and retrieve the cache response using a callback HowToSendCacheRequestAndProcessCacheResponseWithChannel(directReceiver, cachedMessageSubscriptionRequest) - fmt.Println("\n===Interrupt (CTR+C) to handle graceful terminaltion of the messaiging service===") + fmt.Println("\n===Interrupt (CTR+C) to handle graceful termination of the messaging service===") // cleanup after the main calling function has finished execution defer func() {