Main Page   Packages   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Search  

C:/temp/src/j2k/nto/cs5.cpp

Go to the documentation of this file.
00001 #ifndef __J2K__CS5_CPP__
00002 #define __J2K__CS5_CPP__
00003 
00004 #include <j2k/nto/cs_def.hpp>
00005 #include <j2k/nto/Server.hpp>
00006 
00007 /*** Client Side of the code ***/
00008 class Consumer 
00009   : public Basic_PThread,
00010     public MsgContainer
00011 {
00012 protected:
00013   int idx;
00014 public:
00015   Consumer( NameSpace* n0, int idx0 = 0 ) 
00016     : Basic_PThread(), 
00017       MsgContainer( n0, client ),
00018       idx( idx0 )
00019   { 
00020     message->setRequest( MQI_CONSUMER );
00021     message->setIndex( idx );
00022   }
00023 
00024   inline void setPThread( pthread_t tid )
00025   {
00026     printf( "Consumer PID[%d] \n", (int)tid );
00027     fflush( stdout );
00028 
00029     message->setPThread( tid );
00030   }
00031 
00032   virtual ~Consumer() { }
00033 
00034   /*** Consumer Side of the code ***/
00035   virtual void run()
00036   {
00037     printf( "Consumer %d run \n", idx );
00038     fflush( stdout );
00039 
00040     connect();
00041 
00042     printf( "Consumer %d opened a connection \n", idx );
00043     fflush( stdout );
00044     n->waitForThreads();
00045 
00046     /* Do whatever work you wanted with server connection */
00047     register int i = 0;
00048      
00049     for ( i = 0; i < 5; i++ ) 
00050     {
00051       sched_yield();
00052       printf( "Consumer Semaphore P \n" );
00053 //    m.P();
00054 
00055         register int data = 1000 * ( idx + 1 ) + i;
00056         printf("Consumer %d sending %d \n", idx, data );
00057         fflush( stdout );
00058 
00059         message->setNumber( data );
00060 
00061         if ( message->send( fd, replyMsg ) < 0 )
00062         {
00063           perror( "Couldn't send Message ! \n" );
00064         }
00065   
00066         register int status = replyMsg->getStatus();
00067 
00068         printf( "Status[%d] \n", status );
00069         fflush( stdout );
00070 
00071         if ( replyMsg->isSuicide() )
00072         {
00073           printf( "Consumer %d SUICIDE ! \n", idx );
00074           fflush( stdout );
00075           i = 100;
00076         }
00077 
00078         if ( replyMsg->isFull() && replyMsg->isGet() )
00079         {
00080           printf( "Consumer %d retrieved %d \n", idx, replyMsg->getNumber() );
00081           fflush( stdout );
00082         }
00083 
00084         printf( "Consumer loop end for %d \n", data );
00085         fflush( stdout );
00086 
00087     } // End of for
00088 
00089     printf( "Consumer disconnect \n" );
00090     disconnect();
00091   }
00092 };
00093 
00094 /*** Producer Side of the code ***/
00095 class Producer 
00096   : public Basic_PThread, 
00097     public MsgContainer
00098 {
00099 protected:
00100   int idx;
00101 public:
00102   Producer( NameSpace* n0, int idx0 = 0 )
00103     : Basic_PThread(),
00104       MsgContainer( n0, client ),
00105       idx( idx0 )
00106   { 
00107     message->setRequest( MQI_PRODUCER );
00108     message->setIndex( idx );
00109   }
00110 
00111   inline void setPThread( pthread_t tid )
00112   {
00113     printf( "Producer PID[%d] \n", (int)tid );
00114     fflush( stdout );
00115 
00116     message->setPThread( tid );
00117   }
00118 
00119   virtual ~Producer() { }
00120 
00121   /*** Producer Side of the code ***/
00122   virtual void run()
00123   {
00124     printf( "Producer %d run \n", idx );
00125     fflush( stdout );
00126 
00127     connect();
00128 
00129     printf( "Producer %d opened a connection \n", idx );
00130     fflush( stdout );
00131     n->waitForThreads();
00132 
00133     /* Do whatever work you wanted with server connection */
00134     register int i = 0;
00135      
00136     for ( i = 0; i < 5; i++ ) 
00137     {
00138       sched_yield();
00139       printf( "Producer Semaphore P \n" );
00140 //    m.P();
00141 
00142         register int data = 200 * (idx + 1) + i;
00143         printf("Producer %d sending %d \n", idx, data );
00144         fflush( stdout );
00145 
00146         message->setNumber( data );
00147 
00148         if ( message->send( fd, replyMsg ) < 0 )
00149         {
00150           perror( "Couldn't send Message ! \n" );
00151         }
00152 
00153         if ( replyMsg->isSuicide() )
00154         {
00155           printf( "Producer %d SUICIDE ! \n", idx );
00156           fflush( stdout );
00157           i = 100;
00158         }
00159 
00160         printf( "Producer loop end for %d \n", data );
00161         fflush( stdout );
00162 
00163     } // End of for
00164 
00165     if ( replyMsg->isEmpty() )
00166     {
00167       printf( "Producer %d was consumed by %d \n", idx, replyMsg->getNumber() );
00168       fflush( stdout );
00169     }
00170 
00171     printf( "Producer disconnect \n" );
00172     disconnect();
00173   }
00174 };
00175 
00176 int main( int argc, char **argv ) 
00177 {
00178   const int nbConsumers = 2;
00179   const int nbProducers = 1;
00180   const int nbServers   = 1;
00181 
00182   NameSpace* n = new NameSpace( "coen320_fred", nbConsumers, nbProducers, nbServers  );
00183   Server s( n );
00184 
00185   Producer* p[ nbProducers ];
00186   Consumer* c[ nbConsumers ];
00187 
00188   s.start();
00189   setprio( s.getPID(), 20 );
00190 
00191   n->waitForServer();
00192 
00193   setprio( pthread_self(), 19 );
00194 
00195   register int i = 0;
00196   for( i = 0; i < nbProducers; i++ )
00197   {
00198     p[i] = new Producer( n, i+100 );
00199     p[i]->start();
00200     p[i]->setPThread( p[i]->getPID() );
00201     setprio( p[i]->getPID(), 18 );
00202   }
00203 
00204   for( i = 0; i < nbConsumers; i++ )
00205   {
00206     c[i] = new Consumer( n, i+100 );
00207     c[i]->start();
00208     c[i]->setPThread( c[i]->getPID() );
00209     setprio( c[i]->getPID(), 18 );
00210   }
00211 
00212   n->waitForThreads();
00213 
00214 //  pthread_join( c.getPID(), NULL );
00215 
00216   n->waitForClient();
00217 
00218   printf( "Main Done \n" );
00219   fflush( stdout );
00220 
00221   // Shut down the READ-Blocked server
00222   printf( "Shut down the READ-Blocked server... \n" );
00223   fflush( stdout );
00224 
00225   pthread_kill( s.getPID(), SIGKILL );
00226 
00227   if ( n != NULL ) delete n;
00228 
00229   return 0;
00230 }
00231 
00232 #endif // End of CS4.cpp

Generated on Sun Oct 14 18:46:39 2001 for Standard J2K Library by doxygen1.2.11.1 written by Dimitri van Heesch, © 1997-2001