Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

CJoin.cpp

Go to the documentation of this file.
00001 
00034 #include "stdafx.h"
00035 
00036 using AxPipe::Assert::OutputDebugStringF;
00037 
00038 namespace AxPipe {
00041     void
00042     CJoin::CTSinkJoin::Out(CSeg *pSeg) {
00043         OutputDebugStringF(_T("CJoin::CTSinkJoin::Out()\n"));
00044         // Only send data if we're not empty - otherwise we may be locked up
00045         if (!m_fEmpty) {
00046             m_Sync.WorkStart();
00047             m_pNextSeg = pSeg;
00048             m_Sync.WorkSignal();
00049         } else {
00050             SetError(ERROR_CODE_GENERIC, _T("CJoin::CTSinkJoin::Out [Unexpected call]"));
00051             // If we're empty, just ignore extra data and report error.
00052             if (pSeg) {
00053                 pSeg->Release();
00054             }
00055         }
00056     }
00057 
00061     bool
00062     CJoin::CTSinkJoin::OutClose() {
00063         OutputDebugString(_T("CJoin::CTSinkJoin::OutPlug()\n"));
00064         Out(NULL);
00065         return false;
00066     }
00067 
00071     bool
00072     CJoin::CTSinkJoin::OutFlush() {
00073         OutputDebugString(_T("CJoin::CTSinkJoin::OutFlush()\n"));
00074         Out(new CSeg);
00075         return false;
00076     }
00077 
00079     CJoin::CTSinkJoin::CTSinkJoin() {
00080         m_pNextSeg = NULL;
00081         m_fEmpty = false;
00082     }
00083 
00086     bool
00087     CJoin::CTSinkJoin::IsEmpty() {
00088         return m_fEmpty;
00089     }
00090 
00094     CSeg *
00095     CJoin::CTSinkJoin::GetSeg() {
00096         m_fEmpty = m_pNextSeg == NULL;
00097         return m_pNextSeg;
00098     }
00099 
00103     void
00104     CJoin::CTSinkJoin::SinkWorkWait() {
00105         m_Sync.WorkWait();
00106     }
00107 
00110     void
00111     CJoin::CTSinkJoin::SinkWorkEnd() {
00112         m_Sync.WorkEnd();
00113     }
00114 
00116     CJoin::CJoin() {
00117         m_nMaxStreams = 0;
00118         m_ppInSinks = NULL;
00119     }
00120 
00122     CJoin::~CJoin() {
00123         OutputDebugString(_T("CJoin::~CJoin()\n"));
00124         ASSCHK(m_fExit, _T("CJoin::~CJoin() with worker still active"));
00125         if (m_ppInSinks) {
00126             for (int i = 0; i < m_nMaxStreams; i++) {
00127                 delete m_ppInSinks[i];
00128                 m_ppInSinks[i] = NULL;
00129             }
00130             delete[] m_ppInSinks;
00131             m_ppInSinks = NULL;
00132         }
00133     }
00134 
00136     void
00137     CJoin::OutPlug() {
00138         if (GetErrorCode() == ERROR_CODE_SUCCESS) {
00139             for (int i = 0; i < m_nMaxStreams; i++) {
00140                 if (m_ppInSinks[i] && m_ppInSinks[i]->GetErrorCode() != ERROR_CODE_SUCCESS) {
00141                     SetError(m_ppInSinks[i]->GetErrorCode(), m_ppInSinks[i]->GetErrorMsg());
00142                     return;
00143                 }
00144             }
00145         }
00146     }
00147     
00150     CJoin *
00151     CJoin::Init(int nMaxStreams) {
00152         m_ppInSinks = new CTSinkJoin *[m_nMaxStreams = nMaxStreams];
00153         for (int i = 0; i < m_nMaxStreams; i++) {
00154             m_ppInSinks[i] = NULL;
00155         }
00156         return this;
00157     }
00158 
00165     CSink &
00166     CJoin::GetSink(int ix) {
00167         ASSCHK(ix < m_nMaxStreams, _T("CJoin::GetSink() [Invalid stream index]"));
00168         if (!m_ppInSinks[ix]) {
00169             return *(m_ppInSinks[ix] = new CTSinkJoin);
00170         }
00171         return *m_ppInSinks[ix];
00172     }
00173 
00176     CSeg *
00177     CJoin::StreamSeg(int ix) {
00178         ASSCHK(ix > m_nMaxStreams || m_ppInSinks[ix] != NULL, _T("CJoin::StreamSeg() [Invalid stream index]"));
00179         if (m_ppInSinks[ix]->IsEmpty()) {
00180             return NULL;
00181         }
00182         m_ppInSinks[ix]->SinkWorkWait();
00183         CSeg *pSeg = m_ppInSinks[ix]->GetSeg();
00184         m_ppInSinks[ix]->SinkWorkEnd();
00185         return pSeg;
00186     }
00187 
00191     int
00192     CJoin::StreamIx(int ix) {
00193         return ix % m_nMaxStreams;
00194     }
00195     
00198     int
00199     CJoin::StreamNum() {
00200         return m_nMaxStreams;
00201     }
00202     
00206     bool
00207     CJoin::StreamEmpty(int ix) {
00208         return m_ppInSinks[ix]->IsEmpty();
00209     }
00210 };

Generated on Mon Feb 2 13:19:18 2004 for AxPipe by doxygen 1.3.5