Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 85 additions & 28 deletions chainnotifier_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,36 @@ import (
"google.golang.org/grpc"
)

// notifierOptions is a set of functional options that allow callers to further
// NotifierOptions is a set of functional options that allow callers to further
// modify the type of chain even notifications they receive.
type notifierOptions struct {
// includeBlock if true, then the dispatched confirmation notification
type NotifierOptions struct {
// IncludeBlock if true, then the dispatched confirmation notification
// will include the block that mined the transaction.
includeBlock bool
IncludeBlock bool

// reOrgChan if set, will be sent on if the transaction is re-organized
// ReOrgChan if set, will be sent on if the transaction is re-organized
// out of the chain. This channel being set will also imply that we
// don't cancel the notification listener after having received one
// confirmation event. That means the caller manually needs to cancel
// the passed in context to cancel being notified once the required
// number of confirmations have been reached.
reOrgChan chan struct{}
ReOrgChan chan struct{}
}

// defaultNotifierOptions returns the set of default options for the notifier.
func defaultNotifierOptions() *notifierOptions {
return &notifierOptions{}
func DefaultNotifierOptions() *NotifierOptions {
return &NotifierOptions{}
}

// NotifierOption is a functional option that allows a caller to modify the
// events received from the notifier.
type NotifierOption func(*notifierOptions)
type NotifierOption func(*NotifierOptions)

// WithIncludeBlock is an optional argument that allows the caller to specify
// that the block that mined a transaction should be included in the response.
func WithIncludeBlock() NotifierOption {
return func(o *notifierOptions) {
o.includeBlock = true
return func(o *NotifierOptions) {
o.IncludeBlock = true
}
}

Expand All @@ -53,8 +53,8 @@ func WithIncludeBlock() NotifierOption {
// to cancel being notified once the required number of confirmations have been
// reached.
func WithReOrgChan(reOrgChan chan struct{}) NotifierOption {
return func(o *notifierOptions) {
o.reOrgChan = reOrgChan
return func(o *NotifierOptions) {
o.ReOrgChan = reOrgChan
}
}

Expand All @@ -71,8 +71,9 @@ type ChainNotifierClient interface {
chan error, error)

RegisterSpendNtfn(ctx context.Context,
outpoint *wire.OutPoint, pkScript []byte, heightHint int32) (
chan *chainntnfs.SpendDetail, chan error, error)
outpoint *wire.OutPoint, pkScript []byte, heightHint int32,
optFuncs ...NotifierOption) (chan *chainntnfs.SpendDetail,
chan error, error)
}

type chainNotifierClient struct {
Expand Down Expand Up @@ -111,8 +112,18 @@ func (s *chainNotifierClient) RawClientWithMacAuth(
}

func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
outpoint *wire.OutPoint, pkScript []byte, heightHint int32) (
chan *chainntnfs.SpendDetail, chan error, error) {
outpoint *wire.OutPoint, pkScript []byte, heightHint int32,
optFuncs ...NotifierOption) (chan *chainntnfs.SpendDetail, chan error,
error) {

opts := DefaultNotifierOptions()
for _, optFunc := range optFuncs {
optFunc(opts)
}
if opts.IncludeBlock {
return nil, nil, fmt.Errorf("option IncludeBlock is not " +
"supported by RegisterSpendNtfn")
}

var rpcOutpoint *chainrpc.Outpoint
if outpoint != nil {
Expand Down Expand Up @@ -148,7 +159,7 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
if err != nil {
return err
}
spendChan <- &chainntnfs.SpendDetail{
spend := &chainntnfs.SpendDetail{
SpentOutPoint: &wire.OutPoint{
Hash: *outpointHash,
Index: d.SpendingOutpoint.Index,
Expand All @@ -159,7 +170,24 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
SpendingHeight: int32(d.SpendingHeight),
}

return nil
select {
case spendChan <- spend:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

processReorg := func() {
if opts.ReOrgChan == nil {
return
}

select {
case opts.ReOrgChan <- struct{}{}:
case <-ctx.Done():
return
}
}

s.wg.Add(1)
Expand All @@ -172,12 +200,35 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
return
}

c, ok := spendEvent.Event.(*chainrpc.SpendEvent_Spend)
if ok {
switch c := spendEvent.Event.(type) {
case *chainrpc.SpendEvent_Spend:
err := processSpendDetail(c.Spend)
if err != nil {
errChan <- err

return
}

// If we're running in re-org aware mode, then
// we don't return here, since we might want to
// be informed about the new block we got
// confirmed in after a re-org.
if opts.ReOrgChan == nil {
return
}

case *chainrpc.SpendEvent_Reorg:
processReorg()

// Nil event, should never happen.
case nil:
errChan <- fmt.Errorf("spend event empty")
return

// Unexpected type.
default:
errChan <- fmt.Errorf("spend event has " +
"unexpected type")
return
}
}
Expand All @@ -191,7 +242,7 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context,
optFuncs ...NotifierOption) (chan *chainntnfs.TxConfirmation,
chan error, error) {

opts := defaultNotifierOptions()
opts := DefaultNotifierOptions()
for _, optFunc := range optFuncs {
optFunc(opts)
}
Expand All @@ -206,7 +257,7 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context,
NumConfs: uint32(numConfs),
HeightHint: uint32(heightHint),
Txid: txidSlice,
IncludeBlock: opts.includeBlock,
IncludeBlock: opts.IncludeBlock,
},
)
if err != nil {
Expand Down Expand Up @@ -238,7 +289,7 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context,
}

var block *wire.MsgBlock
if opts.includeBlock {
if opts.IncludeBlock {
block, err = decodeBlock(
c.Conf.RawBlock,
)
Expand All @@ -256,29 +307,35 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context,
return
}

confChan <- &chainntnfs.TxConfirmation{
conf := &chainntnfs.TxConfirmation{
BlockHeight: c.Conf.BlockHeight,
BlockHash: blockHash,
Tx: tx,
TxIndex: c.Conf.TxIndex,
Block: block,
}

select {
case confChan <- conf:
case <-ctx.Done():
return
}

// If we're running in re-org aware mode, then
// we don't return here, since we might want to
// be informed about the new block we got
// confirmed in after a re-org.
if opts.reOrgChan == nil {
if opts.ReOrgChan == nil {
return
}

// On a re-org, we just need to signal, we don't have
// any additional information. But we only signal if the
// caller requested to be notified about re-orgs.
case *chainrpc.ConfEvent_Reorg:
if opts.reOrgChan != nil {
if opts.ReOrgChan != nil {
select {
case opts.reOrgChan <- struct{}{}:
case opts.ReOrgChan <- struct{}{}:
case <-ctx.Done():
return
}
Expand Down