Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions cmd/sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"flag"
"io"
"log/slog"
"lukas8219/websocket-operator/cmd/sidecar/proxy"
"lukas8219/websocket-operator/internal/logger"
"lukas8219/websocket-operator/internal/peer_discovery"
"lukas8219/websocket-operator/internal/rendezvous"
"lukas8219/websocket-operator/internal/resolver"
"lukas8219/websocket-operator/internal/transports"
"net"
"net/http"
"os"
Expand All @@ -23,6 +26,7 @@ type ConnectionTracker struct {
downstreamHost string
upstreamConn net.Conn
downstreamConn net.Conn
transports.Transport
}

func (c *ConnectionTracker) Info(message string, args ...any) *ConnectionTracker {
Expand Down Expand Up @@ -51,11 +55,15 @@ var incomingMessageStruct = reflect.StructOf([]reflect.StructField{
func main() {
port := flag.String("port", "3000", "Port to listen on")
targetPort := flag.String("targetPort", "3001", "Port to target")
mode := flag.String("mode", "kubernetes", "Mode to use")
// mode := flag.String("mode", "kubernetes", "Mode to use")
debug := flag.Bool("debug", false, "Debug mode")
flag.Parse()
logger.SetupLogger(*debug)
proxy.InitializeProxy(*mode)
//TODO move to config
peerDiscovery := peer_discovery.NewKubernetes("default", "ws-headless-proxy")
resolver := resolver.New(peerDiscovery, rendezvous.NewDefault())
transport := transports.NewHTTPTransport(*resolver)

slog.Info("Starting server", "port", *port)
// Map to store active WebSocket connections
// Key: user ID, Value: ConnectionTracker
Expand Down Expand Up @@ -115,6 +123,7 @@ func main() {
downstreamHost: r.RemoteAddr,
upstreamConn: proxiedConn,
downstreamConn: clientConn,
Transport: &transport,
}
connections[user] = connectionTracker
if err != nil {
Expand Down Expand Up @@ -143,7 +152,6 @@ func proxySidecarServerToClient(deferClose func(), connectionTracker *Connection
connectionTracker.Error("Failed to read from server", "error", err)
return
}

//TODO: we might need to handle `recipientId` routing messages here also

//Write as client - to the proxied connection
Expand Down Expand Up @@ -190,7 +198,7 @@ func handleIncomingMessagesToProxy(connections map[string]*ConnectionTracker, de
slog.Debug("Message recipient", "recipientId", recipientIdString, "recipientConnection", recipientConnection)
if recipientConnection == nil {
slog.Debug("No recipient found in-memory. Routing message to the correct target.", "recipientId", recipientIdString)
err := proxy.SendProxiedMessage(recipientIdString, msg, op)
err := connectionTracker.Write(rawBytes, op, msg)
if err != nil {
connectionTracker.Error("Failed to route message", "error", err)
}
Expand Down
58 changes: 0 additions & 58 deletions cmd/sidecar/proxy/proxy.go

This file was deleted.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ toolchain go1.23.7
require (
github.com/buraksezer/consistent v0.10.0
github.com/gobwas/ws v1.4.0
github.com/hashicorp/go-set/v3 v3.0.1
github.com/zeebo/xxh3 v1.0.2
k8s.io/api v0.32.3
k8s.io/apimachinery v0.32.3
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgY
github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-set/v3 v3.0.1 h1:ZwO15ZYmIrFYL9zSm2wBuwcRiHxVdp46m/XA/MUlM6I=
github.com/hashicorp/go-set/v3 v3.0.1/go.mod h1:0oPQqhtitglZeT2ZiWnRIfUG6gJAHnn7LzrS7SbgNY4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down Expand Up @@ -78,6 +80,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/shoenig/test v1.12.1 h1:mLHfnMv7gmhhP44WrvT+nKSxKkPDiNkIuHGdIGI9RLU=
github.com/shoenig/test v1.12.1/go.mod h1:UxJ6u/x2v/TNs/LoLxBNJRV9DiwBBKYxXSyczsBHFoI=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
22 changes: 22 additions & 0 deletions internal/diff/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package diff

import "github.com/hashicorp/go-set/v3"

type DifferenceOutput[T comparable] struct {
Added []T
Removed []T
}

// before[1,2,3] + [4] -> currentState[1,2,3,4] = Before|currentState [4]
// before[1,2,3] - [3] -> currentState[1,2,4] = currentState|before [3]
func Difference[T comparable](currentState *set.Set[T], NewEntries []T, ToRemoveEntries []T) DifferenceOutput[T] {
beforeUpdate := currentState.Copy()
currentState.InsertSlice(NewEntries)
currentState.RemoveSlice(ToRemoveEntries)
added := currentState.Difference(beforeUpdate)
removed := beforeUpdate.Difference(currentState)
return DifferenceOutput[T]{
Added: added.Slice(),
Removed: removed.Slice(),
}
}
25 changes: 25 additions & 0 deletions internal/diff/diff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package diff_test

import (
"lukas8219/websocket-operator/internal/diff"
"testing"

"github.com/hashicorp/go-set/v3"
)

func TestDifferenceOuputOnAtomicUpsert(t *testing.T) {
initialState := set.New[int](10)
initialState.InsertSlice([]int{1, 2, 3})
output := diff.Difference(initialState, []int{4, 3, 2, 5}, []int{3, 1})
if !set.From(output.Added).EqualSlice([]int{4, 5}) {
t.Error("Output expected to have added int 4,5")
}

if !set.From(output.Removed).EqualSlice([]int{3, 1}) {
t.Error("Output expected to have remove 3,1")
}

if !initialState.EqualSlice([]int{2, 4, 5}) {
t.Error("Final state expected is 2,4,5", initialState.String())
}
}
89 changes: 89 additions & 0 deletions internal/peer_discovery/dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package peer_discovery

import (
"context"
"log/slog"
"net"
"os"
"time"
)

type DnsPeerDiscovery struct {
srvRecord string
notificationChannel chan []Peer
PeerDiscovery
}

func (r *DnsPeerDiscovery) Initialize() error {
return nil
}

func NewDNS(srvRecord string) *DnsPeerDiscovery {
return &DnsPeerDiscovery{
srvRecord: srvRecord,
}
}

func (r *DnsPeerDiscovery) NotificationChannel() chan []Peer {
return r.notificationChannel
}

func (r *DnsPeerDiscovery) CurrentHosts() ([]Peer, error) {
resolver := createResolver()
slog.Debug("Getting random SRV host for service", "service", r.srvRecord)
_, addrs, err := resolver.LookupSRV(context.Background(), "", "", r.srvRecord)
if err != nil {
return nil, err
}

if len(addrs) == 0 {
return []Peer{}, nil
}

peers := make([]Peer, len(addrs))
for i, srv := range addrs {
addr, err := resolver.LookupIP(context.Background(), "ip", srv.Target)
if err != nil {
slog.Warn("Failed to resolve ip - skipping", "ip", srv.Target)
continue
}
hostname := addr[0].String()
port := srv.Port
peers[i] = Peer{
hostname,
port,
}
}
return peers, nil
}

func (d *DnsPeerDiscovery) Mode() PeerDiscoveryMode {
return PeerDiscoveryModeDns
}

// TODO review
func createResolver() *net.Resolver {
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" {
return &net.Resolver{}
}
// Create a custom resolver that first tries localhost:53 (for testing)
// and falls back to the system resolver if that fails
r := &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
// First try localhost:53
d := net.Dialer{}
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
slog.Debug("Looking for address on localhost:53", "address", address)
conn, err := d.DialContext(ctx, "udp", "0.0.0.0:53")
if err != nil {
slog.Debug("Failed to connect to localhost:53, falling back to system resolver", "error", err)
return d.DialContext(ctx, network, address)
}
return conn, nil
},
}

return r
}
Loading