diff --git a/apps/bitcoin/rpc.go b/apps/bitcoin/rpc.go index 05dd0712..c59c10c5 100644 --- a/apps/bitcoin/rpc.go +++ b/apps/bitcoin/rpc.go @@ -9,10 +9,10 @@ import ( "math/big" "net/http" "sort" - "strings" "time" "github.com/MixinNetwork/mixin/logger" + "github.com/MixinNetwork/safe/util" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/txscript" "github.com/shopspring/decimal" @@ -367,15 +367,11 @@ func callBitcoinRPCUntilSufficient(rpc, method string, params []any) ([]byte, er return res, nil } logger.Printf("callBitcoinRPC(%s, %s, %v) => %v", rpc, method, params, err) - reason := strings.ToLower(err.Error()) - switch { - case strings.Contains(reason, "timeout"): - case strings.Contains(reason, "eof"): - case strings.Contains(reason, "handshake"): - default: - return res, err + if util.CheckRetryableError(err) { + time.Sleep(7 * time.Second) + continue } - time.Sleep(7 * time.Second) + return res, err } } diff --git a/apps/ethereum/account.go b/apps/ethereum/account.go index 78fbe0f7..2087ded5 100644 --- a/apps/ethereum/account.go +++ b/apps/ethereum/account.go @@ -319,6 +319,7 @@ func FetchBalanceFromKey(ctx context.Context, rpc, key string) (*big.Int, error) if err != nil { return nil, err } + defer client.Close() balance, err := client.BalanceAt(ctx, *addr, nil) if err != nil { diff --git a/apps/ethereum/rpc.go b/apps/ethereum/rpc.go index b5ed3b54..2272f828 100644 --- a/apps/ethereum/rpc.go +++ b/apps/ethereum/rpc.go @@ -16,9 +16,11 @@ import ( "github.com/MixinNetwork/mixin/logger" "github.com/MixinNetwork/safe/apps/ethereum/abi" + "github.com/MixinNetwork/safe/util" "github.com/ethereum/go-ethereum" ga "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" ) @@ -141,10 +143,10 @@ func RPCGetBlockWithTransactions(rpc string, height int64) (*RPCBlockWithTransac if err != nil { return nil, err } - var b RPCBlockWithTransactions + var b *RPCBlockWithTransactions err = json.Unmarshal(res, &b) - if err != nil { - return nil, err + if err != nil || b == nil { + return nil, fmt.Errorf("RPCGetBlockWithTransactions(%d) => Unmarshal() => %v, %v", height, err, b) } blockHeight, err := ethereumNumberToUint64(b.Number) if err != nil { @@ -154,7 +156,7 @@ func RPCGetBlockWithTransactions(rpc string, height int64) (*RPCBlockWithTransac for _, tx := range b.Tx { tx.BlockHash = b.Hash } - return &b, err + return b, err } func RPCGetGasPrice(rpc string) (*big.Int, error) { @@ -284,19 +286,14 @@ func rpcGetTokenBalanceAtBlock(rpc, address, tokenAddress string, blockNumber ui } func GetERC20TransferLogFromBlock(ctx context.Context, rpc string, chain, height int64) ([]*Transfer, error) { - client, err := ethclient.Dial(rpc) - if err != nil { - return nil, err - } - query := ethereum.FilterQuery{ - FromBlock: big.NewInt(height), - ToBlock: big.NewInt(height), - } contractAbi, err := ga.JSON(strings.NewReader(abi.AssetABI)) if err != nil { log.Fatal(err) } - logs, err := client.FilterLogs(ctx, query) + logs, err := filterLogsUntilSufficient(ctx, rpc, ethereum.FilterQuery{ + FromBlock: big.NewInt(height), + ToBlock: big.NewInt(height), + }) if err != nil { return nil, err } @@ -337,15 +334,31 @@ func callEthereumRPCUntilSufficient(rpc, method string, params []any) ([]byte, e return res, nil } logger.Printf("callEthereumRPC(%s, %s, %v) => %v", rpc, method, params, err) - reason := strings.ToLower(err.Error()) - switch { - case strings.Contains(reason, "timeout"): - case strings.Contains(reason, "eof"): - case strings.Contains(reason, "handshake"): - default: - return res, err + if util.CheckRetryableError(err) { + time.Sleep(7 * time.Second) + continue + } + return res, err + } +} + +func filterLogsUntilSufficient(ctx context.Context, rpc string, query ethereum.FilterQuery) ([]types.Log, error) { + client, err := ethclient.Dial(rpc) + if err != nil { + return nil, err + } + defer client.Close() + + for { + logs, err := client.FilterLogs(ctx, query) + if ctx.Err() != nil { + return nil, ctx.Err() + } + if util.CheckRetryableError(err) { + time.Sleep(7 * time.Second) + continue } - time.Sleep(7 * time.Second) + return logs, err } } diff --git a/apps/mixin/rpc.go b/apps/mixin/rpc.go index 7b43915b..04e918c2 100644 --- a/apps/mixin/rpc.go +++ b/apps/mixin/rpc.go @@ -7,10 +7,10 @@ import ( "fmt" "io" "net/http" - "strings" "time" "github.com/MixinNetwork/mixin/logger" + "github.com/MixinNetwork/safe/util" ) type WithdrawalData struct { @@ -69,16 +69,11 @@ func callMixinRPCUntilSufficient(rpc, method string, params []any) ([]byte, erro return res, nil } logger.Printf("callMixinRPC(%s, %s, %v) => %v", rpc, method, params, err) - reason := strings.ToLower(err.Error()) - switch { - case strings.Contains(reason, "timeout"): - case strings.Contains(reason, "eof"): - case strings.Contains(reason, "handshake"): - case strings.Contains(reason, "invalid character"): - default: - return res, err + if util.CheckRetryableError(err) { + time.Sleep(7 * time.Second) + continue } - time.Sleep(7 * time.Second) + return res, err } } diff --git a/cmd/transaction.go b/cmd/transaction.go index a9cbcb3c..54cb479f 100644 --- a/cmd/transaction.go +++ b/cmd/transaction.go @@ -1,13 +1,11 @@ package cmd import ( + "context" "encoding/base64" "encoding/hex" - "encoding/json" "fmt" - "net/http" "strings" - "time" "github.com/MixinNetwork/safe/apps/bitcoin" "github.com/MixinNetwork/safe/apps/ethereum" @@ -22,6 +20,7 @@ import ( ) func GenerateTestTransactionProposal(c *cli.Context) error { + ctx := context.Background() chain := c.Int("chain") switch chain { case common.SafeChainBitcoin: @@ -43,7 +42,7 @@ func GenerateTestTransactionProposal(c *cli.Context) error { addr := abi.GetFactoryAssetAddress("0x11EC02748116A983deeD59235302C3139D6e8cdD", common.SafeBitcoinChainId, "BTC", "Bitcoin", holder) assetKey := strings.ToLower(addr.String()) - bondId := fetchAssetId(ethereum.GenerateAssetId(common.SafeChainPolygon, assetKey)) + bondId := fetchAssetId(ctx, ethereum.GenerateAssetId(common.SafeChainPolygon, assetKey)) extra := []byte(receiver) sid := uuid.Must(uuid.NewV4()).String() @@ -94,24 +93,13 @@ func GenerateTestTransactionApproval(c *cli.Context) error { return nil } -func fetchAssetId(mixinId string) string { - client := &http.Client{Timeout: 10 * time.Second} - path := "https://api.mixin.one/network/assets/" + mixinId - resp, err := client.Get(path) - if err != nil { +func fetchAssetId(ctx context.Context, mixinId string) string { + asset, err := common.SafeReadAssetUntilSufficient(ctx, mixinId) + if err != nil || asset == nil { panic(mixinId) } - defer resp.Body.Close() - - var body struct { - Data struct { - AssetId string `json:"asset_id"` - MixinId string `json:"mixin_id"` - } `json:"data"` - } - json.NewDecoder(resp.Body).Decode(&body) - if body.Data.MixinId != mixinId { + if asset.KernelAssetID != mixinId { panic(mixinId) } - return body.Data.AssetId + return asset.AssetID } diff --git a/common/util.go b/common/util.go index e6a7de03..935c939e 100644 --- a/common/util.go +++ b/common/util.go @@ -12,7 +12,6 @@ import ( "strings" "github.com/MixinNetwork/mixin/crypto" - "github.com/MixinNetwork/safe/mtg" "github.com/MixinNetwork/safe/util" "github.com/fox-one/mixin-sdk-go/v2/mixinnet" ) @@ -79,7 +78,7 @@ func ExpandTilde(path string) string { } func CheckRetryableError(err error) bool { - return mtg.CheckRetryableError(err) || CheckTransactionLockedError(err) + return util.CheckRetryableError(err) || CheckTransactionLockedError(err) } func CheckTransactionLockedError(err error) bool { diff --git a/common/wallet.go b/common/wallet.go index a8ecaeaa..2ad2c92a 100644 --- a/common/wallet.go +++ b/common/wallet.go @@ -8,6 +8,7 @@ import ( "time" "github.com/MixinNetwork/bot-api-go-client/v3" + "github.com/MixinNetwork/mixin/logger" "github.com/fox-one/mixin-sdk-go/v2" "github.com/fox-one/mixin-sdk-go/v2/mixinnet" "github.com/shopspring/decimal" @@ -36,6 +37,10 @@ func (mw *MixinWallet) drainOutputsFromNetwork(ctx context.Context) { } members := []string{mw.client.ClientID} utxos, err := listUnspentUTXOsUntilSufficient(ctx, mw.client, members, 1, "", checkpoint) + if CheckRetryableError(err) { + logger.Verbosef("listUnspentUTXOsUntilSufficient() => %v", err) + continue + } if err != nil { panic(err) } diff --git a/keeper/network.go b/keeper/network.go index 8cb78b3f..d915997c 100644 --- a/keeper/network.go +++ b/keeper/network.go @@ -4,11 +4,8 @@ import ( "context" "encoding/binary" "encoding/hex" - "encoding/json" "fmt" "math/big" - "net/http" - "strings" "time" "github.com/MixinNetwork/mixin/crypto" @@ -18,6 +15,7 @@ import ( "github.com/MixinNetwork/safe/common" "github.com/MixinNetwork/safe/keeper/store" "github.com/MixinNetwork/safe/mtg" + "github.com/MixinNetwork/safe/util" "github.com/gofrs/uuid/v5" "github.com/shopspring/decimal" ) @@ -229,40 +227,19 @@ func (node *Node) fetchAssetMetaFromMessengerOrEthereum(ctx context.Context, id, return asset, node.store.WriteAssetMeta(ctx, asset) } -func (node *Node) fetchMixinAsset(_ context.Context, id string) (*store.Asset, error) { - client := &http.Client{Timeout: 10 * time.Second} - path := node.conf.MixinMessengerAPI + "/network/assets/" + id - resp, err := client.Get(path) - if err != nil { - return nil, err - } - defer common.CloseOrPanic(resp.Body) - - var body struct { - Data *struct { - AssetId string `json:"asset_id"` - MixinId crypto.Hash `json:"mixin_id"` - AssetKey string `json:"asset_key"` - Symbol string `json:"symbol"` - Name string `json:"name"` - Precision uint32 `json:"precision"` - ChainId string `json:"chain_id"` - } `json:"data"` - } - err = json.NewDecoder(resp.Body).Decode(&body) - if err != nil { +func (node *Node) fetchMixinAsset(ctx context.Context, id string) (*store.Asset, error) { + asset, err := common.SafeReadAssetUntilSufficient(ctx, id) + if err != nil || asset == nil { return nil, err } - asset := body.Data - return &store.Asset{ - AssetId: asset.AssetId, - MixinId: asset.MixinId.String(), + AssetId: asset.AssetID, + MixinId: asset.KernelAssetID, AssetKey: asset.AssetKey, Symbol: asset.Symbol, Name: asset.Name, - Decimals: asset.Precision, - Chain: common.SafeAssetIdChain(asset.ChainId), + Decimals: uint32(asset.Precision), + Chain: common.SafeAssetIdChainNoPanic(asset.ChainID), CreatedAt: time.Now().UTC(), }, nil } @@ -276,16 +253,15 @@ func (node *Node) fetchAssetMeta(ctx context.Context, id string) (*store.Asset, for { meta, err = node.fetchMixinAsset(ctx, id) if err == nil { + if meta == nil { + return nil, fmt.Errorf("fetchAssetMeta(%s) => nil", id) + } return meta, node.store.WriteAssetMeta(ctx, meta) } - reason := strings.ToLower(err.Error()) - switch { - case strings.Contains(reason, "timeout"): - case strings.Contains(reason, "eof"): - case strings.Contains(reason, "handshake"): - default: - return nil, err + if util.CheckRetryableError(err) { + time.Sleep(2 * time.Second) + continue } - time.Sleep(2 * time.Second) + return nil, err } } diff --git a/messenger/mixin.go b/messenger/mixin.go index 26b3affd..2bbd44eb 100644 --- a/messenger/mixin.go +++ b/messenger/mixin.go @@ -13,7 +13,7 @@ import ( "github.com/MixinNetwork/bot-api-go-client/v3" "github.com/MixinNetwork/mixin/logger" "github.com/MixinNetwork/safe/common" - "github.com/MixinNetwork/safe/mtg" + "github.com/MixinNetwork/safe/util" "github.com/fox-one/mixin-sdk-go/v2" "github.com/gofrs/uuid/v5" ) @@ -216,7 +216,7 @@ func (mm *MixinMessenger) SyncAck() bool { func (mm *MixinMessenger) sendMessagesWithoutTimeout(ctx context.Context, batch []*mixin.MessageRequest) error { for { err := mm.client.SendMessages(ctx, batch) - if err != nil && mtg.CheckRetryableError(err) { + if err != nil && util.CheckRetryableError(err) { logger.Printf("messenger.sendMessagesWithoutTimeout(retry, %d) => %v", len(batch), err) time.Sleep(3 * time.Second) continue diff --git a/mtg/deposit.go b/mtg/deposit.go index 0bea22bb..34d23685 100644 --- a/mtg/deposit.go +++ b/mtg/deposit.go @@ -8,6 +8,7 @@ import ( "time" "github.com/MixinNetwork/mixin/logger" + "github.com/MixinNetwork/safe/util" ) type DepositEntry struct { @@ -61,7 +62,7 @@ func (grp *Group) readOutputDepositUntilSufficientImpl(ctx context.Context, id s var deposit *SafeDepositView err := grp.mixin.Get(ctx, fmt.Sprintf("/safe/outputs/%s/deposit", id), nil, &deposit) logger.Verbosef("Group.readOutputDeposit(%s) => %v %v\n", id, deposit, err) - if CheckRetryableError(err) { + if util.CheckRetryableError(err) { time.Sleep(3 * time.Second) continue } diff --git a/mtg/transaction.go b/mtg/transaction.go index 51deef0b..d404c214 100644 --- a/mtg/transaction.go +++ b/mtg/transaction.go @@ -505,7 +505,7 @@ func (grp *Group) createMultisigUntilSufficient(ctx context.Context, id, raw str RawTransaction: raw, }) logger.Verbosef("Group.SafeCreateTransactionRequest(%s, %s) => %v %v\n", id, raw, req, err) - if err != nil && CheckRetryableError(err) { + if err != nil && util.CheckRetryableError(err) { time.Sleep(3 * time.Second) continue } @@ -558,7 +558,7 @@ func (grp *Group) signMultisigUntilSufficient(ctx context.Context, input *mixin. RawTransaction: signedRaw, }) logger.Verbosef("Group.SafeSignMultisigRequest(%s %s) => %v %v\n", input.RequestID, signedRaw, req, err) - if err != nil && CheckRetryableError(err) { + if err != nil && util.CheckRetryableError(err) { time.Sleep(3 * time.Second) continue } diff --git a/mtg/utils.go b/mtg/utils.go index b31a4d5f..0e39180e 100644 --- a/mtg/utils.go +++ b/mtg/utils.go @@ -25,26 +25,6 @@ func UniqueId(a, b string) string { return util.UniqueId(a, b) } -func CheckRetryableError(err error) bool { - if err == nil { - return false - } - es := err.Error() - switch { - case strings.Contains(es, "EOF"): - case strings.Contains(es, "context deadline exceeded"): - case strings.Contains(es, "connection reset by peer"): - case strings.Contains(es, "Client.Timeout exceeded"): - case strings.Contains(es, "Bad Gateway"): - case strings.Contains(es, "Internal Server Error"): - case strings.Contains(es, "invalid character '<' looking for beginning of value"): - case strings.Contains(es, "TLS handshake timeout"): - default: - return false - } - return true -} - func NewMixAddress(ctx context.Context, members []string, threshold byte) (*mixin.MixAddress, bool, error) { if len(members) == 0 || threshold == 0 { panic(len(members)) @@ -104,7 +84,7 @@ func (grp *Group) getSpendPublicKeyUntilSufficient(ctx context.Context) (string, for { me, err := grp.mixin.UserMe(ctx) logger.Verbosef("Group.UserMe() => %v\n", err) - if CheckRetryableError(err) { + if util.CheckRetryableError(err) { time.Sleep(3 * time.Second) continue } @@ -160,7 +140,7 @@ func (grp *Group) readKernelTransactionUntilSufficientImpl(ctx context.Context, } for { ver, snapshot, err := GetKernelTransaction(grp.kernelRPC, txHash) - if CheckRetryableError(err) || snapshot == "" { + if util.CheckRetryableError(err) || snapshot == "" { time.Sleep(time.Second) continue } @@ -254,7 +234,7 @@ func (grp *Group) readTransactionUntilSufficientImpl(ctx context.Context, id str if err == nil { return &req, nil } - if CheckRetryableError(err) { + if util.CheckRetryableError(err) { time.Sleep(time.Second) continue } @@ -388,7 +368,7 @@ func (grp *Group) createGhostKeysUntilSufficient(ctx context.Context, tx *Transa for { keys, err := grp.mixin.SafeCreateGhostKeys(ctx, uuidGkrs, grp.GetMembers()...) logger.Verbosef("Group.SafeCreateGhostKeys(%s) => %v %v\n", tx.TraceId, keys, err) - if CheckRetryableError(err) { + if util.CheckRetryableError(err) { time.Sleep(3 * time.Second) continue } diff --git a/observer/bond.go b/observer/bond.go index 4c7d5d7a..a32ea859 100644 --- a/observer/bond.go +++ b/observer/bond.go @@ -2,9 +2,7 @@ package observer import ( "context" - "encoding/json" "fmt" - "net/http" "strings" "time" @@ -15,6 +13,7 @@ import ( "github.com/MixinNetwork/safe/apps/ethereum" "github.com/MixinNetwork/safe/common" "github.com/MixinNetwork/safe/common/abi" + "github.com/MixinNetwork/safe/util" ) type MixinNetworkAsset struct { @@ -126,32 +125,19 @@ func (node *Node) fetchAssetMetaFromMessengerOrEthereum(ctx context.Context, id, return asset, node.store.WriteAssetMeta(ctx, asset) } -func (node *Node) fetchMixinAsset(_ context.Context, id string) (*Asset, error) { - client := &http.Client{Timeout: 10 * time.Second} - path := node.conf.MixinMessengerAPI + "/network/assets/" + id - resp, err := client.Get(path) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - var body struct { - Data *MixinNetworkAsset `json:"data"` - } - err = json.NewDecoder(resp.Body).Decode(&body) - if err != nil || body.Data == nil { +func (node *Node) fetchMixinAsset(ctx context.Context, id string) (*Asset, error) { + asset, err := common.SafeReadAssetUntilSufficient(ctx, id) + if err != nil || asset == nil { return nil, err } - asset := body.Data - return &Asset{ - AssetId: asset.AssetId, - MixinId: asset.MixinId.String(), + AssetId: asset.AssetID, + MixinId: asset.KernelAssetID, AssetKey: asset.AssetKey, Symbol: asset.Symbol, Name: asset.Name, - Decimals: asset.Precision, - Chain: common.SafeAssetIdChainNoPanic(asset.ChainId), + Decimals: uint32(asset.Precision), + Chain: common.SafeAssetIdChainNoPanic(asset.ChainID), CreatedAt: time.Now().UTC(), }, nil } @@ -164,24 +150,20 @@ func (node *Node) fetchAssetMeta(ctx context.Context, id string) (*Asset, error) for { meta, err = node.fetchMixinAsset(ctx, id) - if err == nil { - if meta == nil { - return nil, nil - } - if meta.Chain == 0 { - panic(id) - } - return meta, node.store.WriteAssetMeta(ctx, meta) + if util.CheckRetryableError(err) { + time.Sleep(2 * time.Second) + continue } - reason := strings.ToLower(err.Error()) - switch { - case strings.Contains(reason, "timeout"): - case strings.Contains(reason, "eof"): - case strings.Contains(reason, "handshake"): - default: + if err != nil { return nil, err } - time.Sleep(2 * time.Second) + if meta == nil { + return nil, nil + } + if meta.Chain == 0 { + panic(id) + } + return meta, node.store.WriteAssetMeta(ctx, meta) } } diff --git a/util/error.go b/util/error.go index a9c83ccb..8b67e42d 100644 --- a/util/error.go +++ b/util/error.go @@ -1,24 +1,54 @@ package util import ( + "context" "errors" + "io" + "net" + "slices" + "strings" "github.com/MixinNetwork/bot-api-go-client/v3" "github.com/fox-one/mixin-sdk-go/v2" ) +func CheckRetryableError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + es := strings.ToLower(err.Error()) + switch { + case strings.Contains(es, "timeout"): + case strings.Contains(es, "timed out"): + case strings.Contains(es, "handshake"): + case strings.Contains(es, "context deadline exceeded"): + case strings.Contains(es, "connection reset by peer"): + case strings.Contains(es, "upstream connect error or disconnect/reset before headers"): + case strings.Contains(es, "bad gateway"): + case strings.Contains(es, "internal server error"): + case strings.Contains(es, "invalid character '<' looking for beginning of value"): + case strings.Contains(es, "unexpected end of json input"): + default: + return false + } + return true +} + func IsErrorCodes(err error, codes ...int) bool { if mixin.IsErrorCodes(err, codes...) { return true } e := &bot.Error{} - if errors.As(err, e) { - for _, code := range codes { - if e.Code == code { - return true - } - } + if errors.As(err, e) && slices.Contains(codes, e.Code) { + return true } return false }