Skip to content

Commit f97628d

Browse files
committed
feat: pass-through control ws messages
1 parent addba63 commit f97628d

File tree

6 files changed

+125
-20
lines changed

6 files changed

+125
-20
lines changed

cmd/serve.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,15 @@ func CommandServe(cfg *config.Config) *cli.Command {
415415
Value: 16,
416416
},
417417

418+
&cli.DurationFlag{ // --flashblocks-timeout
419+
Category: strings.ToUpper(categoryFlashblocks),
420+
Destination: &cfg.Flashblocks.Timeout,
421+
EnvVars: []string{envPrefix + strings.ToUpper(categoryFlashblocks) + "_TIMEOUT"},
422+
Name: categoryFlashblocks + "-timeout",
423+
Usage: "max `duration` for flashblocks websocket reads or writes",
424+
Value: 5 * time.Second,
425+
},
426+
418427
&cli.StringFlag{ // --flashblocks-tls-crt
419428
Category: strings.ToUpper(categoryFlashblocks),
420429
Destination: &cfg.Flashblocks.TLSCertificate,
File renamed without changes.

config/websocket_proxy.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,23 @@ import (
88
"net"
99
"net/url"
1010
"os"
11+
"time"
1112

1213
"github.com/flashbots/bproxy/utils"
1314
)
1415

1516
type WebsocketProxy struct {
16-
BackendURL string `yaml:"backend_url"`
17-
Enabled bool `yaml:"enabled"`
18-
Healthcheck *Healthcheck `yaml:"healthcheck"`
19-
ListenAddress string `yaml:"listen_address"`
20-
LogMessages bool `yaml:"log_messages"`
21-
LogMessagesMaxSize int `yaml:"log_messages_max_size"`
22-
ReadBufferSize int `yaml:"read_buffer_size_mb"`
23-
TLSCertificate string `yaml:"tls_crt"`
24-
TLSKey string `yaml:"tls_key"`
25-
WriteBufferSize int `yaml:"write_buffer_size_mb"`
17+
BackendURL string `yaml:"backend_url"`
18+
Enabled bool `yaml:"enabled"`
19+
Healthcheck *Healthcheck `yaml:"healthcheck"`
20+
ListenAddress string `yaml:"listen_address"`
21+
LogMessages bool `yaml:"log_messages"`
22+
LogMessagesMaxSize int `yaml:"log_messages_max_size"`
23+
ReadBufferSize int `yaml:"read_buffer_size_mb"`
24+
Timeout time.Duration `yaml:"backend_timeout"`
25+
TLSCertificate string `yaml:"tls_crt"`
26+
TLSKey string `yaml:"tls_key"`
27+
WriteBufferSize int `yaml:"write_buffer_size_mb"`
2628
}
2729

2830
var (

proxy/websocket.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package proxy
33
import (
44
"context"
55
"crypto/tls"
6+
"errors"
67
"net"
78
"net/http"
89
"strings"
@@ -285,17 +286,21 @@ func (p *Websocket) websocket(frontend *websocket.Conn) {
285286
p.pumps[addr] = pump
286287
p.mxConnections.Unlock()
287288

288-
if err := pump.run(); err != nil {
289+
reason := pump.run()
290+
if reason != nil {
289291
metrics.ProxyFailureCount.Add(context.TODO(), 1, otelapi.WithAttributes(
290292
attribute.KeyValue{Key: "proxy", Value: attribute.StringValue(p.cfg.name)},
291293
))
292294
l.Error("Websocket connection failed",
293-
zap.Error(err),
295+
zap.Error(reason),
294296
)
295-
p.mxConnections.Lock()
296-
delete(p.pumps, addr)
297-
p.mxConnections.Unlock()
298297
}
298+
_ = p.closeWebsocket(pump.backend, reason)
299+
_ = p.closeWebsocket(pump.frontend, reason)
300+
301+
p.mxConnections.Lock()
302+
delete(p.pumps, addr)
303+
p.mxConnections.Unlock()
299304
}
300305

301306
func (p *Websocket) upstreamConnectionChanged(conn net.Conn, state fasthttp.ConnState) {
@@ -351,3 +356,21 @@ func (p *Websocket) backendUnhealthy(ctx context.Context) {
351356
p.ResetConnections()
352357
}
353358
}
359+
360+
func (p *Websocket) closeWebsocket(conn *websocket.Conn, reason error) error {
361+
if reason == nil {
362+
return errors.Join(
363+
conn.WriteControl(
364+
websocket.CloseMessage, nil, time.Now().Add(p.cfg.proxy.Timeout),
365+
),
366+
conn.Close(),
367+
)
368+
}
369+
370+
return errors.Join(
371+
conn.WriteControl(
372+
websocket.CloseMessage, []byte(reason.Error()), time.Now().Add(p.cfg.proxy.Timeout),
373+
),
374+
conn.Close(),
375+
)
376+
}

proxy/websocket_pump.go

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package proxy
22

33
import (
44
"context"
5+
"encoding/hex"
56
"encoding/json"
67
"errors"
78
"sync/atomic"
@@ -42,9 +43,19 @@ func newWebsocketPump(
4243

4344
func (w *websocketPump) run() error {
4445
failure := make(chan error, 2)
45-
done := make(chan struct{}, 1)
46+
done := make(chan struct{}, 2)
4647

47-
go w.pump(w.backend, w.frontend, "b->f", done, failure)
48+
w.frontend.SetPingHandler(w.pumpPings(w.frontend, w.backend, "f->b"))
49+
w.backend.SetPongHandler(w.pumpPongs(w.backend, w.frontend, "b->f"))
50+
51+
w.backend.SetPingHandler(w.pumpPings(w.backend, w.frontend, "b->f"))
52+
w.frontend.SetPongHandler(w.pumpPongs(w.frontend, w.backend, "f->b"))
53+
54+
w.frontend.SetCloseHandler(w.pumpCloseMessages(w.frontend, w.backend, "f->b"))
55+
w.backend.SetCloseHandler(w.pumpCloseMessages(w.backend, w.frontend, "b->f"))
56+
57+
go w.pumpMessages(w.backend, w.frontend, "b->f", done, failure)
58+
go w.pumpMessages(w.frontend, w.backend, "f->b", done, failure)
4859

4960
w.active.Store(true)
5061

@@ -86,7 +97,7 @@ func (w *websocketPump) stop() error {
8697
return utils.FlattenErrors(errs)
8798
}
8899

89-
func (w *websocketPump) pump(
100+
func (w *websocketPump) pumpMessages(
90101
from, to *websocket.Conn,
91102
direction string,
92103
done chan struct{},
@@ -105,7 +116,7 @@ loop:
105116
return
106117

107118
default:
108-
if err := from.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
119+
if err := from.SetReadDeadline(time.Now().Add(w.cfg.proxy.Timeout)); err != nil {
109120
failure <- err
110121
continue loop
111122
}
@@ -117,7 +128,7 @@ loop:
117128

118129
ts := time.Now()
119130

120-
if err := to.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
131+
if err := to.SetWriteDeadline(time.Now().Add(w.cfg.proxy.Timeout)); err != nil {
121132
failure <- err
122133
continue loop
123134
}
@@ -130,6 +141,7 @@ loop:
130141
loggedFields := make([]zap.Field, 0, 6)
131142
loggedFields = append(loggedFields,
132143
zap.Time("ts_message_received", ts),
144+
zap.Int("message_type", msgType),
133145
zap.Int("message_size", len(bytes)),
134146
)
135147

@@ -155,3 +167,61 @@ loop:
155167
}
156168
}
157169
}
170+
171+
func (w *websocketPump) pumpPings(
172+
from, to *websocket.Conn,
173+
direction string,
174+
) func(message string) error {
175+
l := w.logger.With(
176+
zap.String("from_addr", from.RemoteAddr().String()),
177+
zap.String("to_addr", to.RemoteAddr().String()),
178+
zap.String("direction", direction),
179+
)
180+
return func(message string) error {
181+
l.Debug("Passing ping through...",
182+
zap.String("message", hex.EncodeToString([]byte(message))),
183+
)
184+
return to.WriteControl(
185+
websocket.PingMessage, []byte(message), time.Now().Add(w.cfg.proxy.Timeout),
186+
)
187+
}
188+
}
189+
190+
func (w *websocketPump) pumpPongs(
191+
from, to *websocket.Conn,
192+
direction string,
193+
) func(message string) error {
194+
l := w.logger.With(
195+
zap.String("from_addr", from.RemoteAddr().String()),
196+
zap.String("to_addr", to.RemoteAddr().String()),
197+
zap.String("direction", direction),
198+
)
199+
return func(message string) error {
200+
l.Debug("Passing pong through...",
201+
zap.String("message", hex.EncodeToString([]byte(message))),
202+
)
203+
return to.WriteControl(
204+
websocket.PongMessage, []byte(message), time.Now().Add(w.cfg.proxy.Timeout),
205+
)
206+
}
207+
}
208+
209+
func (w *websocketPump) pumpCloseMessages(
210+
from, to *websocket.Conn,
211+
direction string,
212+
) func(code int, message string) error {
213+
l := w.logger.With(
214+
zap.String("from_addr", from.RemoteAddr().String()),
215+
zap.String("to_addr", to.RemoteAddr().String()),
216+
zap.String("direction", direction),
217+
)
218+
return func(code int, message string) error {
219+
l.Info("Passing close message through...",
220+
zap.Int("code", code),
221+
zap.String("message", hex.EncodeToString([]byte(message))),
222+
)
223+
return to.WriteControl(
224+
code, []byte(message), time.Now().Add(w.cfg.proxy.Timeout),
225+
)
226+
}
227+
}

readme.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ OPTIONS:
6060
--flashblocks-log-messages whether to log flashblocks messages (default: false) [$BPROXY_FLASHBLOCKS_LOG_MESSAGES]
6161
--flashblocks-log-messages-max-size size do not log flashblocks messages larger than size (default: 4096) [$BPROXY_FLASHBLOCKS_LOG_MESSAGES_MAX_SIZE]
6262
--flashblocks-read-buffer-size megabytes flashblocks read buffer size in megabytes (messages from client) (default: 16) [$BPROXY_FLASHBLOCKS_READ_BUFFER_SIZE]
63+
--flashblocks-timeout duration max duration for flashblocks websocket reads or writes (default: 5s) [$BPROXY_FLASHBLOCKS_TIMEOUT]
6364
--flashblocks-tls-crt path path to flashblocks tls certificate (default: uses plain-text http) [$BPROXY_FLASHBLOCKS_TLS_CRT]
6465
--flashblocks-tls-key path path to flashblocks tls key (default: uses plain-text http) [$BPROXY_FLASHBLOCKS_TLS_KEY]
6566
--flashblocks-write-buffer-size megabytes flashblocks write buffer size in megabytes (messages from backend) (default: 16) [$BPROXY_FLASHBLOCKS_WRITE_BUFFER_SIZE]

0 commit comments

Comments
 (0)