diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index ff3bbe7..9f7707e 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -556,8 +556,10 @@ func DEBUG_simLinkPeers(p, q *peer) { goWorkers := func(source, dest *peer) { source.linkOut = make(chan []byte, 1) send := make(chan []byte, 1) - source.out = func(bs []byte) { - send <- bs + source.out = func(bss [][]byte) { + for _, bs := range bss { + send <- bs + } } go source.linkLoop() go func() { diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index eca96eb..4ce374b 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -37,7 +37,7 @@ type linkInfo struct { type linkInterfaceMsgIO interface { readMsg() ([]byte, error) - writeMsg([]byte) (int, error) + writeMsgs([][]byte) (int, error) close() error // These are temporary workarounds to stream semantics _sendMetaBytes([]byte) error @@ -207,11 +207,11 @@ func (intf *linkInterface) handler() error { intf.link.core.peers.removePeer(intf.peer.port) }() // Finish setting up the peer struct - out := make(chan []byte, 1) + out := make(chan [][]byte, 1) defer close(out) - intf.peer.out = func(msg []byte) { + intf.peer.out = func(msgs [][]byte) { defer func() { recover() }() - out <- msg + out <- msgs } intf.peer.linkOut = make(chan []byte, 1) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) @@ -234,12 +234,12 @@ func (intf *linkInterface) handler() error { interval := 4 * time.Second tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp defer util.TimerStop(tcpTimer) - send := func(bs []byte) { + send := func(bss [][]byte) { sendBlocked.Reset(time.Second) - intf.msgIO.writeMsg(bs) + size, _ := intf.msgIO.writeMsgs(bss) util.TimerStop(sendBlocked) select { - case signalSent <- len(bs) > 0: + case signalSent <- size > 0: default: } } @@ -247,7 +247,7 @@ func (intf *linkInterface) handler() error { // First try to send any link protocol traffic select { case msg := <-intf.peer.linkOut: - send(msg) + send([][]byte{msg}) continue default: } @@ -259,19 +259,21 @@ func (intf *linkInterface) handler() error { case <-tcpTimer.C: intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send(nil) + send([][]byte{nil}) case <-sendAck: intf.link.core.log.Tracef("Sending ack to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send(nil) + send([][]byte{nil}) case msg := <-intf.peer.linkOut: - send(msg) - case msg, ok := <-out: + send([][]byte{msg}) + case msgs, ok := <-out: if !ok { return } - send(msg) - util.PutBytes(msg) + send(msgs) + for _, msg := range msgs { + util.PutBytes(msg) + } select { case signalReady <- struct{}{}: default: diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 06201f9..379ca85 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -109,7 +109,7 @@ type peer struct { linkOut (chan []byte) // used for protocol traffic (to bypass queues) doSend (chan struct{}) // tell the linkLoop to send a switchMsg dinfo (chan *dhtInfo) // used to keep the DHT working - out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes close func() // Called when a peer is removed, to close the underlying connection, or via admin api } @@ -250,11 +250,15 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { } // This just calls p.out(packet) for now. -func (p *peer) sendPacket(packet []byte) { +func (p *peer) sendPackets(packets [][]byte) { // Is there ever a case where something more complicated is needed? // What if p.out blocks? - atomic.AddUint64(&p.bytesSent, uint64(len(packet))) - p.out(packet) + var size int + for _, packet := range packets { + size += len(packet) + } + atomic.AddUint64(&p.bytesSent, uint64(size)) + p.out(packets) } // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 7e2a325..bdead84 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -39,10 +39,10 @@ type router struct { reconfigure chan chan error addr address.Address subnet address.Subnet - in <-chan []byte // packets we received from the network, link to peer's "out" - out func([]byte) // packets we're sending to the network, link to peer's "in" - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff + in <-chan [][]byte // packets we received from the network, link to peer's "out" + out func([]byte) // packets we're sending to the network, link to peer's "in" + reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff nodeinfo nodeinfo } @@ -52,7 +52,7 @@ func (r *router) init(core *Core) { r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) - in := make(chan []byte, 1) // TODO something better than this... + in := make(chan [][]byte, 1) // TODO something better than this... self := linkInterface{ name: "(self)", info: linkInfo{ @@ -62,7 +62,7 @@ func (r *router) init(core *Core) { }, } p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - p.out = func(packet []byte) { in <- packet } + p.out = func(packets [][]byte) { in <- packets } r.in = in out := make(chan []byte, 32) go func() { @@ -114,8 +114,10 @@ func (r *router) mainLoop() { defer ticker.Stop() for { select { - case p := <-r.in: - r.handleIn(p) + case ps := <-r.in: + for _, p := range ps { + r.handleIn(p) + } case info := <-r.core.dht.peers: r.core.dht.insertPeer(info) case <-r.reset: diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 56d4754..4ab37c2 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -35,29 +35,19 @@ func (s *stream) init(rwc io.ReadWriteCloser) { } // writeMsg writes a message with stream padding, and is *not* thread safe. -func (s *stream) writeMsg(bs []byte) (int, error) { +func (s *stream) writeMsgs(bss [][]byte) (int, error) { buf := s.outputBuffer[:0] - buf = append(buf, streamMsg[:]) - l := wire_put_uint64(uint64(len(bs)), util.GetBytes()) - defer util.PutBytes(l) - buf = append(buf, l) - padLen := len(buf[0]) + len(buf[1]) - buf = append(buf, bs) - totalLen := padLen + len(bs) - s.outputBuffer = buf[:0] // So we can reuse the same underlying array later - var bn int - for bn < totalLen { - n, err := buf.WriteTo(s.rwc) - bn += int(n) - if err != nil { - l := bn - padLen - if l < 0 { - l = 0 - } - return l, err - } + var written int + for _, bs := range bss { + buf = append(buf, streamMsg[:]) + buf = append(buf, wire_encode_uint64(uint64(len(bs)))) + buf = append(buf, bs) + written += len(bs) } - return len(bs), nil + s.outputBuffer = buf[:0] // So we can reuse the same underlying array later + _, err := buf.WriteTo(s.rwc) + // TODO only include number of bytes from bs *successfully* written? + return written, err } // readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d092625..cc316d1 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -709,7 +709,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo if best != nil { // Send to the best idle next hop delete(idle, best.port) - best.sendPacket(packet) + best.sendPackets([][]byte{packet}) return true } // Didn't find anyone idle to send it to @@ -784,39 +784,49 @@ func (t *switchTable) handleIdle(port switchPort) bool { if to == nil { return true } - var best string - var bestPriority float64 + var packets [][]byte + var psize int t.queues.cleanup(t) now := time.Now() - for streamID, buf := range t.queues.bufs { - // Filter over the streams that this node is closer to - // Keep the one with the smallest queue - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority > bestPriority && t.portIsCloser(coords, port) { - best = streamID - bestPriority = priority + for psize < 65535 { + var best string + var bestPriority float64 + for streamID, buf := range t.queues.bufs { + // Filter over the streams that this node is closer to + // Keep the one with the smallest queue + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + priority := float64(now.Sub(packet.time)) / float64(buf.size) + if priority > bestPriority && t.portIsCloser(coords, port) { + best = streamID + bestPriority = priority + } } - } - if bestPriority != 0 { - buf := t.queues.bufs[best] - var packet switch_packetInfo - // TODO decide if this should be LIFO or FIFO - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - t.queues.size -= uint64(len(packet.bytes)) - if len(buf.packets) == 0 { - delete(t.queues.bufs, best) + if bestPriority != 0 { + buf := t.queues.bufs[best] + var packet switch_packetInfo + // TODO decide if this should be LIFO or FIFO + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + t.queues.size -= uint64(len(packet.bytes)) + if len(buf.packets) == 0 { + delete(t.queues.bufs, best) + } else { + // Need to update the map, since buf was retrieved by value + t.queues.bufs[best] = buf + } + packets = append(packets, packet.bytes) + psize += len(packet.bytes) } else { - // Need to update the map, since buf was retrieved by value - t.queues.bufs[best] = buf + // Finished finding packets + break } - to.sendPacket(packet.bytes) - return true - } else { - return false } + if len(packets) > 0 { + to.sendPackets(packets) + return true + } + return false } // The switch worker does routing lookups and sends packets to where they need to be @@ -826,7 +836,7 @@ func (t *switchTable) doWorker() { // Keep sending packets to the router self := t.core.peers.getPorts()[0] for bs := range sendingToRouter { - self.sendPacket(bs) + self.sendPackets([][]byte{bs}) } }() go func() {