diff --git a/CHANGELOG.md b/CHANGELOG.md index 3aa7d66..172165d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `BlockTimestampGate`: new gate that lets blocks through once a block's timestamp meets or exceeds a given `time.Time`, supporting both inclusive and exclusive gate types. +- `hub.WithLogger`: new `ForkableHub` option to set the logger used by the hub (and, by default, propagated to its inner forkable). Previously the hub always logged under the package-level `bstream` logger, making lines such as `processing block` indistinguishable across components (relayer, firehose, tier1, ...). Callers should pass their component logger. ## 2026-01-02 diff --git a/hub/hub.go b/hub/hub.go index a2f54ac..7b223ca 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -39,6 +39,8 @@ import ( type ForkableHub struct { *shutter.Shutter + logger *zap.Logger + forkable *forkable.Forkable keepFinalBlocks int @@ -74,13 +76,10 @@ func newForkableHub(liveSourceFactory bstream.SourceFactory, keepFinalBlocks int } sourceChanSize = newSize } - zlog.Info("New forkable hub initialized", - zap.Int("source_chan_size", sourceChanSize), - zap.Int("keep_final_blocks", keepFinalBlocks), - ) hub := &ForkableHub{ Shutter: shutter.New(), + logger: zlog, liveSourceFactory: liveSourceFactory, keepFinalBlocks: keepFinalBlocks, sourceChannelSize: sourceChanSize, // number of blocks that can add up before the subscriber processes them @@ -88,19 +87,25 @@ func newForkableHub(liveSourceFactory bstream.SourceFactory, keepFinalBlocks int Ready: make(chan struct{}), } - hub.forkable = forkable.New(bstream.HandlerFunc(hub.broadcastBlock), + // Apply hub-level options first so that a customized logger (via WithLogger) + // is propagated to the inner forkable created below. + for _, opt := range hubOptions { + opt(hub) + } + + forkableOptions := append([]forkable.Option{ + forkable.WithLogger(hub.logger), forkable.HoldBlocksUntilLIB(), forkable.WithKeptFinalBlocks(keepFinalBlocks), forkable.WithFilters(bstream.StepsAllWithPartial), - ) + }, extraForkableOptions...) - for _, opt := range extraForkableOptions { - opt(hub.forkable) - } + hub.forkable = forkable.New(bstream.HandlerFunc(hub.broadcastBlock), forkableOptions...) - for _, opt := range hubOptions { - opt(hub) - } + hub.logger.Info("New forkable hub initialized", + zap.Int("source_chan_size", sourceChanSize), + zap.Int("keep_final_blocks", keepFinalBlocks), + ) hub.OnTerminating(func(err error) { for _, sub := range hub.subscribers { @@ -125,6 +130,17 @@ func WithMaxConsecutiveUnlinkableBlocks(count int) Option { } } +// WithLogger sets the logger used by the hub and, by default, propagated to its +// inner forkable. When unset, the package-level "bstream" logger is used, which +// makes every hub instance log under the same identifier. Callers should pass +// their component logger (e.g. relayer, firehose, tier1) so log lines such as +// "processing block" are attributed to the right component. +func WithLogger(logger *zap.Logger) Option { + return func(h *ForkableHub) { + h.logger = logger + } +} + func (h *ForkableHub) LowestBlockNum() uint64 { if h != nil && h.IsReady() { return h.forkable.LowestBlockNum() @@ -151,7 +167,7 @@ func (h *ForkableHub) GetBlockByHash(id string) (out *pbbstream.Block) { func (h *ForkableHub) HeadInfo() (headNum uint64, headID string, headTime time.Time, libNum uint64, err error) { if h != nil && h.IsReady() { headNum, headID, headTime, libNum, err = h.forkable.HeadInfo() - zlog.Debug("forkable hub head info", zap.Uint64("head_num", headNum), zap.String("head_id", headID), zap.Time("head_time", headTime), zap.Uint64("lib_num", libNum)) + h.logger.Debug("forkable hub head info", zap.Uint64("head_num", headNum), zap.String("head_id", headID), zap.Time("head_time", headTime), zap.Uint64("lib_num", libNum)) return } zlog.Debug("forkable hub not ready") @@ -215,7 +231,7 @@ func (h *ForkableHub) SourceFromBlockNum(num uint64, handler bstream.Handler) (o out = h.subscribe(handler, blocks, true) }, false) if err != nil { - zlog.Debug("error getting source_from_block_num", zap.Error(err)) + h.logger.Debug("error getting source_from_block_num", zap.Error(err)) return nil } return @@ -230,7 +246,7 @@ func (h *ForkableHub) SourceFromBlockNumWithForks(num uint64, handler bstream.Ha out = h.subscribe(handler, blocks, withPartials) }, true) if err != nil { - zlog.Debug("error getting source_from_block_num", zap.Error(err)) + h.logger.Debug("error getting source_from_block_num", zap.Error(err)) return nil } return @@ -245,7 +261,7 @@ func (h *ForkableHub) SourceFromCursor(cursor *bstream.Cursor, handler bstream.H out = h.subscribe(handler, blocks, true) }) if err != nil { - zlog.Debug("error getting source_from_cursor", zap.Error(err)) + h.logger.Debug("error getting source_from_cursor", zap.Error(err)) return nil } return @@ -265,7 +281,7 @@ func (h *ForkableHub) SourceThroughCursor(startBlock uint64, cursor *bstream.Cur out = h.subscribe(handler, blocks, true) }) if err != nil { - zlog.Debug("error getting source_from_cursor", zap.Error(err)) + h.logger.Debug("error getting source_from_cursor", zap.Error(err)) return nil } return @@ -339,9 +355,9 @@ func (h *ForkableHub) Run() { err := h.bootstrap() if err != nil { - zlog.Warn("bootstrapping from one-block-files incomplete. Will bootstrap from incoming live blocks", zap.Error(err)) + h.logger.Warn("bootstrapping from one-block-files incomplete. Will bootstrap from incoming live blocks", zap.Error(err)) } else { - zlog.Info("Hub is ready") + h.logger.Info("Hub is ready") close(h.Ready) } @@ -353,11 +369,11 @@ func (h *ForkableHub) ProcessBlock(blk *pbbstream.Block, obj any) error { return nil // we don't get ready with partial blocks... } - zlog.Info("processing block", zap.Uint64("block_number", blk.Number), zap.String("block_Id", blk.Id), zap.Uint64("block_lib", blk.LibNum), zap.Duration("age", time.Since(blk.Time()))) + h.logger.Info("processing block", zap.Uint64("block_number", blk.Number), zap.String("block_Id", blk.Id), zap.Uint64("block_lib", blk.LibNum), zap.Duration("age", time.Since(blk.Time()))) ctx := context.Background() - zlog.Debug("forkable state", zap.Uint64("forkable_LibNum", h.forkable.LowestBlockNum()), zap.Uint64("forkable_headNum", h.forkable.HeadNum())) + h.logger.Debug("forkable state", zap.Uint64("forkable_LibNum", h.forkable.LowestBlockNum()), zap.Uint64("forkable_headNum", h.forkable.HeadNum())) if h.forkable.ForkDBHasLib() && blk.Number < h.forkable.LowestBlockNum() { // Block is older than the current LIBNum; nothing useful to do. @@ -374,7 +390,7 @@ func (h *ForkableHub) ProcessBlock(blk *pbbstream.Block, obj any) error { if !h.forkable.Linkable(blk) { if h.maxConsecutiveUnlinkableBlocks != 0 { h.consecutiveUnlinkableBlocks++ - zlog.Warn("block not linkable after one-block lookup", + h.logger.Warn("block not linkable after one-block lookup", zap.Uint64("block_num", blk.Number), zap.Int("consecutive_unlinkable", h.consecutiveUnlinkableBlocks), zap.Int("max_consecutive_unlinkable", h.maxConsecutiveUnlinkableBlocks), @@ -386,7 +402,7 @@ func (h *ForkableHub) ProcessBlock(blk *pbbstream.Block, obj any) error { } else { // linkable h.consecutiveUnlinkableBlocks = 0 if !h.IsReady() { - zlog.Info("Hub is ready") + h.logger.Info("Hub is ready") close(h.Ready) } } @@ -405,7 +421,7 @@ func (h *ForkableHub) linkLiveUsingOneBlocks(ctx context.Context, blk *pbbstream lastKnownLib = blk.LibNum } - zlog.Debug("linking live block using one blocks", zap.Uint64("processed_block", blk.Number), zap.Uint64("last_know_lib", lastKnownLib)) + h.logger.Debug("linking live block using one blocks", zap.Uint64("processed_block", blk.Number), zap.Uint64("last_know_lib", lastKnownLib)) sortedOneBlocksFiles, err := h.WalkOneBlocksStoreFrom(ctx, lastKnownLib) if err != nil { @@ -413,7 +429,7 @@ func (h *ForkableHub) linkLiveUsingOneBlocks(ctx context.Context, blk *pbbstream } if len(sortedOneBlocksFiles) == 0 { - zlog.Warn("no one blocks found while trying to link live block", zap.Uint64("processed_block", blk.Number)) + h.logger.Warn("no one blocks found while trying to link live block", zap.Uint64("processed_block", blk.Number)) return nil } @@ -493,7 +509,7 @@ func decodeOneBlockFromFilename(ctx context.Context, filename string, store dsto // Notes: that function is called by the forkable when a block is processed func (h *ForkableHub) broadcastBlock(blk *pbbstream.Block, obj any) error { - zlog.Debug("process_block", zap.Stringer("blk", blk.AsRef()), zap.Any("obj", obj.(*forkable.ForkableObject).Step())) + h.logger.Debug("process_block", zap.Stringer("blk", blk.AsRef()), zap.Any("obj", obj.(*forkable.ForkableObject).Step())) // broadcastBlock is called on LIVE blocks only if liveable, ok := obj.(bstream.Liveable); ok { @@ -521,7 +537,7 @@ func (h *ForkableHub) reconnect(err error) { return } - zlog.Info("reconnecting hub after disconnection. expecting to reconnect", + h.logger.Info("reconnecting hub after disconnection. expecting to reconnect", zap.Error(err)) liveSource := h.liveSourceFactory(h) diff --git a/hub/hub_test.go b/hub/hub_test.go index 82d5307..b826fe6 100644 --- a/hub/hub_test.go +++ b/hub/hub_test.go @@ -628,6 +628,7 @@ func TestForkableHub_SourceFromCursor(t *testing.T) { t.Run(test.name, func(t *testing.T) { fh := &ForkableHub{ Shutter: shutter.New(), + logger: zlog, } fh.forkable = forkable.New(bstream.HandlerFunc(fh.broadcastBlock), forkable.HoldBlocksUntilLIB(), @@ -938,6 +939,7 @@ func TestForkableHub_SourceThroughCursor(t *testing.T) { t.Run(test.name, func(t *testing.T) { fh := &ForkableHub{ Shutter: shutter.New(), + logger: zlog, } fh.forkable = forkable.New(bstream.HandlerFunc(fh.broadcastBlock), forkable.HoldBlocksUntilLIB(),