00001
00034 #include "stdafx.h"
00035
00036 namespace AxPipe {
00040 CSource::~CSource() {
00041 Plug();
00042 }
00047 CSource *
00048 CSource::Append(CSink *pSink) {
00049 CPipe::Append(pSink);
00050 return this;
00051 }
00052
00057 CSource *
00058 CSource::Append(CSink& sink) {
00059 AppendSink(&sink, false);
00060 return this;
00061 }
00062
00070 CSource *
00071 CSource::Open() {
00072 if (!m_fIsOpen) {
00073 OutPump((new CSeg)->SetType(eSegTypeOpen));
00074 m_fIsOpen = true;
00075 }
00076 return this;
00077 }
00078
00084 CSource *
00085 CSource::Close() {
00086 if (m_fIsOpen) {
00087 OutPump((new CSeg)->SetType(eSegTypeClose));
00088 m_fIsOpen = false;
00089 }
00090 return this;
00091 }
00092
00095 CSource *
00096 CSource::Drain() {
00097 ASSCHK(nGlobalInit != 0, _T("AxPipe::CGlobalInit object must exist"));
00098
00099 if (!m_fIsOpen) {
00100 SetError(ERROR_CODE_NOTOPEN, ERROR_MSG_NOTOPEN);
00101 return this;
00102 }
00103 while (WorkStart(), !GetErrorCode()) {
00104 if (((m_pSeg = In()) != NULL) && (m_pSeg->Len() != 0)) {
00105 WorkSignal();
00106
00107 } else {
00108 break;
00109 }
00110 }
00111
00112 if (m_pSeg) {
00113 m_pSeg->Release();
00114 m_pSeg = NULL;
00115 }
00116
00117 WorkEnd();
00118 return this;
00119 }
00120
00124 CSource *
00125 CSource::Plug() {
00126 if (!m_fExit) {
00127 OutPump((new CSeg)->SetType(eSegTypePlug));
00128 }
00129 return this;
00130 }
00131
00134 void
00135 CSource::Out(CSeg *pSeg) {
00136 if (m_pSink) {
00137 m_pSink->OutPump(pSeg);
00138 } else {
00139 pSeg->Release();
00140 }
00141 }
00142 };