|
MessageQA pessimistic messsage queue implementation which uses file system persistance through java.util.Properties for one to one message caching between an asynchrounous Publisher and an asynchrounous Consumer. Provides rollbackable multiple insertions from the Publisher and safe reading with read confirmation using an iterator for the Consumer. Not thread safe, depends on filesystem integrity by using java for safe persistance and can grow till the file system limits. Instead of doing direct file i/o, we delegate it to the JDK layer through java.util.Properties API. Uses lazy delete with garbage collection done at each new initialization after safe point check. This ia refactored code from an old project, where it was used successfully to handle secure messages coming at random intervals as groups of related message strings from a remote customer log via a socket connection which were picked up for processing periodically by a client whenever it gets a secure connection to a remote DB where they will be stored after some transform logic rules were applied depending on the dynamic data read from the DB.
Basic operationsget() and put() reads and wries a new message to the queue, reget() and reput() reads and writes the same message again. hasMore() provides a simple iterator for the reads.
Safe Read With ConfirmsafeGet() provides a confirmable get() from the queue. Each safeGet() should be follwed by an doneGet() to confirm theread. On getting the doneGet() the message gets marked as read. If there was no call to doneGet() then the next get() will read the same message as the last get().
Rollbackable Multiple Group Insertsa bunch of messages which forms a group and need to inserted together and in the same order, can be surrounded by a begin() and an end() call, and the group insertion will work like a transaction. If the inserts were interrupted by a system event when ever the MessageQ is next accessed the first operation will be a rollback to the state before the begin() call
InternalsThe queue gets marked as empty when a new get() finds out the last read was the last message in queue. And the next initialization will do a garbage collection and create a new Queue.On each init of the queue, if the queue is marked empty its considered a safe point for garbage collection. The queue file is deleted and a new empty queue is initialised
UsageExample psuedo Publisher side code:
MessageQ myCache = new MessageQ("my_message_cache.store");
myCache.put( getSingleLogString() );
myCache.put( getSingleLogString() );
myCache.put( getSingleLogString() );
Int log_count = getGroupLogStringCount();
if( log_count > 0 ){
myCache.begin();
for( int i=1; i<= log_count; i++){
myCache.put(geGroupLogString(i));
}
myCache.end();
}
MessageQ myCache = new MessageQ("my_message_cache.store");
while(myCache.hasMore()){
String log_string = applyLogTransform( myCache.safeGet() );
if( canAccessDB() ){
if ( persistToDB(log_string) ){
myCache.doneGet();
}else{
log.severe("Failed DB insert");
}
}else{
log.warning("No DB access");
}
}
Get MessageQ code
View MessageQ.java
|
| Monday, 14-Mar-2005 19:32:44 PST | kishan at hackorama dot com |

