Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ads/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ const (
)

// A SubscriptionHandler will receive notifications for the cache entries it has subscribed to using
// RawCache.Subscribe. Note that it is imperative that implementations be hashable as it will be
// [Cache.Subscribe]. Note that it is imperative that implementations be hashable as it will be
// stored as the key to a map (unhashable types include slices and functions).
type SubscriptionHandler[T proto.Message] interface {
// Notify is invoked when the given entry is modified. A deletion is denoted with a nil resource. The given time
Expand Down
74 changes: 15 additions & 59 deletions cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package diderot

import (
"fmt"
"iter"
"sync"
"time"
Expand All @@ -16,14 +15,19 @@ import (
// For example, it can be used to store the set of "envoy.config.listener.v3.Listener" available to
// clients.
type Cache[T proto.Message] interface {
RawCache
// Type returns the corresponding [Type] for this cache.
Type() Type
// EntryNames returns an [iter.Seq] that will iterate over all the current entry names in the cache.
EntryNames() iter.Seq[string]
// EstimateSubscriptionSize estimates the number of resources targeted by the given list of
// subscriptions. This is only an estimation since the resource count is dynamic, and repeated
// invocations of this function with the same parameters may not yield the same results.
EstimateSubscriptionSize(resourceNamesSubscribe []string) int
// Set stores the given resource in the cache. If the resource name corresponds to a resource URN, it
// will also be stored in the corresponding glob collection (see [TP1 proposal] for additional
// details on the format). See Subscribe for more details on how the resources added by this method
// can be subscribed to. Invoking Set whenever possible is preferred to RawCache.SetRaw, since it can
// return an error if the given resource's type does not match the expected type while Set validates
// at compile time that the given value matches the desired type. A zero [time.Time] can be used to
// represent that the time at which the resource was created or modified is unknown (or ignored).
// can be subscribed to. A zero [time.Time] can be used to represent that the time at which the
// resource was created or modified is unknown (or ignored).
//
// WARNING: It is imperative that the Resource and the underlying [proto.Message] not be modified
// after insertion! This resource will be read by subscribers to the cache and callers of Get, and
Expand All @@ -38,6 +42,11 @@ type Cache[T proto.Message] interface {
SetResource(r *ads.Resource[T], modifiedAt time.Time)
// Get fetches the entry, or nil if it's not present and/or has been deleted.
Get(name string) *ads.Resource[T]
// Clear clears the entry (if present) and notifies all subscribers that the entry has been deleted.
// A zero [time.Time] can be used to represent that the time at which the resource was cleared is
// unknown (or ignored). For example, when watching a directory, the filesystem does not keep track
// of when the file was deleted.
Clear(name string, clearedAt time.Time)
// IsSubscribedTo checks whether the given handler is subscribed to the given named entry.
IsSubscribedTo(name string, handler ads.SubscriptionHandler[T]) bool
// Subscribe registers the handler as a subscriber of the given named resource. The handler is always
Expand Down Expand Up @@ -107,35 +116,6 @@ type Cache[T proto.Message] interface {
Unsubscribe(name string, handler ads.SubscriptionHandler[T])
}

// RawCache is a subset of the [Cache] interface and provides a number of methods to interact with
// the [Cache] without needing to know the underlying resource type at compile time. All RawCache
// implementations *must* also implement [Cache] for the underlying resource type.
type RawCache interface {
// Type returns the corresponding [Type] for this cache.
Type() Type
// EntryNames returns an [iter.Seq] that will iterate over all the current entry names in the cache.
EntryNames() iter.Seq[string]
// GetRaw is the untyped equivalent of Cache.Get. There are uses for this method, but the preferred
// way is to use Cache.Get because this function incurs the cost of marshaling the resource. Returns
// an error if the resource cannot be marshaled.
GetRaw(name string) (*ads.RawResource, error)
// SetRaw is the untyped equivalent of Cache.Set. There are uses for this method, but the preferred
// way is to use Cache.Set since it offers a typed API instead of the untyped ads.RawResource parameter.
// Subscribers will be notified of the new version of this resource. See Cache.Set for additional
// details on how the resources are stored. Returns an error if the given resource's type URL does
// not match the expected type URL, or the resource cannot be unmarshaled.
SetRaw(r *ads.RawResource, modifiedAt time.Time) error
// Clear clears the entry (if present) and notifies all subscribers that the entry has been deleted.
// A zero [time.Time] can be used to represent that the time at which the resource was cleared is
// unknown (or ignored). For example, when watching a directory, the filesystem does not keep track
// of when the file was deleted.
Clear(name string, clearedAt time.Time)
// EstimateSubscriptionSize estimates the number of resources targeted by the given list of
// subscriptions. This is only an estimation since the resource count is dynamic, and repeated
// invocations of this function with the same parameters may not yield the same results.
EstimateSubscriptionSize(resourceNamesSubscribe []string) int
}

// NewCache returns a simple Cache with only 1 priority (see NewPrioritizedCache).
func NewCache[T proto.Message]() Cache[T] {
return NewPrioritizedCache[T](1)[0]
Expand Down Expand Up @@ -379,14 +359,6 @@ func (c *cache[T]) Get(name string) (r *ads.Resource[T]) {
return r
}

func (c *cache[T]) GetRaw(name string) (*ads.RawResource, error) {
r := c.Get(name)
if r == nil {
return nil, nil
}
return r.Marshal()
}

func (c *cache[T]) EntryNames() iter.Seq[string] {
return func(yield func(string) bool) {
c.resources.Range()(func(k string, v *internal.WatchableValue[T]) bool {
Expand Down Expand Up @@ -448,19 +420,3 @@ func (c *cacheWithPriority[T]) SetResource(r *ads.Resource[T], modifiedAt time.T
v.Set(c.p, r, modifiedAt)
})
}

func (c *cacheWithPriority[T]) SetRaw(raw *ads.RawResource, modifiedAt time.Time) error {
// Ensure that the given resource's type URL is correct.
if u := raw.GetResource().GetTypeUrl(); u != c.typeReference.URL() {
return fmt.Errorf("diderot: invalid type URL, expected %q got %q", c.typeReference, u)
}

r, err := ads.UnmarshalRawResource[T](raw)
if err != nil {
return err
}

c.SetResource(r, modifiedAt)

return nil
}
14 changes: 7 additions & 7 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func TestCacheCollections(t *testing.T) {
// subscribers are wrapped in a wrappedHandler, which is then used as the key in the subscriber map.
// It's important to check that subscribing then unsubscribing works as expected.
func TestCacheRaw(t *testing.T) {
c := diderot.RawCache(newCache())
c := diderot.ToRawCache(newCache())
r := newResource(name1, "42")

ch := make(chan *ads.RawResource, 1)
Expand All @@ -521,16 +521,16 @@ func TestCacheRaw(t *testing.T) {
},
)

diderot.Subscribe(c, name1, h)
c.Subscribe(name1, h)
<-ch
require.NoError(t, c.SetRaw(testutils.MustMarshal(t, r), noTime))
raw, err := c.GetRaw(r.Name)
require.NoError(t, c.Set(testutils.MustMarshal(t, r), noTime))
raw, err := c.Get(r.Name)
require.NoError(t, err)
require.Same(t, testutils.MustMarshal(t, r), raw)
<-ch
c.Clear(name1, noTime)
<-ch
diderot.Unsubscribe(c, name1, h)
c.Unsubscribe(name1, h)
select {
case raw := <-ch:
require.Fail(t, "Received unexpected update after unsubscription", raw)
Expand Down Expand Up @@ -847,7 +847,7 @@ func TestGlobRace(t *testing.T) {

const (
entries = 100
writers = 100
writers = 10
count = 100
readers = 100

Expand All @@ -862,7 +862,7 @@ func TestGlobRace(t *testing.T) {

var writesDone, readsDone sync.WaitGroup
writesDone.Add(writers)
readsDone.Add(writers * readers)
readsDone.Add(entries * readers)

for range readers {
h := testutils.NewSubscriptionHandler(func(name string, r *ads.Resource[*Timestamp], _ ads.SubscriptionMetadata) {
Expand Down
6 changes: 6 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func Watch[T proto.Message](c *ADSClient, name string, watcher Watcher[T]) {
}
}

// Watch is the equivalent of the top-level [Watch] function, except that it can be used to watch
// resources without knowing the hard type [T] at runtime. Useful when writing type-agnostic code.
func (c *ADSClient) Watch(t Type, name string, watcher Watcher[proto.Message]) {
t.watch(c, name, watcher)
}

// getResourceHandler gets or initializes the [internal.ResourceHandler] for the specified type in
// the given client.
func getResourceHandler[T proto.Message](c *ADSClient) *internal.ResourceHandler[T] {
Expand Down
6 changes: 3 additions & 3 deletions examples/quickstart/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ func (sl SimpleResourceLocator) Subscribe(
// Do nothing if the given type is not supported
return func() {}
}
diderot.Subscribe(c, resourceName, handler)
c.Subscribe(resourceName, handler)
return func() {
diderot.Unsubscribe(c, resourceName, handler)
c.Unsubscribe(resourceName, handler)
}
}

// getCache extracts a typed [diderot.Cache] from the given [SimpleResourceLocator].
func getCache[T proto.Message](sl SimpleResourceLocator) diderot.Cache[T] {
return sl[diderot.TypeOf[T]().URL()].(diderot.Cache[T])
return diderot.MustUnwrapRawCache[T](sl[diderot.TypeOf[T]().URL()])
}

func (sl SimpleResourceLocator) GetListenerCache() diderot.Cache[*ads.Listener] {
Expand Down
2 changes: 1 addition & 1 deletion internal/cache/subscription_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package internal
type subscriptionType byte

// The following subscriptionType constants define the ways a client can subscribe to a resource. See
// RawCache.Subscribe for additional details.
// [Cache.Subscribe] for additional details.
const (
// An ExplicitSubscription means the client subscribed to a resource by explicit providing its name.
ExplicitSubscription = subscriptionType(iota)
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func WithControlPlane(controlPlane *corev3.ControlPlane) ADSServerOption {
// estimator will not be invoked, as it may result in pre-allocating a very large map that will likely
// not be fully utilized.
//
// For convenience, this is trivially implemented by [RawCache.EstimateSubscriptionSize].
// For convenience, this is trivially implemented by [Cache.EstimateSubscriptionSize].
//
// [initial resource versions]: https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/discovery/v3/discovery.proto#service-discovery-v3-deltadiscoveryrequest
type SendBufferSizeEstimator interface {
Expand Down
18 changes: 9 additions & 9 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func (tl *testLocator) Subscribe(
handler ads.RawSubscriptionHandler,
) (unsubscribe func()) {
c := tl.caches[typeURL]
Subscribe(c, resourceName, handler)
c.Subscribe(resourceName, handler)
return func() {
Unsubscribe(c, resourceName, handler)
c.Unsubscribe(resourceName, handler)
}
}

Expand All @@ -118,7 +118,7 @@ func newTestLocator(t *testing.T, node *ads.Node, types ...Type) *testLocator {
}

func getCache[T proto.Message](tl *testLocator) Cache[T] {
return tl.caches[TypeOf[T]().URL()].(Cache[T])
return MustUnwrapRawCache[T](tl.caches[TypeOf[T]().URL()])
}

type mockSizeEstimator struct {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestEndToEnd(t *testing.T) {
for _, r := range resources {
c, ok := locator.caches[r.Resource.TypeUrl]
require.Truef(t, ok, "Unknown type loaded from test config %q: %+v", r.Resource.TypeUrl, r)
require.NoError(t, c.SetRaw(r, time.Now()))
require.NoError(t, c.Set(r, time.Now()))
}

addr := ts.Addr().(*net.TCPAddr)
Expand Down Expand Up @@ -733,12 +733,12 @@ func TestSubscriptionManagerSubscriptions(t *testing.T) {
)
checkSubs := func(t *testing.T, c RawCache, h ads.RawSubscriptionHandler, wildcard, r1Sub, r2Sub bool) {
t.Helper()
require.Equal(t, wildcard, IsSubscribedTo(c, ads.WildcardSubscription, h), "wildcard")
require.Equal(t, r1Sub, IsSubscribedTo(c, r1, h), r1)
require.Equal(t, r2Sub, IsSubscribedTo(c, r2, h), r2)
require.Equal(t, wildcard, c.IsSubscribedTo(ads.WildcardSubscription, h), "wildcard")
require.Equal(t, r1Sub, c.IsSubscribedTo(r1, h), r1)
require.Equal(t, r2Sub, c.IsSubscribedTo(r2, h), r2)
}

newCacheAndHandler := func(t *testing.T) (Cache[*wrapperspb.BoolValue], ResourceLocator, *simpleBatchHandler) {
newCacheAndHandler := func(t *testing.T) (RawCache, ResourceLocator, *simpleBatchHandler) {
tl := newTestLocator(t, nil, TypeOf[*wrapperspb.BoolValue]())
c := getCache[*wrapperspb.BoolValue](tl)
expected := ads.NewResource(r1, "0", wrapperspb.Bool(true))
Expand All @@ -758,7 +758,7 @@ func TestSubscriptionManagerSubscriptions(t *testing.T) {
},
}

return c, tl, h
return ToRawCache(c), tl, h
}

for _, streamType := range []ads.StreamType{ads.DeltaStreamType, ads.SotWStreamType} {
Expand Down
Loading