PDOS

[uia] / trunk / uia / sst / lib / sock.cc  

View of /trunk/uia/sst/lib/sock.cc

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3672 - (download) (as text) (annotate)
Wed Jan 21 14:30:25 2009 UTC (10 months ago) by baford
File size: 9253 byte(s)
Add LGPL copyright notices, update licensing info/explanation in README
/*
 * Structured Stream Transport
 * Copyright (C) 2006-2008 Massachusetts Institute of Technology
 * Author: Bryan Ford
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 * 
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 */


#include <QDataStream>
#include <QSettings>
#include <QtDebug>

#include "util.h"
#include "sock.h"
#include "xdr.h"
#include "os.h"

using namespace SST;


////////// SocketEndpoint //////////

SocketEndpoint::SocketEndpoint(const Endpoint &ep, Socket *s)
:	Endpoint(ep),
	sock(s)
{
	Q_ASSERT(sock != NULL);
}

bool SocketEndpoint::send(const char *data, int size) const
{
	if (!sock) {
		qDebug("Trying to send on a nonexistent socket!");
		return false;
	}
	return sock->send(*this, data, size);
}

QString SocketEndpoint::toString() const
{
	return Endpoint::toString() + " via "
		+ (sock.isNull() ? "NULL" : sock->toString());
}

uint qHash(const SST::SocketEndpoint &sep)
{
	const Endpoint &ep = sep;
	Socket *sock = sep.sock;
	return qHash(ep) + qHash(sock);
}


////////// Socket //////////

Socket::~Socket()
{
	//qDebug() << this << "~Socket()";

	// Unbind all flows
	foreach (SocketFlow *f, flows.values())
		f->unbind();
	Q_ASSERT(flows.isEmpty());
}

void Socket::setActive(bool newact)
{
	if (newact && !act) {
		h->actsocks.append(this);
	} else if (act && !newact) {
		h->actsocks.removeAll(this);
	}
	act = newact;

	h->activeSocketsChanged();
}

bool
Socket::bindFlow(const Endpoint &remoteep, Channel localchan, SocketFlow *fl)
{
	Q_ASSERT(flow(remoteep, localchan) == NULL);

	QPair<Endpoint,Channel> p(remoteep, localchan);
	flows.insert(p, fl);
	return true;
}

void
Socket::receive(QByteArray &msg, const SocketEndpoint &src)
{
	if (msg.size() < 4) {
		// Message too small to be interesting
		qDebug("Ignoring runt UDP datagram");
		return;
	}

	// First interpret the first byte as a channel number
	// to try to find an endpoint-specific flow.
	Channel chan = msg.at(0);
	SocketFlow *fl = flow(src, chan);
	if (fl != NULL)
		return fl->receive(msg, src);

	// If that doesn't work, it may be a global control packet:
	// if so, pass it to the appropriate SocketReceiver.
	XdrStream rs(&msg, QIODevice::ReadOnly);
	quint32 magic;
	rs >> magic;
	SocketReceiver *rcv = h->receivers.value(magic);
	if (rcv)
		return rcv->receive(msg, rs, src);

	qDebug("Received control message for unknown flow/receiver %08x",
		magic);
}

bool Socket::isCongestionControlled(const Endpoint &)
{
	return false;
}

int Socket::mayTransmit(const Endpoint &)
{
	qFatal("Socket::mayTransmit() called on non-flow-controlled socket");
	return -1;
}

QString Socket::toString() const
{
	return QString("%1(0x%2)")
		.arg(metaObject()->className())
		.arg((ulong)this, 0, 16);
}


////////// UdpSocket //////////

UdpSocket::UdpSocket(SocketHostState *host, QObject *parent)
:	Socket(host, parent)
{
	connect(&usock, SIGNAL(readyRead()), this, SLOT(udpReadyRead()));
}

bool UdpSocket::bind(const QHostAddress &addr, quint16 port,
			QUdpSocket::BindMode mode)
{
	Q_ASSERT(!active());

	if (!usock.bind(addr, port, mode))
		return false;

	setActive(true);
	return true;
}

bool
UdpSocket::send(const Endpoint &ep, const char *data, int size)
{
	// XXX Qt bug?  For some reason, at least under both Linux & Mac,
	// our UDP socket is getting mysteriously "unbound"
	// when we try to send a packet to an IPv6 address...
	// localPort() still reports the correct port,
	// but after our first attempt to send an IPv6 packet,
	// all the subsequent IPv4 packets we send
	// appear on the wire with newly-allocated port numbers.
	if (ep.addr.protocol() != QAbstractSocket::IPv4Protocol)
		return false;

	bool rc = usock.writeDatagram(data, size, ep.addr, ep.port) == size;
	if (!rc)
		qDebug() << "Socket::send:" << errorString();
//	qDebug() << "after writeDatagram: rc" << rc
//		<< "err" << errorString() << "state" << state()
//		<< "valid" << isValid() << "addr" << localAddress().toString()
//		<< "port" << localPort();
	return rc;
}

void
UdpSocket::udpReadyRead()
{
	SocketEndpoint src;
	src.sock = this;
	QByteArray msg;
	int size;
	while ((size = usock.pendingDatagramSize()) >= 0) {

		// Read the datagram
		msg.resize(size);
		if (usock.readDatagram(msg.data(), size, &src.addr, &src.port)
				!= size) {
			qWarning("Error reading %d-byte UDP datagram", size);
			break;
		}

		receive(msg, src);
	}
}

QList<Endpoint> UdpSocket::localEndpoints()
{
	QList<QHostAddress> addrs = localHostAddrs();
	quint16 port = usock.localPort();
	Q_ASSERT(port > 0);

	QList<Endpoint> eps;
	foreach (const QHostAddress &addr, addrs) {
		qDebug() << "Local endpoint"
			<< Endpoint(addr, port).toString();
		eps.append(Endpoint(addr, port));
	}
	return eps;
}


////////// SocketFlow //////////

SocketFlow::SocketFlow(QObject *parent)
:	QObject(parent),
	sock(NULL),
	localchan(localchan),
	remotechan(0),
	active(false)
{
}

SocketFlow::~SocketFlow()
{
	// Stop and unbind this flow
	unbind();
}

Channel SocketFlow::bind(Socket *sock, const Endpoint &dst)
{
	Q_ASSERT(sock);
	Q_ASSERT(!active);	// can't bind while flow is active
	Q_ASSERT(!this->sock);	// can't bind again while already bound

	// Find a free channel number for this remote endpoint.
	// Never assign channel zero - that's reserved for control packets.
	Channel chan = 1;
	while (sock->flow(dst, chan) != NULL) {
		if (++chan == 0)
			return 0;	// wraparound - no channels available
	}

	// Bind to this channel
	if (!bind(sock, dst, chan))
		return 0;

	return chan;
}

bool SocketFlow::bind(Socket *sock, const Endpoint &dst, Channel chan)
{
	Q_ASSERT(sock);
	Q_ASSERT(!active);	// can't bind while flow is active
	Q_ASSERT(!this->sock);	// can't bind again while already bound

	if (sock->flow(dst, chan) != NULL)
		return false;		// Already in use

	//qDebug() << this << "bind" << dst << "chan" << chan << "to" << sock;

	// Bind us to this socket and channel
	remoteep.addr = dst.addr;
	remoteep.port = dst.port;
	localchan = chan;
	if (!sock->bindFlow(remoteep, localchan, this))
		return false;

	this->sock = sock;
	return true;
}

void SocketFlow::start(bool)
{
	Q_ASSERT(remotechan);

	active = true;
}

void SocketFlow::stop()
{
	active = false;
}

void SocketFlow::unbind()
{
	stop();
	Q_ASSERT(!active);

	if (sock) {
		//qDebug() << this << "unbind from sock" << sock;
		QPair<Endpoint,Channel> p(remoteep, localchan);
		Q_ASSERT(sock->flows.value(p) == this);
		sock->flows.remove(p);

		sock = NULL;
		localchan = 0;
	}
}

void SocketFlow::receive(QByteArray &msg, const SocketEndpoint &src)
{
	received(msg, src);
}

int SocketFlow::mayTransmit()
{
	Q_ASSERT(sock);
	return sock->mayTransmit(remoteep);
}


////////// SocketReceiver //////////

SocketReceiver::~SocketReceiver()
{
	unbind();
}

void SocketReceiver::bind(quint32 magic)
{
	Q_ASSERT(!isBound());

	// Receiver's magic value must leave the upper byte 0
	// to distinguish control packets from flow data packets.
	Q_ASSERT(magic <= 0xffffff);

	// Make sure we don't try to enter two conflicting receivers
	Q_ASSERT(!h->receivers.contains(magic));

	h->receivers.insert(mag = magic, this);
}

void SocketReceiver::unbind()
{
	if (isBound())  {
		Q_ASSERT(mag);
		Q_ASSERT(h->receivers.value(mag) == this);
		h->receivers.remove(mag);
		mag = 0;
	}
}


////////// SocketHostState //////////

SocketHostState::~SocketHostState()
{
}

QList<Endpoint> SocketHostState::activeLocalEndpoints()
{
	QList<Endpoint> l;
	foreach (Socket *sock, activeSockets()) {
		Q_ASSERT(sock->active());
		l += sock->localEndpoints();
	}
	return l;
}

Socket *SocketHostState::newSocket(QObject *parent)
{
	return new UdpSocket(this, parent);
}

Socket *
SocketHostState::initSocket(QSettings *settings, int defaultport)
{
	if (mainsock && mainsock->active())
		return mainsock;	// Already initialized

	// See if a port number is recorded in our settings;
	// if so, use that instead of the specified default port.
	if (settings) {
		int port = settings->value("port").toInt();
		if (port > 0 && port <= 65535)
			defaultport = port;
	}

	// Create and bind the main socket.
	mainsock = newSocket(this);
	if (!mainsock->bind(QHostAddress::Any, defaultport,
				QUdpSocket::DontShareAddress)) {
		qWarning("Can't bind to port %d - trying another",
			defaultport);
		if (!mainsock->bind(QHostAddress::Any, 0,
					QUdpSocket::DontShareAddress))
			qFatal("Can't bind main socket: %s",
				mainsock->errorString()
					.toLocal8Bit().data());
		defaultport = mainsock->localPort();
	}

	// Remember the port number we ended up using.
	if (settings)
		settings->setValue("port", defaultport);
	qDebug("Bound to port %d", defaultport);

	return mainsock;
}


Maintained by PDOS
ViewVC Help
Powered by ViewVC 1.0.3