// @ZBS { // *MODULE_NAME Hash-based Message Dispatcher // *MASTER_FILE 1 // +DESCRIPTION { // A hashtable based method of passing messages around. Allows for // a header-free method of passing a message from one module to another // via direct and indirect dispatching. Includes features for queing messages // } // *PORTABILITY win32 unix // *REQUIRED_FILES zmsg.cpp zmsg.h // *VERSION 2.0 // +HISTORY { // 2.0 New naming convention // } // +TODO { // } // *SELF_TEST yes console // *PUBLISH yes // } // OPERATING SYSTEM specific includes: // SDK includes: // STDLIB includes: #include "stdio.h" #include "stdlib.h" #include "assert.h" #include "stdarg.h" #include "string.h" // MODULE includes: #include "zmsg.h" // ZBSLIB includes: static char zMsgTempString[512]; ZHashTable *zMsgDispatchInstructions = NULL; ZMsg *zMsgs[ZMSG_QUEUE_SIZE]; // TODO: dynamic allocate queue with realloc int zMsgRead = 0; int zMsgWrite = 0; ZMsgRegister::ZMsgRegister( char *typeName, ZMsgHandler handlerPtr ) { if( !zMsgDispatchInstructions ) { zMsgDispatchInstructions = new ZHashTable(); } zMsgDispatchInstructions->putI( typeName, (int)handlerPtr ); } void zMsgQueue( ZMsg *msg ) { zMsgs[ zMsgWrite ] = msg; zMsgWrite = (zMsgWrite + 1) % ZMSG_QUEUE_SIZE; msg->putS( "__queued__", "1" ); static int serialNum = 0; msg->putI( "__serial__", serialNum++ ); assert( zMsgWrite != zMsgRead ); } void zMsgQueue( char *msg, ... ) { { va_list argptr; va_start( argptr, msg ); vsprintf( zMsgTempString, msg, argptr ); va_end( argptr ); assert( strlen(zMsgTempString) < 512 ); } // Search for semi-colon separated strings // Must be outside of quote contexts char *msgStart = zMsgTempString; int sQuote = 0, dQuote = 0; for( char *c=zMsgTempString; *c; c++ ) { if( *c == '\'' && !sQuote ) sQuote++; else if( *c == '\'' && sQuote ) sQuote--; else if( *c == '\"' && !dQuote ) dQuote++; else if( *c == '\"' && dQuote ) dQuote--; if( !dQuote && !sQuote && *c == ';' ) { *c = 0; zMsgQueue( zHashTable( msgStart ) ); msgStart = c+1; } } ZHashTable *hash = zHashTable( msgStart ); zMsgQueue( hash ); } ZMsg *zMsgDequeue() { if( zMsgRead != zMsgWrite ) { ZHashTable *q = zMsgs[ zMsgRead ]; zMsgRead = (zMsgRead + 1) % ZMSG_QUEUE_SIZE; q->del( "__queued__" ); return q; } return NULL; } void zMsgSetHandler( char *type, ZMsgHandler d ) { zMsgDispatchInstructions->putI( type, (int)( d ) ); } void zMsgDispatch( double time ) { #define ZMSG_REQUEUE_SIZE (30) ZMsg *requeue[ZMSG_REQUEUE_SIZE]; int requeueCount = 0; ZMsgHandler defH = (ZMsgHandler)zMsgDispatchInstructions->getI( "default" ); // To prevent infinite loops, the dispatcher notes the last of // the msgs and only goes to that one. Anything that gets added // on to the end of the queue will no be processed until next time. int zMsgWriteAtStart = zMsgWrite; for( ZMsg *msg = zMsgDequeue(); msg; ) { if( time ) { if( zmsgF(__timer__) ) { // Convert the timer field into a time field msg->putD( "__time__", zmsgF(__timer__) + time ); msg->putS( "__timer__", NULL ); } // Using the __time__ field float _time = zmsgF(__time__); if( time < _time ) { // REQUEUE this one requeue[requeueCount++] = msg; assert( requeueCount < ZMSG_REQUEUE_SIZE ); msg = zMsgDequeue(); continue; } } int countDown = zmsgI(__countdown__); if( countDown > 0 ) { // REQUEUE this one with countdown decremented msg->putI( "__countdown__", countDown-1 ); requeue[requeueCount++] = msg; assert( requeueCount < ZMSG_REQUEUE_SIZE ); msg = zMsgDequeue(); continue; } // LOOKUP the dispatching instructions ZMsgHandler h = (ZMsgHandler)zMsgDispatchInstructions->getI( zmsgS(type) ); if( defH ) { (*defH)( msg ); } if( h && !zmsgI(__used__) ) { (*h)( msg ); } // It is possible that this message has been pushed back // into the queue in which case we cannot delete it! if( ! zmsgI(__queued__) ) { delete msg; } // FETCH the next one as long as we haven't gotten to the last // one as of the start of the function msg = zMsgDequeue(); } for( int i=0; igetI( "default" ); // LOOKUP the dispatching instructions ZMsgHandler h = (ZMsgHandler)zMsgDispatchInstructions->getI( zmsgS(type) ); if( defH ) { (*defH)( msg ); } if( h && !zmsgI(__used__) ) { (*h)( msg ); } // It is possible that this message has been pushed back // into the queue in which case we cannot delete it! if( ! zmsgI(__queued__) ) { delete msg; } } char zMsgTranslateCharacter( char *character ) { if( !strcmp(character,"escape") ) { return 27; } if( !strcmp(character,"quote") ) { return '\''; } if( !strcmp(character,"doublequote") ) { return '\"'; } if( !strcmp(character,"backspace") ) { return '\b'; } if( !strcmp(character,"tab") ) { return '\t'; } return *character; } #if SELF_TEST #define ZMSG_HANDLER( Oink ) { printf( "Received type=%s, flags=%d\n", msg->get("type"), zMsgGetInt(msg,"flags") ); } void main() { zMsgQueue( zMsg( "type=Oink flags=1" ) ); zMsgQueue( zMsg( "type=Oink flags=2" ) ); zMsgDispatch(); } #endif