Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions sqlitestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ var (
metricOperationStarted = metrics.NewRegisteredCounter("arkiv_store/operations_started", nil)
metricOperationSuccessful = metrics.NewRegisteredCounter("arkiv_store/operations_successful", nil)
metricCreates = metrics.NewRegisteredCounter("arkiv_store/creates", nil)
metricCreatesBytes = metrics.NewRegisteredCounter("arkiv_store/creates_bytes", nil)
metricUpdates = metrics.NewRegisteredCounter("arkiv_store/updates", nil)
metricUpdatesBytes = metrics.NewRegisteredCounter("arkiv_store/updates_bytes", nil)
metricDeletes = metrics.NewRegisteredCounter("arkiv_store/deletes", nil)
metricDeletesBytes = metrics.NewRegisteredCounter("arkiv_store/deletes_bytes", nil)
metricExtends = metrics.NewRegisteredCounter("arkiv_store/extends", nil)
metricOwnerChanges = metrics.NewRegisteredCounter("arkiv_store/owner_changes", nil)
// Tracks operation duration (ms) using an exponential decay sample so the histogram
Expand Down Expand Up @@ -116,11 +119,14 @@ func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) {
}

type blockStats struct {
creates int
updates int
deletes int
extends int
ownerChanges int
creates int64
createsBytes int64
updates int64
updatesBytes int64
deletes int64
deletesBytes int64
extends int64
ownerChanges int64
}

func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.BatchIterator) error {
Expand Down Expand Up @@ -193,6 +199,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
case operation.Create != nil:
// expiresAtBlock := blockNumber + operation.Create.BTL
blockStat.creates++
blockStat.createsBytes += int64(len(operation.Create.Content))
key := operation.Create.Key

stringAttributes := maps.Clone(operation.Create.StringAttributes)
Expand Down Expand Up @@ -255,6 +262,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
continue operationLoop
}
blockStat.updates++
blockStat.updatesBytes += int64(len(operation.Update.Content))

key := operation.Update.Key.Bytes()

Expand Down Expand Up @@ -353,6 +361,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
if err != nil {
return fmt.Errorf("failed to get latest payload: %w", err)
}
blockStat.deletesBytes += int64(len(latestPayload.Payload))

oldStringAttributes := latestPayload.StringAttributes

Expand Down Expand Up @@ -492,37 +501,52 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat

// Calculate batch totals for logging and update metrics PER BLOCK
var (
totalCreates int
totalUpdates int
totalDeletes int
totalExtends int
totalOwnerChanges int
totalCreates int64
totalCreatesBytes int64
totalUpdates int64
totalUpdatesBytes int64
totalDeletes int64
totalDeletesBytes int64
totalExtends int64
totalOwnerChanges int64
)

// Iterate blocks again to preserve order and update metrics per block
for _, block := range batch.Batch.Blocks {
if stat, ok := stats[block.Number]; ok {
totalCreates += stat.creates
totalCreatesBytes += stat.createsBytes
totalUpdates += stat.updates
totalUpdatesBytes += stat.updatesBytes
totalDeletes += stat.deletes
totalDeletesBytes += stat.deletesBytes
totalExtends += stat.extends
totalOwnerChanges += stat.ownerChanges

// Update metrics specifically per block
if stat.creates > 0 {
metricCreates.Inc(int64(stat.creates))
metricCreates.Inc(stat.creates)
}
if stat.createsBytes > 0 {
metricCreatesBytes.Inc(stat.createsBytes)
}
if stat.updates > 0 {
metricUpdates.Inc(int64(stat.updates))
metricUpdates.Inc(stat.updates)
}
if stat.updatesBytes > 0 {
metricUpdatesBytes.Inc(stat.updatesBytes)
}
if stat.deletes > 0 {
metricDeletes.Inc(int64(stat.deletes))
metricDeletes.Inc(stat.deletes)
}
if stat.deletesBytes > 0 {
metricDeletesBytes.Inc(stat.deletesBytes)
}
if stat.extends > 0 {
metricExtends.Inc(int64(stat.extends))
metricExtends.Inc(stat.extends)
}
if stat.ownerChanges > 0 {
metricOwnerChanges.Inc(int64(stat.ownerChanges))
metricOwnerChanges.Inc(stat.ownerChanges)
}
}
}
Expand All @@ -535,8 +559,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
"lastBlock", lastBlock,
"processingTime", time.Since(startTime).Milliseconds(),
"creates", totalCreates,
"createsBytes", totalCreatesBytes,
"updates", totalUpdates,
"updatesBytes", totalUpdatesBytes,
"deletes", totalDeletes,
"deletesBytes", totalDeletesBytes,
"extends", totalExtends,
"ownerChanges", totalOwnerChanges)

Expand Down