00001
00034 #include "stdafx.h"
00035
00036 namespace AxPipe {
00041 void
00042 CFilter::CoFilter(void *pvThis) {
00043 ((CFilter *)pvThis)->CoStartFilter(pvThis);
00044
00045 ASSCHK(false, _T("CFilter::CoFilter unexpected fiber exit"));
00046 }
00047
00049 CFilter::CFilter() {
00050 m_ctxFilter.Init(this, CoFilter, this);
00051 m_ctxWork.Init(this, NULL, NULL);
00052 m_fFirstWork = true;
00053 }
00054
00055 CFilter::~CFilter() {
00056 m_ctxFilter.Stop();
00057 }
00058
00064 void
00065 CFilter::Out(CSeg *pSeg) {
00066 if (!m_fIsOpen) {
00067 SetError(ERROR_CODE_NOTOPEN, ERROR_MSG_NOTOPEN);
00068 return;
00069 }
00070
00071 m_pSeg = pSeg;
00072
00073
00074 m_ctxFilter.Go();
00075 }
00076
00088 bool
00089 CFilter::OutOpen() {
00090 OutputDebugString(_T("CFilter::OutOpen()\n"));
00091 if (m_fFirstWork) {
00092
00093 OutputDebugString(_T("CFilter::OutOpen() m_ctxWork.Go()\n"));
00094 m_ctxWork.Go();
00095 m_fFirstWork = false;
00096 }
00097
00098 return false;
00099 }
00100
00107 bool
00108 CFilter::OutClose() {
00109 Out(NULL);
00110
00111 return false;
00112 }
00113
00116 bool
00117 CFilter::OutFlush() {
00118 Out(new CSeg);
00119 return true;
00120 }
00121
00126 void
00127 CFilter::Work() {
00128 CPipe::Work();
00129 }
00130
00135 void
00136 CFilter::CoStartFilter(void *pvThis) {
00137
00138
00139 do {
00140 OutputDebugStringF(_T("CFilter::CoStartFilter(void *pvThis) InFilter(), this=%p\n"), this);
00141 InFilter();
00142
00143
00144 do {
00145 OutputDebugStringF(_T("CFilter::CoStartFilter waiting for data, this=%p\n"), this);
00146 m_ctxWork.Go();
00147 } while (m_pSeg == NULL || m_pSeg->Len() == 0);
00148 OutputDebugStringF(_T("CFilter::CoStartFilter found data, this=%p\n"), this);
00149 } while (true);
00150
00151 }
00152
00158 CSeg *
00159 CFilter::Read() {
00160
00161 if (!m_pSeg) {
00162 m_ctxWork.Go();
00163 }
00164
00165 CSeg *pSeg = m_pSeg;
00166 m_pSeg = NULL;
00167 return pSeg;
00168 }
00175 bool
00176 CFilterByte::GetNextSeg() {
00177
00178 if (m_pSeg && !m_pSeg->Len()) {
00179 m_pSeg->Release();
00180 m_pSeg = NULL;
00181 }
00182 if (!m_pSeg) {
00183 m_ctxWork.Go();
00184 }
00185 return m_pSeg != NULL;
00186 }
00189 int
00190 CFilterByte::ReadByte() {
00191 do {
00192 if (!GetNextSeg()) {
00193 return -1;
00194 }
00195 } while (!m_pSeg->Len());
00196
00197
00198 unsigned char c = *m_pSeg->PtrRd();
00199 m_pSeg->Drop(1);
00200 return c;
00201 }
00202
00204 CSeg *
00205 CFilterByte::Read() {
00206 SetError(ERROR_CODE_GENERIC, ERROR_MSG_GENERIC, _T("Attempt to call CFilterByte::Read()"));
00207 return NULL;
00208 }
00209
00213 size_t
00214 CFilterByte::Skip(size_t cb) {
00215 while (cb) {
00216 if (!GetNextSeg()) {
00217 break;
00218 }
00219 size_t cbChunk = m_pSeg->Len();
00220 if (cbChunk > cb) {
00221 cbChunk = cb;
00222 }
00223 m_pSeg->Drop(cbChunk);
00224 cb -= cbChunk;
00225 }
00226 return cb;
00227 }
00228
00239 CSeg *
00240 CFilterBlock::ReadBlock(size_t cb) {
00241
00242 if (!cb) {
00243 while (GetNextSeg()) {
00244 if (m_pSeg->Len()) {
00245 CSeg *pSeg = m_pSeg;
00246 m_pSeg = NULL;
00247 return pSeg;
00248 }
00249 m_pSeg->Release();
00250 m_pSeg = NULL;
00251 }
00252 return NULL;
00253 }
00254
00255 if (!GetNextSeg()) {
00256 return NULL;
00257 }
00258
00259
00260
00261
00262
00263 if (m_pSeg->Len() >= cb) {
00264 CSeg *pSeg = m_pSeg->Clone();
00265 m_pSeg->Drop(cb);
00266 pSeg->Len(cb);
00267 return pSeg;
00268 }
00269
00270
00271 CSeg *pSeg = new CSeg(cb);
00272 if (!pSeg) {
00273 SetError(ERROR_CODE_GENERIC, ERROR_MSG_GENERIC, _T("Out of memory"));
00274 return NULL;
00275 }
00276
00277 size_t cbSeg;
00278 pSeg->Len(cbSeg = 0);
00279
00280 do {
00281 size_t cbChunk = m_pSeg->Len();
00282 if (cbChunk > cb - cbSeg) {
00283 cbChunk = cb - cbSeg;
00284 }
00285 memcpy(&pSeg->PtrWr()[cbSeg], m_pSeg->PtrRd(), cbChunk);
00286 m_pSeg->Drop(cbChunk);
00287 pSeg->Len(cbSeg += cbChunk);
00288
00289
00290 if (cbSeg == cb) {
00291 return pSeg;
00292 }
00293
00294 if (!GetNextSeg()) {
00295 return pSeg;
00296 }
00297 } while (true);
00298
00299 }
00300 };