00001 #pragma once
00002 #ifndef AXPIPE_H
00003 #define AXPIPE_H
00004
00040
00041 #include "CError.h"
00042 #include "CSeg.h"
00043 #include "CSync.h"
00044 #include "CThread.h"
00045 #include "CCoContext.h"
00046 #include "AxPipeAssert.h"
00047
00093
00094
00095
00096
00097 #ifndef _T
00098 #define _T(s) s
00099 #define _TCHAR char
00100 #endif
00101
00107 namespace AxPipe {
00115 namespace Stock {
00116
00117 }
00118
00119 class CSeg;
00121 extern DWORD dwTlsIndex;
00123 extern volatile long nGlobalInit;
00124
00125
00126
00127
00128
00129
00130
00132 static const _TCHAR *ERROR_MSG_GENERIC = _T("AxPipe:: %s");
00134 static const _TCHAR *ERROR_MSG_INTERNAL = _T("AxPipe:: Internal error %s");
00136 static const _TCHAR *ERROR_MSG_NOTOPEN = _T("AxPipe:: Pipe not Open");
00137
00139 enum ERROR_CODE {
00140 ERROR_CODE_SUCCESS = 0,
00141 ERROR_CODE_GENERIC,
00142 ERROR_CODE_INTERNAL,
00143 ERROR_CODE_NOTOPEN,
00144 ERROR_CODE_STOCK,
00145 ERROR_CODE_DERIVED = 100,
00146 };
00147
00148
00149 typedef __int64 longlong;
00150 typedef unsigned __int64 ulonglong;
00151
00158 enum eSegType {
00159 eSegTypeOpen = 1,
00160 eSegTypeFlush,
00161 eSegTypeClose,
00162 eSegTypePlug,
00163 eSegTypeDerived = 100,
00164 };
00165
00183 class CGlobalInit {
00184 public:
00186 CGlobalInit() {
00187 if (InterlockedIncrement(&nGlobalInit) == 1) {
00188 dwTlsIndex = TlsAlloc();
00189 }
00190 }
00192 ~CGlobalInit() {
00193 if (InterlockedDecrement(&nGlobalInit) == 0 && dwTlsIndex != TLS_OUT_OF_INDEXES) {
00194 TlsFree(dwTlsIndex);
00195 dwTlsIndex = TLS_OUT_OF_INDEXES;
00196 }
00197 }
00198 };
00199
00214 class CSink : public CNoThread, public CError {
00215 friend class CPipe;
00216 protected:
00217 bool m_fIsOpen;
00218 CSeg *m_pSeg;
00219
00220 private:
00221 bool DoSegWork(CSeg *pSeg);
00222
00223 protected:
00224 void Work();
00225 CSeg *GetSeg(size_t cb);
00226 virtual void Signal(void *vId, void *p);
00227 longlong SizeMax();
00228
00229 public:
00230 virtual void OutPump(CSeg *pSeg);
00231
00232 protected:
00233 virtual longlong OutSizeMax();
00234 virtual CSeg *OutGetSeg(size_t cb);
00235 virtual bool OutSignal(void *vId, void *p);
00236 virtual bool OutOpen();
00237 virtual bool OutFlush();
00238 virtual bool OutClose();
00239 virtual void OutPlug();
00240 virtual void OutSpecial(CSeg *pSeg);
00241
00254 virtual void Out(CSeg *pSeg) = 0;
00255
00256 public:
00257 CSink();
00258 virtual ~CSink();
00259 virtual void AppendSink(CSink *pSink, bool fAutoDelete);
00260 virtual void DestructSink();
00261 virtual void Sync();
00262 };
00270 class CPipe : public CSink {
00271 friend class CSplit;
00272 void DestructSink();
00273
00274 protected:
00275 CSink *m_pSink;
00276
00277 void Work();
00278 void AppendSink(CSink *pSink, bool fAutoDelete);
00279 CSeg *GetSeg(size_t cb);
00280 void Signal(void *vId, void *p);
00281 longlong OutSizeMax();
00282 CSeg *OutGetSeg(size_t cb);
00283 bool OutSignal(void *vId, void *p);
00284 bool OutOpen();
00285 bool OutClose();
00286 void OutSpecial(CSeg *pSeg);
00287
00296 void Out(CSeg *pSeg) = 0;
00297
00298 public:
00299 CPipe();
00300 ~CPipe();
00301 CPipe *Append(CSink *pSink);
00302 CPipe *Append(CSink& sink);
00303 void Sync();
00304 void Open();
00305 void Pump(CSeg *pSeg);
00306 void Flush();
00307 void Close();
00308 };
00309
00313 class CSinkNull : public CSink {
00314 public:
00317 inline void Out(CSeg *pSeg) {
00318 pSeg->Release();
00319 }
00320 };
00321
00327 class CSplit : public CPipe {
00328 CPipe *m_pLeft;
00329 CPipe *m_pRight;
00330
00331 void DestructSink();
00332 void PumpSplit(CSeg *pSeg);
00333 public:
00334 CSplit();
00335 void AppendSink(CSink *pSink, bool fAutoDelete);
00336 void Sync();
00337 CSplit *Init(CPipe *pLeft, CPipe *pRight);
00338 void Out(CSeg *pSeg);
00339 void OutSpecial(CSeg *pSeg);
00340 bool OutFlush();
00341 bool OutClose();
00342 bool OutOpen();
00343 };
00344
00351 class CPipeBlock : public CPipe {
00352 CSeg *m_pBlockPart;
00353 size_t m_cbBlockSize;
00354
00355 public:
00356 CPipeBlock();
00357 virtual ~CPipeBlock();
00358 CPipeBlock *Init(size_t cbBlockSize);
00359 void OutPump(CSeg *pSeg);
00360 CSeg *PartialBlock();
00361 };
00362
00377 class CSource : public CPipe {
00378 public:
00379 virtual ~CSource();
00380 CSource *Append(CSink *pSink);
00381 CSource *Append(CSink& sink);
00382 CSource *Open();
00383 CSource *Close();
00384 CSource *Drain();
00385 CSource *Plug();
00386 void Out(CSeg *pSeg);
00387
00388 protected:
00394 virtual CSeg *In() = 0;
00395 };
00396
00401 class CSourceNull : public CSource {
00402 protected:
00405 CSeg *In() {
00406 return new CSeg;
00407 }
00408 };
00409
00411 class CSourceMem : public CSource {
00412 CSeg *m_pSegSave;
00413 protected:
00416 CSeg *In() {
00417 if (m_pSegSave) {
00418 m_pSeg = m_pSegSave;
00419 m_pSegSave = NULL;
00420 return m_pSeg;
00421 } else {
00422 return new CSeg;
00423 }
00424 }
00425
00426 public:
00428 CSourceMem() {
00429 m_pSegSave = NULL;
00430 }
00435 CSourceMem *Init(size_t cb, void *p) {
00436 m_pSegSave = new CSeg(cb, p);
00437 return this;
00438 }
00439 };
00460 class CFilter : public CPipe {
00461 private:
00462 bool m_fFirstWork;
00463 CCoContext m_ctxFilter;
00464
00465 static void CoFilter(void *pvThis);
00466 void CoStartFilter(void *pvThis);
00467 void Out(CSeg *pSeg);
00468
00469 public:
00470 CFilter();
00471 ~CFilter();
00472
00473 protected:
00474 CCoContext m_ctxWork;
00475
00476 bool OutOpen();
00477 bool OutClose();
00478 bool OutFlush();
00479 void Work();
00480 CSeg *Read();
00481
00491 virtual void InFilter() = 0;
00492 };
00493
00498 class CFilterByte : public CFilter {
00499 protected:
00500 bool GetNextSeg();
00501 protected:
00502 int ReadByte();
00503 CSeg *Read();
00504 size_t Skip(size_t cb);
00505 };
00506
00508 class CFilterBlock : public CFilterByte {
00509 protected:
00510 CSeg *ReadBlock(size_t cb);
00511 };
00533 class CJoin : public CSource {
00539 class CTSinkJoin : public CThread<CSink> {
00540 CThreadSync m_Sync;
00541 CSeg *m_pNextSeg;
00542 bool m_fEmpty;
00543 protected:
00544 void Out(CSeg *pSeg);
00545 bool OutClose();
00546 bool OutFlush();
00547 public:
00548 CTSinkJoin();
00549 CSeg *GetSeg();
00550 bool IsEmpty();
00551 void SinkWorkWait();
00552 void SinkWorkEnd();
00553 };
00554
00555 CTSinkJoin **m_ppInSinks;
00556 int m_nMaxStreams;
00557
00558 public:
00559 CJoin();
00560 virtual ~CJoin();
00561 void OutPlug();
00562 CSink &GetSink(int ix);
00563 CJoin *Init(int nMaxStreams = 2);
00564
00565
00566
00567
00568 CSeg *StreamSeg(int ix);
00569 int StreamIx(int ix);
00570 int StreamNum();
00571 bool StreamEmpty(int ix);
00572 };
00573
00934 }
00935 #endif