00001
00034 #include "stdafx.h"
00035
00036 using AxPipe::Assert::OutputDebugStringF;
00037
00038 namespace AxPipe {
00041 void
00042 CJoin::CTSinkJoin::Out(CSeg *pSeg) {
00043 OutputDebugStringF(_T("CJoin::CTSinkJoin::Out()\n"));
00044
00045 if (!m_fEmpty) {
00046 m_Sync.WorkStart();
00047 m_pNextSeg = pSeg;
00048 m_Sync.WorkSignal();
00049 } else {
00050 SetError(ERROR_CODE_GENERIC, _T("CJoin::CTSinkJoin::Out [Unexpected call]"));
00051
00052 if (pSeg) {
00053 pSeg->Release();
00054 }
00055 }
00056 }
00057
00061 bool
00062 CJoin::CTSinkJoin::OutClose() {
00063 OutputDebugString(_T("CJoin::CTSinkJoin::OutPlug()\n"));
00064 Out(NULL);
00065 return false;
00066 }
00067
00071 bool
00072 CJoin::CTSinkJoin::OutFlush() {
00073 OutputDebugString(_T("CJoin::CTSinkJoin::OutFlush()\n"));
00074 Out(new CSeg);
00075 return false;
00076 }
00077
00079 CJoin::CTSinkJoin::CTSinkJoin() {
00080 m_pNextSeg = NULL;
00081 m_fEmpty = false;
00082 }
00083
00086 bool
00087 CJoin::CTSinkJoin::IsEmpty() {
00088 return m_fEmpty;
00089 }
00090
00094 CSeg *
00095 CJoin::CTSinkJoin::GetSeg() {
00096 m_fEmpty = m_pNextSeg == NULL;
00097 return m_pNextSeg;
00098 }
00099
00103 void
00104 CJoin::CTSinkJoin::SinkWorkWait() {
00105 m_Sync.WorkWait();
00106 }
00107
00110 void
00111 CJoin::CTSinkJoin::SinkWorkEnd() {
00112 m_Sync.WorkEnd();
00113 }
00114
00116 CJoin::CJoin() {
00117 m_nMaxStreams = 0;
00118 m_ppInSinks = NULL;
00119 }
00120
00122 CJoin::~CJoin() {
00123 OutputDebugString(_T("CJoin::~CJoin()\n"));
00124 ASSCHK(m_fExit, _T("CJoin::~CJoin() with worker still active"));
00125 if (m_ppInSinks) {
00126 for (int i = 0; i < m_nMaxStreams; i++) {
00127 delete m_ppInSinks[i];
00128 m_ppInSinks[i] = NULL;
00129 }
00130 delete[] m_ppInSinks;
00131 m_ppInSinks = NULL;
00132 }
00133 }
00134
00136 void
00137 CJoin::OutPlug() {
00138 if (GetErrorCode() == ERROR_CODE_SUCCESS) {
00139 for (int i = 0; i < m_nMaxStreams; i++) {
00140 if (m_ppInSinks[i] && m_ppInSinks[i]->GetErrorCode() != ERROR_CODE_SUCCESS) {
00141 SetError(m_ppInSinks[i]->GetErrorCode(), m_ppInSinks[i]->GetErrorMsg());
00142 return;
00143 }
00144 }
00145 }
00146 }
00147
00150 CJoin *
00151 CJoin::Init(int nMaxStreams) {
00152 m_ppInSinks = new CTSinkJoin *[m_nMaxStreams = nMaxStreams];
00153 for (int i = 0; i < m_nMaxStreams; i++) {
00154 m_ppInSinks[i] = NULL;
00155 }
00156 return this;
00157 }
00158
00165 CSink &
00166 CJoin::GetSink(int ix) {
00167 ASSCHK(ix < m_nMaxStreams, _T("CJoin::GetSink() [Invalid stream index]"));
00168 if (!m_ppInSinks[ix]) {
00169 return *(m_ppInSinks[ix] = new CTSinkJoin);
00170 }
00171 return *m_ppInSinks[ix];
00172 }
00173
00176 CSeg *
00177 CJoin::StreamSeg(int ix) {
00178 ASSCHK(ix > m_nMaxStreams || m_ppInSinks[ix] != NULL, _T("CJoin::StreamSeg() [Invalid stream index]"));
00179 if (m_ppInSinks[ix]->IsEmpty()) {
00180 return NULL;
00181 }
00182 m_ppInSinks[ix]->SinkWorkWait();
00183 CSeg *pSeg = m_ppInSinks[ix]->GetSeg();
00184 m_ppInSinks[ix]->SinkWorkEnd();
00185 return pSeg;
00186 }
00187
00191 int
00192 CJoin::StreamIx(int ix) {
00193 return ix % m_nMaxStreams;
00194 }
00195
00198 int
00199 CJoin::StreamNum() {
00200 return m_nMaxStreams;
00201 }
00202
00206 bool
00207 CJoin::StreamEmpty(int ix) {
00208 return m_ppInSinks[ix]->IsEmpty();
00209 }
00210 };