diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index e9a21c1..33a8769 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -292,11 +292,10 @@ exit: } func (n *node) shutdown() { - n.core.Stop() n.admin.Stop() n.multicast.Stop() n.tuntap.Stop() - os.Exit(0) + n.core.Stop() } func (n *node) sessionFirewall(pubkey *crypto.BoxPubKey, initiator bool) bool { diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 22ac935..7f2f591 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -7,6 +7,7 @@ import ( "regexp" "time" + "github.com/Arceliar/phony" "github.com/gologme/log" "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -19,14 +20,17 @@ import ( // configured multicast interface, Yggdrasil will attempt to peer with that node // automatically. type Multicast struct { - core *yggdrasil.Core - config *config.NodeState - log *log.Logger - sock *ipv6.PacketConn - groupAddr string - listeners map[string]*yggdrasil.TcpListener - listenPort uint16 - isOpen bool + phony.Inbox + core *yggdrasil.Core + config *config.NodeState + log *log.Logger + sock *ipv6.PacketConn + groupAddr string + listeners map[string]*yggdrasil.TcpListener + listenPort uint16 + isOpen bool + announcer *time.Timer + platformhandler *time.Timer } // Init prepares the multicast interface for use. @@ -63,9 +67,9 @@ func (m *Multicast) Start() error { } m.isOpen = true - go m.multicastStarted() go m.listen() - go m.announce() + m.Act(m, m.multicastStarted) + m.Act(m, m.announce) return nil } @@ -73,6 +77,12 @@ func (m *Multicast) Start() error { // Stop is not implemented for multicast yet. func (m *Multicast) Stop() error { m.isOpen = false + if m.announcer != nil { + m.announcer.Stop() + } + if m.platformhandler != nil { + m.platformhandler.Stop() + } m.sock.Close() return nil } @@ -136,108 +146,108 @@ func (m *Multicast) announce() { if err != nil { panic(err) } - for { - interfaces := m.Interfaces() - // There might be interfaces that we configured listeners for but are no - // longer up - if that's the case then we should stop the listeners - for name, listener := range m.listeners { - // Prepare our stop function! - stop := func() { - listener.Stop <- true - delete(m.listeners, name) - m.log.Debugln("No longer multicasting on", name) - } - // If the interface is no longer visible on the system then stop the - // listener, as another one will be started further down - if _, ok := interfaces[name]; !ok { - stop() - continue - } - // It's possible that the link-local listener address has changed so if - // that is the case then we should clean up the interface listener - found := false - listenaddr, err := net.ResolveTCPAddr("tcp6", listener.Listener.Addr().String()) - if err != nil { - stop() - continue - } - // Find the interface that matches the listener - if intf, err := net.InterfaceByName(name); err == nil { - if addrs, err := intf.Addrs(); err == nil { - // Loop through the addresses attached to that listener and see if any - // of them match the current address of the listener - for _, addr := range addrs { - if ip, _, err := net.ParseCIDR(addr.String()); err == nil { - // Does the interface address match our listener address? - if ip.Equal(listenaddr.IP) { - found = true - break - } + interfaces := m.Interfaces() + // There might be interfaces that we configured listeners for but are no + // longer up - if that's the case then we should stop the listeners + for name, listener := range m.listeners { + // Prepare our stop function! + stop := func() { + listener.Stop() + delete(m.listeners, name) + m.log.Debugln("No longer multicasting on", name) + } + // If the interface is no longer visible on the system then stop the + // listener, as another one will be started further down + if _, ok := interfaces[name]; !ok { + stop() + continue + } + // It's possible that the link-local listener address has changed so if + // that is the case then we should clean up the interface listener + found := false + listenaddr, err := net.ResolveTCPAddr("tcp6", listener.Listener.Addr().String()) + if err != nil { + stop() + continue + } + // Find the interface that matches the listener + if intf, err := net.InterfaceByName(name); err == nil { + if addrs, err := intf.Addrs(); err == nil { + // Loop through the addresses attached to that listener and see if any + // of them match the current address of the listener + for _, addr := range addrs { + if ip, _, err := net.ParseCIDR(addr.String()); err == nil { + // Does the interface address match our listener address? + if ip.Equal(listenaddr.IP) { + found = true + break } } } } - // If the address has not been found on the adapter then we should stop - // and clean up the TCP listener. A new one will be created below if a - // suitable link-local address is found - if !found { - stop() - } } - // Now that we have a list of valid interfaces from the operating system, - // we can start checking if we can send multicasts on them - for _, iface := range interfaces { - // Find interface addresses - addrs, err := iface.Addrs() - if err != nil { - panic(err) - } - for _, addr := range addrs { - addrIP, _, _ := net.ParseCIDR(addr.String()) - // Ignore IPv4 addresses - if addrIP.To4() != nil { - continue - } - // Ignore non-link-local addresses - if !addrIP.IsLinkLocalUnicast() { - continue - } - // Join the multicast group - m.sock.JoinGroup(&iface, groupAddr) - // Try and see if we already have a TCP listener for this interface - var listener *yggdrasil.TcpListener - if l, ok := m.listeners[iface.Name]; !ok || l.Listener == nil { - // No listener was found - let's create one - listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort) - if li, err := m.core.ListenTCP(listenaddr); err == nil { - m.log.Debugln("Started multicasting on", iface.Name) - // Store the listener so that we can stop it later if needed - m.listeners[iface.Name] = li - listener = li - } else { - m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) - } - } else { - // An existing listener was found - listener = m.listeners[iface.Name] - } - // Make sure nothing above failed for some reason - if listener == nil { - continue - } - // Get the listener details and construct the multicast beacon - lladdr := listener.Listener.Addr().String() - if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { - a.Zone = "" - destAddr.Zone = iface.Name - msg := []byte(a.String()) - m.sock.WriteTo(msg, nil, destAddr) - } - break - } + // If the address has not been found on the adapter then we should stop + // and clean up the TCP listener. A new one will be created below if a + // suitable link-local address is found + if !found { + stop() } - time.Sleep(time.Second * 15) } + // Now that we have a list of valid interfaces from the operating system, + // we can start checking if we can send multicasts on them + for _, iface := range interfaces { + // Find interface addresses + addrs, err := iface.Addrs() + if err != nil { + panic(err) + } + for _, addr := range addrs { + addrIP, _, _ := net.ParseCIDR(addr.String()) + // Ignore IPv4 addresses + if addrIP.To4() != nil { + continue + } + // Ignore non-link-local addresses + if !addrIP.IsLinkLocalUnicast() { + continue + } + // Join the multicast group + m.sock.JoinGroup(&iface, groupAddr) + // Try and see if we already have a TCP listener for this interface + var listener *yggdrasil.TcpListener + if l, ok := m.listeners[iface.Name]; !ok || l.Listener == nil { + // No listener was found - let's create one + listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort) + if li, err := m.core.ListenTCP(listenaddr); err == nil { + m.log.Debugln("Started multicasting on", iface.Name) + // Store the listener so that we can stop it later if needed + m.listeners[iface.Name] = li + listener = li + } else { + m.log.Warnln("Not multicasting on", iface.Name, "due to error:", err) + } + } else { + // An existing listener was found + listener = m.listeners[iface.Name] + } + // Make sure nothing above failed for some reason + if listener == nil { + continue + } + // Get the listener details and construct the multicast beacon + lladdr := listener.Listener.Addr().String() + if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil { + a.Zone = "" + destAddr.Zone = iface.Name + msg := []byte(a.String()) + m.sock.WriteTo(msg, nil, destAddr) + } + break + } + } + m.announcer = time.AfterFunc(time.Second*15, func() { + m.Act(m, m.announce) + }) } func (m *Multicast) listen() { diff --git a/src/multicast/multicast_darwin.go b/src/multicast/multicast_darwin.go index c88b4a8..4cfef9e 100644 --- a/src/multicast/multicast_darwin.go +++ b/src/multicast/multicast_darwin.go @@ -32,21 +32,16 @@ import ( var awdlGoroutineStarted bool func (m *Multicast) multicastStarted() { - if awdlGoroutineStarted { - return - } - awdlGoroutineStarted = true - for { - C.StopAWDLBrowsing() - for intf := range m.Interfaces() { - if intf == "awdl0" { - m.log.Infoln("Multicast discovery is using AWDL discovery") - C.StartAWDLBrowsing() - break - } + C.StopAWDLBrowsing() + for intf := range m.Interfaces() { + if intf == "awdl0" { + C.StartAWDLBrowsing() + break } - time.Sleep(time.Minute) } + m.platformhandler = time.AfterFunc(time.Minute, func() { + m.Act(m, m.multicastStarted) + }) } func (m *Multicast) multicastReuse(network string, address string, c syscall.RawConn) error { diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index cb28a6f..9af5d24 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -102,11 +102,12 @@ func (c *Conn) search() error { if sinfo != nil { // Need to clean up to avoid a session leak sinfo.cancel.Cancel(nil) + sinfo.sessions.removeSession(sinfo) } default: if sinfo != nil { // Finish initializing the session - sinfo.conn = c + sinfo.setConn(nil, c) } c.session = sinfo err = e diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 42910aa..98a5c6e 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -96,7 +96,7 @@ func (c *Core) _addPeerLoop() { if err := c.CallPeer(peer, intf); err != nil { c.log.Errorln("Failed to add peer:", err) } - }(peer, "") + }(peer, "") // TODO: this should be acted and not in a goroutine? } // Add peers from the InterfacePeers section @@ -106,13 +106,12 @@ func (c *Core) _addPeerLoop() { if err := c.CallPeer(peer, intf); err != nil { c.log.Errorln("Failed to add peer:", err) } - }(peer, intf) + }(peer, intf) // TODO: this should be acted and not in a goroutine? } } - // Sit for a while c.addPeerTimer = time.AfterFunc(time.Minute, func() { - c.Act(c, c._addPeerLoop) + c.Act(nil, c._addPeerLoop) }) } @@ -192,5 +191,12 @@ func (c *Core) Stop() { // This function is unsafe and should only be ran by the core actor. func (c *Core) _stop() { c.log.Infoln("Stopping...") - c.addPeerTimer.Stop() + if c.addPeerTimer != nil { + c.addPeerTimer.Stop() + } + c.link.stop() + for _, peer := range c.GetPeers() { + c.DisconnectPeer(peer.Port) + } + c.log.Infoln("Stopped") } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 3686ab9..a4a41e7 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -25,6 +25,7 @@ type link struct { mutex sync.RWMutex // protects interfaces below interfaces map[linkInfo]*linkInterface tcp tcp // TCP interface support + stopped chan struct{} // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -70,6 +71,7 @@ func (l *link) init(c *Core) error { l.mutex.Lock() l.interfaces = make(map[linkInfo]*linkInterface) l.mutex.Unlock() + l.stopped = make(chan struct{}) if err := l.tcp.init(l); err != nil { c.log.Errorln("Failed to start TCP interface") @@ -86,7 +88,7 @@ func (l *link) reconfigure() { func (l *link) call(uri string, sintf string) error { u, err := url.Parse(uri) if err != nil { - return fmt.Errorf("peer %s is not correctly formatted", uri) + return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err) } pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/") switch u.Scheme { @@ -134,6 +136,14 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st return &intf, nil } +func (l *link) stop() error { + close(l.stopped) + if err := l.tcp.stop(); err != nil { + return err + } + return nil +} + func (intf *linkInterface) handler() error { // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later myLinkPub, myLinkPriv := crypto.NewBoxKeys() @@ -224,7 +234,18 @@ func (intf *linkInterface) handler() error { go intf.peer.start() intf.reader.Act(nil, intf.reader._read) // Wait for the reader to finish + // TODO find a way to do this without keeping live goroutines around + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-intf.link.stopped: + intf.msgIO.close() + case <-done: + } + }() err = <-intf.reader.err + // TODO don't report an error if it's just a 'use of closed network connection' if err != nil { intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 8a6d16f..2f5e0af 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -414,8 +414,10 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Resets all sessions to an uninitialized state. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. +// Only call this from the router actor. func (ss *sessions) reset() { - for _, sinfo := range ss.sinfos { + for _, _sinfo := range ss.sinfos { + sinfo := _sinfo // So we can safely put it in a closure sinfo.Act(ss.router, func() { sinfo.reset = true }) @@ -470,7 +472,8 @@ func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { callback := func() { util.PutBytes(p.Payload) if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { - // Either we failed to decrypt, or the session was updated, or we received this packet in the mean time + // Either we failed to decrypt, or the session was updated, or we + // received this packet in the mean time util.PutBytes(bs) return } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index f81e49a..ed8f7b9 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -34,6 +34,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcp struct { link *link + waitgroup sync.WaitGroup mutex sync.Mutex // Protecting the below listeners map[string]*TcpListener calls map[string]struct{} @@ -46,7 +47,12 @@ type tcp struct { // multicast interfaces. type TcpListener struct { Listener net.Listener - Stop chan bool + stop chan struct{} +} + +func (l *TcpListener) Stop() { + defer func() { recover() }() + close(l.stop) } // Wrapper function to set additional options for specific connection types. @@ -96,6 +102,16 @@ func (t *tcp) init(l *link) error { return nil } +func (t *tcp) stop() error { + t.mutex.Lock() + for _, listener := range t.listeners { + listener.Stop() + } + t.mutex.Unlock() + t.waitgroup.Wait() + return nil +} + func (t *tcp) reconfigure() { t.link.core.config.Mutex.RLock() added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen) @@ -121,7 +137,7 @@ func (t *tcp) reconfigure() { t.mutex.Lock() if listener, ok := t.listeners[d[6:]]; ok { t.mutex.Unlock() - listener.Stop <- true + listener.Stop() t.link.core.log.Infoln("Stopped TCP listener:", d[6:]) } else { t.mutex.Unlock() @@ -141,8 +157,9 @@ func (t *tcp) listen(listenaddr string) (*TcpListener, error) { if err == nil { l := TcpListener{ Listener: listener, - Stop: make(chan bool), + stop: make(chan struct{}), } + t.waitgroup.Add(1) go t.listener(&l, listenaddr) return &l, nil } @@ -152,6 +169,7 @@ func (t *tcp) listen(listenaddr string) (*TcpListener, error) { // Runs the listener, which spawns off goroutines for incoming connections. func (t *tcp) listener(l *TcpListener, listenaddr string) { + defer t.waitgroup.Done() if l == nil { return } @@ -166,7 +184,6 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { t.mutex.Unlock() } // And here we go! - accepted := make(chan bool) defer func() { t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String()) l.Listener.Close() @@ -175,36 +192,29 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) { t.mutex.Unlock() }() t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String()) + go func() { + <-l.stop + l.Listener.Close() + }() + defer l.Stop() for { - var sock net.Conn - var err error - // Listen in a separate goroutine, as that way it does not block us from - // receiving "stop" events - go func() { - sock, err = l.Listener.Accept() - accepted <- true - }() - // Wait for either an accepted connection, or a message telling us to stop - // the TCP listener - select { - case <-accepted: - if err != nil { - t.link.core.log.Errorln("Failed to accept connection:", err) - return - } - go t.handler(sock, true, nil) - case <-l.Stop: + sock, err := l.Listener.Accept() + if err != nil { + t.link.core.log.Errorln("Failed to accept connection:", err) return } + t.waitgroup.Add(1) + go t.handler(sock, true, nil) } } // Checks if we already are calling this address -func (t *tcp) isAlreadyCalling(saddr string) bool { +func (t *tcp) startCalling(saddr string) bool { t.mutex.Lock() defer t.mutex.Unlock() _, isIn := t.calls[saddr] - return isIn + t.calls[saddr] = struct{}{} + return !isIn } // Checks if a connection already exists. @@ -218,12 +228,9 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - if t.isAlreadyCalling(callname) { + if !t.startCalling(callname) { return } - t.mutex.Lock() - t.calls[callname] = struct{}{} - t.mutex.Unlock() defer func() { // Block new calls for a little while, to mitigate livelock scenarios time.Sleep(default_timeout) @@ -252,6 +259,7 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { if err != nil { return } + t.waitgroup.Add(1) t.handler(conn, false, saddr) } else { dst, err := net.ResolveTCPAddr("tcp", saddr) @@ -316,12 +324,14 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { t.link.core.log.Debugln("Failed to dial TCP:", err) return } + t.waitgroup.Add(1) t.handler(conn, false, nil) } }() } func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) { + defer t.waitgroup.Done() // Happens after sock.close defer sock.Close() t.setExtraOptions(sock) stream := stream{}