Main Page | Class Hierarchy | Class List | File List | Class Members

/Users/baford/proj/netsteria/sst/lib/stream.h

Go to the documentation of this file.
00001 
00006 #ifndef SST_STREAM_H
00007 #define SST_STREAM_H
00008 
00009 #include <QSet>
00010 #include <QQueue>
00011 #include <QIODevice>
00012 
00013 #include "key.h"
00014 #include "strm/proto.h"
00015 
00016 namespace SST {
00017 
00018 class Host;
00019 class Flow;
00020 class Ident;
00021 class AbstractStream;
00022 class BaseStream;
00023 class StreamPeer;
00024 class Endpoint;
00025 class RegInfo;
00026 class RegClient;
00027 
00028 
00070 class Stream : public QIODevice
00071 {
00072         friend class AbstractStream;
00073         friend class BaseStream;
00074         friend class StreamServer;
00075         Q_OBJECT
00076 
00077 public:
00078         enum ShutdownModeFlag {
00079                 Read    = 1,    // Read (incoming data) direction
00080                 Write   = 2,    // Write (outgoing data) direction
00081                 Close   = 3,    // Both directions (Read|Write)
00082                 Reset   = 4,    // Forceful reset
00083         };
00084         Q_DECLARE_FLAGS(ShutdownMode, ShutdownModeFlag)
00085 
00086 private:
00087         Host            *host;          // Per-host SST state
00088         AbstractStream  *as;            // Internal stream control object
00089 
00090 
00091 public:
00100         Stream(Host *host, QObject *parent = NULL);
00101         virtual ~Stream();
00102 
00103 
00105 
00149         bool connectTo(const QByteArray &dstid,
00150                         const QString &service, const QString &protocol,
00151                         const Endpoint &dstep = Endpoint());
00152 
00155         bool connectTo(const Ident &dstid,
00156                         const QString &service, const QString &protocol,
00157                         const Endpoint &dstep = Endpoint());
00158 
00167         void disconnect();
00168 
00177         bool isConnected();
00178 
00179 
00181 
00188         qint64 bytesAvailable() const;
00189 
00191         inline bool hasBytesAvailable() const { return bytesAvailable() > 0; }
00192 
00211         virtual qint64 readData(char *data, qint64 maxSize);
00212 
00213 
00216         int pendingMessages() const;
00217 
00220         inline bool hasPendingMessages() const
00221                 { return pendingMessages() > 0; }
00222 
00230         qint64 pendingMessageSize() const;
00231 
00244         qint64 readMessage(char *data, int maxSize);
00245 
00254         QByteArray readMessage(int maxSize = 1 << 30);
00255 
00256 
00261         bool atEnd() const;
00262 
00263 
00265 
00274         qint64 writeData(const char *data, qint64 size);
00275 
00292         qint64 writeMessage(const char *data, qint64 size);
00293 
00298         inline qint64 writeMessage(const QByteArray &msg)
00299                 { return writeMessage(msg.data(), msg.size()); }
00300 
00301         // Send and receive unordered, unreliable datagrams on this stream.
00302         int readDatagram(char *data, int maxSize);
00303         QByteArray readDatagram(int maxSize = 1 << 30);
00304         int writeDatagram(const char *data, int size);
00305         inline int writeDatagram(const QByteArray &dgram)
00306                 { return writeDatagram(dgram.data(), dgram.size()); }
00307 
00308 
00309         // Check for pending datagrams
00310         bool hasPendingDatagrams() const;
00311         qint32 pendingDatagramSize() const;
00312 
00313         // XX bytesToWrite()
00314 
00315 
00317 
00336         Stream *openSubstream();
00337 
00339         void listen();
00340 
00342         bool isListening() const;
00343 
00356         Stream *acceptSubstream();
00357 
00358 
00360 
00364         QByteArray localHostId();
00365 
00368         QByteArray remoteHostId();
00369 
00374         bool isLinkUp();
00375 
00388         void setPriority(int pri);
00389 
00391         int priority();
00392 
00395         // void setReceiveWindow(int size);
00396 
00398         // inline int receiveWindow();
00399 
00415         void shutdown(ShutdownMode mode);
00416 
00420         inline void close() { shutdown(Close); }
00421 
00422 
00423 #ifndef QT_NO_DEBUG
00424 
00425         void dump();
00426 #endif
00427 
00428 
00430 signals:
00431 
00443         void readyReadMessage();
00444 
00445 #if 0   // XXX not sure if this actually useful, maybe just readyReadMessage...
00446 
00453         void readyReadComplete();
00454 #endif
00455 
00460         void newSubstream();
00461 
00473         void readyReadDatagram();
00474 
00477         void linkUp();
00478 
00485         //void linkStalled();   XX implement
00486 
00497         void linkDown();
00498 
00508         void receiveBlocked();
00509 
00511         void reset(const QString &errorString);
00512 
00515         void error(const QString &errorString);
00516 
00517 
00518 protected:
00519         // Set an error condition on this Stream and emit the error() signal.
00520         void setError(const QString &errorString);
00521 
00522 private:
00523         // Private constructor used internally
00524         // to create a Stream wrapper for an existing BaseStream.
00525         Stream(AbstractStream *as, QObject *parent);
00526 };
00527 
00528 Q_DECLARE_OPERATORS_FOR_FLAGS(Stream::ShutdownMode)
00529 
00530 
00531 
00538 class StreamServer : public QObject, public StreamProtocol
00539 {
00540         friend class BaseStream;
00541         Q_OBJECT
00542 
00543 private:
00544         Host *const h;                  // Our per-host state
00545         QQueue<BaseStream*> rconns;     // Received connection stream queue
00546         QString svname;                 // Service name
00547         QString svdesc;                 // Longer service description
00548         QString prname;                 // Protocol name
00549         QString prdesc;                 // Longer protocol description
00550         QString err;
00551         bool active;
00552 
00553 
00554 public:
00564         StreamServer(Host *host, QObject *parent = NULL);
00565 
00586         bool listen(const QString &serviceName, const QString &serviceDesc,
00587                 const QString &protocolName, const QString &protocolDesc);
00588 
00590         inline bool isListening() { return active; }
00591 
00606         Stream *accept();
00607 
00612         inline Stream *accept(QByteArray &originHostId) {
00613                 Stream *strm = accept();
00614                 if (strm) originHostId = strm->remoteHostId();
00615                 return strm; }
00616 
00618         inline QString serviceName() { return svname; }
00619 
00621         inline QString serviceDescription() { return svdesc; }
00622 
00624         inline QString protocolName() { return prname; }
00625 
00627         inline QString protocolDescription() { return prdesc; }
00628 
00630         inline QString errorString() { return err; }
00631 
00632 signals:
00634         void newConnection();
00635 
00636 protected:
00638         inline void setErrorString(const QString &err) { this->err = err; }
00639 };
00640 
00641 
00642 // Private helper class,
00643 // to register with Socket layer to receive key exchange packets.
00644 // Only one instance ever created per host.
00645 class StreamResponder : public KeyResponder, public StreamProtocol
00646 {
00647         friend class StreamPeer;
00648         friend class StreamServer;
00649         friend class StreamHostState;
00650         Q_OBJECT
00651 
00652         StreamResponder(Host *h);
00653 
00654         // Set of RegClients we've connected to so far
00655         QPointerSet<RegClient> connrcs;
00656 
00657 
00658         void conncli(RegClient *cli);
00659 
00660         virtual Flow *newFlow(const SocketEndpoint &epi, const QByteArray &idi,
00661                                 const QByteArray &ulpi, QByteArray &ulpr);
00662 
00663 private slots:
00664         void clientCreate(RegClient *rc);
00665         void clientStateChanged();
00666         void lookupNotify(const QByteArray &id, const Endpoint &loc,
00667                         const RegInfo &info);
00668 };
00669 
00670 
00671 // Per-host state for the Stream module.
00672 class StreamHostState : public QObject
00673 {
00674         friend class BaseStream;
00675         friend class StreamServer;
00676         friend class StreamPeer;
00677         friend class StreamResponder;
00678         Q_OBJECT
00679 
00680 private:
00681         StreamResponder *rpndr;
00682         QHash<StreamProtocol::ServicePair,StreamServer*> listeners;
00683         QHash<QByteArray,StreamPeer*> peers;
00684 
00685 
00686         StreamResponder *streamResponder();
00687 
00688 public:
00689         inline StreamHostState() : rpndr(NULL) { }
00690         virtual ~StreamHostState();
00691 
00692         StreamPeer *streamPeer(const QByteArray &id);
00693 
00694         virtual Host *host() = 0;
00695 };
00696 
00697 } // namespace SST
00698 
00699 #endif  // SST_STREAM_H

Generated on Wed Mar 28 11:48:05 2007 for SST by doxygen 1.3.4