Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

AxPipe.h

Go to the documentation of this file.
00001 #pragma once
00002 #ifndef AXPIPE_H
00003 #define AXPIPE_H
00004 
00040 // Common base classes
00041 #include "CError.h"
00042 #include "CSeg.h"
00043 #include "CSync.h"
00044 #include "CThread.h"
00045 #include "CCoContext.h"
00046 #include "AxPipeAssert.h"
00047 
00093 //
00094 //  Use the Windows convention for Unicode/Ansi switcheable code, but if nothing
00095 //  is defined, we default to Ansi here. Change as appropriate if necessary.
00096 //
00097 #ifndef _T
00098 #define _T(s) s                             
00099 #define _TCHAR char                         
00100 #endif
00101 
00107 namespace AxPipe {
00115     namespace Stock {
00116         // Dummy, just to get the description of the namespace documented.
00117     }
00118 
00119 class CSeg;
00121 extern DWORD dwTlsIndex;
00123 extern volatile long nGlobalInit;
00124 //
00125 //  The error class is a virtual base class to all the others,
00126 //  and is used to record errors. Normally, errors are not passed
00127 //  by return value, nor are any exceptions thrown.
00128 //
00129 //  Use SetError() to indicate an error, and GetError(), to check for one.
00130 
00132 static const _TCHAR *ERROR_MSG_GENERIC = _T("AxPipe:: %s");
00134 static const _TCHAR *ERROR_MSG_INTERNAL = _T("AxPipe:: Internal error %s");
00136 static const _TCHAR *ERROR_MSG_NOTOPEN = _T("AxPipe:: Pipe not Open");
00137 
00139 enum ERROR_CODE {
00140     ERROR_CODE_SUCCESS = 0,                 
00141     ERROR_CODE_GENERIC,                     
00142     ERROR_CODE_INTERNAL,                    
00143     ERROR_CODE_NOTOPEN,                     
00144     ERROR_CODE_STOCK,                       
00145     ERROR_CODE_DERIVED = 100,               
00146 };
00147 
00148 // Modify according to compiler version. Include appropriate #ifdef's.
00149 typedef __int64 longlong;                   
00150 typedef unsigned __int64 ulonglong;         
00151 
00158 enum eSegType {
00159     eSegTypeOpen = 1,                       
00160     eSegTypeFlush,                          
00161     eSegTypeClose,                          
00162     eSegTypePlug,                           
00163     eSegTypeDerived = 100,                  
00164 };
00165 
00183 class CGlobalInit {
00184 public:
00186     CGlobalInit() {
00187         if (InterlockedIncrement(&nGlobalInit) == 1) {
00188             dwTlsIndex = TlsAlloc();
00189         }
00190     }
00192     ~CGlobalInit() {
00193         if (InterlockedDecrement(&nGlobalInit) == 0 && dwTlsIndex != TLS_OUT_OF_INDEXES) {
00194             TlsFree(dwTlsIndex);
00195             dwTlsIndex = TLS_OUT_OF_INDEXES;
00196         }
00197     }
00198 };
00199 
00214 class CSink : public CNoThread, public CError {
00215     friend class CPipe;                     
00216 protected:
00217     bool m_fIsOpen;                         
00218     CSeg *m_pSeg;                           
00219 
00220 private:
00221     bool DoSegWork(CSeg *pSeg);             
00222 
00223 protected:
00224     void Work();                            
00225     CSeg *GetSeg(size_t cb);                
00226     virtual void Signal(void *vId, void *p);
00227     longlong SizeMax();                     
00228 
00229 public:
00230     virtual void OutPump(CSeg *pSeg);       
00231 
00232 protected:
00233     virtual longlong OutSizeMax();          
00234     virtual CSeg *OutGetSeg(size_t cb);     
00235     virtual bool OutSignal(void *vId, void *p); 
00236     virtual bool OutOpen();                 
00237     virtual bool OutFlush();                
00238     virtual bool OutClose();                
00239     virtual void OutPlug();                 
00240     virtual void OutSpecial(CSeg *pSeg);    
00241 
00254     virtual void Out(CSeg *pSeg) = 0;
00255 
00256 public:
00257     CSink();                                
00258     virtual ~CSink();                       
00259     virtual void AppendSink(CSink *pSink, bool fAutoDelete); 
00260     virtual void DestructSink();            
00261     virtual void Sync();                    
00262 };
00270 class CPipe : public CSink {
00271     friend class CSplit;                    
00272     void DestructSink();                    
00273 
00274 protected:
00275     CSink *m_pSink;                         
00276 
00277     void Work();                            
00278     void AppendSink(CSink *pSink, bool fAutoDelete); 
00279     CSeg *GetSeg(size_t cb);                
00280     void Signal(void *vId, void *p);        
00281     longlong OutSizeMax();                  
00282     CSeg *OutGetSeg(size_t cb);             
00283     bool OutSignal(void *vId, void *p);     
00284     bool OutOpen();                         
00285     bool OutClose();                        
00286     void OutSpecial(CSeg *pSeg);            
00287 
00296     void Out(CSeg *pSeg) = 0;
00297 
00298 public:
00299     CPipe();                                
00300     ~CPipe();                               
00301     CPipe *Append(CSink *pSink);            
00302     CPipe *Append(CSink& sink);             
00303     void Sync();                            
00304     void Open();                            
00305     void Pump(CSeg *pSeg);                  
00306     void Flush();                           
00307     void Close();                           
00308 };
00309 
00313 class CSinkNull : public CSink {
00314 public:
00317     inline void Out(CSeg *pSeg) {
00318         pSeg->Release();
00319     }
00320 };
00321 
00327 class CSplit : public CPipe {
00328     CPipe *m_pLeft;                         
00329     CPipe *m_pRight;                        
00330     
00331     void DestructSink();                    
00332     void PumpSplit(CSeg *pSeg);             
00333 public:
00334     CSplit();                               
00335     void AppendSink(CSink *pSink, bool fAutoDelete); 
00336     void Sync();                            
00337     CSplit *Init(CPipe *pLeft, CPipe *pRight); 
00338     void Out(CSeg *pSeg);                   
00339     void OutSpecial(CSeg *pSeg);            
00340     bool OutFlush();                        
00341     bool OutClose();                        
00342     bool OutOpen();                         
00343 };
00344 
00351 class CPipeBlock : public CPipe {
00352     CSeg *m_pBlockPart;                     
00353     size_t m_cbBlockSize;                   
00354 
00355 public:
00356     CPipeBlock();                           
00357     virtual ~CPipeBlock();                  
00358     CPipeBlock *Init(size_t cbBlockSize);   
00359     void OutPump(CSeg *pSeg);               
00360     CSeg *PartialBlock();                   
00361 };
00362 
00377 class CSource : public CPipe {
00378 public:
00379     virtual ~CSource();                     
00380     CSource *Append(CSink *pSink);          
00381     CSource *Append(CSink& sink);           
00382     CSource *Open();                        
00383     CSource *Close();                       
00384     CSource *Drain();                       
00385     CSource *Plug();                        
00386     void Out(CSeg *pSeg);                   
00387 
00388 protected:
00394     virtual CSeg *In() = 0;
00395 };
00396 
00401 class CSourceNull : public CSource {
00402 protected:
00405     CSeg *In() {
00406         return new CSeg;
00407     }
00408 };
00409 
00411 class CSourceMem : public CSource {
00412     CSeg *m_pSegSave;                       
00413 protected:
00416     CSeg *In() {
00417         if (m_pSegSave) {
00418             m_pSeg = m_pSegSave;
00419             m_pSegSave = NULL;
00420             return m_pSeg;
00421         } else {
00422             return new CSeg;
00423         }
00424     }
00425 
00426 public:
00428     CSourceMem() {
00429         m_pSegSave = NULL;
00430     }
00435     CSourceMem *Init(size_t cb, void *p) {
00436         m_pSegSave = new CSeg(cb, p);
00437         return this;
00438     }
00439 };
00460 class CFilter : public CPipe {
00461 private:
00462     bool m_fFirstWork;                      
00463     CCoContext m_ctxFilter;                 
00464 
00465     static void CoFilter(void *pvThis);     
00466     void CoStartFilter(void *pvThis);       
00467     void Out(CSeg *pSeg);                   
00468 
00469 public:
00470     CFilter();                              
00471     ~CFilter();
00472 
00473 protected:
00474     CCoContext m_ctxWork;                   
00475 
00476     bool OutOpen();                         
00477     bool OutClose();                        
00478     bool OutFlush();                        
00479     void Work();                            
00480     CSeg *Read();                           
00481 
00491     virtual void InFilter() = 0;
00492 };
00493 
00498 class CFilterByte : public CFilter {
00499 protected:
00500     bool GetNextSeg();                      
00501 protected:
00502     int ReadByte();                         
00503     CSeg *Read();                           
00504     size_t Skip(size_t cb);                 
00505 };
00506 
00508 class CFilterBlock : public CFilterByte {
00509 protected:
00510     CSeg *ReadBlock(size_t cb);             
00511 };
00533 class CJoin : public CSource {
00539     class CTSinkJoin : public CThread<CSink> {
00540         CThreadSync m_Sync;                 
00541         CSeg *m_pNextSeg;                   
00542         bool m_fEmpty;                      
00543     protected:
00544         void Out(CSeg *pSeg);               
00545         bool OutClose();                    
00546         bool OutFlush();                    
00547     public:
00548         CTSinkJoin();                       
00549         CSeg *GetSeg();                     
00550         bool IsEmpty();                     
00551         void SinkWorkWait();                
00552         void SinkWorkEnd();                 
00553     };
00554 
00555     CTSinkJoin **m_ppInSinks;               
00556     int m_nMaxStreams;                      
00557 
00558 public:
00559     CJoin();                                
00560     virtual ~CJoin();                       
00561     void OutPlug();
00562     CSink &GetSink(int ix);
00563     CJoin *Init(int nMaxStreams = 2);       
00564 
00565     //
00566     // Utility routines for In()
00567     //
00568     CSeg *StreamSeg(int ix);                
00569     int StreamIx(int ix);                   
00570     int StreamNum();                        
00571     bool StreamEmpty(int ix);               
00572 };
00573 
00934 } // namespace AxPipe
00935 #endif

Generated on Mon Feb 2 13:19:18 2004 for AxPipe by doxygen 1.3.5