Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions sqlitestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ var (
metricOperationStarted = metrics.NewRegisteredCounter("arkiv_store/operations_started", nil)
metricOperationSuccessful = metrics.NewRegisteredCounter("arkiv_store/operations_successful", nil)
metricCreates = metrics.NewRegisteredCounter("arkiv_store/creates", nil)
metricCreatesBytes = metrics.NewRegisteredCounter("arkiv_store/creates_bytes", nil)
metricUpdates = metrics.NewRegisteredCounter("arkiv_store/updates", nil)
metricUpdatesBytes = metrics.NewRegisteredCounter("arkiv_store/updates_bytes", nil)
metricDeletes = metrics.NewRegisteredCounter("arkiv_store/deletes", nil)
metricDeletesBytes = metrics.NewRegisteredCounter("arkiv_store/deletes_bytes", nil)
metricExtends = metrics.NewRegisteredCounter("arkiv_store/extends", nil)
metricOwnerChanges = metrics.NewRegisteredCounter("arkiv_store/owner_changes", nil)
// Tracks operation duration (ms) using an exponential decay sample so the histogram
Expand Down Expand Up @@ -117,8 +120,11 @@ func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) {

type blockStats struct {
creates int
createsBytes int
updates int
updatesBytes int
deletes int
deletesBytes int
extends int
ownerChanges int
}
Expand Down Expand Up @@ -193,6 +199,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
case operation.Create != nil:
// expiresAtBlock := blockNumber + operation.Create.BTL
blockStat.creates++
blockStat.createsBytes += len(operation.Create.Content)
key := operation.Create.Key

stringAttributes := maps.Clone(operation.Create.StringAttributes)
Expand Down Expand Up @@ -255,6 +262,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
continue operationLoop
}
blockStat.updates++
blockStat.updatesBytes += len(operation.Update.Content)

key := operation.Update.Key.Bytes()

Expand Down Expand Up @@ -353,6 +361,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
if err != nil {
return fmt.Errorf("failed to get latest payload: %w", err)
}
blockStat.deletesBytes += len(latestPayload.Payload)

oldStringAttributes := latestPayload.StringAttributes

Expand Down Expand Up @@ -493,8 +502,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
// Calculate batch totals for logging and update metrics PER BLOCK
var (
totalCreates int
totalCreatesBytes int
totalUpdates int
totalUpdatesBytes int
totalDeletes int
totalDeletesBytes int
totalExtends int
totalOwnerChanges int
)
Expand All @@ -503,21 +515,33 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
for _, block := range batch.Batch.Blocks {
if stat, ok := stats[block.Number]; ok {
totalCreates += stat.creates
totalCreatesBytes += stat.createsBytes
totalUpdates += stat.updates
totalUpdatesBytes += stat.updatesBytes
totalDeletes += stat.deletes
totalDeletesBytes += stat.deletesBytes
totalExtends += stat.extends
totalOwnerChanges += stat.ownerChanges

// Update metrics specifically per block
if stat.creates > 0 {
metricCreates.Inc(int64(stat.creates))
}
if stat.createsBytes > 0 {
metricCreatesBytes.Inc(int64(stat.createsBytes))
}
if stat.updates > 0 {
metricUpdates.Inc(int64(stat.updates))
}
if stat.updatesBytes > 0 {
metricUpdatesBytes.Inc(int64(stat.updatesBytes))
}
if stat.deletes > 0 {
metricDeletes.Inc(int64(stat.deletes))
}
if stat.deletesBytes > 0 {
metricDeletesBytes.Inc(int64(stat.deletesBytes))
}
if stat.extends > 0 {
metricExtends.Inc(int64(stat.extends))
}
Expand All @@ -535,8 +559,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
"lastBlock", lastBlock,
"processingTime", time.Since(startTime).Milliseconds(),
"creates", totalCreates,
"createsBytes", totalCreatesBytes,
"updates", totalUpdates,
"updatesBytes", totalUpdatesBytes,
"deletes", totalDeletes,
"deletesBytes", totalDeletesBytes,
"extends", totalExtends,
"ownerChanges", totalOwnerChanges)

Expand Down
108 changes: 108 additions & 0 deletions sqlitestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/metrics"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand All @@ -19,6 +20,11 @@ import (
"github.com/Arkiv-Network/sqlite-bitmap-store/store"
)

func counterValue(name string) int64 {
counter := metrics.GetOrRegisterCounter(name, nil)
return counter.Snapshot().Count()
}

var _ = Describe("SQLiteStore", func() {
var (
sqlStore *sqlitebitmapstore.SQLiteStore
Expand Down Expand Up @@ -436,6 +442,108 @@ var _ = Describe("SQLiteStore", func() {
})
})

Describe("FollowEvents byte metrics", func() {
It("should track create, update, and delete payload sizes in bytes", func() {
key := common.HexToHash("0xabababababababababababababababababababababababababababababababab")
owner := common.HexToAddress("0x1234567890123456789012345678901234567890")

createContent := []byte("abc")
updateContent := []byte("abcdef")
deletedContentSize := int64(len(updateContent))

createsBefore := counterValue("arkiv_store/creates_bytes")
updatesBefore := counterValue("arkiv_store/updates_bytes")
deletesBefore := counterValue("arkiv_store/deletes_bytes")

createIterator := pusher.NewPushIterator()
go func() {
defer GinkgoRecover()
createIterator.Push(ctx, events.BlockBatch{
Blocks: []events.Block{
{
Number: 100,
Operations: []events.Operation{
{
TxIndex: 0,
OpIndex: 0,
Create: &events.OPCreate{
Key: key,
ContentType: "text/plain",
BTL: 500,
Owner: owner,
Content: createContent,
StringAttributes: map[string]string{"status": "created"},
NumericAttributes: map[string]uint64{},
},
},
},
},
},
})
createIterator.Close()
}()
err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator()))
Expect(err).NotTo(HaveOccurred())

updateIterator := pusher.NewPushIterator()
go func() {
defer GinkgoRecover()
updateIterator.Push(ctx, events.BlockBatch{
Blocks: []events.Block{
{
Number: 101,
Operations: []events.Operation{
{
TxIndex: 0,
OpIndex: 0,
Update: &events.OPUpdate{
Key: key,
ContentType: "text/plain",
BTL: 500,
Owner: owner,
Content: updateContent,
StringAttributes: map[string]string{"status": "updated"},
NumericAttributes: map[string]uint64{},
},
},
},
},
},
})
updateIterator.Close()
}()
err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator()))
Expect(err).NotTo(HaveOccurred())

deleteIterator := pusher.NewPushIterator()
deleteKey := events.OPDelete(key)
go func() {
defer GinkgoRecover()
deleteIterator.Push(ctx, events.BlockBatch{
Blocks: []events.Block{
{
Number: 102,
Operations: []events.Operation{
{
TxIndex: 0,
OpIndex: 0,
Delete: &deleteKey,
},
},
},
},
})
deleteIterator.Close()
}()
err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(deleteIterator.Iterator()))
Expect(err).NotTo(HaveOccurred())

Expect(counterValue("arkiv_store/creates_bytes") - createsBefore).To(Equal(int64(len(createContent))))
Expect(counterValue("arkiv_store/updates_bytes") - updatesBefore).To(Equal(int64(len(updateContent))))
Expect(counterValue("arkiv_store/deletes_bytes") - deletesBefore).To(Equal(deletedContentSize))
})
})

Describe("FollowEvents Expire operation", func() {
It("should expire payload and remove bitmap indexes", func() {
key := common.HexToHash("0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
Expand Down