Async IO and network fetching with libferris

Mr. Ben "monkeyiq" Martin

Abstract

Details on how to perform async IO with libferris and explicit mention of using async io to fetch information from an HTTP site where header information is gleemed from libferris before the contents of the request are read or transfered over the network.


Table of Contents

Overview
Obtaining network headers
GLib2, ferris and async io
Monitoring children
Putting it all together
Future trends
Bibliography

Overview

Modern computing environments make network programming atleast an implicit part of any substantial software development. This paper aims to show how to connect the various abstractions in libferris to acheive async network IO in a GUI tool using libferris.

This paper aims to assist both those who develop and extend libferris and authors focusing only on the usage of libferris. These goals will be met by providing an introduction or refresher course on the key abstractions in libferris for async io and network io.

Note that ferris tar.bz2 files and xerces-c and xalan-c rpms are available at the ferris distribution website.

The key topics are

  • When obtaining the body of a remote context how can one get access to the header information before the entire document is sent over the network.

  • How can ferris network IO be integrated into the GLib2 main loop so that a tool can present a responsive GUI to the user while transfering document(s) over the network.

  • Getting access to network header information aswell as byte content using async io.

Locations of the referenced code and abstractions.

  • fcat is in libferris-xxx.tar.bz2/ferris/apps/cat/

  • Runner, AsyncIOHandler and their handles fh_runner and fh_aiohandler respectively are in libferris-xxx.tar.bz2/ferris/Ferris/Runner.cpp

  • ChildStreamServer and its handle fh_childserv are in libferris-xxx.tar.bz2/ferris/Ferris/ChildStreamServer.cpp

  • fnews is in fnews-XXX.tar.bz2 which is available on the distro site.

See the ferris distribution site for all the above code.

Obtaining network headers

Access to the raw network headers with libferris is currently not allowed. If such raw headers were made available to client code then the filesystem abstraction would no longer exist and client tools would have to endure the burden of the specifics of each transfer protocol.

The information gleemed from these headers is made available in a sanitized format such that any network protocol's headers can be exposed to clients without breaking abstraction. This is done by each libferris plugin module collecting metadata about which EA has been updated by the network headers and firing a libsigc++ signal once all headers are read. Clients wishing to see network headers can attach to this signal and will obtain a std::set< string > with the names of each EA that has been updated by the headers.

Currently this is only done in the HTTP module and only for the getIStream() method. If the client attempts to obtain the size EA from an HTTP context before any network IO has occured then the HTTP ferris module will submit a request to the server asking for the HTTP headers that contain the size EA information (using a no body HTTP request).

The flow of control in the client is thus:

  • 
    fh_context   c = Resolve( "http://127.0.0.1/index.html" );
    c->getContextEvent_Headers_Received_Sig().connect( slot( my_header_function ) );
    fh_istream iss = c->getIStream();
    
    

  • Within the getIStream() call an HTTP request is made and the network headers are received. The HTTP module will collect all the headers and update the context's internal state to reflect any information that is gleemed from these headers. Such data typically includes the modification time and size of the remote content.

  • Once all headers are received a Headers_Received_Sig is emitted by the HTTP module. This will call the connected user code

    Example 1. my_header_function

    
    void my_header_function( 
      fh_context c,               // who received header info
      const stringset_t& strset ) // EA names that were updated
    {
      for( stringset_t::iterator iter = strset.begin(); iter != strset.end(); ++iter )
      {
        string k = *iter;
        try {
          string v = getStrAttr( c, k, "", true, true );
          cerr << "k:" << k << " v:" << v << endl;
        }
        catch( ... )
        {}
      }
    }
    
    
    Which can pick off the updated EA that is of interest to it.

  • after all signal handlers for the Headers_Received_Sig have returned the HTTP module will begin transfering the contents of the HTTP request and return to the caller of getIStream().

See the fcat program for a working example of reading and displaying network headers. This is how to invoke fcat to show network headers in bash

fcat --output-headers-on-fd=5 http://127.0.0.1/index.rss 5>|/tmp/hdr
fcat /tmp/hdr

GLib2, ferris and async io

libferris is based around the GLib2 main loop. As such any form of IO should be exposed to GLib2. I created the AsyncIOHandler abstraction to handle easy integration of info arriving on an fd with the GLib2 main loop.

The main class for async IO is the AsyncIOHandler class and its handle fh_aiohandler. AsyncIOHandler objects will call a functor with the following signature when data arrives on their given fd.


 fh_istream async_cb_template( fh_aiohandler a, fh_istream iss );

Once the fd for async IO is known then programmers can connect an AsyncIOHandler using the following

Example 2.


  struct MyHandler
  {
    fh_istream method_name( fh_aiohandler a, fh_istream iss )
    {
      return iss;
    }
  };

  fh_aiohandler aio = new AsyncIOHandler( fd );
  MyHandler* MyHandlerPtr = new MyHandler();
  aio->setFunctor( 
    AsyncIOHandler::AsyncIOFunctor_t( 
      MyHandlerPtr, &MyHandler::method_name ));

A common pattern for method_name in the above is to collect all the async IO data to a fh_stringstream and when the child exits then read the child's data from the fh_stringstream. This way the parent can ask a child to produce some data and forget about the child until that data is produced.

Note that in Example 2 the functor will be called automatically and the fd will be watched by the GLib2 main loop.

The Runner class uses AsyncIOHandler to monitor the stdout of child processes if the Runner::setAsyncStdOutFunctor method has been called with a custom functor.

Monitoring children

The ChildStreamServer class was introduced to help async IO. The main purpose of ChildStreamServer is to monitor children process and call a functor when one dies.

Note that one ChildStreamServer can monitor many Runner objects. Programmers should not call wait() etc functions for children processes when using ChildStreamServer or termination functors may not be called properly.

The following attaches some code that will be informed when a child has completed.



struct ChildWatcher
{
    bool m_childComplete;
    bool m_childWasCanceled;

    ChildWatcher::ChildWatcher()
        :
        m_childComplete( false ),
        m_childWasCanceled( false )
    {
    }

    void child_complete( fh_childserv serv, fh_runner r, int status )
    {
        if( WIFEXITED( status ) )
        {
            int s = WEXITSTATUS( status );
            if( s != 0 )
                m_childWasCanceled = true;
        }
        
        cerr << "ChildWatcher::child_complete() " << endl;
        m_childComplete = true;

        /*
         * Make sure that all async IO calls have been accepted.
         */
        Main::processAllPendingEvents();
        serv->remDiedFunctor(
            ChildStreamServer::ChildDiedFunctor_t(
                this, &ChildWatcher::child_complete ));
    }

    void ChildWatcher::attach( fh_childserv serv )
    {
        serv->addDiedFunctor(
            ChildStreamServer::ChildDiedFunctor_t(
                this, &ChildWatcher::child_complete ));
    }

};

 fh_runner r = ...;
 fh_childserv serv = new ChildStreamServer();
 serv->addChild( r );
 ChildWatcher childwatch();
 childwatch.attach( serv );

Putting it all together

The fnewsconfig tool is a GTK2 tool for configuring and testing RSS news feeds. When testing a feed a network request will be made by calling getIStream() on a HTTP address. For Internet fetching this can take some time and the GUI remains responsive during this period.

fnewsconfig remains responsive by spawning a child process that uses fcat to transfer the data over the network. The spawning is done using the Runner abstraction and the childs stdout is connected for async IO in fnewsconfig.

Note that the below snippits ignore error conditions and are somewhat simplified. See fnewsconfig for the full code.

Example 3.


            int          pnt_hdrfd = 0;
            int          chd_hdrfd = 0;
            int          filedes[2];

            pipe( filedes );
            pnt_hdrfd = filedes[0];
            chd_hdrfd = filedes[1];

            fh_runner r = new Runner();
            commandss << "fcat ";
            if( chd_hdrfd )
                commandss << " --output-headers-on-fd=" << chd_hdrfd << " ";
            commandss << rssc->getURL();
            string command = tostr(commandss);
            r->setCommandLine( command );
            r->setSpawnFlags( GSpawnFlags( r->getSpawnFlags()
                                           | G_SPAWN_LEAVE_DESCRIPTORS_OPEN
                                           | G_SPAWN_STDERR_TO_DEV_NULL
                                           | G_SPAWN_SEARCH_PATH) );


There are three events that the child fcat can perform that are of interest to fnewsconfig: child has data on stdout, child has header data, child process completes.

Data on stdout can be collected in the background using a stringstream. We assume that 'r' is a fh_runner has setup in Example 3.



struct ObjectClass
{
  fh_stringstream ss;

  fh_istream async_io_cb( fh_runner r, fh_istream iss )
  {
        streamsize readsz = 0;
        const int bufsz = 1024;
        char buf[ bufsz + 1 ];
        while( true )
        {
            iss.read( buf, bufsz );
            readsz = iss.gcount();
            if( !readsz )
                break;
            
            ss.write( buf, readsz );
            donesz += readsz;
        }
        
        return iss;
  }
};

... 

  fh_runner r = ...;
  ObjectClass* objectptr = new ObjectClass();
  r->setAsyncStdOutFunctor(
      Runner::AsyncIOFunctor_t(
          objectptr, &ObjectClass::async_io_cb ));

Data that the child writes to chd_hdrfd will arrive on pnt_hdrfd. This is a chance to use the raw AsyncIOHandler class. Using the same ObjectClass and objectptr as in the above example.

struct ObjectClass
{
  fh_stringstream hdrss;

  fh_istream header_cb( fh_aiohandler a, fh_istream iss )
        {
            std::copy( std::istreambuf_iterator<char>(iss),
                       std::istreambuf_iterator<char>(),
                       std::ostreambuf_iterator<char>(hdrss));
            hdrss << flush;
            return iss;
        }
    
  void attachHeader( fh_aiohandler aio )
    {
      aio->setFunctor( AsyncIOHandler::AsyncIOFunctor_t( this, &_Self::header_cb ));
    }
};

... 

  fh_aiohandler hdrAIO = new AsyncIOHandler( pnt_hdrfd );
  objectptr->attachHeader( hdrAIO );


Monitoring a child process for completion can be done using ChildStreamServer.


struct ChildWatcher
{
    bool m_childComplete;
    bool m_childWasCanceled;
    bool m_childContentNotModified;

    ChildWatcher::ChildWatcher()
        :
        m_childComplete( false ),
        m_childWasCanceled( false ),
        m_childContentNotModified( false )
    {
    }

    void child_complete( fh_childserv serv, fh_runner r, int status )
    {
        if( WIFEXITED( status ) )
        {
            int s = WEXITSTATUS( status );
            if( s == 5 )
                m_childContentNotModified = true;
            else if( s != 0 )
                m_childWasCanceled = true;
        }
        
        cerr << "ChildWatcher::child_complete() " << endl;
        m_childComplete = true;

        /*
         * Make sure that all async IO calls have been accepted.
         */
        Main::processAllPendingEvents();
        serv->remDiedFunctor(
            ChildStreamServer::ChildDiedFunctor_t(
                this, &ChildWatcher::child_complete ));
    }

    void ChildWatcher::attach( fh_childserv serv )
    {
        serv->addDiedFunctor(
            ChildStreamServer::ChildDiedFunctor_t(
                this, &ChildWatcher::child_complete ));
    }

};

 fh_childserv serv = new ChildStreamServer();
 serv->addChild( r );
 ChildWatcher childwatch();
 childwatch.attach( serv );

Now all that is left is to call

r->Run();

and monitor the GUI and wait for the child to complete.

The sequence of events is thus

  • user clicks test button for a news feed in fnewsconfig.

  • fnewsconfig prepares a fh_runner.

  • fnewsconfig attaches child death, header fd, and stdout callbacks.

  • fnewsconfig calls Run() on its Runner.

  • fnewsconfig presents a GTK2 dialog showing a status message and progress meter for the download

  • fnewsconfig hands control over to GTK.

  • fcat: Resolve()s the URL and attaches a header libsigc++ handler.

  • fcat: calls getIStream on the network address

  • fcat: the header signal handler in fcat is called with the names of EA that the network request header has been able to update. fcat creates an XML document and writes it to the fd that was specified in its argv[].

  • fcat: the getIStream() call completes after transfering the body of the context. fcat dumps this info to its stdout and exits.

  • Sometime after handing control to GTK fnewsconfig will have its header callback invoked possibly many times when data arrives on pnt_hdrfd. After these invocations fnewsconfig will have its stdout functor called possibly many times and then its child death functor.

    Note that the child death functor may be called before all the data has been read from the child's stdout. For this reason the child death signal handler in fnewsconfig processes GLib2's pending event queue before returning.

    To process the header info fnewsconfig collects all the data on pnt_hdrfd and the first time its stdout async IO functor is called it processes the header XML document.

    
        virtual fh_istream async_io_cb( fh_runner r, fh_istream iss )
            {
                if( m_firstBodyCall )
                {
                    m_firstBodyCall = false;
                    try
                    {
                        DOM_Document dom   = Factory::StreamToDOM( hdrss );
                        fh_context    dc   = Factory::mountDOM( dom );
                        fh_context    hdrc = dc->getSubContext( "headerinfo" );
                    
                        totalsz       = toType<streamsize>( getStrSubCtx( hdrc, "size",  "0" ));
                        m_remoteMTime = toType<time_t>    ( getStrSubCtx( hdrc, "mtime", "0" ));
    
                        cerr << "m_firstBodyCall sz:" << totalsz << " mtime:" << m_remoteMTime << endl;
                    }
                    catch( exception& e )
                    {
                        cerr << "Error parsing header info e:" << e.what() << endl;
                    }
                }
             ...
            }
    
    

Future trends

The header information transmitted by fcat could be a SOAP message and fnewsconfig could have registered a object to handle this message. That would cut down the complexity of the async io in fnewsconfig because it wouldn't have to worry about collecting the header info.

Source code for ferris 0.9.60+ and its dependencies for a Redhat 8.0 machine can be found at Ferris downloads . Note that not all of the dependencies are required. for example if the a52dec rpm is not detected during configure time Ferris will not build support for a52 audio.

For discussion about this document please use the Ferris mailing list or on irc.openprojects.net/#ferris.

Bibliography

Websites

[FerrisWebSite] Ben Martin. Copyright � 2001 Ben Martin.

[FerrisCreatePaper] Ben Martin. Copyright � 2001 Ben Martin.

[FerrisXSL1Paper] Ben Martin. Copyright � 2002 Ben Martin.

[Apache handler manual]

[XSD W3C]

[Berkeley DB]

[GNU CGICC website]

[CGICC src rpm from makdrake Linux]

Books

[ORA XSLT]