PDOS

[uia] / trunk / uia / sst / lib / seg.cc  

View of /trunk/uia/sst/lib/seg.cc

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3678 - (download) (as text) (annotate)
Thu Jan 22 16:55:11 2009 UTC (10 months ago) by baford
File size: 7588 byte(s)
reno & vegas-style congestion control now working across multiple segments
/*
 * Structured Stream Transport
 * Copyright (C) 2006-2008 Massachusetts Institute of Technology
 * Author: Bryan Ford
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 * 
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 */


#include <string.h>

#include <QDebug>

#include "host.h"
#include "ident.h"
#include "key.h"
#include "seg.h"

using namespace SST;


////////// FlowSegment //////////

FlowSegment::FlowSegment(Host *host, QObject *parent)
:	Flow(host, parent),
	other(NULL), fsock(NULL), qlim(qlim_default)
{
	connect(this, SIGNAL(readyTransmit()), this, SLOT(gotReadyTransmit()));
}

bool FlowSegment::initiateTo(Socket *sock, const Endpoint &remoteep)
{
	// Bind the segment to the specified socket
	if (!bind(sock, remoteep)) {
		qDebug() << "FlowSegment::initiateTo" << remoteep.toString()
			<< "failed";
		return false;
	}

	// Start the connection process
	KeyInitiator *ki = new KeyInitiator(this, flow_seg_magic,
				Ident::fromEndpoint(remoteep).id());
	(void)ki;
	//connect(ki, SIGNAL(completed(bool)),
	//	this, SLOT(keyCompleted(bool)));

	return true;
}

bool FlowSegment::flowReceive(qint64 pktseq, QByteArray &pkt)
{
	qDebug() << this << "flowReceive seq" << pktseq << "size" << pkt.size();

	if (fsock) {
		// Ignore standalone ACK packets.
		if (pkt.size() <= hdrlen) {
			// It's a standalone acknowledgment packet,
			// generated by the default impl of Flow::transmitAck().
			// Count this explicit ack packet as received,
			// but do NOT send another ack just to ack this ack!
			acknowledge(pktseq, false);
			return false;
		}

		// XXX safer to ack first?
		QByteArray data = pkt.mid(hdrlen);
		qDebug() << fsock << "recv size" << data.size();
		fsock->receive(data, fsock->peer);
		return true;	// go ahead and acknowledge
	}

	// We must be a flow middlebox.
	Q_ASSERT(other);

	// Stash the packet in our queue until the next segment takes it.
	// Implement a basic drop-tail policy for the moment.
	if (rxq.size() >= qlim) {
		qDebug("flow middlebox queue overflow; DROP packet");
		return false;
	}
	RxPkt rp;
	rp.rxseq = pktseq;
	rp.pkt = pkt;
	rxq.enqueue(rp);
	qDebug() << this << "queue size" << rxq.size();

	// Notify the downstream segment
	Q_ASSERT(other->other == this);
	other->readyTransmit();

	return false;
}

void FlowSegment::gotReadyTransmit()
{
	if (fsock) {
		// Pass the ready indication on up the stack.
		return fsock->readyTransmitToPeer();
	}

	// We must be a flow middlebox.
	Q_ASSERT(other);

	while (mayTransmit()) {
		if (other->rxq.empty())
			return;		// nothing to transmit

		// Dequeue a packet from the upstream segment
		RxPkt rp = other->rxq.dequeue();

		// Transmit it onto the downstream segment,
		// if it's a data segment to be forwarded.
		bool isdata = rp.pkt.size() > hdrlen;
		if (isdata) {
			quint64 txseq;
			flowTransmit(rp.pkt, txseq);
		}

		// Now we can finally acknowledge it on the previous segment
		other->acknowledge(rp.rxseq, isdata);
	}
}


////////// FlowSocket //////////

FlowSocket::FlowSocket(Host *host, const QHostAddress &peer, QObject *parent)
:	Socket(host, parent),
	h(host),
	fseg(NULL),
	peer(SocketEndpoint(Endpoint(peer, NETSTERIA_DEFAULT_PORT), this))
{
	setActive(true);	// XXX should we already???

	qDebug() << this << "active sockets" << host->activeSockets();
}

FlowSocket::~FlowSocket()
{
	qDebug() << this << "~FlowSocket";
}

FlowSegment *FlowSocket::initiateTo(const Endpoint &remoteep)
{
	Q_ASSERT(fseg == NULL);
	fseg = new FlowSegment(host(), this);
	fseg->fsock = this;
	Q_ASSERT(fseg->other == NULL);

	// Bind the segment to the host's main socket
	if (!fseg->initiateTo(host()->activeSockets().at(0), remoteep))
		qFatal("FlowSocket::initiateTo failed");//XXX

	return fseg;
}

// XXX unused
void FlowSocket::keyCompleted(bool success)
{
	if (!success)
		qFatal("FlowSocket key agreement failed!"); //XXX

	qDebug() << "FlowSocket key agreement succeeded";
}

bool FlowSocket::bindFlow(const Endpoint &remoteep, Channel localchan,
			SocketFlow *flow)
{
	if (remoteep != peer) {
		qDebug() << "Can't bind flow to" << remoteep.toString()
			<< "over FlowSocket to" << peer.toString();
		return false;
	}

	if (!Socket::bindFlow(remoteep, localchan, flow))
		return false;

	// Propagate the underlying flow's readyTransmit() signal upward
	connect(this, SIGNAL(readyTransmitToPeer()),
		flow, SIGNAL(readyTransmit()));

	return true;
}

bool FlowSocket::send(const Endpoint &ep, const char *data, int size)
{
	//qDebug() << this << "send to" << ep << "size" << size;

	Q_ASSERT(ep == peer);
	Q_ASSERT(fseg);

	if (!fseg->isActive()) {
		qDebug() << this << "send: dropping packet for inactive flow";
		return false;
	}

	QByteArray pkt;
	pkt.resize(fseg->hdrlen + size);
	memcpy(pkt.data() + fseg->hdrlen, data, size);

	quint64 txseq;
	bool success = fseg->flowTransmit(pkt, txseq);
	qDebug() << fseg << "flowTransmit seq" << txseq << "size" << size;

	return success;
}

bool FlowSocket::isCongestionControlled(const Endpoint &)
{
	return true;
}

int FlowSocket::mayTransmit(const Endpoint &ep)
{
	Q_ASSERT(ep == peer);

	return fseg->mayTransmit();
}

bool FlowSocket::bind(const QHostAddress &, quint16, QUdpSocket::BindMode)
{
	qFatal("FlowSocket::bind() shouldn't be called");//XXX???
	return false;
}

QList<Endpoint> FlowSocket::localEndpoints()
{
	qFatal("FlowSocket::localEndpoints() shouldn't be called");//XXX???
	return QList<Endpoint>();
}

quint16 FlowSocket::localPort()
{
	qFatal("FlowSocket::localPort() shouldn't be called");//XXX???
	return 0;
}

QString FlowSocket::errorString()
{
	qFatal("FlowSocket::errorString() shouldn't be called");//XXX???
	return QString();
}

////////// FlowResponder //////////

FlowResponder::FlowResponder(Host *h)
:	KeyResponder(h, flow_seg_magic),
	fsock(NULL),
	lastiseg(NULL), lastoseg(NULL)	// XXX
{
}

void FlowResponder::forwardTo(const Endpoint &targetep)
{
	this->targetep = targetep;
}

void FlowResponder::forwardUp(FlowSocket *fsock)
{
	this->fsock = fsock;
}

Flow *FlowResponder::newFlow(const SocketEndpoint &epi, const QByteArray &,
				const QByteArray &, QByteArray &)
{
	FlowSegment *fseg = new FlowSegment(host(), this);

	// Bind the flow to the socket the request came in on.
	if (!fseg->bind(epi)) {
		qDebug("FlowResponder: could not bind new flow");
		delete fseg;
		return NULL;
	}

	lastiseg = fseg;	// XXX hack

	// Set up the onward or upward forwarding path
	if (fsock) {
		fseg->fsock = fsock;	// just forward up to this socket
		fseg->setParent(fsock);

		Q_ASSERT(fsock->fseg == NULL);	// XXX
		fsock->fseg = fseg;

	} else {
		// Initiate a corresponding outgoing flow
		Q_ASSERT(!targetep.isNull());
		FlowSegment *oseg = new FlowSegment(host(), this);
		if (!oseg->initiateTo(epi.sock, targetep)) {
			qDebug() << "FlowResponder: couldn't initiate to next"
				<< targetep.toString();
			delete fseg;
			return NULL;
		}
		oseg->other = fseg;
		fseg->other = oseg;

		lastoseg = oseg;	// XXX hack
	}

	return fseg;
}


Maintained by PDOS
ViewVC Help
Powered by ViewVC 1.0.3