![]()
Parent Directory
|
Revision Log
forgot to check in some stuff after SIGCOMM deadline
/*
* Structured Stream Transport
* Copyright (C) 2006-2008 Massachusetts Institute of Technology
* Author: Bryan Ford
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <cmath>
#include <QDataStream>
#include <QtDebug>
#include "flow.h"
#include "sock.h"
#include "host.h"
#include "xdr.h"
using namespace SST;
#define RTT_INIT (500*1000) // Initial RTT estimate: 1/2 second
#define RTT_MAX (10*1000*1000) // Max round-trip time: ten seconds
#define CWND_MIN ((unsigned)2) // Min congestion window (packets/RTT)
#define CWND_MAX ((unsigned)1<<20)// Max congestion window (packets/RTT)
#define ACKDELAY (10*1000) // 10 milliseconds (1/100 sec)
#define ACKPACKETS 2 // Max outstanding packets to be ACKed
#define ACKACKPACKETS 4 // Delay before for ACKing only ACKs
////////// FlowArmor //////////
FlowArmor::~FlowArmor()
{
}
////////// Flow //////////
Flow::Flow(Host *host, QObject *parent)
: SocketFlow(parent),
h(host),
armr(NULL),
ccmode(CC_TCP), nocc(false),
missthresh(3), // XX make adaptive for robustness to reordering
rtxtimer(host),
linkstat(LinkDown),
delayack(true),
acktimer(host),
statstimer(host)
{
Q_ASSERT(sizeof(txackmask)*8 == maskBits);
// Initialize transmit congestion control state
txseq = 1;
txevts.enqueue(TxEvent(0, false)); Q_ASSERT(txevts.size() == 1);
txevtseq = 0;
txackseq = 0;
txackmask = 1; // Ficticious packet 0 already "received"
txfltcnt = txfltsize = 0;
recovseq = 1;
markseq = 1;
marktime = host->currentTime();
// Initialize congestion control state
ccReset();
// Initialize retransmit state
connect(&rtxtimer, SIGNAL(timeout(bool)),
this, SLOT(rtxTimeout(bool)));
// Delayed ACK state
connect(&acktimer, SIGNAL(timeout(bool)),
this, SLOT(ackTimeout()));
// Initialize receive sequencing/replay protection state
rxseq = 0;
rxmask = 1; // Ficticious packet 0
// Initialize receive acknowledgment/congestion control state
rxackseq = 0;
//rxackmask = 1; // Ficticious packet 0
rxackct = 0;
rxunacked = 0;
// Statistics gathering state
connect(&statstimer, SIGNAL(timeout(bool)),
this, SLOT(statsTimeout()));
//statstimer.start(5*1000);
}
Flow::~Flow()
{
//qDebug() << this << "~Flow()";
if (armr)
delete armr;
//if (cc)
// delete cc;
}
void
Flow::ccReset()
{
qDebug() << this << "ccReset: mode" << ccmode;
cwnd = CWND_MIN;
cwndlim = true;
ssthresh = CWND_MAX;
sstoggle = true;
ssbase = 0;
cwndinc = 1;
cwndmax = CWND_MIN;
lastrtt = 0;
lastpps = 0;
basertt = 0;
basepps = 0;
cumrtt = RTT_INIT;
cumrttvar = 0;
cumpps = 0;
cumppsvar = 0;
cumpwr = 0;
cumbps = 0;
cumloss = 0;
}
void Flow::start(bool initiator)
{
Q_ASSERT(armr);
SocketFlow::start(initiator);
// Test whether we actually need congestion control
if (isSocketCongestionControlled())
nocc = true;
// We're ready to go!
rtxstart();
readyTransmit();
setLinkStatus(LinkUp);
}
void Flow::stop()
{
rtxtimer.stop();
acktimer.stop();
statstimer.stop();
SocketFlow::stop();
setLinkStatus(LinkDown);
// XXX if client: go into TIME-WAIT
// XXX then auto-delete self?
}
inline qint64 Flow::markElapsed()
{
return host()->currentTime().since(marktime).usecs;
}
// Private low-level transmit routine:
// encrypt, authenticate, and transmit a packet
// whose cleartext header and data are already fully set up,
// with a specified ACK sequence/count word.
// Returns true on success, false on error
// (e.g., no output buffer space for packet)
bool Flow::tx(QByteArray &pkt, quint32 packseq, quint64 &pktseq, bool isdata)
{
Q_ASSERT(isActive());
// Don't allow txseq counter to wrap (XXX re-key before it does!)
pktseq = txseq;
Q_ASSERT(txseq < maxPacketSeq);
quint32 ptxseq = ((quint32)pktseq & seqMask) |
(quint32)remoteChannel() << chanShift;
// Fill in the transmit and ACK sequence number fields.
Q_ASSERT(pkt.size() >= 8);
quint32 *pkt32 = (quint32*)pkt.data();
pkt32[0] = htonl(ptxseq);
pkt32[1] = htonl(packseq);
// Encrypt and compute the MAC for the packet
QByteArray epkt = armr->txenc(txseq, pkt);
// Bump transmit sequence number,
// and timestamp if this packet is marked for RTT measurement
// This is the "Point of no return" -
// a failure after this still consumes sequence number space.
if (txseq == markseq) {
marktime = host()->currentTime();
markacks = 0;
markbase = txackseq;
marksent = txseq - txackseq;
}
txseq++;
// Record the transmission event
TxEvent evt(pkt.size(), isdata);
if (isdata) {
txfltcnt++;
txfltsize += evt.size;
//qDebug() << this << "tx-data seq" << txseq
// << "txfltcnt" << txfltcnt;
}
txevts.enqueue(evt);
Q_ASSERT(txevtseq + txevts.size() == txseq);
Q_ASSERT(txfltcnt <= (unsigned)txevts.size());
//qDebug() << this << "tx seq" << txseq << "size" << epkt.size();
// Ship it out
return udpSend(epkt);
}
// Send a standalone ACK packet
bool Flow::transmitAck(QByteArray &pkt, quint64 ackseq, unsigned ackct)
{
//qDebug() << this << "transmitAck" << ackseq << ackct;
Q_ASSERT(ackct <= ackctMax);
if (pkt.size() < hdrlen)
pkt.resize(hdrlen);
quint32 packseq = (ackct << ackctShift) | (ackseq & ackSeqMask);
quint64 pktseq;
return tx(pkt, packseq, pktseq, false);
}
// High-level public transmit function.
bool Flow::flowTransmit(QByteArray &pkt, quint64 &pktseq)
{
Q_ASSERT(pkt.size() > hdrlen); // should be a nonempty data packet
// Include implicit acknowledgment of the latest packet(s) we've acked
quint32 packseq = (rxackct << ackctShift) | (rxackseq & seqMask);
if (rxunacked) {
rxunacked = 0;
acktimer.stop();
}
// Send the packet
bool success = tx(pkt, packseq, pktseq, true);
// If the retransmission timer is inactive, start it afresh.
// (If this was a retransmission, rtxTimeout() would have restarted it.)
if (!rtxtimer.isActive()) {
//qDebug() << "flowTransmit: rtxstart at time"
// << QDateTime::currentDateTime()
// .toString("h:mm:ss:zzz");
rtxstart();
}
return success;
}
int Flow::mayTransmit()
{
if (nocc) // socket already provides congestion control
return SocketFlow::mayTransmit();
if (cwnd > txfltcnt) {
return cwnd - txfltcnt;
} else {
cwndlim = true;
return 0;
}
}
// Flow::rtxtimer invokes this slot when the retransmission timer expires.
// XX to be really compliant with TCP do we need to have
// a retransmission timer per packet?
void Flow::rtxTimeout(bool fail)
{
qDebug() << this << "rtxTimeout" << (fail ? "- FAILED" : "")
<< "period" << rtxtimer.interval();
// Restart the retransmission timer
// with an exponentially increased backoff delay.
rtxtimer.restart();
if (!nocc) switch (ccmode) {
default:
// Reset cwnd and go back to slow start
ssthresh = txfltcnt / 2;
ssthresh = qMax(ssthresh, CWND_MIN);
cwnd = CWND_MIN;
//qDebug("rtxTimeout: ssthresh = %d, cwnd = %d",
// ssthresh, cwnd);
case CC_FIXED:
break; // fixed cwnd, no congestion control
}
// Assume that all in-flight data packets have been dropped,
// and notify the upper layer as such.
// Snapshot txseq first, because the missed() calls in the loop
// might cause more packets to be transmitted.
quint64 seqlim = txseq;
for (quint64 seq = txevtseq; seq < seqlim; seq++) {
TxEvent &e = txevts[seq - txevtseq];
if (e.pipe) {
e.pipe = false;
txfltcnt--;
txfltsize -= e.size;
missed(seq, 1);
qDebug() << this << "rto-missed seq" << seq
<< "txfltcnt" << txfltcnt;
}
}
if (seqlim == txseq) {
Q_ASSERT(txfltcnt == 0);
Q_ASSERT(txfltsize == 0);
}
// Force at least one new packet transmission regardless of cwnd.
// This might not actually send a packet
// if there's nothing on our transmit queue -
// i.e., if no reliable sessions have outstanding data.
// In that case, rtxtimer stays disarmed until the next transmit.
readyTransmit();
// If we exceed a threshold timeout, signal a failed connection.
// The subclass has no obligation to do anything about this, however.
setLinkStatus(fail ? LinkDown : LinkStalled);
}
void Flow::receive(QByteArray &pkt, const SocketEndpoint &)
{
if (!isActive()) {
qDebug() << this << "receive: inactive flow";
return;
}
if (pkt.size() < hdrlen) {
qDebug() << this << "receive: runt packet";
return;
}
// Determine the full 64-bit packet sequence number
quint32 *pkt32 = (quint32*)pkt.data();
quint32 ptxseq = ntohl(pkt32[0]);
quint8 pktchan = ptxseq >> chanShift;
Q_ASSERT(pktchan == localChannel()); // enforced by sock.cc
qint32 seqdiff = ((qint32)(ptxseq << chanBits)
- ((qint32)rxseq << chanBits))
>> chanBits;
quint64 pktseq = rxseq + seqdiff;
//qDebug() << this << "rx seq" << pktseq << "size" << pkt.size();
// Immediately drop too-old or already-received packets
Q_ASSERT(sizeof(rxmask)*8 == maskBits);
if (seqdiff > 0) {
if (pktseq < rxseq) {
qDebug("Flow receive: 64-bit wraparound detected!");
return;
}
} else if (seqdiff <= -maskBits) {
qDebug("Flow receive: too-old packet dropped");
return;
} else if (seqdiff <= 0) {
if (rxmask & (1 << -seqdiff)) {
qDebug("Flow receive: duplicate packet dropped");
return;
}
}
// Authenticate and decrypt the packet
if (!armr->rxdec(pktseq, pkt)) {
qDebug() << this << "receive: auth failed on rx" << pktseq;
return;
}
// Record this packet as received for replay protection
if (seqdiff > 0) {
// Roll rxseq and rxmask forward appropriately.
rxseq = pktseq;
if (seqdiff < maskBits)
rxmask = (rxmask << seqdiff) + 1;
else
rxmask = 1; // bit 0 = packet just received
} else {
// Set appropriate bit in rxmask
Q_ASSERT(seqdiff < 0 && seqdiff > -maskBits);
rxmask |= (1 << -seqdiff);
}
// Decode the rest of the flow header
pkt32 = (quint32*)pkt.data(); // might have changed in armr->rxdec()!
quint32 packseq = ntohl(pkt32[1]);
// Update our transmit state with the ack info in this packet
unsigned ackct = (packseq >> ackctShift) & ackctMask;
qint32 ackdiff = ((qint32)(packseq << chanBits)
- ((qint32)txackseq << chanBits))
>> chanBits;
quint64 ackseq = txackseq + ackdiff;
//qDebug("Flow: recv seq %llu ack %llu(%d) len %d",
// pktseq, ackseq, ackct, pkt.size());
if (ackseq >= txseq) {
qDebug() << "Flow receive: got ACK for packet " << ackseq
<< "not transmitted yet";
return;
}
// Account for newly acknowledged packets
unsigned newpackets = 0;
if (ackdiff > 0) {
// Received acknowledgment for one or more new packets.
// Roll forward txackseq and txackmask.
txackseq = ackseq;
if (ackdiff < maskBits)
txackmask <<= ackdiff;
else
txackmask = 0;
// Determine the number of newly-acknowledged packets
// since the highest previously acknowledged sequence number.
// (Out-of-order ACKs are handled separately below.)
newpackets = qMin((unsigned)ackdiff, ackct+1);
//qDebug() << this << "advanced" << ackdiff
// << "ackct" << ackct
// << "newpackets" << newpackets
// << "txackseq" << txackseq;
// Record the new in-sequence packets in txackmask as received.
// (But note: ackct+1 may also include out-of-sequence pkts.)
txackmask |= (1 << newpackets) - 1;
// Notify the upper layer of newly-acknowledged data packets
for (quint64 seq = txackseq - newpackets + 1;
seq <= txackseq; seq++) {
TxEvent &e = txevts[seq - txevtseq];
if (e.pipe) {
e.pipe = false;
txfltcnt--;
txfltsize -= e.size;
acked(seq, 1, pktseq);
}
}
// Infer that packets left un-acknowledged sufficiently late
// have been dropped, and notify the upper layer as such.
// XX we could avoid some of this arithmetic if we just
// made sequence numbers start a bit higher.
quint64 misslim = txackseq - qMin(txackseq, (quint64)
qMax(missthresh, newpackets));
for (quint64 missseq = txackseq - qMin(txackseq, (quint64)
(missthresh+ackdiff-1));
missseq <= misslim; missseq++) {
TxEvent &e = txevts[missseq - txevtseq];
if (e.pipe) {
//qDebug() << this << "seq" << txevtseq
// << "inferred dropped";
e.pipe = false;
txfltcnt--;
txfltsize -= e.size;
ccMissed(missseq);
missed(missseq, 1);
qDebug() << this << "infer-missed seq"
<< missseq << "txfltcnt" << txfltcnt;
}
}
// Finally, notice packets as they exit our ack window,
// and garbage collect their transmit records,
// since they can never be acknowledged after that.
if (txackseq > (unsigned)maskBits) {
while (txevtseq <= txackseq-maskBits) {
//qDebug() << this << "seq" << txevtseq
// << "expired";
Q_ASSERT(!txevts.head().pipe);
txevts.removeFirst();
txevtseq++;
expire(txevtseq-1, 1);
}
}
// Reset the retransmission timer, since we've made progress.
// Only re-arm it if there's still outstanding unACKed data.
setLinkStatus(LinkUp);
if (txfltcnt) {
//qDebug() << this << "receive: rtxstart at time"
// << QDateTime::currentDateTime()
// .toString("h:mm:ss:zzz");
rtxstart();
} else {
rtxtimer.stop();
}
// Now that we've moved txackseq forward to the packet's ackseq,
// they're now the same, which is important to the code below.
ackdiff = 0;
}
Q_ASSERT(ackdiff <= 0);
// Handle acknowledgments for any straggling out-of-order packets
// (or an out-of-order acknowledgment for in-order packets).
// Set the appropriate bits in our txackmask,
// and count newly acknowledged packets within our window.
quint32 newmask = (1 << ackct) - 1;
if ((txackmask & newmask) != newmask) {
for (unsigned i = 0; i <= ackct; i++) {
int bit = -ackdiff + i;
if (bit >= maskBits)
break;
if (txackmask & (1 << bit))
continue; // already ACKed
txackmask |= (1 << bit);
TxEvent &e = txevts[txackseq - bit - txevtseq];
if (e.pipe) {
e.pipe = false;
txfltcnt--;
txfltsize -= e.size;
acked(txackseq - bit, 1, pktseq);
}
newpackets++;
}
}
// Count the total number of acknowledged packets since the last mark.
markacks += newpackets;
if (!nocc) switch (ccmode) {
case CC_VEGAS:
sstoggle = !sstoggle;
if (sstoggle)
break; // do slow start only once every two RTTs
// fall through...
case CC_TCP: {
// During standard TCP slow start procedure,
// increment cwnd for each newly-ACKed packet.
// XX TCP spec allows this to be <=,
// which puts us in slow start briefly after each loss...
if (newpackets && cwndlim && cwnd < ssthresh) {
cwnd = qMin(cwnd + newpackets, ssthresh);
qDebug("Slow start: %d new ACKs; boost cwnd to %d "
"(ssthresh %d)",
newpackets, cwnd, ssthresh);
}
break; }
case CC_DELAY:
if (cwndinc < 0) // Only slow start during up-phase
break;
// fall through...
case CC_AGGRESSIVE: {
// We're always in slow start, but we only count ACKs received
// on schedule and after a per-roundtrip baseline.
if (markacks > ssbase && markElapsed() <= lastrtt) {
cwnd += qMin(newpackets, markacks - ssbase);
// qDebug("Slow start: %d new ACKs; boost cwnd to %d",
// newpackets, cwnd);
}
break; }
case CC_CTCP:
Q_ASSERT(0); // XXX
case CC_FIXED:
break; // fixed cwnd, no congestion control
}
// When ackseq passes markseq, we've observed a round-trip,
// so update our round-trip statistics.
if (ackseq >= markseq) {
// 'rtt' is the total round-trip delay in microseconds before
// we receive an ACK for a packet at or beyond the mark.
// Fold this into 'rtt' to determine avg round-trip time,
// and restart the timer to measure the next round-trip.
int rtt = markElapsed();
rtt = qMax(1, qMin(RTT_MAX, rtt));
cumrtt = ((cumrtt * 7.0) + rtt) / 8.0;
// Compute an RTT variance measure
float rttvar = fabsf(rtt - cumrtt);
cumrttvar = ((cumrttvar * 7.0) + rttvar) / 8.0;
// 'markacks' is the number of unique packets ACKed
// by the receiver during the time since the last mark.
// Use this to guage throughput during this round-trip.
float pps = (float)markacks * 1000000.0 / rtt;
cumpps = ((cumpps * 7.0) + pps) / 8.0;
// "Power" measures network efficiency
// in the sense of both minimizing rtt and maximizing pps.
float pwr = pps / rtt;
cumpwr = ((cumpwr * 7.0) + pwr) / 8.0;
// Compute a PPS variance measure
float ppsvar = fabsf(pps - cumpps);
cumppsvar = ((cumppsvar * 7.0) + ppsvar) / 8.0;
// Calculate loss rate during this last round-trip,
// and a cumulative loss ratio.
// Could go out of (0.0,1.0) range due to out-of-order acks.
float loss = (float)(marksent - markacks) / (float)marksent;
loss = qMax(0.0f, qMin(1.0f, loss));
cumloss = ((cumloss * 7.0) + loss) / 8.0;
// Reset markseq to be the next packet transmitted.
// The new timestamp will be taken when that packet is sent.
markseq = txseq;
if (!nocc) switch (ccmode) {
case CC_TCP:
// Normal TCP congestion control:
// during congestion avoidance,
// increment cwnd once each RTT,
// but only on round-trips that were cwnd-limited.
if (cwndlim) {
cwnd++;
//qDebug("cwnd %d ssthresh %d",
// cwnd, ssthresh);
}
cwndlim = false;
break;
case CC_AGGRESSIVE:
break;
case CC_DELAY:
if (pwr > basepwr) {
basepwr = pwr;
basertt = rtt;
basepps = pps;
basewnd = markacks;
} else if (markacks <= basewnd && rtt > basertt) {
basertt = rtt;
basepwr = basepps / basertt;
} else if (markacks >= basewnd && pps < basepps) {
basepps = pps;
basepwr = basepps / basertt;
}
if (cwndinc > 0) {
// Window going up.
// If RTT makes a significant jump, reverse.
if (rtt > basertt || cwnd >= CWND_MAX) {
cwndinc = -1;
} else {
// Additively increase the window
cwnd += cwndinc;
}
} else {
// Window going down.
// If PPS makes a significant dive, reverse.
if (pps < basepps || cwnd <= CWND_MIN) {
ssbase = cwnd++;
cwndinc = +1;
} else {
// Additively decrease the window
cwnd += cwndinc;
}
}
cwnd = qMax(CWND_MIN, cwnd);
cwnd = qMin(CWND_MAX, cwnd);
qDebug("RT: pwr %.0f[%.0f/%.0f]@%d "
"base %.0f[%.0f/%.0f]@%d "
"cwnd %d%+d",
pwr*1000.0, pps, (float)rtt, markacks,
basepwr*1000.0, basepps, basertt, basewnd,
cwnd, cwndinc);
break;
case CC_VEGAS: {
// Keep track of the lowest RTT ever seen,
// as per the original Vegas algorithm.
// This has the known problem that it screws up
// if the path's actual base RTT changes.
if (basertt == 0) // first packet
basertt = rtt;
else if (rtt < basertt)
basertt = rtt;
//else
// basertt = (basertt * 255.0 + rtt) / 256.0;
float expect = (float)marksent / basertt;
float actual = (float)marksent / rtt;
float diffpps = expect - actual;
Q_ASSERT(diffpps >= 0.0);
float diffpprt = diffpps * rtt;
if (diffpprt < 1.0 && cwnd < CWND_MAX && cwndlim) {
cwnd++;
// ssthresh = qMax(ssthresh, cwnd / 2); ??
} else if (diffpprt > 3.0 && cwnd > CWND_MIN) {
cwnd--;
ssthresh = qMin(ssthresh, cwnd); // /2??
}
qDebug("Round-trip: win %d basertt %.3f rtt %d "
"exp-pps %f act-pps %f diff-pprt %.3f cwnd %d",
marksent, basertt, rtt,
expect*1000000.0, actual*1000000.0,
diffpprt, cwnd);
break; }
case CC_CTCP: {
#if 0
k = 0.8; a = 1/8; B = 1/2
if (in-recovery)
...
else if (diff < y) {
dwnd += sqrt(win)/8.0 - 1;
} else
dwnd -= C * diff;
#endif
break; }
case CC_FIXED:
break; // fixed cwnd, no congestion control
}
if (nocc)
qDebug() << "End-to-end rtt" << rtt << "cum" << cumrtt;
else
qDebug("Cumulative: rtt %.3f[%.3f] pps %.3f[%.3f] pwr %.3f "
"loss %.3f",
cumrtt, cumrttvar, cumpps, cumppsvar, cumpwr, cumloss);
lastrtt = rtt;
lastpps = pps;
}
// Always clamp cwnd against CWND_MAX.
cwnd = qMin(cwnd, CWND_MAX);
// Pass the received packet to the upper layer for processing.
// It'll return true if it wants us to ack the packet, false otherwise.
if (flowReceive(pktseq, pkt))
acknowledge(pktseq, true);
// XX should still replay-protect even if no ack!
// Signal upper layer that we can transmit more, if appropriate
if (newpackets > 0 && mayTransmit())
readyTransmit();
}
void Flow::acknowledge(quint16 pktseq, bool sendack)
{
//qDebug() << this << "acknowledging" << pktseq
// << (sendack ? "(sending)" : "(not sending)")
// << "rxunacked" << rxunacked;
// Update our receive state to account for this packet
qint32 seqdiff = pktseq - rxackseq;
if (seqdiff == 1) {
// Received packet is in-order and contiguous.
// Roll rxackseq and rxackmask forward appropriately.
rxackseq = pktseq;
//rxackmask = (rxackmask << 1) + 1;
rxackct++;
if (rxackct > ackctMax)
rxackct = ackctMax;
// ACK the received packet if appropriate.
// Delay our ACK for up to ACKPACKETS
// received non-ACK-only packets,
// or up to ACKACKPACKETS continuous ack-only packets.
++rxunacked;
if (!sendack && rxunacked < ACKACKPACKETS) {
// Only ack acks occasionally,
// and don't start the ack timer for them.
return;
}
if (rxunacked < ACKACKPACKETS) {
// Schedule an ack for transmission
// by starting the ack timer.
// We normally do this even in for non-delayed acks,
// so that we can process any other
// already-received packets first
// and have a chance to combine multiple acks into one.
if (delayack && rxunacked < ACKPACKETS) {
// Data packet - start delayed ack timer.
if (!acktimer.isActive())
acktimer.start(ACKDELAY);
} else {
// Start with zero timeout -
// immediate callback from event loop
acktimer.start(0);
}
} else {
// But make sure we send an ack every 4 packets
// no matter what...
flushack();
}
} else if (seqdiff > 1) {
// Received packet is in-order but discontiguous.
// One or more packets probably were lost.
// Flush any delayed ACK immediately,
// before updating our receive state.
flushack();
// Roll rxackseq and rxackmask forward appropriately.
rxackseq = pktseq;
//if (seqdiff < maskBits)
// rxackmask = (rxackmask << seqdiff) + 1;
//else
// rxackmask = 1; // bit 0 = packet just received
// Reset the contiguous packet counter
rxackct = 0; // (0 means 1 packet received)
// ACK this discontiguous packet immediately
// so that the sender is informed of lost packets ASAP.
if (sendack)
txack(rxackseq, 0);
} else if (seqdiff < 0) {
// Old packet recieved out of order.
// Flush any delayed ACK immediately.
flushack();
// Set appropriate bit in rxackmask
//if (-seqdiff < maskBits)
// rxackmask |= (1 << -seqdiff);
// ACK this out-of-order packet immediately.
if (sendack)
txack(pktseq, 0);
}
}
void Flow::ccMissed(quint64 pktseq)
{
qDebug() << "Missed seq" << pktseq;
// Notify congestion control
if (!nocc) switch (ccmode) {
case CC_TCP:
case CC_DELAY:
case CC_VEGAS: {
// Packet loss detected -
// perform standard TCP congestion control
if (pktseq <= recovseq) {
// We're in a fast recovery window:
// this isn't a new loss event.
break;
}
// new loss event: cut ssthresh and cwnd
//ssthresh = (txseq - txackseq) / 2; XXX
ssthresh = cwnd / 2;
ssthresh = qMax(ssthresh, CWND_MIN);
//qDebug("%d PACKETS LOST: cwnd %d -> %d",
// ackdiff - newpackets, cwnd, ssthresh);
cwnd = ssthresh;
// fast recovery for the rest of this window
recovseq = txseq;
break; }
case CC_AGGRESSIVE: {
// Number of packets we think have been lost
// so far during this round-trip.
int lost = (txackseq - markbase) - markacks;
lost = qMax(0, lost);
// Number of packets we expect to receive,
// assuming the lost ones are really lost
// and we don't lose any more this round-trip.
unsigned expected = marksent - lost;
// Clamp the congestion window to this value.
if (expected < cwnd) {
qDebug("PACKETS LOST: cwnd %d -> %d", cwnd, expected);
cwnd = ssbase = expected;
cwnd = qMax(CWND_MIN, cwnd);
}
break; }
case CC_CTCP:
Q_ASSERT(0); // XXX
case CC_FIXED:
break; // fixed cwnd, no congestion control
}
}
void Flow::ackTimeout()
{
flushack();
}
void Flow::statsTimeout()
{
qDebug("Stats: txseq %llu txackseq %llu "
"rxseq %llu rxackseq %llu"
"txfltcnt %d cwnd %d ssthresh %d\n\t"
"cumrtt %.3f cumpps %.3f cumloss %.3f",
txseq, txackseq, rxseq, rxackseq, txfltcnt, cwnd, ssthresh,
cumrtt, cumpps, cumloss);
}
void Flow::acked(quint64 seq, int n, quint64)
{
//qDebug() << this << "tx seq" << seq << "-" << seq+n-1 << "acked";
}
void Flow::missed(quint64, int)
{
//qDebug() << this << "tx seq" << seq << "missed";
}
void Flow::expire(quint64, int)
{
//qDebug() << this << "tx seq" << seq << "expired";
}
////////// ChecksumArmor //////////
ChecksumArmor::ChecksumArmor(uint32_t txkey, uint32_t rxkey,
const QByteArray &armorid)
: txkey(txkey), rxkey(rxkey), armorid(armorid)
{
}
QByteArray ChecksumArmor::txenc(qint64 pktseq, const QByteArray &pkt)
{
// Copy the packet so we can append the checksum
QByteArray epkt = pkt;
int size = epkt.size();
// Compute the checksum for the packet,
// including the full 64-bit packet sequence number as a pseudo-header.
Chk32 chk;
quint32 ivec[2] = { htonl(pktseq >> 32), htonl(pktseq) };
chk.update(ivec, 8);
chk.update(epkt.data(), size);
quint32 sum = htonl(chk.final() + txkey);
QByteArray sumbuf((const char*)&sum, 4);
// Append it and return the resulting packet.
epkt.append(sumbuf);
return epkt;
}
bool ChecksumArmor::rxdec(qint64 pktseq, QByteArray &pkt)
{
int size = pkt.size() - 4;
if (size < Flow::hdrlen)
return false; // too small to contain a full checksum
// Compute the checksum for the packet,
// including the full 64-bit packet sequence number as a pseudo-header.
Chk32 chk;
quint32 ivec[2] = { htonl(pktseq >> 32), htonl(pktseq) };
chk.update(ivec, 8);
chk.update(pkt.data(), size);
quint32 sum = htonl(chk.final() + rxkey);
QByteArray sumbuf((const char*)&sum, 4);
// Verify and strip the packet's checksum.
if (pkt.mid(size) != sumbuf)
return false;
pkt.resize(size);
return true;
}
////////// AESArmor //////////
#include "aes.h"
#include "hmac.h"
AESArmor::AESArmor(const QByteArray &txenckey, const QByteArray &txmackey,
const QByteArray &rxenckey, const QByteArray &rxmackey)
: txaes(txenckey, AES::CtrEncrypt), rxaes(rxenckey, AES::CtrDecrypt),
txmac(txmackey), rxmac(rxmackey)
{
//qDebug() << this << "txenc" << txenckey.toBase64();
//qDebug() << this << "rxenc" << rxenckey.toBase64();
//qDebug() << this << "txmac" << txmackey.toBase64();
//qDebug() << this << "rxmac" << rxmackey.toBase64();
}
QByteArray AESArmor::txenc(qint64 pktseq, const QByteArray &pkt)
{
int size = pkt.size();
const quint8 *buf = (const quint8*)pkt.constData();
// Create a buffer for the encrypted packet
QByteArray epkt;
epkt.resize(size);
quint8 *ebuf = (quint8*)epkt.data();
// Build the initialization vector template for encryption.
// We also use the first 8 bytes as a pseudo-header for the MAC.
ivec.l[0] = htonl(pktseq >> 32);
ivec.l[1] = htonl(pktseq);
ivec.l[2] = htonl(0x56584166); // 'VXAf'
ivec.l[3] = 0; // per-packet block counter
// Copy the unencrypted header (XX hack)
Q_ASSERT(encofs == 4);
*(quint32*)ebuf = *(quint32*)buf;
// Encrypt the block in CTR mode
txaes.ctrEncrypt(buf + encofs, ebuf + encofs, size - encofs, &ivec);
// Compute the MAC for the packet,
// including the full 64-bit packet sequence number as a pseudo-header.
HMAC hmac(txmac);
hmac.update(ivec.b, 8);
hmac.update(ebuf, size);
hmac.finalAppend(epkt);
Q_ASSERT(epkt.size() == size + HMACLEN);
//qDebug() << this << "txenc" << pktseq << epkt.size();
return epkt;
}
bool AESArmor::rxdec(qint64 pktseq, QByteArray &pkt)
{
//qDebug() << this << "rxdec" << pktseq << pkt.size();
int size = pkt.size() - HMACLEN;
if (size < Flow::hdrlen) {
qDebug() << this << "rxdec: received packet too small";
return false; // too small to contain a full HMAC
}
// Build the initialization vector template for decryption.
// We also use the first 8 bytes as a pseudo-header for the MAC.
ivec.l[0] = htonl(pktseq >> 32);
ivec.l[1] = htonl(pktseq);
ivec.l[2] = htonl(0x56584166); // 'VXAf'
ivec.l[3] = 0; // per-packet block counter
// Verify the packet's MAC.
HMAC hmac(rxmac);
hmac.update(ivec.b, 8);
hmac.update(pkt.data(), size);
if (!hmac.finalVerify(pkt))
return false;
// Decrypt the block in CTR mode
quint8 *buf = (quint8*)pkt.data();
rxaes.ctrDecrypt(buf + encofs, buf + encofs, size - encofs, &ivec);
return true;
}
| Maintained by PDOS | ViewVC Help |
| Powered by ViewVC 1.0.3 |