hackorama
hackorama

MessageQ

A pessimistic messsage queue implementation which uses file system persistance through java.util.Properties for one to one message caching between an asynchrounous Publisher and an asynchrounous Consumer.

Provides rollbackable multiple insertions from the Publisher and safe reading with read confirmation using an iterator for the Consumer.

Not thread safe, depends on filesystem integrity by using java for safe persistance and can grow till the file system limits. Instead of doing direct file i/o, we delegate it to the JDK layer through java.util.Properties API. Uses lazy delete with garbage collection done at each new initialization after safe point check.

This ia refactored code from an old project, where it was used successfully to handle secure messages coming at random intervals as groups of related message strings from a remote customer log via a socket connection which were picked up for processing periodically by a client whenever it gets a secure connection to a remote DB where they will be stored after some transform logic rules were applied depending on the dynamic data read from the DB.

Basic operations

get() and put() reads and wries a new message to the queue, reget() and reput() reads and writes the same message again. hasMore() provides a simple iterator for the reads.

Safe Read With Confirm

safeGet() provides a confirmable get() from the queue. Each safeGet() should be follwed by an doneGet() to confirm theread. On getting the doneGet() the message gets marked as read. If there was no call to doneGet() then the next get() will read the same message as the last get().

Rollbackable Multiple Group Inserts

a bunch of messages which forms a group and need to inserted together and in the same order, can be surrounded by a begin() and an end() call, and the group insertion will work like a transaction. If the inserts were interrupted by a system event when ever the MessageQ is next accessed the first operation will be a rollback to the state before the begin() call

Internals

The queue gets marked as empty when a new get() finds out the last read was the last message in queue. And the next initialization will do a garbage collection and create a new Queue.

On each init of the queue, if the queue is marked empty its considered a safe point for garbage collection. The queue file is deleted and a new empty queue is initialised

Usage

Example psuedo Publisher side code:

MessageQ  myCache = new MessageQ("my_message_cache.store");
myCache.put( getSingleLogString() );
myCache.put( getSingleLogString() );
myCache.put( getSingleLogString() );

Int log_count = getGroupLogStringCount();
if( log_count > 0 ){
    myCache.begin();
    for( int i=1; i<= log_count; i++){
	myCache.put(geGroupLogString(i));
    }
    myCache.end();
}
Example psuedo Consumer side code:
MessageQ  myCache = new MessageQ("my_message_cache.store");
while(myCache.hasMore()){
    String log_string = applyLogTransform( myCache.safeGet() );
    if( canAccessDB() ){
	if ( persistToDB(log_string) ){
	    myCache.doneGet();
	    }else{
		log.severe("Failed DB insert");
	    }
    }else{
        log.warning("No DB access");
    }
}

Get MessageQ code

View MessageQ.java
Download MessageQ.java



Monday, 14-Mar-2005 19:32:44 PST kishan at hackorama dot com