diff --git a/sqlitestore.go b/sqlitestore.go index 3669a45..5589bc4 100644 --- a/sqlitestore.go +++ b/sqlitestore.go @@ -29,8 +29,11 @@ var ( metricOperationStarted = metrics.NewRegisteredCounter("arkiv_store/operations_started", nil) metricOperationSuccessful = metrics.NewRegisteredCounter("arkiv_store/operations_successful", nil) metricCreates = metrics.NewRegisteredCounter("arkiv_store/creates", nil) + metricCreatesBytes = metrics.NewRegisteredCounter("arkiv_store/creates_bytes", nil) metricUpdates = metrics.NewRegisteredCounter("arkiv_store/updates", nil) + metricUpdatesBytes = metrics.NewRegisteredCounter("arkiv_store/updates_bytes", nil) metricDeletes = metrics.NewRegisteredCounter("arkiv_store/deletes", nil) + metricDeletesBytes = metrics.NewRegisteredCounter("arkiv_store/deletes_bytes", nil) metricExtends = metrics.NewRegisteredCounter("arkiv_store/extends", nil) metricOwnerChanges = metrics.NewRegisteredCounter("arkiv_store/owner_changes", nil) // Tracks operation duration (ms) using an exponential decay sample so the histogram @@ -117,8 +120,11 @@ func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) { type blockStats struct { creates int + createsBytes int updates int + updatesBytes int deletes int + deletesBytes int extends int ownerChanges int } @@ -193,6 +199,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat case operation.Create != nil: // expiresAtBlock := blockNumber + operation.Create.BTL blockStat.creates++ + blockStat.createsBytes += len(operation.Create.Content) key := operation.Create.Key stringAttributes := maps.Clone(operation.Create.StringAttributes) @@ -255,6 +262,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat continue operationLoop } blockStat.updates++ + blockStat.updatesBytes += len(operation.Update.Content) key := operation.Update.Key.Bytes() @@ -353,6 +361,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat if err != nil { return fmt.Errorf("failed to get latest payload: %w", err) } + blockStat.deletesBytes += len(latestPayload.Payload) oldStringAttributes := latestPayload.StringAttributes @@ -493,8 +502,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat // Calculate batch totals for logging and update metrics PER BLOCK var ( totalCreates int + totalCreatesBytes int totalUpdates int + totalUpdatesBytes int totalDeletes int + totalDeletesBytes int totalExtends int totalOwnerChanges int ) @@ -503,8 +515,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat for _, block := range batch.Batch.Blocks { if stat, ok := stats[block.Number]; ok { totalCreates += stat.creates + totalCreatesBytes += stat.createsBytes totalUpdates += stat.updates + totalUpdatesBytes += stat.updatesBytes totalDeletes += stat.deletes + totalDeletesBytes += stat.deletesBytes totalExtends += stat.extends totalOwnerChanges += stat.ownerChanges @@ -512,12 +527,21 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat if stat.creates > 0 { metricCreates.Inc(int64(stat.creates)) } + if stat.createsBytes > 0 { + metricCreatesBytes.Inc(int64(stat.createsBytes)) + } if stat.updates > 0 { metricUpdates.Inc(int64(stat.updates)) } + if stat.updatesBytes > 0 { + metricUpdatesBytes.Inc(int64(stat.updatesBytes)) + } if stat.deletes > 0 { metricDeletes.Inc(int64(stat.deletes)) } + if stat.deletesBytes > 0 { + metricDeletesBytes.Inc(int64(stat.deletesBytes)) + } if stat.extends > 0 { metricExtends.Inc(int64(stat.extends)) } @@ -535,8 +559,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat "lastBlock", lastBlock, "processingTime", time.Since(startTime).Milliseconds(), "creates", totalCreates, + "createsBytes", totalCreatesBytes, "updates", totalUpdates, + "updatesBytes", totalUpdatesBytes, "deletes", totalDeletes, + "deletesBytes", totalDeletesBytes, "extends", totalExtends, "ownerChanges", totalOwnerChanges) diff --git a/sqlitestore_test.go b/sqlitestore_test.go index bf0c8f0..b48320a 100644 --- a/sqlitestore_test.go +++ b/sqlitestore_test.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -19,6 +20,11 @@ import ( "github.com/Arkiv-Network/sqlite-bitmap-store/store" ) +func counterValue(name string) int64 { + counter := metrics.GetOrRegisterCounter(name, nil) + return counter.Snapshot().Count() +} + var _ = Describe("SQLiteStore", func() { var ( sqlStore *sqlitebitmapstore.SQLiteStore @@ -436,6 +442,108 @@ var _ = Describe("SQLiteStore", func() { }) }) + Describe("FollowEvents byte metrics", func() { + It("should track create, update, and delete payload sizes in bytes", func() { + key := common.HexToHash("0xabababababababababababababababababababababababababababababababab") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + createContent := []byte("abc") + updateContent := []byte("abcdef") + deletedContentSize := int64(len(updateContent)) + + createsBefore := counterValue("arkiv_store/creates_bytes") + updatesBefore := counterValue("arkiv_store/updates_bytes") + deletesBefore := counterValue("arkiv_store/deletes_bytes") + + createIterator := pusher.NewPushIterator() + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: createContent, + StringAttributes: map[string]string{"status": "created"}, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + }) + createIterator.Close() + }() + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + updateIterator := pusher.NewPushIterator() + go func() { + defer GinkgoRecover() + updateIterator.Push(ctx, events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 101, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Update: &events.OPUpdate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: updateContent, + StringAttributes: map[string]string{"status": "updated"}, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + }) + updateIterator.Close() + }() + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + deleteIterator := pusher.NewPushIterator() + deleteKey := events.OPDelete(key) + go func() { + defer GinkgoRecover() + deleteIterator.Push(ctx, events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 102, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Delete: &deleteKey, + }, + }, + }, + }, + }) + deleteIterator.Close() + }() + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(deleteIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + Expect(counterValue("arkiv_store/creates_bytes") - createsBefore).To(Equal(int64(len(createContent)))) + Expect(counterValue("arkiv_store/updates_bytes") - updatesBefore).To(Equal(int64(len(updateContent)))) + Expect(counterValue("arkiv_store/deletes_bytes") - deletesBefore).To(Equal(deletedContentSize)) + }) + }) + Describe("FollowEvents Expire operation", func() { It("should expire payload and remove bitmap indexes", func() { key := common.HexToHash("0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")