Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

A Second Example

The following is a sample program with some patterns to re-use. It will build a pipe reading from a file, and process the input in three stages, demonstrating three different basic models. It will also join two sources, and then split them up again, just to demonstrate the use of that functionality.

The assumption is that the input data is ASCII, just to make it clear.

The first stage changes all spaces (' ') to dashes ('-')
The second stage changes all dashes ('-') to plus ('+')
The third stage changes all plus ('+') to equal ('=')

The join, just simply takes one segment from each source and output it, in a round- robin fashion.

The split finally, will take every other character and pass them to two different sinks, one being standard output, the other being a file.

Not very useful perhaps, but it demonstrates the principles involved. Actually, for regular text streams, the iostream library may be a better bet (although it does not support threading).


First we define a new type of source, reading from standard input.
class CSourceStdin : public AxPipe::CSource {
public:
    AxPipe::CSeg *In() {
        if (feof(stdin)) {
            return new AxPipe::CSeg;        // Signal end of file/no more data now
        }
        AxPipe::CSeg *pSeg = new AxPipe::CSeg(1024);
        size_t cb = fread(pSeg->PtrWr(), 1, pSeg->Size(), stdin);
        pSeg->Len(cb);                      // Set length of valid data
        if (ferror(stdin)) {                // Report if error reading
            // This convolution is just because VC does not support _tcserror()
            _TCHAR *szMsg = astrtch(strerror(errno));
            SetError(AxPipe::ERROR_CODE_DERIVED, szMsg);
            delete[] szMsg;
        }
        return pSeg;                        // The framework will push this down the pipe
    }
}; // CSourceStdin

Here we just override CSource::In() and read in suitable chunks, passing it onwards by returning the a CSeg with the data. That's the basic source. For files, use the included CSourceMemFile.


Then we define a new type of sink, writing to standard output.
class CSinkStdout : public AxPipe::CSink {
public:
    void Out(AxPipe::CSeg *pSeg) {
        // Just write the data to the final destination, stdout
        if (fwrite(pSeg->PtrRd(), 1, pSeg->Len(), stdout) != pSeg->Len()) {
            // This convolution is just because VC does not support _tcserror()
            _TCHAR *szMsg = astrtch(strerror(errno));
            SetError(AxPipe::ERROR_CODE_DERIVED, szMsg);
            delete[] szMsg;
        }
    }
}; // CSinkStdout

This is almost trivial, override CSink::Out and write the CSeg segment passed.


The first sample uses a segment oriented push model. It demonstrate how to build the basic type of AxPipe::CPipe derived push-model stream processor.
class CPipeReplace1 : public AxPipe::CPipe {
protected:
    void Out(AxPipe::CSeg *pSeg) {
        size_t cb = pSeg->Len();
        // Allocate a new segment to output to
        AxPipe::CSeg *pOutSeg = new AxPipe::CSeg(cb);
        char *s = (char *)pSeg->PtrRd();    // Get ptr to start of source
        char *d = (char *)pOutSeg->PtrWr(); // Get ptr to start of dest
        while (cb--) {
            char c = *s++;
            if (c == ' ') {
                c = '-';
            }
            *d++ = c;
        }
        pSeg->Release();                // Must release, otherwise leak
        Pump(pOutSeg);                  // Send it onwards
    }
}; // CPipeReplace1
The heart of the example is in the override of CPipe::Out. Segments are passed to it as they arrive, a new segment is allocated, and is used to create the processed result. This is then sent onwards with CPipe::Pump. The input segment, which now is no longer needed, is CSeg::Release'd.


The second examle uses the pull-model instead, where the code requests segments instead until the end of stream is detected.
class CPipeReplace2 : public AxPipe::CFilter {
protected:
    void InFilter() {
        // Filters by default need to Open() and close manually, this may
        // be modified by overriding AxPipe::CFilter::Open() and Close().
        Open();                             // Must Open() pipe downwards
        AxPipe::CSeg *pSeg;
        while (pSeg = Read()) {               // In() returns NULL when pipe is empty
            size_t cb = pSeg->Len();
            // Allocate a new segment to output to
            AxPipe::CSeg *pOutSeg = new AxPipe::CSeg(cb);
            char *s = (char *)pSeg->PtrRd();    // Get ptr to start of source
            char *d = (char *)pOutSeg->PtrWr(); // Get ptr to start of dest
            while (cb--) {
                char c = *s++;
                if (c == '-') {
                    c = '+';
                }
                *d++ = c;
            }
            pSeg->Release();                // Must release, otherwise leak
            Pump(pOutSeg);                  // Send it onwards
        }
        Close();                            // Should ensure Close() downwards
    } //InFilter
}; // CPipeReplace2
Here we override CFilter::InFilter, and request segments with CFilter::Read. In other respects, it's the same as the previous example.
Do note that a CFilter-derived class in it's CFilter::InFilter, must explicitly call CFilter::Open and CFilter::Close. This can be changed by overriding the default CFilter::OutOpen and CFilter::OutClose, which do nothing but stop the propagation of the open and close signals from the source in the default versions. More about open and close in the description of the main code.


The final example here illustrates the use of a further derivation of the filter model, providing a byte at a time.
class CPipeReplace3 : public AxPipe::CFilterByte {
public:
    void InFilter() {
        Open();
        int c;
        while ((c = ReadByte()) != -1) {
            AxPipe::CSeg *pSeg = new AxPipe::CSeg(1);
            if (c == '+') {
                c = '=';
            }
            *pSeg->PtrWr() = c;
            Pump(pSeg);
        }
        Close();
    }
}; // CPipeReplace3
It's the same override of CFilterByte::InFilter, but now the CFilterByte::ReadByte function is called, providing a byte at a time, or -1 at the end of the stream. Here also we need to call CFilterByte::Open and CFilterByte::Close.


A rudimentary join, which will just intermix any number of streams, on a segment by segment basis, round-robin fashion. This is not very useful either, as the segmentation will depend on the previous stages and is not known here.
class CJoinInterMix : public AxPipe::CJoin {
    int i;
public:
    CJoinInterMix() {
        i = 0;
    }
    AxPipe::CSeg *In() {
        int j = i;
        do {
            AxPipe::CSeg *pSeg = StreamSeg(i);
            i = StreamIx(i+1);
            if (pSeg) {
                return pSeg;
            }
        } while (i != j);
        return NULL;
    }
}; // CJoinInterMix
Note that to actually get anything to join, you must use the CJoin::GetSink() member and CSource::Append() that to a pipe.


A class used in the splitting, but also serves as yet another example of a simple push-model processing stage. This takes either the odd-numbered or the even-numbered bytes of a stream and passes them on, dropping the other bytes.
class CEvenOdd : public AxPipe::CPipe {
    int m_iCurrent;                         
    int m_iWhich;                           
public:
    CEvenOdd() {
        m_iCurrent = m_iWhich = 0;
    }

    CEvenOdd* Init(int iWhich) {
        m_iWhich = iWhich;
        return this;
    }
protected:
    void Out(AxPipe::CSeg *pSeg) {
        ASSPTR(pSeg);
        // If the length is odd, and we want the first byte,
        // Then we must adjust by one to make room for the last byte.
        int iAdjust = 0;
        // If we want the first byte...
        if (m_iCurrent == m_iWhich) {
            // .. and the length is odd, we need an extra byte in the buffer.
            iAdjust = (int)(pSeg->Len() & 1);
        }
        // Allocate an exactly large enough buffer.
        AxPipe::CSeg *pSegHalf = new AxPipe::CSeg(pSeg->Len()/2 + iAdjust);
        for (unsigned i = 0; i < pSeg->Len(); i++) {
            if (m_iCurrent == m_iWhich) {
                pSegHalf->PtrWr()[i/2] = pSeg->PtrRd()[i];
            }
            m_iCurrent ^= 1;
        }
        pSeg->Release();
        Pump(pSegHalf);
    }
}; // CEvenOdd
See the code in _tmain() below for an example of how to use AxPipe::CSplit together with this kind of class.


Finally, the main program tying it all together.
int _tmain(int argc, _TCHAR* argv[]) {
    AxPipe::CGlobalInit axpipeInit;         // It just has to be there.

    // Build a partial pipe with a string as the source.
    const char sz1[] = "Now is the time for all good men";
    AxPipe::CThreadSource<AxPipe::CSourceMem> *p1 = new AxPipe::CThreadSource<AxPipe::CSourceMem>;
    p1->Init(_tcslen(sz1), (void *)sz1);
    // Some different forms of processing, in separate threads.
    p1->Append(new AxPipe::CThread<CPipeReplace1>)->Append(new AxPipe::CThread<CPipeReplace2>)->Append(new AxPipe::CThread<CPipeReplace3>);

    // Build another partial pipe with a string as the source.
    const char sz2[] = "come to the aid of their country";
    AxPipe::CThreadSource<AxPipe::CSourceMem> *p2 = new AxPipe::CThreadSource<AxPipe::CSourceMem>;
    p2->Init(_tcslen(sz2), (void *)sz2);
    // Some different forms of processing, in one single thread.
    p2->Append(new CPipeReplace1)->Append(new CPipeReplace2)->Append(new CPipeReplace3);

    // Allocate a new AxPipe::CJoin -derived object, which just intermixes it's in streams.
    CJoinInterMix *pipe = new CJoinInterMix;
    pipe->Init(2);                          // Max 2 in streams.

    p1->Append(pipe->GetSink(0));           // Attach one of the AxPipe::CSinkJoin's
    p2->Append(pipe->GetSink(1));           // Attach the other of the AxPipe::CSinkJoin's

    // Now build a partial pipe, which will only pass even bytes to it's sink
    CEvenOdd *pEven = new CEvenOdd;
    // The AxPipe::CSink is a file, even.txt.
    pEven->Init(0)->Append((new AxPipe::CSinkMemFile)->Init(_T("Even.txt")));

    // Now build a partial pipe, which will only pass odd bytes to it's sink
    CEvenOdd *pOdd = new CEvenOdd;
    // The AxPipe::CSink is standard output.
    pOdd->Init(1)->Append(new CSinkStdout);

    // Create a AxPipe::CSplit dynamically, and attach the Even/Odd filters.
    pipe->Append((new AxPipe::CSplit)->Init(pEven, pOdd));

    p1->Run();                              // Start the 'left' side of the Join running
    p2->Run();                              // Start the 'right' side of the Join running
    pipe->Open()->Drain()->Close()->Plug(); // Brace Drain() with Open()/Close() then Plug()
    p1->Wait();                             // Wait for the 'left' thread to exit
    p2->Wait();                             // Wait for the 'right' thread to exit
    
    int iError = 0;                         // Assume no error
    // The AxPipe::CJoin will interrogate all attached streams for errors, so it's enough
    // to check pipe here.
    if (pipe->GetErrorCode() != AxPipe::ERROR_CODE_SUCCESS) {
        _ftprintf(stderr, _T("Demo error: %s"), pipe->GetErrorMsg());
        iError = 1;                         // Return an error code from the program
    }

    delete p1;                              // Delete the 'left' side of the AxPipe::CJoin
    delete p2;                              // Delete the 'right' side of the AxPipe::CJoin
    delete pipe;                            // All sections of the pipe will self-delete
    return iError;
} // _tmain
First, note the definition of a CGlobalInit object. You need one, and only one, such object to be defined in your program before using AxPipe. The constructor of this object will initialize various global data.

The second thing to note is the Open()->Drain()->Close()->Plug() sequence.
The CSource::Drain() call causes data to be read from the CSource and passed along the pipe to the CSink. But before that, you must call CSource::Open(). This causes a signal to be passed down the line, enabling sources, pipe sections and sinks to prepare. After the source signals end of stream, the CSource::Close() call is necessary to give the different parts a chance to flush final data etc.

Also note how the extra AxPipe::CSource derived objects are setup to drain in threads of their own. This is necessary for the CJoinInterMix to work, as it otherwise will simply wait for data. Do remember thus, that using a AxPipe::CJoin derived class entails many threads by definition.

It's possible to re-open a pipe-line by calling CSource::Open() again, if the sections support it. This is suitable for situations where a single stream contains of separate concatenated parts for example.

The final CSource::Plug call close the pipe-line for good.

A check for errors is done by calling CSource::GetErrorCode, any error signalled with CError::SetError will be passed back to the source and thus be checked here. If there is an error, the CSource::GetErrorMsg will get the text representation of it.


Do follow the links provided for explanation of the different framework calls.

See also:
CSeg, CThread, CSource, CSink, CPipe, CFilter, CFilterByte, CFilterBlock CPipeBlock, CJoin, CSplit, AxPipe::CSourceFileMap, AxPipe::CSinkFileMap

Introduction, Installation, A First Example, A Second Examle, Definitions of Terms, Stock Transformations, Utilities and Overrides


Generated on Mon Feb 2 13:19:21 2004 for AxPipe by doxygen 1.3.5