Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `BlockTimestampGate`: new gate that lets blocks through once a block's timestamp meets or exceeds a given `time.Time`, supporting both inclusive and exclusive gate types.
- `hub.WithLogger`: new `ForkableHub` option to set the logger used by the hub (and, by default, propagated to its inner forkable). Previously the hub always logged under the package-level `bstream` logger, making lines such as `processing block` indistinguishable across components (relayer, firehose, tier1, ...). Callers should pass their component logger.

## 2026-01-02

Expand Down
70 changes: 43 additions & 27 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
type ForkableHub struct {
*shutter.Shutter

logger *zap.Logger

forkable *forkable.Forkable

keepFinalBlocks int
Expand Down Expand Up @@ -74,33 +76,36 @@ func newForkableHub(liveSourceFactory bstream.SourceFactory, keepFinalBlocks int
}
sourceChanSize = newSize
}
zlog.Info("New forkable hub initialized",
zap.Int("source_chan_size", sourceChanSize),
zap.Int("keep_final_blocks", keepFinalBlocks),
)

hub := &ForkableHub{
Shutter: shutter.New(),
logger: zlog,
liveSourceFactory: liveSourceFactory,
keepFinalBlocks: keepFinalBlocks,
sourceChannelSize: sourceChanSize, // number of blocks that can add up before the subscriber processes them
oneBlocksStore: oneBlocksStore,
Ready: make(chan struct{}),
}

hub.forkable = forkable.New(bstream.HandlerFunc(hub.broadcastBlock),
// Apply hub-level options first so that a customized logger (via WithLogger)
// is propagated to the inner forkable created below.
for _, opt := range hubOptions {
opt(hub)
}

forkableOptions := append([]forkable.Option{
forkable.WithLogger(hub.logger),
forkable.HoldBlocksUntilLIB(),
forkable.WithKeptFinalBlocks(keepFinalBlocks),
forkable.WithFilters(bstream.StepsAllWithPartial),
)
}, extraForkableOptions...)

for _, opt := range extraForkableOptions {
opt(hub.forkable)
}
hub.forkable = forkable.New(bstream.HandlerFunc(hub.broadcastBlock), forkableOptions...)

for _, opt := range hubOptions {
opt(hub)
}
hub.logger.Info("New forkable hub initialized",
zap.Int("source_chan_size", sourceChanSize),
zap.Int("keep_final_blocks", keepFinalBlocks),
)

hub.OnTerminating(func(err error) {
for _, sub := range hub.subscribers {
Expand All @@ -125,6 +130,17 @@ func WithMaxConsecutiveUnlinkableBlocks(count int) Option {
}
}

// WithLogger sets the logger used by the hub and, by default, propagated to its
// inner forkable. When unset, the package-level "bstream" logger is used, which
// makes every hub instance log under the same identifier. Callers should pass
// their component logger (e.g. relayer, firehose, tier1) so log lines such as
// "processing block" are attributed to the right component.
func WithLogger(logger *zap.Logger) Option {
return func(h *ForkableHub) {
h.logger = logger
}
}

func (h *ForkableHub) LowestBlockNum() uint64 {
if h != nil && h.IsReady() {
return h.forkable.LowestBlockNum()
Expand All @@ -151,7 +167,7 @@ func (h *ForkableHub) GetBlockByHash(id string) (out *pbbstream.Block) {
func (h *ForkableHub) HeadInfo() (headNum uint64, headID string, headTime time.Time, libNum uint64, err error) {
if h != nil && h.IsReady() {
headNum, headID, headTime, libNum, err = h.forkable.HeadInfo()
zlog.Debug("forkable hub head info", zap.Uint64("head_num", headNum), zap.String("head_id", headID), zap.Time("head_time", headTime), zap.Uint64("lib_num", libNum))
h.logger.Debug("forkable hub head info", zap.Uint64("head_num", headNum), zap.String("head_id", headID), zap.Time("head_time", headTime), zap.Uint64("lib_num", libNum))
return
}
zlog.Debug("forkable hub not ready")
Expand Down Expand Up @@ -215,7 +231,7 @@ func (h *ForkableHub) SourceFromBlockNum(num uint64, handler bstream.Handler) (o
out = h.subscribe(handler, blocks, true)
}, false)
if err != nil {
zlog.Debug("error getting source_from_block_num", zap.Error(err))
h.logger.Debug("error getting source_from_block_num", zap.Error(err))
return nil
}
return
Expand All @@ -230,7 +246,7 @@ func (h *ForkableHub) SourceFromBlockNumWithForks(num uint64, handler bstream.Ha
out = h.subscribe(handler, blocks, withPartials)
}, true)
if err != nil {
zlog.Debug("error getting source_from_block_num", zap.Error(err))
h.logger.Debug("error getting source_from_block_num", zap.Error(err))
return nil
}
return
Expand All @@ -245,7 +261,7 @@ func (h *ForkableHub) SourceFromCursor(cursor *bstream.Cursor, handler bstream.H
out = h.subscribe(handler, blocks, true)
})
if err != nil {
zlog.Debug("error getting source_from_cursor", zap.Error(err))
h.logger.Debug("error getting source_from_cursor", zap.Error(err))
return nil
}
return
Expand All @@ -265,7 +281,7 @@ func (h *ForkableHub) SourceThroughCursor(startBlock uint64, cursor *bstream.Cur
out = h.subscribe(handler, blocks, true)
})
if err != nil {
zlog.Debug("error getting source_from_cursor", zap.Error(err))
h.logger.Debug("error getting source_from_cursor", zap.Error(err))
return nil
}
return
Expand Down Expand Up @@ -339,9 +355,9 @@ func (h *ForkableHub) Run() {

err := h.bootstrap()
if err != nil {
zlog.Warn("bootstrapping from one-block-files incomplete. Will bootstrap from incoming live blocks", zap.Error(err))
h.logger.Warn("bootstrapping from one-block-files incomplete. Will bootstrap from incoming live blocks", zap.Error(err))
} else {
zlog.Info("Hub is ready")
h.logger.Info("Hub is ready")
close(h.Ready)
}

Expand All @@ -353,11 +369,11 @@ func (h *ForkableHub) ProcessBlock(blk *pbbstream.Block, obj any) error {
return nil // we don't get ready with partial blocks...
}

zlog.Info("processing block", zap.Uint64("block_number", blk.Number), zap.String("block_Id", blk.Id), zap.Uint64("block_lib", blk.LibNum), zap.Duration("age", time.Since(blk.Time())))
h.logger.Info("processing block", zap.Uint64("block_number", blk.Number), zap.String("block_Id", blk.Id), zap.Uint64("block_lib", blk.LibNum), zap.Duration("age", time.Since(blk.Time())))

ctx := context.Background()

zlog.Debug("forkable state", zap.Uint64("forkable_LibNum", h.forkable.LowestBlockNum()), zap.Uint64("forkable_headNum", h.forkable.HeadNum()))
h.logger.Debug("forkable state", zap.Uint64("forkable_LibNum", h.forkable.LowestBlockNum()), zap.Uint64("forkable_headNum", h.forkable.HeadNum()))

if h.forkable.ForkDBHasLib() && blk.Number < h.forkable.LowestBlockNum() {
// Block is older than the current LIBNum; nothing useful to do.
Expand All @@ -374,7 +390,7 @@ func (h *ForkableHub) ProcessBlock(blk *pbbstream.Block, obj any) error {
if !h.forkable.Linkable(blk) {
if h.maxConsecutiveUnlinkableBlocks != 0 {
h.consecutiveUnlinkableBlocks++
zlog.Warn("block not linkable after one-block lookup",
h.logger.Warn("block not linkable after one-block lookup",
zap.Uint64("block_num", blk.Number),
zap.Int("consecutive_unlinkable", h.consecutiveUnlinkableBlocks),
zap.Int("max_consecutive_unlinkable", h.maxConsecutiveUnlinkableBlocks),
Expand All @@ -386,7 +402,7 @@ func (h *ForkableHub) ProcessBlock(blk *pbbstream.Block, obj any) error {
} else { // linkable
h.consecutiveUnlinkableBlocks = 0
if !h.IsReady() {
zlog.Info("Hub is ready")
h.logger.Info("Hub is ready")
close(h.Ready)
}
}
Expand All @@ -405,15 +421,15 @@ func (h *ForkableHub) linkLiveUsingOneBlocks(ctx context.Context, blk *pbbstream
lastKnownLib = blk.LibNum
}

zlog.Debug("linking live block using one blocks", zap.Uint64("processed_block", blk.Number), zap.Uint64("last_know_lib", lastKnownLib))
h.logger.Debug("linking live block using one blocks", zap.Uint64("processed_block", blk.Number), zap.Uint64("last_know_lib", lastKnownLib))

sortedOneBlocksFiles, err := h.WalkOneBlocksStoreFrom(ctx, lastKnownLib)
if err != nil {
return fmt.Errorf("walking through one blocks files: %w", err)
}

if len(sortedOneBlocksFiles) == 0 {
zlog.Warn("no one blocks found while trying to link live block", zap.Uint64("processed_block", blk.Number))
h.logger.Warn("no one blocks found while trying to link live block", zap.Uint64("processed_block", blk.Number))
return nil
}

Expand Down Expand Up @@ -493,7 +509,7 @@ func decodeOneBlockFromFilename(ctx context.Context, filename string, store dsto

// Notes: that function is called by the forkable when a block is processed
func (h *ForkableHub) broadcastBlock(blk *pbbstream.Block, obj any) error {
zlog.Debug("process_block", zap.Stringer("blk", blk.AsRef()), zap.Any("obj", obj.(*forkable.ForkableObject).Step()))
h.logger.Debug("process_block", zap.Stringer("blk", blk.AsRef()), zap.Any("obj", obj.(*forkable.ForkableObject).Step()))

// broadcastBlock is called on LIVE blocks only
if liveable, ok := obj.(bstream.Liveable); ok {
Expand Down Expand Up @@ -521,7 +537,7 @@ func (h *ForkableHub) reconnect(err error) {
return
}

zlog.Info("reconnecting hub after disconnection. expecting to reconnect",
h.logger.Info("reconnecting hub after disconnection. expecting to reconnect",
zap.Error(err))

liveSource := h.liveSourceFactory(h)
Expand Down
2 changes: 2 additions & 0 deletions hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func TestForkableHub_SourceFromCursor(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fh := &ForkableHub{
Shutter: shutter.New(),
logger: zlog,
}
fh.forkable = forkable.New(bstream.HandlerFunc(fh.broadcastBlock),
forkable.HoldBlocksUntilLIB(),
Expand Down Expand Up @@ -938,6 +939,7 @@ func TestForkableHub_SourceThroughCursor(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fh := &ForkableHub{
Shutter: shutter.New(),
logger: zlog,
}
fh.forkable = forkable.New(bstream.HandlerFunc(fh.broadcastBlock),
forkable.HoldBlocksUntilLIB(),
Expand Down
Loading