00001 #ifndef __J2K__Consumer_CPP__
00002 #define __J2K__Consumer_CPP__
00003
00004 #include <j2k/nto/Consumer.hpp>
00005
00006 Consumer::Consumer( NameSpace* n0, int idx0 = 0 )
00007 : Basic_PThread(),
00008 MsgContainer( n0, client ),
00009 idx( idx0 )
00010 {
00011 message->setRequest( MQI_CONSUMER );
00012 message->setIndex( idx );
00013 }
00014
00015 void Consumer::setPThread( pthread_t tid )
00016 {
00017 printf( "Consumer PID[%d] \n", (int)tid );
00018 fflush( stdout );
00019
00020 message->setPThread( tid );
00021 }
00022
00023 void Consumer::run()
00024 {
00025 printf( "Consumer %d run \n", idx );
00026 fflush( stdout );
00027
00028 connect();
00029
00030 printf( "Consumer %d opened a connection \n", idx );
00031 fflush( stdout );
00032 n->waitForThreads();
00033
00034
00035 register int i = 0;
00036
00037
00038 for ( i = 0; i < 5; i++ )
00039 {
00040 sched_yield();
00041
00042 register int data = 1000 * idx + i;
00043 printf("Consumer %d sending %d \n", idx, data );
00044 fflush( stdout );
00045
00046
00047 message->setNumber( data );
00048
00049 if ( message->send( fd, replyMsg ) < 0 )
00050 {
00051 perror( "Couldn't send Message ! \n" );
00052 }
00053
00054
00055 register int status = replyMsg->getStatus();
00056
00057 printf( "Status[%d] \n", status );
00058 fflush( stdout );
00059
00060
00061 if ( replyMsg->isSuicide() )
00062 {
00063 printf( "Consumer %d SUICIDE ! \n", idx );
00064 fflush( stdout );
00065 i = 100;
00066 }
00067
00068 if ( replyMsg->isFull() && replyMsg->isGet() )
00069 {
00070 printf( "Consumer %d retrieved %d \n", idx, replyMsg->getNumber() );
00071 fflush( stdout );
00072 }
00073
00074 printf( "Consumer loop end for %d \n", data );
00075 fflush( stdout );
00076
00077 }
00078
00079 printf( "Consumer disconnect \n" );
00080 disconnect();
00081 }
00082
00083 #endif // End of Consumer.cpp