Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -75,6 +76,7 @@ type LocalConnection struct {
finished <-chan struct{} // closed to signal that actorLoop has finished
senders *gossipSenders
logger Logger
mu *sync.RWMutex
}

// If the connection is successful, it will end up in the local peer's
Expand All @@ -94,6 +96,7 @@ func startLocalConnection(connRemote *remoteConnection, tcpConn *net.TCPConn, ro
errorChan: errorChan,
finished: finished,
logger: logger,
mu: &sync.RWMutex{},
}
conn.senders = newGossipSenders(conn, finished)
go conn.run(errorChan, finished, acceptNewPeer)
Expand All @@ -119,6 +122,8 @@ func (conn *LocalConnection) breakTie(dupConn ourConnection) connectionTieBreak
// Established returns true if the connection is established.
// TODO(pb): data race?
func (conn *LocalConnection) isEstablished() bool {
defer conn.mu.RUnlock()
conn.mu.RLock()
return conn.established
}

Expand Down Expand Up @@ -355,7 +360,9 @@ func (conn *LocalConnection) actorLoop(errorChan <-chan error) (err error) {
case <-conn.heartbeatTCP.C:
err = conn.sendSimpleProtocolMsg(ProtocolHeartbeat)
case <-fwdEstablishedChan:
conn.mu.Lock()
conn.established = true
conn.mu.Unlock()
fwdEstablishedChan = nil
conn.router.Ourself.doConnectionEstablished(conn)
case err = <-errorChan:
Expand Down
5 changes: 5 additions & 0 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"strconv"
"sync"
)

// Peer is a local representation of a peer, including connections to other
Expand All @@ -15,6 +16,7 @@ type Peer struct {
peerSummary
localRefCount uint64 // maintained by Peers
connections map[PeerName]Connection
mu *sync.RWMutex
}

type peerSummary struct {
Expand Down Expand Up @@ -42,6 +44,7 @@ func newPeerFromSummary(summary peerSummary) *Peer {
Name: PeerNameFromBin(summary.NameByte),
peerSummary: summary,
connections: make(map[PeerName]Connection),
mu: &sync.RWMutex{},
}
}

Expand All @@ -62,6 +65,8 @@ func newPeerPlaceholder(name PeerName) *Peer {

// String returns the peer name and nickname.
func (peer *Peer) String() string {
defer peer.mu.RUnlock()
peer.mu.RLock()
return fmt.Sprint(peer.Name, "(", peer.NickName, ")")
}

Expand Down
2 changes: 2 additions & 0 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func (peers *Peers) applyDecodedUpdate(decodedUpdate []*Peer, decodedConns [][]c
(!newPeer.HasShortID || peer.HasShortID)))) {
continue
}
peer.mu.Lock()
peer.Version = newPeer.Version
peer.UID = newPeer.UID
peer.NickName = newPeer.NickName
Expand All @@ -512,6 +513,7 @@ func (peers *Peers) applyDecodedUpdate(decodedUpdate []*Peer, decodedConns [][]c
peer.HasShortID = newPeer.HasShortID
peers.addByShortID(peer, pending)
}
peer.mu.Unlock()
newUpdate[name] = struct{}{}
}
}
Expand Down