00001
00034 #include "stdafx.h"
00035
00036 namespace AxPipe {
00037 using Assert::OutputDebugStringF;
00038
00045 void
00046 CPipe::DestructSink() {
00047 OutputDebugStringF(_T("CPipe::DestructSink() m_fAutoDeleteSink=%d, m_pSink=%p\n"), m_fAutoDeleteSink, m_pSink);
00048 if (m_pSink) {
00049 m_pSink->WaitForIdle();
00050 ASSCHK(m_pSink->m_fExit, _T("CPipe::DestructSink() without proper Plug()"));
00051 if (m_fAutoDeleteSink) {
00052 delete m_pSink;
00053 } else {
00054 m_pSink->DestructSink();
00055 }
00056 m_pSink = NULL;
00057 }
00058 }
00059
00065 void
00066 CPipe::Work() {
00067 CSeg *pSeg = m_pSeg;
00068 m_pSeg = NULL;
00069
00070 if (DoSegWork(pSeg->AddRef()) && m_pSink) {
00071
00072 m_pSink->OutPump(pSeg);
00073
00074
00075
00076 if (m_fExit) {
00077 m_pSink->WorkExitWait();
00078 }
00079 } else {
00080 pSeg->Release();
00081 }
00082 }
00083
00088 void
00089 CPipe::AppendSink(CSink *pSink, bool fAutoDeleteSink) {
00090 if (m_pSink) {
00091 m_pSink->AppendSink(pSink, fAutoDeleteSink);
00092 } else {
00093 m_fAutoDeleteSink = fAutoDeleteSink;
00094 (m_pSink = pSink)->CError::Init(this);
00095 }
00096 }
00102 CPipe *
00103 CPipe::Append(CSink *pSink) {
00104 AppendSink(pSink, true);
00105 return this;
00106 }
00107
00112 CPipe *
00113 CPipe::Append(CSink& sink) {
00114 AppendSink(&sink, false);
00115 return this;
00116 }
00117
00120 void
00121 CPipe::Sync() {
00122 WaitForIdle();
00123 m_pSink->Sync();
00124 }
00125
00127 void
00128 CPipe::Open() {
00129 if (m_pSink) {
00130 m_pSink->OutPump((new CSeg)->SetType(eSegTypeOpen));
00131 }
00132 }
00133
00136 void
00137 CPipe::Pump(CSeg *pSeg) {
00138 if (m_pSink) {
00139 m_pSink->OutPump(pSeg);
00140 } else {
00141 pSeg->Release();
00142 }
00143 }
00144
00146 void
00147 CPipe::Flush() {
00148 if (m_pSink) {
00149 m_pSink->OutPump((new CSeg)->SetType(eSegTypeFlush));
00150 }
00151 }
00152
00154 void
00155 CPipe::Close() {
00156 if (m_pSink) {
00157 m_pSink->OutPump((new CSeg)->SetType(eSegTypeClose));
00158 }
00159 }
00160
00161
00162
00163 CSeg *
00164 CPipe::GetSeg(size_t cb) {
00165 return m_pSink ? m_pSink->OutGetSeg(cb) : new CSeg(cb);
00166 }
00167
00178 void
00179 CPipe::Signal(void *vId, void *p) {
00180 if (m_pSink) {
00181 m_pSink->WaitForIdle();
00182 if (m_pSink->OutSignal(vId, p)) {
00183 m_pSink->Signal(vId, p);
00184 }
00185 }
00186 }
00187
00194 longlong
00195 CPipe::OutSizeMax() {
00196 return m_pSink ? m_pSink->OutSizeMax() : -1;
00197 }
00198
00204 CSeg *
00205 CPipe::OutGetSeg(size_t cb) {
00206 return new CSeg(cb);
00207 }
00208
00218 bool
00219 CPipe::OutSignal(void *vId, void *p) {
00220 return true;
00221 }
00222
00231 bool
00232 CPipe::OutOpen() {
00233 return true;
00234 };
00235
00240 bool
00241 CPipe::OutClose() {
00242 return true;
00243 };
00244
00252 void
00253 CPipe::OutSpecial(CSeg *pSeg) {
00254 Pump(pSeg);
00255 }
00256
00260 CPipe::CPipe() {
00261 m_pSink = NULL;
00262 }
00263
00265 CPipe::~CPipe() {
00266 DestructSink();
00267 }
00268 };