Main Page   Packages   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Search  

C:/temp/src/j2k/nto/Server.cpp

Go to the documentation of this file.
00001 #ifndef __J2K__Server_CPP__
00002 #define __J2K__Server_CPP__
00003 
00004 #include <j2k/nto/Server.hpp>
00005 
00006   Server::Server( NameSpace* n0 )
00007     : Basic_PThread(), 
00008       MsgContainer( n0, server ), 
00009       nbProducer( 0 ),
00010       nbConsumer( 0 )
00011   { 
00012   }
00013 
00014   Server::~Server() 
00015   { 
00016     printf( "Killing server... \n" );
00017     register size_t  i   = 0;
00018     register size_t  max = timeout.size();
00019 
00020     iter = timeout.begin();
00021     for( i = 0; i < max; i++ )
00022     {
00023       if ( timeout[i] != NULL )
00024       {
00025         deleteEntry( i );
00026       }
00027       iter++;
00028     }
00029   }
00030 
00031   void Server::deleteEntry( register int index )
00032   {
00033 //     printf( "Delete server entry [%d] \n", index );
00034      fflush( stdout );
00035 
00036      // Delete that entry
00037      timeout[ index ]->stop();
00038      timeout[ index ]->reset();
00039 
00040 //     Timeout* t = timeout[ index ];
00041      timeout[ index ] = NULL;
00042 //     timeout.erase( iter );
00043 //     delete t;
00044 
00045 //    printf( "New server container size [%d] \n", timeout.size() );
00046     fflush( stdout );
00047   }
00048 
00049   void Server::createEntry( register int rcvid, 
00050                             register Message* message, 
00051                             register int startTimer = 0 )
00052   {
00053 
00054 //    printf( "Create server entry for [%d] \n", message->getNumber() );
00055 //    fflush( stdout );
00056 
00057     Timeout* t = new Timeout( MsgContainer::n, 0, 1 ); // 5 sec
00058     t->copy( message );
00059 
00060     if ( startTimer > 0 )
00061     {
00062       t->start();
00063     }
00064 
00065     t->setReceiver( rcvid );
00066     timeout.push_back( t );
00067 
00068 //    printf( "New server container size [%d] \n", timeout.size() );
00069   }
00070 
00071   /*** Server Side of the code ***/
00072   void Server::run()
00073   {
00074     printf( "Server run \n" );
00075     fflush( stdout );
00076 
00077     // Create a namespace channel for Producer-Server-Consumer
00078     connect();
00079 
00080     printf( "Server attached \n" );
00081     fflush( stdout );
00082 
00083     /* Do your MsgReceive's here now with the chid */
00084     while( 1 ) 
00085     {
00086       printf( "Wait for receive...\n" );
00087       fflush( stdout );
00088 
00089       int rcvid = message->receive( n->getChannel() ); 
00090 
00091       printf( "Message received from ID#%d \n", rcvid );
00092       fflush( stdout );
00093 
00094       // Check for local Message
00095       if ( rcvid == 0 ) continue;
00096        
00097       // Check for Kernel Message
00098       if ( message->checkHeader( rcvid ) < 0 ) break;
00099 
00100       printf("Server receive %d \n", message->getNumber() );
00101       fflush( stdout );
00102 
00103       message->print();
00104       message->setStatus( message->getRequest() );
00105    
00106       // Handling Consumer Message       
00107       if ( message->isConsumer() )  
00108       {
00109         printf( "Consumer message \n" );
00110         fflush( stdout );
00111 
00112         // Feed the Consumer
00113         if ( nbProducer > 0 ) 
00114         {
00115      // There is some Producer waiting to feed consumer so FEED IT!
00116           register size_t  i   = 0;
00117           register size_t  max = timeout.size();
00118           iter = timeout.begin();
00119           for( i = 0; i < max; i++, iter++ )
00120           {
00121            if ( timeout[i] != NULL )
00122            {
00123             if ( timeout[ i ]->getNumber() != -1  &&  timeout[ i ]->isProducer() )
00124             {
00125               printf( "Consumer fed by entry %d, producer %d \n", i, timeout[ i ]->getNumber() );
00126               fflush( stdout );
00127 
00128               timeout[ i ]->stop(); 
00129               timeout[ i ]->reply( rcvid );                   // Reply Consumer
00130               message->reply( timeout[ i ]->getReceiver() );  // Reply Producer
00131 
00132               // Delete that entry
00133               deleteEntry( i );
00134 
00135               nbProducer--;
00136               i = max;
00137 
00138               printf( "Done with case 1 ! \n" );
00139               fflush( stdout );
00140             }
00141            }
00142           }
00143         }
00144         else 
00145         {
00146      // Store the consumer request with timeout
00147           printf( "Storing Consumer %d \n", message->getNumber() );
00148           fflush( stdout );
00149 
00150           createEntry( rcvid, message, 1 );
00151           nbConsumer++;
00152 
00153           printf( "Done with case 2 ! \n" );
00154           fflush( stdout );
00155         }
00156       }
00157       else if (  message->isProducer() )   // Handling Producer
00158       {
00159         printf( "Producer message \n" );
00160         fflush( stdout );
00161 
00162         if ( nbConsumer > 0 )
00163         {
00164           // Feed the Consumer from the current Producer
00165           register size_t  i   = 0;
00166           register size_t  max = timeout.size();
00167           iter = timeout.begin();
00168           for( i = 0; i < max; i++, iter++ )
00169           {
00170            if ( timeout[ i ] != NULL )
00171            {
00172             if ( timeout[ i ]->getNumber() != -1  &&  timeout[ i ]->isConsumer() )
00173             {
00174               printf( "Consumer fed by entry %d, producer %d \n", i, timeout[ i ]->getNumber() );
00175               fflush( stdout );
00176 
00177               timeout[ i ]->stop();
00178               timeout[ i ]->reply( rcvid );                   // Reply Consumer
00179               message->reply( timeout[ i ]->getReceiver() );  // Reply Producer
00180 
00181               // Delete that entry
00182               deleteEntry( i );
00183 
00184               nbConsumer--;
00185               i = max;
00186 
00187               printf( "Done with case 3 ! \n" );
00188               fflush( stdout ); 
00189             }
00190            }
00191           }
00192         }
00193         else
00194         {
00195           // Storing Producer, No consumer Waiting.
00196           printf( "Storing Producer %d \n", message->getNumber() );
00197           fflush( stdout );
00198 
00199           createEntry( rcvid, message, 0 );
00200           nbProducer++;
00201 
00202           printf( "Done with case 4 ! \n" );
00203           fflush( stdout );
00204         }
00205       }
00206       else  // Generic reply
00207       {
00208         printf( "Generic reply \n" );
00209         fflush( stdout );
00210 
00211           int data = 100;
00212           message->reply( rcvid, data );
00213 
00214         printf( "Done with case 5 ! \n" );
00215         fflush( stdout );
00216 
00217      }
00218 
00219       printf( "End of message response. \n" );
00220       printf( "****************************************************************\n" );
00221       fflush( stdout );
00222 
00223     } // End of while loop
00224 
00225     printf( "Server Disconnect \n" );
00226     fflush( stdout );
00227 
00228     disconnect();
00229   }
00230 
00231 #endif // End of Server.cpp

Generated on Sun Oct 14 18:46:40 2001 for Standard J2K Library by doxygen1.2.11.1 written by Dimitri van Heesch, © 1997-2001