Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
76 changes: 32 additions & 44 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,54 @@ const (
// EngineTypeART
)

// matchKey is a composite key combining evalID and groupID
type matchKey struct {
evalID uuid.UUID
groupID groupID
}

var matchResultPool = sync.Pool{
New: func() any {
return &MatchResult{
Result: make(map[matchKey]int),
}
},
}

func NewMatchResult() *MatchResult {
return &MatchResult{
Result: map[uuid.UUID]map[groupID]int{},
Lock: &sync.Mutex{},
}
return matchResultPool.Get().(*MatchResult)
}

// MatchResult is a map of evaluable IDs to the groups found, and the number of elements
// found matching that group.
// MatchResult stores matches as a pooled map
type MatchResult struct {
Result map[uuid.UUID]map[groupID]int
Lock *sync.Mutex
Result map[matchKey]int // (evalID, groupID) -> count
}

// Release returns the MatchResult to the pool
func (m *MatchResult) Release() {
clear(m.Result)
matchResultPool.Put(m)
}

func (m *MatchResult) Len() int {
m.Lock.Lock()
defer m.Lock.Unlock()
return len(m.Result)
}

// AddExprs increments the matched counter for the given eval's group ID
// Add increments the count for the given (evalID, groupID)
func (m *MatchResult) Add(evalID uuid.UUID, gID groupID) {
m.Lock.Lock()
defer m.Lock.Unlock()
if _, ok := m.Result[evalID]; !ok {
m.Result[evalID] = map[groupID]int{}
}
m.Result[evalID][gID]++
m.Result[matchKey{evalID: evalID, groupID: gID}]++
}

// AddExprs increments the matched counter for each stored expression part.
// AddExprs increments counts for each expression part
func (m *MatchResult) AddExprs(exprs ...*StoredExpressionPart) {
m.Lock.Lock()
defer m.Lock.Unlock()
for _, expr := range exprs {
if _, ok := m.Result[expr.EvaluableID]; !ok {
m.Result[expr.EvaluableID] = map[groupID]int{}
}
m.Result[expr.EvaluableID][expr.GroupID]++
m.Result[matchKey{evalID: expr.EvaluableID, groupID: expr.GroupID}]++
}
}

// GroupMatches returns the total lenght of all matches for a given eval's group ID.
// GroupMatches returns the count for a given eval's group ID
func (m *MatchResult) GroupMatches(evalID uuid.UUID, gID groupID) int {
m.Lock.Lock()
defer m.Lock.Unlock()
if _, ok := m.Result[evalID]; !ok {
return 0
}
return m.Result[evalID][gID]
return m.Result[matchKey{evalID: evalID, groupID: gID}]
}

// MatchingEngine represents an engine (such as a b-tree, radix trie, or
Expand All @@ -90,9 +88,9 @@ type MatchingEngine interface {

// Add adds a new expression part to the matching engine for future matches.
Add(ctx context.Context, p ExpressionPart) error
// Remove removes an expression part from the matching engine, ensuring that the
// ExpressionPart will not be matched in the future.
Remove(ctx context.Context, p ExpressionPart) error
// Remove removes multiple expression parts in a single batch operation.
// Returns the number of parts successfully processed before any timeout/cancellation.
Remove(ctx context.Context, parts []ExpressionPart) (int, error)

// Search searches for a given variable<>value match, returning any expression
// parts that match.
Expand Down Expand Up @@ -145,16 +143,6 @@ func (p ExpressionPart) EqualsStored(n *StoredExpressionPart) bool {
return p.Hash() == n.PredicateID
}

func (p ExpressionPart) Equals(n ExpressionPart) bool {
if p.GroupID != n.GroupID {
return false
}
if p.Predicate.String() != n.Predicate.String() {
return false
}
return p.Parsed.EvaluableID == n.Parsed.EvaluableID
}

func (p ExpressionPart) ToStored() *StoredExpressionPart {
var id uuid.UUID
if p.Parsed != nil {
Expand Down
97 changes: 47 additions & 50 deletions engine_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"github.com/ohler55/ojg/jp"
)

func newNullMatcher(concurrency int64) MatchingEngine {
func newNullMatcher() MatchingEngine {
return &nullLookup{
lock: &sync.RWMutex{},
paths: map[string]struct{}{},
null: map[string][]*StoredExpressionPart{},
not: map[string][]*StoredExpressionPart{},
concurrency: concurrency,
lock: &sync.RWMutex{},
paths: map[string]struct{}{},
null: map[string][]*StoredExpressionPart{},
not: map[string][]*StoredExpressionPart{},
}
}

Expand All @@ -26,41 +25,32 @@ type nullLookup struct {

null map[string][]*StoredExpressionPart
not map[string][]*StoredExpressionPart

concurrency int64
}

func (n *nullLookup) Type() EngineType {
return EngineTypeNullMatch
}

func (n *nullLookup) Match(ctx context.Context, data map[string]any, result *MatchResult) (err error) {
pool := newErrPool(errPoolOpts{concurrency: n.concurrency})

for item := range n.paths {
path := item
pool.Go(func() error {
x, err := jp.ParseString(path)
if err != nil {
return err
}

res := x.Get(data)
if len(res) == 0 {
// This isn't present, which matches null in our overloads. Set the
// value to nil.
res = []any{nil}
}
for path := range n.paths {
x, err := jp.ParseString(path)
if err != nil {
return err
}

// XXX: This engine hasn't been updated with denied items for !=. It needs consideration
// in how to handle these cases appropriately.
n.Search(ctx, path, res[0], result)
res := x.Get(data)
if len(res) == 0 {
// This isn't present, which matches null in our overloads. Set the
// value to nil.
res = []any{nil}
}

return nil
})
// XXX: This engine hasn't been updated with denied items for !=. It needs consideration
// in how to handle these cases appropriately.
n.Search(ctx, path, res[0], result)
}

return pool.Wait()
return nil
}

func (n *nullLookup) Search(ctx context.Context, variable string, input any, result *MatchResult) {
Expand Down Expand Up @@ -103,33 +93,40 @@ func (n *nullLookup) Add(ctx context.Context, p ExpressionPart) error {
return nil
}

func (n *nullLookup) Remove(ctx context.Context, p ExpressionPart) error {
func (n *nullLookup) Remove(ctx context.Context, parts []ExpressionPart) (int, error) {
n.lock.Lock()
defer n.lock.Unlock()

coll, ok := n.not[p.Predicate.Ident]
if p.Predicate.Operator == operators.Equals {
coll, ok = n.null[p.Predicate.Ident]
}
processedCount := 0
for _, p := range parts {
// Check for context cancellation/timeout
if ctx.Err() != nil {
return processedCount, ctx.Err()
}

if !ok {
// This could not exist as there's nothing mapping this variable for
// the given event name.
return ErrExpressionPartNotFound
}
coll, ok := n.not[p.Predicate.Ident]
if p.Predicate.Operator == operators.Equals {
coll, ok = n.null[p.Predicate.Ident]
}

if !ok {
processedCount++
continue
}

// Remove the expression part from the leaf.
for i, eval := range coll {
if p.EqualsStored(eval) {
coll = append(coll[:i], coll[i+1:]...)
if p.Predicate.Operator == operators.Equals {
n.null[p.Predicate.Ident] = coll
} else {
n.not[p.Predicate.Ident] = coll
for i, eval := range coll {
if p.EqualsStored(eval) {
coll = append(coll[:i], coll[i+1:]...)
if p.Predicate.Operator == operators.Equals {
n.null[p.Predicate.Ident] = coll
} else {
n.not[p.Predicate.Ident] = coll
}
break
}
return nil
}
processedCount++
}

return ErrExpressionPartNotFound
return processedCount, nil
}
Loading