The assumption is that the input data is ASCII, just to make it clear.
The first stage changes all spaces (' ') to dashes ('-')
The second stage changes all dashes ('-') to plus ('+')
The third stage changes all plus ('+') to equal ('=')
The join, just simply takes one segment from each source and output it, in a round- robin fashion.
The split finally, will take every other character and pass them to two different sinks, one being standard output, the other being a file.
Not very useful perhaps, but it demonstrates the principles involved. Actually, for regular text streams, the iostream library may be a better bet (although it does not support threading).
class CSourceStdin : public AxPipe::CSource { public: AxPipe::CSeg *In() { if (feof(stdin)) { return new AxPipe::CSeg; // Signal end of file/no more data now } AxPipe::CSeg *pSeg = new AxPipe::CSeg(1024); size_t cb = fread(pSeg->PtrWr(), 1, pSeg->Size(), stdin); pSeg->Len(cb); // Set length of valid data if (ferror(stdin)) { // Report if error reading // This convolution is just because VC does not support _tcserror() _TCHAR *szMsg = astrtch(strerror(errno)); SetError(AxPipe::ERROR_CODE_DERIVED, szMsg); delete[] szMsg; } return pSeg; // The framework will push this down the pipe } }; // CSourceStdin
Here we just override CSource::In() and read in suitable chunks, passing it onwards by returning the a CSeg with the data. That's the basic source. For files, use the included CSourceMemFile.
class CSinkStdout : public AxPipe::CSink { public: void Out(AxPipe::CSeg *pSeg) { // Just write the data to the final destination, stdout if (fwrite(pSeg->PtrRd(), 1, pSeg->Len(), stdout) != pSeg->Len()) { // This convolution is just because VC does not support _tcserror() _TCHAR *szMsg = astrtch(strerror(errno)); SetError(AxPipe::ERROR_CODE_DERIVED, szMsg); delete[] szMsg; } } }; // CSinkStdout
This is almost trivial, override CSink::Out and write the CSeg segment passed.
class CPipeReplace1 : public AxPipe::CPipe { protected: void Out(AxPipe::CSeg *pSeg) { size_t cb = pSeg->Len(); // Allocate a new segment to output to AxPipe::CSeg *pOutSeg = new AxPipe::CSeg(cb); char *s = (char *)pSeg->PtrRd(); // Get ptr to start of source char *d = (char *)pOutSeg->PtrWr(); // Get ptr to start of dest while (cb--) { char c = *s++; if (c == ' ') { c = '-'; } *d++ = c; } pSeg->Release(); // Must release, otherwise leak Pump(pOutSeg); // Send it onwards } }; // CPipeReplace1
class CPipeReplace2 : public AxPipe::CFilter { protected: void InFilter() { // Filters by default need to Open() and close manually, this may // be modified by overriding AxPipe::CFilter::Open() and Close(). Open(); // Must Open() pipe downwards AxPipe::CSeg *pSeg; while (pSeg = Read()) { // In() returns NULL when pipe is empty size_t cb = pSeg->Len(); // Allocate a new segment to output to AxPipe::CSeg *pOutSeg = new AxPipe::CSeg(cb); char *s = (char *)pSeg->PtrRd(); // Get ptr to start of source char *d = (char *)pOutSeg->PtrWr(); // Get ptr to start of dest while (cb--) { char c = *s++; if (c == '-') { c = '+'; } *d++ = c; } pSeg->Release(); // Must release, otherwise leak Pump(pOutSeg); // Send it onwards } Close(); // Should ensure Close() downwards } //InFilter }; // CPipeReplace2
class CPipeReplace3 : public AxPipe::CFilterByte { public: void InFilter() { Open(); int c; while ((c = ReadByte()) != -1) { AxPipe::CSeg *pSeg = new AxPipe::CSeg(1); if (c == '+') { c = '='; } *pSeg->PtrWr() = c; Pump(pSeg); } Close(); } }; // CPipeReplace3
class CJoinInterMix : public AxPipe::CJoin { int i; public: CJoinInterMix() { i = 0; } AxPipe::CSeg *In() { int j = i; do { AxPipe::CSeg *pSeg = StreamSeg(i); i = StreamIx(i+1); if (pSeg) { return pSeg; } } while (i != j); return NULL; } }; // CJoinInterMix
class CEvenOdd : public AxPipe::CPipe { int m_iCurrent; int m_iWhich; public: CEvenOdd() { m_iCurrent = m_iWhich = 0; } CEvenOdd* Init(int iWhich) { m_iWhich = iWhich; return this; } protected: void Out(AxPipe::CSeg *pSeg) { ASSPTR(pSeg); // If the length is odd, and we want the first byte, // Then we must adjust by one to make room for the last byte. int iAdjust = 0; // If we want the first byte... if (m_iCurrent == m_iWhich) { // .. and the length is odd, we need an extra byte in the buffer. iAdjust = (int)(pSeg->Len() & 1); } // Allocate an exactly large enough buffer. AxPipe::CSeg *pSegHalf = new AxPipe::CSeg(pSeg->Len()/2 + iAdjust); for (unsigned i = 0; i < pSeg->Len(); i++) { if (m_iCurrent == m_iWhich) { pSegHalf->PtrWr()[i/2] = pSeg->PtrRd()[i]; } m_iCurrent ^= 1; } pSeg->Release(); Pump(pSegHalf); } }; // CEvenOdd
int _tmain(int argc, _TCHAR* argv[]) { AxPipe::CGlobalInit axpipeInit; // It just has to be there. // Build a partial pipe with a string as the source. const char sz1[] = "Now is the time for all good men"; AxPipe::CThreadSource<AxPipe::CSourceMem> *p1 = new AxPipe::CThreadSource<AxPipe::CSourceMem>; p1->Init(_tcslen(sz1), (void *)sz1); // Some different forms of processing, in separate threads. p1->Append(new AxPipe::CThread<CPipeReplace1>)->Append(new AxPipe::CThread<CPipeReplace2>)->Append(new AxPipe::CThread<CPipeReplace3>); // Build another partial pipe with a string as the source. const char sz2[] = "come to the aid of their country"; AxPipe::CThreadSource<AxPipe::CSourceMem> *p2 = new AxPipe::CThreadSource<AxPipe::CSourceMem>; p2->Init(_tcslen(sz2), (void *)sz2); // Some different forms of processing, in one single thread. p2->Append(new CPipeReplace1)->Append(new CPipeReplace2)->Append(new CPipeReplace3); // Allocate a new AxPipe::CJoin -derived object, which just intermixes it's in streams. CJoinInterMix *pipe = new CJoinInterMix; pipe->Init(2); // Max 2 in streams. p1->Append(pipe->GetSink(0)); // Attach one of the AxPipe::CSinkJoin's p2->Append(pipe->GetSink(1)); // Attach the other of the AxPipe::CSinkJoin's // Now build a partial pipe, which will only pass even bytes to it's sink CEvenOdd *pEven = new CEvenOdd; // The AxPipe::CSink is a file, even.txt. pEven->Init(0)->Append((new AxPipe::CSinkMemFile)->Init(_T("Even.txt"))); // Now build a partial pipe, which will only pass odd bytes to it's sink CEvenOdd *pOdd = new CEvenOdd; // The AxPipe::CSink is standard output. pOdd->Init(1)->Append(new CSinkStdout); // Create a AxPipe::CSplit dynamically, and attach the Even/Odd filters. pipe->Append((new AxPipe::CSplit)->Init(pEven, pOdd)); p1->Run(); // Start the 'left' side of the Join running p2->Run(); // Start the 'right' side of the Join running pipe->Open()->Drain()->Close()->Plug(); // Brace Drain() with Open()/Close() then Plug() p1->Wait(); // Wait for the 'left' thread to exit p2->Wait(); // Wait for the 'right' thread to exit int iError = 0; // Assume no error // The AxPipe::CJoin will interrogate all attached streams for errors, so it's enough // to check pipe here. if (pipe->GetErrorCode() != AxPipe::ERROR_CODE_SUCCESS) { _ftprintf(stderr, _T("Demo error: %s"), pipe->GetErrorMsg()); iError = 1; // Return an error code from the program } delete p1; // Delete the 'left' side of the AxPipe::CJoin delete p2; // Delete the 'right' side of the AxPipe::CJoin delete pipe; // All sections of the pipe will self-delete return iError; } // _tmain
The second thing to note is the Open()->Drain()->Close()->Plug() sequence.
The CSource::Drain() call causes data to be read from the CSource and passed along the pipe to the CSink. But before that, you must call CSource::Open(). This causes a signal to be passed down the line, enabling sources, pipe sections and sinks to prepare. After the source signals end of stream, the CSource::Close() call is necessary to give the different parts a chance to flush final data etc.
Also note how the extra AxPipe::CSource derived objects are setup to drain in threads of their own. This is necessary for the CJoinInterMix to work, as it otherwise will simply wait for data. Do remember thus, that using a AxPipe::CJoin derived class entails many threads by definition.
It's possible to re-open a pipe-line by calling CSource::Open() again, if the sections support it. This is suitable for situations where a single stream contains of separate concatenated parts for example.
The final CSource::Plug call close the pipe-line for good.
A check for errors is done by calling CSource::GetErrorCode, any error signalled with CError::SetError will be passed back to the source and thus be checked here. If there is an error, the CSource::GetErrorMsg will get the text representation of it.
Do follow the links provided for explanation of the different framework calls.
Introduction, Installation, A First Example, A Second Examle, Definitions of Terms, Stock Transformations, Utilities and Overrides