![]()
Parent Directory
|
Revision Log
reno & vegas-style congestion control now working across multiple segments
/*
* Structured Stream Transport
* Copyright (C) 2006-2008 Massachusetts Institute of Technology
* Author: Bryan Ford
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <string.h>
#include <QDebug>
#include "host.h"
#include "ident.h"
#include "key.h"
#include "seg.h"
using namespace SST;
////////// FlowSegment //////////
FlowSegment::FlowSegment(Host *host, QObject *parent)
: Flow(host, parent),
other(NULL), fsock(NULL), qlim(qlim_default)
{
connect(this, SIGNAL(readyTransmit()), this, SLOT(gotReadyTransmit()));
}
bool FlowSegment::initiateTo(Socket *sock, const Endpoint &remoteep)
{
// Bind the segment to the specified socket
if (!bind(sock, remoteep)) {
qDebug() << "FlowSegment::initiateTo" << remoteep.toString()
<< "failed";
return false;
}
// Start the connection process
KeyInitiator *ki = new KeyInitiator(this, flow_seg_magic,
Ident::fromEndpoint(remoteep).id());
(void)ki;
//connect(ki, SIGNAL(completed(bool)),
// this, SLOT(keyCompleted(bool)));
return true;
}
bool FlowSegment::flowReceive(qint64 pktseq, QByteArray &pkt)
{
qDebug() << this << "flowReceive seq" << pktseq << "size" << pkt.size();
if (fsock) {
// Ignore standalone ACK packets.
if (pkt.size() <= hdrlen) {
// It's a standalone acknowledgment packet,
// generated by the default impl of Flow::transmitAck().
// Count this explicit ack packet as received,
// but do NOT send another ack just to ack this ack!
acknowledge(pktseq, false);
return false;
}
// XXX safer to ack first?
QByteArray data = pkt.mid(hdrlen);
qDebug() << fsock << "recv size" << data.size();
fsock->receive(data, fsock->peer);
return true; // go ahead and acknowledge
}
// We must be a flow middlebox.
Q_ASSERT(other);
// Stash the packet in our queue until the next segment takes it.
// Implement a basic drop-tail policy for the moment.
if (rxq.size() >= qlim) {
qDebug("flow middlebox queue overflow; DROP packet");
return false;
}
RxPkt rp;
rp.rxseq = pktseq;
rp.pkt = pkt;
rxq.enqueue(rp);
qDebug() << this << "queue size" << rxq.size();
// Notify the downstream segment
Q_ASSERT(other->other == this);
other->readyTransmit();
return false;
}
void FlowSegment::gotReadyTransmit()
{
if (fsock) {
// Pass the ready indication on up the stack.
return fsock->readyTransmitToPeer();
}
// We must be a flow middlebox.
Q_ASSERT(other);
while (mayTransmit()) {
if (other->rxq.empty())
return; // nothing to transmit
// Dequeue a packet from the upstream segment
RxPkt rp = other->rxq.dequeue();
// Transmit it onto the downstream segment,
// if it's a data segment to be forwarded.
bool isdata = rp.pkt.size() > hdrlen;
if (isdata) {
quint64 txseq;
flowTransmit(rp.pkt, txseq);
}
// Now we can finally acknowledge it on the previous segment
other->acknowledge(rp.rxseq, isdata);
}
}
////////// FlowSocket //////////
FlowSocket::FlowSocket(Host *host, const QHostAddress &peer, QObject *parent)
: Socket(host, parent),
h(host),
fseg(NULL),
peer(SocketEndpoint(Endpoint(peer, NETSTERIA_DEFAULT_PORT), this))
{
setActive(true); // XXX should we already???
qDebug() << this << "active sockets" << host->activeSockets();
}
FlowSocket::~FlowSocket()
{
qDebug() << this << "~FlowSocket";
}
FlowSegment *FlowSocket::initiateTo(const Endpoint &remoteep)
{
Q_ASSERT(fseg == NULL);
fseg = new FlowSegment(host(), this);
fseg->fsock = this;
Q_ASSERT(fseg->other == NULL);
// Bind the segment to the host's main socket
if (!fseg->initiateTo(host()->activeSockets().at(0), remoteep))
qFatal("FlowSocket::initiateTo failed");//XXX
return fseg;
}
// XXX unused
void FlowSocket::keyCompleted(bool success)
{
if (!success)
qFatal("FlowSocket key agreement failed!"); //XXX
qDebug() << "FlowSocket key agreement succeeded";
}
bool FlowSocket::bindFlow(const Endpoint &remoteep, Channel localchan,
SocketFlow *flow)
{
if (remoteep != peer) {
qDebug() << "Can't bind flow to" << remoteep.toString()
<< "over FlowSocket to" << peer.toString();
return false;
}
if (!Socket::bindFlow(remoteep, localchan, flow))
return false;
// Propagate the underlying flow's readyTransmit() signal upward
connect(this, SIGNAL(readyTransmitToPeer()),
flow, SIGNAL(readyTransmit()));
return true;
}
bool FlowSocket::send(const Endpoint &ep, const char *data, int size)
{
//qDebug() << this << "send to" << ep << "size" << size;
Q_ASSERT(ep == peer);
Q_ASSERT(fseg);
if (!fseg->isActive()) {
qDebug() << this << "send: dropping packet for inactive flow";
return false;
}
QByteArray pkt;
pkt.resize(fseg->hdrlen + size);
memcpy(pkt.data() + fseg->hdrlen, data, size);
quint64 txseq;
bool success = fseg->flowTransmit(pkt, txseq);
qDebug() << fseg << "flowTransmit seq" << txseq << "size" << size;
return success;
}
bool FlowSocket::isCongestionControlled(const Endpoint &)
{
return true;
}
int FlowSocket::mayTransmit(const Endpoint &ep)
{
Q_ASSERT(ep == peer);
return fseg->mayTransmit();
}
bool FlowSocket::bind(const QHostAddress &, quint16, QUdpSocket::BindMode)
{
qFatal("FlowSocket::bind() shouldn't be called");//XXX???
return false;
}
QList<Endpoint> FlowSocket::localEndpoints()
{
qFatal("FlowSocket::localEndpoints() shouldn't be called");//XXX???
return QList<Endpoint>();
}
quint16 FlowSocket::localPort()
{
qFatal("FlowSocket::localPort() shouldn't be called");//XXX???
return 0;
}
QString FlowSocket::errorString()
{
qFatal("FlowSocket::errorString() shouldn't be called");//XXX???
return QString();
}
////////// FlowResponder //////////
FlowResponder::FlowResponder(Host *h)
: KeyResponder(h, flow_seg_magic),
fsock(NULL),
lastiseg(NULL), lastoseg(NULL) // XXX
{
}
void FlowResponder::forwardTo(const Endpoint &targetep)
{
this->targetep = targetep;
}
void FlowResponder::forwardUp(FlowSocket *fsock)
{
this->fsock = fsock;
}
Flow *FlowResponder::newFlow(const SocketEndpoint &epi, const QByteArray &,
const QByteArray &, QByteArray &)
{
FlowSegment *fseg = new FlowSegment(host(), this);
// Bind the flow to the socket the request came in on.
if (!fseg->bind(epi)) {
qDebug("FlowResponder: could not bind new flow");
delete fseg;
return NULL;
}
lastiseg = fseg; // XXX hack
// Set up the onward or upward forwarding path
if (fsock) {
fseg->fsock = fsock; // just forward up to this socket
fseg->setParent(fsock);
Q_ASSERT(fsock->fseg == NULL); // XXX
fsock->fseg = fseg;
} else {
// Initiate a corresponding outgoing flow
Q_ASSERT(!targetep.isNull());
FlowSegment *oseg = new FlowSegment(host(), this);
if (!oseg->initiateTo(epi.sock, targetep)) {
qDebug() << "FlowResponder: couldn't initiate to next"
<< targetep.toString();
delete fseg;
return NULL;
}
oseg->other = fseg;
fseg->other = oseg;
lastoseg = oseg; // XXX hack
}
return fseg;
}
| Maintained by PDOS | ViewVC Help |
| Powered by ViewVC 1.0.3 |