Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

CFilter.cpp

Go to the documentation of this file.
00001 
00034 #include "stdafx.h"
00035 
00036 namespace AxPipe {
00041     void
00042     CFilter::CoFilter(void *pvThis) {
00043         ((CFilter *)pvThis)->CoStartFilter(pvThis);
00044         // Should never get here!
00045         ASSCHK(false, _T("CFilter::CoFilter unexpected fiber exit"));
00046     }
00047 
00049     CFilter::CFilter() {
00050         m_ctxFilter.Init(this, CoFilter, this);
00051         m_ctxWork.Init(this, NULL, NULL);   // The Work context starts as the 'current'
00052         m_fFirstWork = true;                // Ensure that we know when Work() is called first time.
00053     }
00054 
00055     CFilter::~CFilter() {
00056         m_ctxFilter.Stop();
00057     }
00058 
00064     void
00065     CFilter::Out(CSeg *pSeg) {
00066         if (!m_fIsOpen) {
00067             SetError(ERROR_CODE_NOTOPEN, ERROR_MSG_NOTOPEN);
00068             return;
00069         }
00070 
00071         m_pSeg = pSeg;
00072         // Switch to Read() and InFilter()
00073         //OutputDebugString(_T("CFilter::Out(CSeg *pSeg) m_ctxFilter.Go()\n"));
00074         m_ctxFilter.Go();
00075     }
00076 
00088     bool
00089     CFilter::OutOpen() {
00090         OutputDebugString(_T("CFilter::OutOpen()\n"));
00091         if (m_fFirstWork) {
00092             // If first time, ensure we are executing in Work coroutine context now
00093             OutputDebugString(_T("CFilter::OutOpen() m_ctxWork.Go()\n"));
00094             m_ctxWork.Go();                 // This only initializes the context...
00095             m_fFirstWork = false;           // ...we get here immediately
00096         }
00097         // Default for filters is to not propagate
00098         return false;
00099     }
00100     
00107     bool
00108     CFilter::OutClose() {
00109         Out(NULL);
00110         // Default for filters is to not propagate
00111         return false;
00112     }
00113     
00116     bool
00117     CFilter::OutFlush() {
00118         Out(new CSeg);
00119         return true;
00120     }
00121 
00126     void
00127     CFilter::Work() {
00128         CPipe::Work();
00129     }
00130 
00135     void
00136     CFilter::CoStartFilter(void *pvThis) {
00137         // The filter may be called multiple times. It should exit when receiving
00138         // a eSegTypeClose segment, ready to be called again when more arrives.
00139         do {
00140             OutputDebugStringF(_T("CFilter::CoStartFilter(void *pvThis) InFilter(), this=%p\n"), this);
00141             InFilter();                     // Shold return when eof/empty is signalled.
00142 
00143             // Drive the sender until we either get a valid data segment, or we're killed
00144             do {
00145                 OutputDebugStringF(_T("CFilter::CoStartFilter waiting for data, this=%p\n"), this);
00146                 m_ctxWork.Go();
00147             } while (m_pSeg == NULL || m_pSeg->Len() == 0);
00148             OutputDebugStringF(_T("CFilter::CoStartFilter found data, this=%p\n"), this);
00149         } while (true);
00150         // Never get here!
00151     }
00152 
00158     CSeg *
00159     CFilter::Read() {
00160         // We may already have a segment waiting, at first call.
00161         if (!m_pSeg) {
00162             m_ctxWork.Go();
00163         }
00164         // m_pSeg can be valid, zero-length or NULL here. Nothing else.
00165         CSeg *pSeg = m_pSeg;
00166         m_pSeg = NULL;
00167         return pSeg;
00168     }
00175     bool
00176     CFilterByte::GetNextSeg() {
00177         // Release if empty.
00178         if (m_pSeg && !m_pSeg->Len()) {
00179             m_pSeg->Release();
00180             m_pSeg = NULL;
00181         }
00182         if (!m_pSeg) {
00183             m_ctxWork.Go();
00184         }
00185         return m_pSeg != NULL;              // Success as long as we get a segment, but it might be zero-len
00186     }
00189     int
00190     CFilterByte::ReadByte() {
00191         do {
00192             if (!GetNextSeg()) {
00193                 return -1;
00194             }
00195         } while (!m_pSeg->Len());       // Ignore flush requests, just wait for data.
00196 
00197         // Now we now we have at least one byte.
00198         unsigned char c = *m_pSeg->PtrRd();
00199         m_pSeg->Drop(1);
00200         return c;
00201     }
00202 
00204     CSeg *
00205     CFilterByte::Read() {
00206         SetError(ERROR_CODE_GENERIC, ERROR_MSG_GENERIC, _T("Attempt to call CFilterByte::Read()"));
00207         return NULL;
00208     }
00209 
00213     size_t
00214     CFilterByte::Skip(size_t cb) {
00215         while (cb) {
00216             if (!GetNextSeg()) {
00217                 break;
00218             }
00219             size_t cbChunk = m_pSeg->Len();
00220             if (cbChunk > cb) {
00221                 cbChunk = cb;
00222             }
00223             m_pSeg->Drop(cbChunk);
00224             cb -= cbChunk;
00225         }
00226         return cb;
00227     }
00228 
00239     CSeg *
00240     CFilterBlock::ReadBlock(size_t cb) {
00241         // Zero means take what we get
00242         if (!cb) {
00243             while (GetNextSeg()) {
00244                 if (m_pSeg->Len()) {
00245                     CSeg *pSeg = m_pSeg;
00246                     m_pSeg = NULL;
00247                     return pSeg;
00248                 }
00249                 m_pSeg->Release();
00250                 m_pSeg = NULL;
00251             }
00252             return NULL;                    // No data - this is shown with NULL
00253         }
00254         // If no buffered data - we must get more in any case, so let's do it.
00255         if (!GetNextSeg()) {
00256             return NULL;                    // No data to get.
00257         }
00258         // This is a slight optimization to try to keep chunks in the original
00259         // segment as much as possible.
00260         // If the buffer contains the proper number of bytes already, clone it,
00261         // drop the bytes off the original, and set the length of the copy
00262         // returned.
00263         if (m_pSeg->Len() >= cb) {
00264             CSeg *pSeg = m_pSeg->Clone();
00265             m_pSeg->Drop(cb);
00266             pSeg->Len(cb);
00267             return pSeg;
00268         }
00269 
00270         // Now we know we must merge two or more buffers. Let's allocate a segment to return.
00271         CSeg *pSeg = new CSeg(cb);
00272         if (!pSeg) {
00273             SetError(ERROR_CODE_GENERIC, ERROR_MSG_GENERIC, _T("Out of memory"));
00274             return NULL;
00275         }
00276 
00277         size_t cbSeg;
00278         pSeg->Len(cbSeg = 0);           // Set the length of valid data in the segment to zero.
00279         // We also know at this point that we have valid data in the buffer.
00280         do {
00281             size_t cbChunk = m_pSeg->Len();
00282             if (cbChunk > cb - cbSeg) {
00283                 cbChunk = cb - cbSeg;
00284             }
00285             memcpy(&pSeg->PtrWr()[cbSeg], m_pSeg->PtrRd(), cbChunk);
00286             m_pSeg->Drop(cbChunk);
00287             pSeg->Len(cbSeg += cbChunk);// Update the length of valid data in the segment.
00288 
00289             // If we've gotten all we need...
00290             if (cbSeg == cb) {
00291                 return pSeg;
00292             }
00293 
00294             if (!GetNextSeg()) {
00295                 return pSeg;        // Return what data we have.
00296             }
00297         } while (true);
00298         // Can't get here!
00299     }
00300 };

Generated on Mon Feb 2 13:19:18 2004 for AxPipe by doxygen 1.3.5