PDOS

[uia] / trunk / uia / sst / lib / flow.h  

View of /trunk/uia/sst/lib/flow.h

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4175 - (download) (as text) (annotate)
Thu Mar 26 16:53:55 2009 UTC (8 months ago) by baford
File size: 13322 byte(s)
forgot to check in some stuff after SIGCOMM deadline
/*
 * Structured Stream Transport
 * Copyright (C) 2006-2008 Massachusetts Institute of Technology
 * Author: Bryan Ford
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 * 
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 */
#ifndef SST_FLOW_H
#define SST_FLOW_H

#include <QTime>
#include <QTimer>
#include <QQueue>	// XXX FlowSegment

#include "ident.h"
#include "sock.h"
#include "timer.h"

// XXX for specific armor methods - break into separate module
#include "chk32.h"
#include "aes.h"
#include "hmac.h"


namespace SST {

class Host;
class Ident;
class Endpoint;
class Socket;
class FlowCC;
class KeyInitiator;	// XXX


// Packet sequence numbers are 64-bit unsigned integers
typedef quint64 PacketSeq;

static const PacketSeq maxPacketSeq = ~(PacketSeq)0;


enum CCMode {
	CC_TCP,
	CC_AGGRESSIVE,
	CC_DELAY,
	CC_VEGAS,
	CC_CTCP,
	CC_FIXED,
};


// Abstract base class for flow encryption and authentication schemes.
class FlowArmor : public QObject
{
	friend class Flow;
	Q_OBJECT

protected:
	// The pseudo-header is a header logically prepended to each packet
	// for authentication purposes but not actually transmitted.
	// XX probably can't always be static const
	//static const int phlen = 4;

	// The encryption offset defines the offset in the transmitted packet
	// at which encrypted data begins (after authenticate-only data).
	static const int encofs = 4;

	virtual QByteArray txenc(qint64 pktseq, const QByteArray &pkt) = 0;
	virtual bool rxdec(qint64 pktseq, QByteArray &pkt) = 0;

	virtual ~FlowArmor();
};

// Abstract base class representing a flow
// between a local Socket and a remote endpoint.
class Flow : public SocketFlow
{
	friend class KeyInitiator;	// XXX
	Q_OBJECT

private:
	Host *const h;
	FlowArmor *armr;	// Encryption/authentication method
	//FlowCC *cc;		// Congestion control method
	CCMode ccmode;		// Congestion control method
	bool nocc;		// Disable congestion control.  XXX
	unsigned missthresh;	// Threshold at which to infer packets dropped

	// Per-direction unique channel IDs for this channel.
	// Stream layer uses these in assigning USIDs to new streams.
	QByteArray txchanid;	// Transmit channel ID
	QByteArray rxchanid;	// Receive channel ID


public:
	// How many sequence numbers a packet needs to be out of date
	// before we consider it to be "missed", i.e., "probably dropped".
	// This is represents the "standard" Internet convention,
	// although (XXX) it really needs to be dynamically measured
	// to account for variable reordering rates on different paths.
	// XXX static const inst missdelay = 2;

	// Amount of space client must leave at the beginning of a packet
	// to be transmitted with flowTransmit() or received via flowReceive().
	// XX won't always be static const.
	static const int hdrlen = 8;

	// Layout of the first header word: channel number, tx sequence
	// Transmitted in cleartext.
	static const quint32 chanBits = 8;	// 31-24: channel number
	static const quint32 chanMask = (1 << chanBits) - 1;
	static const quint32 chanMax = chanMask;
	static const quint32 chanShift = 24;
	static const quint32 seqBits = 24;	// 23-0: tx sequence number
	static const quint32 seqMask = (1 << seqBits) - 1;

	// Layout of the second header word: ACK count/sequence number
	// Encrypted for transmission.
	static const quint32 resvBits = 4;	// 31-28: reserved field
	static const quint32 ackctBits = 4;	// 27-24: ack count
	static const quint32 ackctMask = (1 << ackctBits) - 1;
	static const quint32 ackctMax = ackctMask;
	static const quint32 ackctShift = 24;
	static const quint32 ackSeqBits = 24;	// 23-0: ack sequence number
	static const quint32 ackSeqMask = (1 << ackSeqBits) - 1;


	Flow(Host *host, QObject *parent = NULL);
	virtual ~Flow();

	inline Host *host() { return h; }

	// Set the encryption/authentication method for this flow.
	// This MUST be set before a new flow can be activated.
	inline void setArmor(FlowArmor *armor) { this->armr = armor; }
	inline FlowArmor *armor() { return armr; }

	// Set the channel IDs for this flow.
	inline QByteArray txChannelId() { return txchanid; }
	inline QByteArray rxChannelId() { return rxchanid; }
	inline void setChannelIds(const QByteArray &tx, const QByteArray &rx)
		{ txchanid = tx; rxchanid = rx; }

	// Set the congestion controller for this flow.
	// This must be set if the client wishes to call mayTransmit().
	//inline void setCongestionController(FlowCC *cc) { this->cc = cc; }
	//inline FlowCC *congestionController() { return cc; }

	// Start and stop the flow.
	virtual void start(bool initiator);
	virtual void stop();

	// Return the current link status as observed by this flow.
	inline LinkStatus linkStatus() const { return linkstat; }


protected:

	// Size of rxmask, rxackmask, and txackmask fields in bits
	static const int maskBits = 32;

	struct TxEvent {
		qint32	size;	// Total size of packet including hdr
		bool	data;	// Was an upper-layer data packet
		bool	pipe;	// Currently counted toward txdatpipe

		inline TxEvent(qint32 size, bool isdata)
			: size(size), data(isdata), pipe(isdata) { }
	};

	// Transmit state
	quint64 txseq;		// Next sequence number to transmit
	QQueue<TxEvent> txevts;	// Record of transmission events (XX data sizes)
	quint64 txevtseq;	// Seqno of oldest recorded tx event
	quint64 txackseq;	// Highest transmit sequence number ACK'd
	quint64 recovseq;	// Sequence at which fast recovery finishes
	quint64 markseq;	// Transmit sequence number of "marked" packet
	quint64 markbase;	// Snapshot of txackseq at time mark was placed
	Time marktime;		// Time at which marked packet was sent
	quint32 txackmask;	// Mask of packets transmitted and ACK'd
	quint32	txfltcnt;	// Data packets currently in flight
	quint32	txfltsize;	// Data bytes currently in flight
	quint32 markacks;	// Number of ACK'd packets since last mark
	quint32 marksent;	// Number of ACKs expected after last mark
	quint32 cwnd;		// Current congestion window
	bool cwndlim;		// We were cwnd-limited this round-trip

	// TCP congestion control
	quint32 ssthresh;	// Slow start threshold
	bool sstoggle;		// Slow start toggle flag for CC_VEGAS

	// Aggressive congestion control
	quint32 ssbase;		// Slow start baseline

	// Low-delay congestion control
	int cwndinc;
	int lastrtt;		// Measured RTT of last round-trip
	float lastpps;		// Measured PPS of last round-trip
	quint32 basewnd;
	float basertt, basepps, basepwr;

	// TCP Vegas-like congestion control
	float cwndmax;

	// Retransmit state
	Timer rtxtimer;		// Retransmit timer
	LinkStatus linkstat;	// Current link status

	// Receive state
	quint64 rxseq;		// Highest sequence number received so far
	quint32 rxmask;		// Mask of packets received so far

	// Receive-side ACK state
	quint64 rxackseq;	// Highest sequence number acknowledged so far
	//quint32 rxackmask;	// Mask of packets received & acknowledged
	quint8 rxackct;		// # contiguous packets received before rxackseq
	quint8 rxunacked;	// # contiguous packets not yet ACKed
	bool delayack;		// Enable delayed acknowledgments
	Timer acktimer;		// Delayed ACK timer

	// Statistics gathering
	float cumrtt;		// Cumulative measured RTT in milliseconds
	float cumrttvar;	// Cumulative variation in RTT
	float cumpps;		// Cumulative measured packets per second
	float cumppsvar;	// Cumulative variation in PPS
	float cumpwr;		// Cumulative measured network power (pps/rtt)
	float cumbps;		// Cumulative measured bytes per second
	float cumloss;		// Cumulative measured packet loss ratio
	Timer statstimer;


	// Transmit a packet across the flow.
	// Caller must leave hdrlen bytes at the beginning for the header.
	// The packet is armored in-place in the provided QByteArray.
	// It is the caller's responsibility to transmit
	// only when flow control says it's OK (mayTransmit())
	// or upon getting a readyTransmit() signal.
	// Provides in 'pktseq' the transmit sequence number
	// that was assigned to the packet.
	// Returns true if the transmit was successful,
	// or false if it failed (e.g., due to lack of buffer space);
	// a sequence number is assigned even on failure however.
	bool flowTransmit(QByteArray &pkt, quint64 &pktseq);


	// Check congestion control state and return the number of new packets,
	// if any, that flow control says we may transmit now.
	virtual int mayTransmit();

	// Compute current number of transmitted but un-acknowledged packets.
	// This count may include raw ACK packets,
	// for which we expect no acknowledgments
	// unless they happen to be piggybacked on data coming back.
	inline qint64 unackedPackets()
		{ return txseq - txackseq; }

	// Compute the time elapsed since the mark in microseconds.
	qint64 markElapsed();

public:
	// May be called by upper-level protocols during receive
	// to indicate that the packet has been received and processed,
	// so that subsequently transmitted packets include this ack info.
	// if 'sendack' is true, make sure an acknowledgment gets sent soon:
	// in the next transmitted packet, or in an ack packet if needed.
	void acknowledge(quint16 pktseq, bool sendack);

	inline bool deleyedAcks() const { return delayack; }
	inline void setDelayedAcks(bool enabled) { delayack = enabled; }

	void ccReset();
	inline CCMode ccMode() const { return ccmode; }
	inline void setCCMode(CCMode mode) { ccmode = mode; ccReset(); }

	// for CC_FIXED: fixed congestion window for reserved-bandwidth links
	inline void setCCWindow(int cwnd) { this->cwnd = cwnd; }

	// Congestion information accessors for flow monitoring purposes
	inline int txCongestionWindow() { return cwnd; }
	inline int txBytesInFlight() { return txfltsize; }
	inline int txPacketsInFlight() { return txfltcnt; }

signals:
	// Indicates when this flow observes a change in link status.
	void linkStatusChanged(LinkStatus newstatus);

protected:
	// Main method for upper-layer subclass to receive a packet on a flow.
	// Should return true if the packet was processed and should be acked,
	// or false to silently pretend we never received the packet.
	virtual bool flowReceive(qint64 pktseq, QByteArray &pkt) = 0;

	// Create and transmit a packet for acknowledgment purposes only.
	// Upper layer may override this if ack packets should contain
	// more than an just an empty flow payload.
	virtual bool transmitAck(QByteArray &pkt,
				quint64 ackseq, unsigned ackct);

	virtual void acked(quint64 txseq, int npackets, quint64 rxackseq);
	virtual void missed(quint64 txseq, int npackets);
	virtual void expire(quint64 txseq, int npackets);


private:
	// Called by Socket to dispatch a received packet to this flow.
	virtual void receive(QByteArray &msg, const SocketEndpoint &src);

	// Internal transmit methods.
	bool tx(QByteArray &pkt, quint32 packseq, quint64 &pktseq, bool isdata);
	inline bool txack(quint64 ackseq, unsigned ackct)
		{ QByteArray pkt; return transmitAck(pkt, ackseq, ackct); }
	inline void flushack()
		{ if (rxunacked) { rxunacked = 0; txack(rxackseq, rxackct); }
		  acktimer.stop(); }

	inline void rtxstart()
		{ rtxtimer.start((int)(cumrtt * 2.0)); }

	// Repeat stall indications but not other link status changes.
	// XXX hack - maybe "stall severity" or "stall time"
	// should be part of status?
	// Or perhaps status should be (up, stalltime)?
	inline void setLinkStatus(LinkStatus newstat)
		{ if (linkstat != newstat || newstat == LinkStalled) {
			linkStatusChanged(linkstat = newstat); } }

	// Congestion control
	void ccMissed(quint64 pktseq);


private slots:
	void rtxTimeout(bool failed);	// Retransmission timeout
	void ackTimeout();	// Delayed ACK timeout
	void statsTimeout();
};



// XX break this stuff into separate module

// Simple 32-bit keyed checksum protection with no encryption,
// to defend only against off-the-path attackers
// who can inject forged packets but not monitor the flow.
class ChecksumArmor : public FlowArmor
{
	const uint32_t txkey, rxkey;	// flow authentication parameters
	const QByteArray armorid;	// for key protocol duplicate detection

public:
	ChecksumArmor(uint32_t txkey, uint32_t rxkey,
			const QByteArray &armorid = QByteArray());

	virtual QByteArray txenc(qint64 pktseq, const QByteArray &pkt);
	virtual bool rxdec(qint64 pktseq, QByteArray &pkt);

	inline QByteArray id() { return armorid; }
};


class AESArmor : public FlowArmor
{
	const AES txaes, rxaes;
	const HMAC txmac, rxmac;

	union ivec {
		quint8 b[AES_BLOCK_SIZE];
		quint32 l[4];
	} ivec;

public:
	AESArmor(const QByteArray &txenckey, const QByteArray &txmackey,
		const QByteArray &rxenckey, const QByteArray &rxmackey);

	virtual QByteArray txenc(qint64 pktseq, const QByteArray &pkt);
	virtual bool rxdec(qint64 pktseq, QByteArray &pkt);
};


} // namespace SST

#endif	// SST_FLOW_H

Maintained by PDOS
ViewVC Help
Powered by ViewVC 1.0.3