00001 #ifndef __J2K__CS5_CPP__
00002 #define __J2K__CS5_CPP__
00003
00004 #include <j2k/nto/cs_def.hpp>
00005 #include <j2k/nto/Server.hpp>
00006
00007
00008 class Consumer
00009 : public Basic_PThread,
00010 public MsgContainer
00011 {
00012 protected:
00013 int idx;
00014 public:
00015 Consumer( NameSpace* n0, int idx0 = 0 )
00016 : Basic_PThread(),
00017 MsgContainer( n0, client ),
00018 idx( idx0 )
00019 {
00020 message->setRequest( MQI_CONSUMER );
00021 message->setIndex( idx );
00022 }
00023
00024 inline void setPThread( pthread_t tid )
00025 {
00026 printf( "Consumer PID[%d] \n", (int)tid );
00027 fflush( stdout );
00028
00029 message->setPThread( tid );
00030 }
00031
00032 virtual ~Consumer() { }
00033
00034
00035 virtual void run()
00036 {
00037 printf( "Consumer %d run \n", idx );
00038 fflush( stdout );
00039
00040 connect();
00041
00042 printf( "Consumer %d opened a connection \n", idx );
00043 fflush( stdout );
00044 n->waitForThreads();
00045
00046
00047 register int i = 0;
00048
00049 for ( i = 0; i < 5; i++ )
00050 {
00051 sched_yield();
00052 printf( "Consumer Semaphore P \n" );
00053
00054
00055 register int data = 1000 * ( idx + 1 ) + i;
00056 printf("Consumer %d sending %d \n", idx, data );
00057 fflush( stdout );
00058
00059 message->setNumber( data );
00060
00061 if ( message->send( fd, replyMsg ) < 0 )
00062 {
00063 perror( "Couldn't send Message ! \n" );
00064 }
00065
00066 register int status = replyMsg->getStatus();
00067
00068 printf( "Status[%d] \n", status );
00069 fflush( stdout );
00070
00071 if ( replyMsg->isSuicide() )
00072 {
00073 printf( "Consumer %d SUICIDE ! \n", idx );
00074 fflush( stdout );
00075 i = 100;
00076 }
00077
00078 if ( replyMsg->isFull() && replyMsg->isGet() )
00079 {
00080 printf( "Consumer %d retrieved %d \n", idx, replyMsg->getNumber() );
00081 fflush( stdout );
00082 }
00083
00084 printf( "Consumer loop end for %d \n", data );
00085 fflush( stdout );
00086
00087 }
00088
00089 printf( "Consumer disconnect \n" );
00090 disconnect();
00091 }
00092 };
00093
00094
00095 class Producer
00096 : public Basic_PThread,
00097 public MsgContainer
00098 {
00099 protected:
00100 int idx;
00101 public:
00102 Producer( NameSpace* n0, int idx0 = 0 )
00103 : Basic_PThread(),
00104 MsgContainer( n0, client ),
00105 idx( idx0 )
00106 {
00107 message->setRequest( MQI_PRODUCER );
00108 message->setIndex( idx );
00109 }
00110
00111 inline void setPThread( pthread_t tid )
00112 {
00113 printf( "Producer PID[%d] \n", (int)tid );
00114 fflush( stdout );
00115
00116 message->setPThread( tid );
00117 }
00118
00119 virtual ~Producer() { }
00120
00121
00122 virtual void run()
00123 {
00124 printf( "Producer %d run \n", idx );
00125 fflush( stdout );
00126
00127 connect();
00128
00129 printf( "Producer %d opened a connection \n", idx );
00130 fflush( stdout );
00131 n->waitForThreads();
00132
00133
00134 register int i = 0;
00135
00136 for ( i = 0; i < 5; i++ )
00137 {
00138 sched_yield();
00139 printf( "Producer Semaphore P \n" );
00140
00141
00142 register int data = 200 * (idx + 1) + i;
00143 printf("Producer %d sending %d \n", idx, data );
00144 fflush( stdout );
00145
00146 message->setNumber( data );
00147
00148 if ( message->send( fd, replyMsg ) < 0 )
00149 {
00150 perror( "Couldn't send Message ! \n" );
00151 }
00152
00153 if ( replyMsg->isSuicide() )
00154 {
00155 printf( "Producer %d SUICIDE ! \n", idx );
00156 fflush( stdout );
00157 i = 100;
00158 }
00159
00160 printf( "Producer loop end for %d \n", data );
00161 fflush( stdout );
00162
00163 }
00164
00165 if ( replyMsg->isEmpty() )
00166 {
00167 printf( "Producer %d was consumed by %d \n", idx, replyMsg->getNumber() );
00168 fflush( stdout );
00169 }
00170
00171 printf( "Producer disconnect \n" );
00172 disconnect();
00173 }
00174 };
00175
00176 int main( int argc, char **argv )
00177 {
00178 const int nbConsumers = 2;
00179 const int nbProducers = 1;
00180 const int nbServers = 1;
00181
00182 NameSpace* n = new NameSpace( "coen320_fred", nbConsumers, nbProducers, nbServers );
00183 Server s( n );
00184
00185 Producer* p[ nbProducers ];
00186 Consumer* c[ nbConsumers ];
00187
00188 s.start();
00189 setprio( s.getPID(), 20 );
00190
00191 n->waitForServer();
00192
00193 setprio( pthread_self(), 19 );
00194
00195 register int i = 0;
00196 for( i = 0; i < nbProducers; i++ )
00197 {
00198 p[i] = new Producer( n, i+100 );
00199 p[i]->start();
00200 p[i]->setPThread( p[i]->getPID() );
00201 setprio( p[i]->getPID(), 18 );
00202 }
00203
00204 for( i = 0; i < nbConsumers; i++ )
00205 {
00206 c[i] = new Consumer( n, i+100 );
00207 c[i]->start();
00208 c[i]->setPThread( c[i]->getPID() );
00209 setprio( c[i]->getPID(), 18 );
00210 }
00211
00212 n->waitForThreads();
00213
00214
00215
00216 n->waitForClient();
00217
00218 printf( "Main Done \n" );
00219 fflush( stdout );
00220
00221
00222 printf( "Shut down the READ-Blocked server... \n" );
00223 fflush( stdout );
00224
00225 pthread_kill( s.getPID(), SIGKILL );
00226
00227 if ( n != NULL ) delete n;
00228
00229 return 0;
00230 }
00231
00232 #endif // End of CS4.cpp