00001
00006 #ifndef SST_STREAM_H
00007 #define SST_STREAM_H
00008
00009 #include <QSet>
00010 #include <QQueue>
00011 #include <QIODevice>
00012
00013 #include "key.h"
00014 #include "strm/proto.h"
00015
00016 namespace SST {
00017
00018 class Host;
00019 class Flow;
00020 class Ident;
00021 class AbstractStream;
00022 class BaseStream;
00023 class StreamPeer;
00024 class Endpoint;
00025 class RegInfo;
00026 class RegClient;
00027
00028
00070 class Stream : public QIODevice
00071 {
00072 friend class AbstractStream;
00073 friend class BaseStream;
00074 friend class StreamServer;
00075 Q_OBJECT
00076
00077 public:
00078 enum ShutdownModeFlag {
00079 Read = 1,
00080 Write = 2,
00081 Close = 3,
00082 Reset = 4,
00083 };
00084 Q_DECLARE_FLAGS(ShutdownMode, ShutdownModeFlag)
00085
00086 private:
00087 Host *host;
00088 AbstractStream *as;
00089
00090
00091 public:
00100 Stream(Host *host, QObject *parent = NULL);
00101 virtual ~Stream();
00102
00103
00105
00149 bool connectTo(const QByteArray &dstid,
00150 const QString &service, const QString &protocol,
00151 const Endpoint &dstep = Endpoint());
00152
00155 bool connectTo(const Ident &dstid,
00156 const QString &service, const QString &protocol,
00157 const Endpoint &dstep = Endpoint());
00158
00167 void disconnect();
00168
00177 bool isConnected();
00178
00179
00181
00188 qint64 bytesAvailable() const;
00189
00191 inline bool hasBytesAvailable() const { return bytesAvailable() > 0; }
00192
00211 virtual qint64 readData(char *data, qint64 maxSize);
00212
00213
00216 int pendingMessages() const;
00217
00220 inline bool hasPendingMessages() const
00221 { return pendingMessages() > 0; }
00222
00230 qint64 pendingMessageSize() const;
00231
00244 qint64 readMessage(char *data, int maxSize);
00245
00254 QByteArray readMessage(int maxSize = 1 << 30);
00255
00256
00261 bool atEnd() const;
00262
00263
00265
00274 qint64 writeData(const char *data, qint64 size);
00275
00292 qint64 writeMessage(const char *data, qint64 size);
00293
00298 inline qint64 writeMessage(const QByteArray &msg)
00299 { return writeMessage(msg.data(), msg.size()); }
00300
00301
00302 int readDatagram(char *data, int maxSize);
00303 QByteArray readDatagram(int maxSize = 1 << 30);
00304 int writeDatagram(const char *data, int size);
00305 inline int writeDatagram(const QByteArray &dgram)
00306 { return writeDatagram(dgram.data(), dgram.size()); }
00307
00308
00309
00310 bool hasPendingDatagrams() const;
00311 qint32 pendingDatagramSize() const;
00312
00313
00314
00315
00317
00336 Stream *openSubstream();
00337
00339 void listen();
00340
00342 bool isListening() const;
00343
00356 Stream *acceptSubstream();
00357
00358
00360
00364 QByteArray localHostId();
00365
00368 QByteArray remoteHostId();
00369
00374 bool isLinkUp();
00375
00388 void setPriority(int pri);
00389
00391 int priority();
00392
00395
00396
00398
00399
00415 void shutdown(ShutdownMode mode);
00416
00420 inline void close() { shutdown(Close); }
00421
00422
00423 #ifndef QT_NO_DEBUG
00424
00425 void dump();
00426 #endif
00427
00428
00430 signals:
00431
00443 void readyReadMessage();
00444
00445 #if 0 // XXX not sure if this actually useful, maybe just readyReadMessage...
00446
00453 void readyReadComplete();
00454 #endif
00455
00460 void newSubstream();
00461
00473 void readyReadDatagram();
00474
00477 void linkUp();
00478
00485
00486
00497 void linkDown();
00498
00508 void receiveBlocked();
00509
00511 void reset(const QString &errorString);
00512
00515 void error(const QString &errorString);
00516
00517
00518 protected:
00519
00520 void setError(const QString &errorString);
00521
00522 private:
00523
00524
00525 Stream(AbstractStream *as, QObject *parent);
00526 };
00527
00528 Q_DECLARE_OPERATORS_FOR_FLAGS(Stream::ShutdownMode)
00529
00530
00531
00538 class StreamServer : public QObject, public StreamProtocol
00539 {
00540 friend class BaseStream;
00541 Q_OBJECT
00542
00543 private:
00544 Host *const h;
00545 QQueue<BaseStream*> rconns;
00546 QString svname;
00547 QString svdesc;
00548 QString prname;
00549 QString prdesc;
00550 QString err;
00551 bool active;
00552
00553
00554 public:
00564 StreamServer(Host *host, QObject *parent = NULL);
00565
00586 bool listen(const QString &serviceName, const QString &serviceDesc,
00587 const QString &protocolName, const QString &protocolDesc);
00588
00590 inline bool isListening() { return active; }
00591
00606 Stream *accept();
00607
00612 inline Stream *accept(QByteArray &originHostId) {
00613 Stream *strm = accept();
00614 if (strm) originHostId = strm->remoteHostId();
00615 return strm; }
00616
00618 inline QString serviceName() { return svname; }
00619
00621 inline QString serviceDescription() { return svdesc; }
00622
00624 inline QString protocolName() { return prname; }
00625
00627 inline QString protocolDescription() { return prdesc; }
00628
00630 inline QString errorString() { return err; }
00631
00632 signals:
00634 void newConnection();
00635
00636 protected:
00638 inline void setErrorString(const QString &err) { this->err = err; }
00639 };
00640
00641
00642
00643
00644
00645 class StreamResponder : public KeyResponder, public StreamProtocol
00646 {
00647 friend class StreamPeer;
00648 friend class StreamServer;
00649 friend class StreamHostState;
00650 Q_OBJECT
00651
00652 StreamResponder(Host *h);
00653
00654
00655 QPointerSet<RegClient> connrcs;
00656
00657
00658 void conncli(RegClient *cli);
00659
00660 virtual Flow *newFlow(const SocketEndpoint &epi, const QByteArray &idi,
00661 const QByteArray &ulpi, QByteArray &ulpr);
00662
00663 private slots:
00664 void clientCreate(RegClient *rc);
00665 void clientStateChanged();
00666 void lookupNotify(const QByteArray &id, const Endpoint &loc,
00667 const RegInfo &info);
00668 };
00669
00670
00671
00672 class StreamHostState : public QObject
00673 {
00674 friend class BaseStream;
00675 friend class StreamServer;
00676 friend class StreamPeer;
00677 friend class StreamResponder;
00678 Q_OBJECT
00679
00680 private:
00681 StreamResponder *rpndr;
00682 QHash<StreamProtocol::ServicePair,StreamServer*> listeners;
00683 QHash<QByteArray,StreamPeer*> peers;
00684
00685
00686 StreamResponder *streamResponder();
00687
00688 public:
00689 inline StreamHostState() : rpndr(NULL) { }
00690 virtual ~StreamHostState();
00691
00692 StreamPeer *streamPeer(const QByteArray &id);
00693
00694 virtual Host *host() = 0;
00695 };
00696
00697 }
00698
00699 #endif // SST_STREAM_H