00001 #ifndef __J2K__CS4_CPP__
00002 #define __J2K__CS4_CPP__
00003
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <errno.h>
00007 #include <string.h>
00008 #include <pthread.h>
00009 #include <sys/neutrino.h>
00010 #include <sys/dispatch.h>
00011 #include <sync.h>
00012 #include <time.h>
00013
00014 #include <j2k/nto/Basic_PThread.hpp>
00015 #include <j2k/nto/Basic_PThread.cpp>
00016 #include <j2k/nto/Message.hpp>
00017 #include <j2k/nto/Message.cpp>
00018 #include <j2k/nto/NameSpace.hpp>
00019 #include <j2k/nto/NameSpace.cpp>
00020
00021 #include <j2k/nto/MsgContainer.hpp>
00022 #include <j2k/nto/MsgContainer.cpp>
00023
00024 #include <j2k/nto/TimerPulse.hpp>
00025 #include <j2k/nto/TimerPulse.cpp>
00026
00027 #define MAX_CLIENTS 3
00028
00029 class Server;
00030 class Client;
00031 class Timeout;
00032 class Timeout : public TimerPulse, MsgContainer
00033 {
00034 protected:
00035 ULONG delay_nsec;
00036 ULONG delay_sec;
00037 public:
00038
00039 Timeout( NameSpace* n0, ULONG nsec0, ULONG sec0 )
00040 : TimerPulse( nsec0, sec0 ), MsgContainer( n0, client ),
00041 delay_nsec( nsec0 ), delay_sec( sec0 )
00042 {
00043 message->setRequest( MQI_TIMEOUT );
00044 message->setStatus( MQI_TIMEOUT );
00045 }
00046
00047 virtual void tick( int signo )
00048 {
00049 printf( "#" );
00050 fflush( stdout );
00051
00052 if ( message->send( fd, replyMsg ) < 0 )
00053 {
00054 perror( "Couldn't send Timeout Message ! \n" );
00055 }
00056 }
00057
00058 virtual void start()
00059 {
00060 connect();
00061
00062 printf( "Timer started.\n" );
00063 fflush( stdout );
00064
00065 TimerPulse::start();
00066 }
00067
00068 virtual ~Timeout()
00069 {
00070 }
00071
00072 virtual void run()
00073 {
00074 }
00075
00076 void reset()
00077 {
00078 set( delay_nsec, delay_sec );
00079 }
00080
00081 inline int Timeout::getNumber() { return message->getNumber(); }
00082 inline void Timeout::setNumber( int nb ) { message->setNumber( nb ); }
00083 inline int Timeout::getRequest() { return message->getRequest(); }
00084 inline void Timeout::setRequest( int r ) { message->setRequest( r ); }
00085 inline int Timeout::getStatus() { return message->getStatus(); }
00086 inline void Timeout::setStatus( int s ) { message->setStatus( s ); }
00087 inline int Timeout::getIndex() { return message->getIndex(); }
00088 inline void Timeout::setIndex( int i ) { message->setIndex( i ); }
00089
00090 };
00091
00092
00093 class Server
00094 : public Basic_PThread, MsgContainer
00095 {
00096 private:
00097 Timeout** t;
00098
00099 public:
00100 Server( NameSpace* n0 )
00101 : Basic_PThread(), MsgContainer( n0, server )
00102 {
00103 t = new Timeout*[ MAX_CLIENTS ];
00104 register int i = 0;
00105 for( i = 0; i < MAX_CLIENTS; i++ )
00106 {
00107 t[i] = new Timeout( n0, 0, 5 );
00108 }
00109 }
00110
00111 virtual ~Server()
00112 {
00113 register int i = 0;
00114 for( i = 0; i < MAX_CLIENTS; i++ )
00115 {
00116 if ( t[i] != NULL)
00117 {
00118 delete t[i];
00119 t[i] = NULL;
00120 }
00121 }
00122 delete[] t;
00123 }
00124
00125
00126 virtual void run()
00127 {
00128 printf( "Server run \n" );
00129
00130 connect();
00131
00132 printf( "Server attached \n" );
00133
00134
00135 while( 1 )
00136 {
00137 int rcvid = message->receive( n->getChannel() );
00138
00139 if ( message->checkHeader( rcvid ) < 0 ) break;
00140
00141
00142 printf("Server receive %d \n", message->getNumber() );
00143
00144 int data = 100;
00145 message->reply( rcvid, data );
00146 }
00147
00148 disconnect();
00149 }
00150 };
00151
00152
00153 class Consumer
00154 : public Basic_PThread, MsgContainer
00155 {
00156 public:
00157 Consumer( NameSpace* n0, int idx = 0 )
00158 : Basic_PThread(), MsgContainer( n0, client )
00159 {
00160 message->setRequest( MQI_CONSUMER );
00161 message->setIndex( idx );
00162 }
00163
00164 virtual ~Consumer() { }
00165
00166
00167 virtual void run()
00168 {
00169 printf( "Consumer run \n" );
00170
00171 connect();
00172
00173 printf( "Consumer opened a connection \n" );
00174
00175
00176 register int i = 0;
00177
00178 for ( i = 0; i < 5; i++ )
00179 {
00180 printf("Consumer sending %d \n", i );
00181
00182 message->setNumber( i );
00183
00184 if ( message->send( fd, replyMsg ) < 0 )
00185 {
00186 perror( "Couldn't send Message ! \n" );
00187 break;
00188 }
00189 }
00190
00191 disconnect();
00192 }
00193 };
00194
00195
00196 class Producer
00197 : public Basic_PThread, MsgContainer
00198 {
00199 public:
00200 Producer( NameSpace* n0, int idx = 0 )
00201 : Basic_PThread(), MsgContainer( n0, client )
00202 {
00203 message->setRequest( MQI_PRODUCER );
00204 message->setIndex( idx );
00205 }
00206
00207 virtual ~Producer() { }
00208
00209
00210 virtual void run()
00211 {
00212 printf( "Producer run \n" );
00213
00214 connect();
00215
00216 printf( "Producer opened a connection \n" );
00217
00218
00219 register int i = 0;
00220
00221 for ( i = 0; i < 5; i++ )
00222 {
00223 printf("Producer sending %d \n", i );
00224
00225 message->setNumber( i );
00226
00227 if ( message->send( fd, replyMsg ) < 0 )
00228 {
00229 perror( "Couldn't send Message ! \n" );
00230 break;
00231 }
00232 }
00233
00234 disconnect();
00235 }
00236 };
00237
00238 int main( int argc, char **argv )
00239 {
00240 NameSpace* n = new NameSpace( "myname", 3 );
00241
00242 Server s( n );
00243 Producer p( n );
00244 Consumer c( n );
00245
00246 s.start();
00247
00248 n->waitForServer();
00249
00250 p.start();
00251 c.start();
00252
00253 pthread_join( c.getPID(), NULL );
00254
00255 n->waitForClient();
00256
00257 printf( "Main Done \n" );
00258
00259
00260 pthread_kill( s.getPID(), SIGKILL );
00261
00262 if ( n != NULL ) delete n;
00263
00264 return 0;
00265 }
00266
00267 #endif // End of CS4.cpp