diff --git a/sqlitestore.go b/sqlitestore.go index 3669a45..ca631b7 100644 --- a/sqlitestore.go +++ b/sqlitestore.go @@ -29,8 +29,11 @@ var ( metricOperationStarted = metrics.NewRegisteredCounter("arkiv_store/operations_started", nil) metricOperationSuccessful = metrics.NewRegisteredCounter("arkiv_store/operations_successful", nil) metricCreates = metrics.NewRegisteredCounter("arkiv_store/creates", nil) + metricCreatesBytes = metrics.NewRegisteredCounter("arkiv_store/creates_bytes", nil) metricUpdates = metrics.NewRegisteredCounter("arkiv_store/updates", nil) + metricUpdatesBytes = metrics.NewRegisteredCounter("arkiv_store/updates_bytes", nil) metricDeletes = metrics.NewRegisteredCounter("arkiv_store/deletes", nil) + metricDeletesBytes = metrics.NewRegisteredCounter("arkiv_store/deletes_bytes", nil) metricExtends = metrics.NewRegisteredCounter("arkiv_store/extends", nil) metricOwnerChanges = metrics.NewRegisteredCounter("arkiv_store/owner_changes", nil) // Tracks operation duration (ms) using an exponential decay sample so the histogram @@ -116,11 +119,14 @@ func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) { } type blockStats struct { - creates int - updates int - deletes int - extends int - ownerChanges int + creates int64 + createsBytes int64 + updates int64 + updatesBytes int64 + deletes int64 + deletesBytes int64 + extends int64 + ownerChanges int64 } func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.BatchIterator) error { @@ -193,6 +199,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat case operation.Create != nil: // expiresAtBlock := blockNumber + operation.Create.BTL blockStat.creates++ + blockStat.createsBytes += int64(len(operation.Create.Content)) key := operation.Create.Key stringAttributes := maps.Clone(operation.Create.StringAttributes) @@ -255,6 +262,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat continue operationLoop } blockStat.updates++ + blockStat.updatesBytes += int64(len(operation.Update.Content)) key := operation.Update.Key.Bytes() @@ -353,6 +361,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat if err != nil { return fmt.Errorf("failed to get latest payload: %w", err) } + blockStat.deletesBytes += int64(len(latestPayload.Payload)) oldStringAttributes := latestPayload.StringAttributes @@ -492,37 +501,52 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat // Calculate batch totals for logging and update metrics PER BLOCK var ( - totalCreates int - totalUpdates int - totalDeletes int - totalExtends int - totalOwnerChanges int + totalCreates int64 + totalCreatesBytes int64 + totalUpdates int64 + totalUpdatesBytes int64 + totalDeletes int64 + totalDeletesBytes int64 + totalExtends int64 + totalOwnerChanges int64 ) // Iterate blocks again to preserve order and update metrics per block for _, block := range batch.Batch.Blocks { if stat, ok := stats[block.Number]; ok { totalCreates += stat.creates + totalCreatesBytes += stat.createsBytes totalUpdates += stat.updates + totalUpdatesBytes += stat.updatesBytes totalDeletes += stat.deletes + totalDeletesBytes += stat.deletesBytes totalExtends += stat.extends totalOwnerChanges += stat.ownerChanges // Update metrics specifically per block if stat.creates > 0 { - metricCreates.Inc(int64(stat.creates)) + metricCreates.Inc(stat.creates) + } + if stat.createsBytes > 0 { + metricCreatesBytes.Inc(stat.createsBytes) } if stat.updates > 0 { - metricUpdates.Inc(int64(stat.updates)) + metricUpdates.Inc(stat.updates) + } + if stat.updatesBytes > 0 { + metricUpdatesBytes.Inc(stat.updatesBytes) } if stat.deletes > 0 { - metricDeletes.Inc(int64(stat.deletes)) + metricDeletes.Inc(stat.deletes) + } + if stat.deletesBytes > 0 { + metricDeletesBytes.Inc(stat.deletesBytes) } if stat.extends > 0 { - metricExtends.Inc(int64(stat.extends)) + metricExtends.Inc(stat.extends) } if stat.ownerChanges > 0 { - metricOwnerChanges.Inc(int64(stat.ownerChanges)) + metricOwnerChanges.Inc(stat.ownerChanges) } } } @@ -535,8 +559,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat "lastBlock", lastBlock, "processingTime", time.Since(startTime).Milliseconds(), "creates", totalCreates, + "createsBytes", totalCreatesBytes, "updates", totalUpdates, + "updatesBytes", totalUpdatesBytes, "deletes", totalDeletes, + "deletesBytes", totalDeletesBytes, "extends", totalExtends, "ownerChanges", totalOwnerChanges)