diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index abdfa0c..d7a0b37 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -89,7 +89,7 @@ type peer struct { firstSeen time.Time // To track uptime for getPeers linkOut (chan []byte) // used for protocol traffic (to bypass queues) doSend (chan struct{}) // tell the linkLoop to send a switchMsg - dinfo *dhtInfo // used to keep the DHT working + dinfo (chan *dhtInfo) // used to keep the DHT working out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes close func() // Called when a peer is removed, to close the underlying connection, or via admin api } @@ -104,6 +104,7 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKe endpoint: endpoint, firstSeen: now, doSend: make(chan struct{}, 1), + dinfo: make(chan *dhtInfo, 1), core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -176,6 +177,8 @@ func (p *peer) doSendSwitchMsgs() { func (p *peer) linkLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() + p.doSendSwitchMsgs() + var dinfo *dhtInfo for { select { case _, ok := <-p.doSend: @@ -183,12 +186,10 @@ func (p *peer) linkLoop() { return } p.sendSwitchMsg() + case dinfo = <-p.dinfo: case _ = <-tick.C: - //break // FIXME disabled the below completely to test something - pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line - if pdinfo != nil { - dinfo := *pdinfo - p.core.dht.peers <- &dinfo + if dinfo != nil { + p.core.dht.peers <- dinfo } } } @@ -218,8 +219,9 @@ func (p *peer) handlePacket(packet []byte) { // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.dinfo == nil { - // Drop traffic until the peer manages to send us at least one good switchMsg + table := p.core.switchTable.getTable() + if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { + // Drop traffic if the peer isn't in the switch return } p.core.switchTable.packetIn <- packet @@ -323,9 +325,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { p.core.switchTable.handleMsg(&msg, p.port) if !p.core.switchTable.checkRoot(&msg) { // Bad switch message - // Stop forwarding traffic from it - // Stop refreshing it in the DHT - p.dinfo = nil + p.dinfo <- nil return } // Pass a mesage to the dht informing it that this peer (still) exists @@ -334,8 +334,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { key: p.box, coords: loc.getCoords(), } - //p.core.dht.peers <- &dinfo - p.dinfo = &dinfo + p.dinfo <- &dinfo } // This generates the bytes that we sign or check the signature of for a switchMsg. diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 5ca6630..a794166 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -31,14 +31,6 @@ const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense const default_tcp_timeout = 6 * time.Second const tcp_ping_interval = (default_tcp_timeout * 2 / 3) -// Wrapper function for non tcp/ip connections. -func setNoDelay(c net.Conn, delay bool) { - tcp, ok := c.(*net.TCPConn) - if ok { - tcp.SetNoDelay(delay) - } -} - // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core @@ -58,6 +50,18 @@ type tcpInfo struct { remoteAddr string } +// Wrapper function to set additional options for specific connection types. +func (iface *tcpInterface) setExtraOptions(c net.Conn) { + switch sock := c.(type) { + case *net.TCPConn: + sock.SetNoDelay(true) + sock.SetKeepAlive(true) + sock.SetKeepAlivePeriod(iface.tcp_timeout) + // TODO something for socks5 + default: + } +} + // Returns the address of the listener. func (iface *tcpInterface) getAddr() *net.TCPAddr { return iface.serv.Addr().(*net.TCPAddr) @@ -205,6 +209,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { // It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer sock.Close() + iface.setExtraOptions(sock) // Get our keys myLinkPub, myLinkPriv := newBoxKeys() // ephemeral link keys meta := version_getBaseMetadata() @@ -342,7 +347,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { out <- msg } p.close = func() { sock.Close() } - setNoDelay(sock, true) go p.linkLoop() defer func() { // Put all of our cleanup here...