00001 #ifndef __J2K__Producer_CPP__
00002 #define __J2K__Producer_CPP__
00003
00004 #include <j2k/nto/Producer.hpp>
00005
00006
00007 Producer::Producer( NameSpace* n0, int idx0 = 0 )
00008 : Basic_PThread(),
00009 MsgContainer( n0, client ),
00010 idx( idx0 )
00011 {
00012 message->setRequest( MQI_PRODUCER );
00013 message->setIndex( idx );
00014 }
00015
00016 void Producer::setPThread( pthread_t tid )
00017 {
00018 printf( "Producer PID[%d] \n", (int)tid );
00019 fflush( stdout );
00020
00021 message->setPThread( tid );
00022 }
00023
00024
00025 void Producer::run()
00026 {
00027 printf( "Producer %d run \n", idx );
00028 fflush( stdout );
00029
00030 connect();
00031
00032 printf( "Producer %d opened a connection \n", idx );
00033 fflush( stdout );
00034 n->waitForThreads();
00035
00036
00037 register int i = 0;
00038
00039
00040 for ( i = 0; i < 5; i++ )
00041 {
00042 sched_yield();
00043
00044
00045 unsigned seed = time( NULL );
00046 srand( seed );
00047 int sec1 = rand()/100%3 + 1;
00048
00049 unsigned seed2 = time( NULL );
00050 srand( seed2 );
00051 int sec2 = ( rand()/100%1 + 1 ) * sec1;
00052
00053 printf( "Sleep for %d seconds.\n", sec2 );
00054 sleep( sec2 );
00055
00056
00057
00058 register int data = 100 * idx + i;
00059 printf("Producer %d sending %d \n", idx, data );
00060 fflush( stdout );
00061
00062 message->setNumber( data );
00063
00064 if ( message->send( fd, replyMsg ) < 0 )
00065 {
00066 perror( "Couldn't send Message ! \n" );
00067 }
00068
00069
00070 if ( replyMsg->isSuicide() )
00071 {
00072 printf( "Producer %d SUICIDE ! \n", idx );
00073 fflush( stdout );
00074 i = 100;
00075 }
00076
00077 printf( "Producer loop end for %d \n", data );
00078 fflush( stdout );
00079
00080 }
00081
00082 if ( replyMsg->isEmpty() )
00083 {
00084 printf( "Producer %d was consumed by %d \n", idx, replyMsg->getNumber() );
00085 fflush( stdout );
00086 }
00087
00088
00089 printf( "Producer disconnect \n" );
00090 disconnect();
00091 }
00092
00093 #endif // End of Producer.cpp