00001 #ifndef __J2K__Server_CPP__
00002 #define __J2K__Server_CPP__
00003
00004 #include <j2k/nto/Server.hpp>
00005
00006 Server::Server( NameSpace* n0 )
00007 : Basic_PThread(),
00008 MsgContainer( n0, server ),
00009 nbProducer( 0 ),
00010 nbConsumer( 0 )
00011 {
00012 }
00013
00014 Server::~Server()
00015 {
00016 printf( "Killing server... \n" );
00017 register size_t i = 0;
00018 register size_t max = timeout.size();
00019
00020 iter = timeout.begin();
00021 for( i = 0; i < max; i++ )
00022 {
00023 if ( timeout[i] != NULL )
00024 {
00025 deleteEntry( i );
00026 }
00027 iter++;
00028 }
00029 }
00030
00031 void Server::deleteEntry( register int index )
00032 {
00033
00034 fflush( stdout );
00035
00036
00037 timeout[ index ]->stop();
00038 timeout[ index ]->reset();
00039
00040
00041 timeout[ index ] = NULL;
00042
00043
00044
00045
00046 fflush( stdout );
00047 }
00048
00049 void Server::createEntry( register int rcvid,
00050 register Message* message,
00051 register int startTimer = 0 )
00052 {
00053
00054
00055
00056
00057 Timeout* t = new Timeout( MsgContainer::n, 0, 1 );
00058 t->copy( message );
00059
00060 if ( startTimer > 0 )
00061 {
00062 t->start();
00063 }
00064
00065 t->setReceiver( rcvid );
00066 timeout.push_back( t );
00067
00068
00069 }
00070
00071
00072 void Server::run()
00073 {
00074 printf( "Server run \n" );
00075 fflush( stdout );
00076
00077
00078 connect();
00079
00080 printf( "Server attached \n" );
00081 fflush( stdout );
00082
00083
00084 while( 1 )
00085 {
00086 printf( "Wait for receive...\n" );
00087 fflush( stdout );
00088
00089 int rcvid = message->receive( n->getChannel() );
00090
00091 printf( "Message received from ID#%d \n", rcvid );
00092 fflush( stdout );
00093
00094
00095 if ( rcvid == 0 ) continue;
00096
00097
00098 if ( message->checkHeader( rcvid ) < 0 ) break;
00099
00100 printf("Server receive %d \n", message->getNumber() );
00101 fflush( stdout );
00102
00103 message->print();
00104 message->setStatus( message->getRequest() );
00105
00106
00107 if ( message->isConsumer() )
00108 {
00109 printf( "Consumer message \n" );
00110 fflush( stdout );
00111
00112
00113 if ( nbProducer > 0 )
00114 {
00115
00116 register size_t i = 0;
00117 register size_t max = timeout.size();
00118 iter = timeout.begin();
00119 for( i = 0; i < max; i++, iter++ )
00120 {
00121 if ( timeout[i] != NULL )
00122 {
00123 if ( timeout[ i ]->getNumber() != -1 && timeout[ i ]->isProducer() )
00124 {
00125 printf( "Consumer fed by entry %d, producer %d \n", i, timeout[ i ]->getNumber() );
00126 fflush( stdout );
00127
00128 timeout[ i ]->stop();
00129 timeout[ i ]->reply( rcvid );
00130 message->reply( timeout[ i ]->getReceiver() );
00131
00132
00133 deleteEntry( i );
00134
00135 nbProducer--;
00136 i = max;
00137
00138 printf( "Done with case 1 ! \n" );
00139 fflush( stdout );
00140 }
00141 }
00142 }
00143 }
00144 else
00145 {
00146
00147 printf( "Storing Consumer %d \n", message->getNumber() );
00148 fflush( stdout );
00149
00150 createEntry( rcvid, message, 1 );
00151 nbConsumer++;
00152
00153 printf( "Done with case 2 ! \n" );
00154 fflush( stdout );
00155 }
00156 }
00157 else if ( message->isProducer() )
00158 {
00159 printf( "Producer message \n" );
00160 fflush( stdout );
00161
00162 if ( nbConsumer > 0 )
00163 {
00164
00165 register size_t i = 0;
00166 register size_t max = timeout.size();
00167 iter = timeout.begin();
00168 for( i = 0; i < max; i++, iter++ )
00169 {
00170 if ( timeout[ i ] != NULL )
00171 {
00172 if ( timeout[ i ]->getNumber() != -1 && timeout[ i ]->isConsumer() )
00173 {
00174 printf( "Consumer fed by entry %d, producer %d \n", i, timeout[ i ]->getNumber() );
00175 fflush( stdout );
00176
00177 timeout[ i ]->stop();
00178 timeout[ i ]->reply( rcvid );
00179 message->reply( timeout[ i ]->getReceiver() );
00180
00181
00182 deleteEntry( i );
00183
00184 nbConsumer--;
00185 i = max;
00186
00187 printf( "Done with case 3 ! \n" );
00188 fflush( stdout );
00189 }
00190 }
00191 }
00192 }
00193 else
00194 {
00195
00196 printf( "Storing Producer %d \n", message->getNumber() );
00197 fflush( stdout );
00198
00199 createEntry( rcvid, message, 0 );
00200 nbProducer++;
00201
00202 printf( "Done with case 4 ! \n" );
00203 fflush( stdout );
00204 }
00205 }
00206 else
00207 {
00208 printf( "Generic reply \n" );
00209 fflush( stdout );
00210
00211 int data = 100;
00212 message->reply( rcvid, data );
00213
00214 printf( "Done with case 5 ! \n" );
00215 fflush( stdout );
00216
00217 }
00218
00219 printf( "End of message response. \n" );
00220 printf( "****************************************************************\n" );
00221 fflush( stdout );
00222
00223 }
00224
00225 printf( "Server Disconnect \n" );
00226 fflush( stdout );
00227
00228 disconnect();
00229 }
00230
00231 #endif // End of Server.cpp