Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/loadbalancer/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ func (c *Connection) Handle() {

c.ProxyUpstreamToDownstream()
}

func (c *Connection) Close() {
c.Proxier.Close()
}
17 changes: 15 additions & 2 deletions cmd/loadbalancer/connection/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type Proxier interface {
ProxyUpstreamToDownstream()
ProxyDownstreamToUpstream() (net.Conn, error)
Close()
}

type WSDialer interface {
Expand All @@ -20,14 +21,26 @@ type WSDialer interface {

// WSProxier implements Proxier for WebSocket connections
type WSProxier struct {
tracker ConnectionTracker
tracker *Tracker
dialer WSDialer
}

// NewWSProxier creates a new WebSocket proxier
func NewWSProxier(tracker ConnectionTracker, dialer WSDialer) *WSProxier {
func NewWSProxier(tracker *Tracker, dialer WSDialer) *WSProxier {
return &WSProxier{
tracker: tracker,
dialer: dialer,
}
}

func (p *WSProxier) Close() {
upstreamConn := p.tracker.UpstreamConn()
downstreamConn := p.tracker.DownstreamConn()

if upstreamConn != nil {
upstreamConn.Close()
}
if downstreamConn != nil {
downstreamConn.Close()
}
}
14 changes: 0 additions & 14 deletions cmd/loadbalancer/connection/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,6 @@ func (t *Tracker) Debug(message string, args ...any) Logger {
return t
}

func (t *Tracker) Close() {
t.mu.Lock()
upstreamConn := t.upstreamConn
downstreamConn := t.downstreamConn
t.mu.Unlock()

if upstreamConn != nil {
upstreamConn.Close()
}
if downstreamConn != nil {
downstreamConn.Close()
}
}

// NewTracker creates a new connection tracker
func NewTracker(user, upstreamHost, downstreamHost string, downstreamConn net.Conn) *Tracker {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
83 changes: 76 additions & 7 deletions cmd/loadbalancer/server/rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package server
import (
"bufio"
"context"
"io"
"log"
"log/slog"
"lukas8219/websocket-operator/cmd/loadbalancer/connection"
"net"
Expand Down Expand Up @@ -31,23 +33,56 @@ func (m *MockRouter) InitializeHosts() error { return nil }

type NetConnectionMock struct {
net.Conn
remoteAddr net.Addr
isClosed bool
remoteAddr net.Addr
isClosed bool
name string
isServer bool
mu sync.Mutex
writtenBytes int
readBytes int
}

func (m *NetConnectionMock) Read(b []byte) (int, error) {
return 0, nil
m.mu.Lock()
defer m.mu.Unlock()
if m.isClosed {
return 0, io.EOF
}
message := []byte(m.name)

frame := ws.NewBinaryFrame(message)

// If masked, apply masking
if !m.isServer {
frame = ws.MaskFrameInPlace(frame)
}

compiledFrame, err := ws.CompileFrame(frame)
if err != nil {
return 0, err
}
r := copy(b, compiledFrame)
m.readBytes += r
return r, nil
}

func (m *NetConnectionMock) Write(b []byte) (int, error) {
return 0, nil
m.mu.Lock()
if m.isClosed {
return 0, io.EOF
}
defer m.mu.Unlock()
m.writtenBytes += len(b)
return len(b), nil
}

func (m *NetConnectionMock) RemoteAddr() net.Addr {
return m.remoteAddr
}

func (m *NetConnectionMock) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
m.isClosed = true
return nil
}
Expand All @@ -59,12 +94,19 @@ type MockWSDialer struct {
}

func (m *MockWSDialer) Dial(ctx context.Context, urlstr string) (net.Conn, *bufio.Reader, ws.Handshake, error) {
log.Println("Dialing", urlstr)
m.mu.Lock()
defer m.mu.Unlock()
m.dialCalls = append(m.dialCalls, urlstr)
mockConn := &NetConnectionMock{remoteAddr: &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8080}}
mockConn := &NetConnectionMock{
remoteAddr: &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8080},
name: urlstr,
isServer: true,
writtenBytes: 0,
}
m.connections = append(m.connections, mockConn)
return mockConn, nil, ws.Handshake{}, nil
reader := bufio.NewReader(mockConn)
return mockConn, reader, ws.Handshake{}, nil
}

func NewMockConnection(user, upstreamHost string, downstreamConn net.Conn, wsDialer *MockWSDialer) *connection.Connection {
Expand All @@ -84,12 +126,19 @@ func TestHandleRebalanceLoop(t *testing.T) {
go handleRebalanceLoop(mockRouter, connections)

t.Run("Sucessfully rebalanced", func(t *testing.T) {
mockDownstreamConn := &NetConnectionMock{remoteAddr: &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8080}}
mockDownstreamConn := &NetConnectionMock{
remoteAddr: &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8080},
name: "downstream",
isServer: false,
writtenBytes: 0,
}
mockWSDialer := &MockWSDialer{}
mockConn := NewMockConnection("user1", "old-host:3000", mockDownstreamConn, mockWSDialer)
go mockConn.Handle()
connections[mockConn.Tracker.User()] = mockConn

mockConn.Tracker.UpstreamCancelChan() <- 1
time.Sleep(100 * time.Millisecond)
mockRouter.rebalanceChan <- [][2]string{{mockConn.Tracker.User(), "new-host:3000"}}
time.Sleep(100 * time.Millisecond)

Expand All @@ -103,7 +152,27 @@ func TestHandleRebalanceLoop(t *testing.T) {
if mockWSDialer.dialCalls[1] != "ws://new-host:3000" {
t.Errorf("Expected dial to be called with ws://new-host:3000, got %s", mockWSDialer.dialCalls[1])
}
mockConn.Close()

writtenBytes := 0
for _, clientConnections := range mockWSDialer.connections {
writtenBytes += clientConnections.writtenBytes
}

if writtenBytes == 0 {
t.Errorf("Expected written bytes to be greater than 0, got %d", writtenBytes)
return
}

if mockDownstreamConn.readBytes == 0 {
t.Errorf("Expected read bytes to be greater than 0, got %d", mockDownstreamConn.readBytes)
return
}

ratio := float64(writtenBytes) / float64(mockDownstreamConn.readBytes)
if ratio < 0.99 {
t.Errorf("Expected ratio of written bytes to read bytes to be greater than 0.99, got %f", ratio)
}
})

}