package com.hackorama.util; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.Properties; import java.util.logging.Logger; /* * kishan at hackorama dot com www.hackorama.com 2004/2005 * * A pessimistic messsage queue implementation which uses * file system persistance through java.util.properties for * one to one message caching between an asynchrounous publisher * and an asynchrounous consumer. * * Provides rollbackable multiple insertions from the publisher * and safe reading with read confirmation for the consumer. * * Not thread safe, depends on filesystem integrity for safe * persistance and can grow till the file system limits. * Uses lazy delete with garbage collection done at each new * initialization after safe point check * * kishan at hackorama dot com www.hackorama.com 2004/2005 * */ public class MessageQ { private Logger log = Logger.getLogger("com.hackorama.util"); private String header = "MESSAGEQ PERSISTANT DATA DO NOT MODIFY"; private String filename = null; private int length = 0; private int cursor = 0; private Properties prop = null; private FileOutputStream outfile = null; private boolean read_ok = false; private static final String LENGTHKEY = "LEN"; private static final String CURSORKEY = "CUR"; private static final String KEY = "KEY"; private static final String LASTGOTKEY = "GOT"; private static final String LASTPUTKEY = "PUT"; /*---------------------------------------------*/ /* Public Interface */ /*---------------------------------------------*/ public MessageQ(String filename) { this.filename = filename; prop = new Properties(); checkStatus(); } public boolean hasMore() { if (length > 0) return true; return false; } public String get() { String result = safeGet(); doneGet(); return result; } public String reget() { String value = null; if (cursor == 0) value = readStringProp(LASTGOTKEY); else value = readStringProp(KEY + Integer.toString(cursor)); return value; } public String safeGet() { read_ok = false; if (isEmpty()) return null; if (emptyQueue()) { log.fine("Finished all messages from queue"); return null; } String value = readStringProp(KEY + Integer.toString(cursor + 1)); read_ok = true; return value; } public void doneGet() { if (read_ok) cursorPlus(); } public boolean put(String message) { if (message == null) return false; if (insertProp(KEY + Integer.toString(length + 1), message)) { lengthPlus(); insertProp(LASTGOTKEY, message); return true; } return false; } public boolean reput(String message) { if (message == null) return false; if (length <= 0) return false; return insertProp(KEY + Integer.toString(length), message); } // Call this method to start the insertion // of a group of messages that can be rollbacked // if all the messages were not inserted public boolean begin() { return insertProp(LASTPUTKEY, length); } // Call this method to end the insertion // of a group of messages that gets rollbacked // like a transaction public boolean end() { return insertProp(LASTPUTKEY, 0); } public void setHeader(String header) { if(header == null){ log.warning("Cannot set header to null value"); }else{ this.header = header; } } /*---------------------------------------------*/ /* Private Implementation */ /*---------------------------------------------*/ private boolean lengthPlus() { length++; return insertProp(LENGTHKEY, length); } private boolean cursorPlus() { cursor++; return insertProp(CURSORKEY, cursor); } private boolean isEmpty() { if (length == 0) return true; return false; } private boolean emptyQueue() { if (removeQueue()) { length = 0; cursor = 0; return true; } return false; } private boolean removeQueue() { if (!shouldEmpty()) return false; if (insertProp(LENGTHKEY, 0)) { if (insertProp(CURSORKEY, 0)) { return true; } } return false; } private boolean shouldEmpty() { //do not clean an empty queue if (length == 0) { return false; } // ok to clean if all marked as read if (cursor >= length) return true; //by default do not clean return false; } // Will return null on failure private String readStringProp(String key) { if (key == null) return null; String value = null; try { FileInputStream infile = new FileInputStream(filename); prop.load(infile); value = prop.getProperty(key); infile.close(); } catch (IOException e) { log.severe("IO Error reading queue properties from " + filename + " " + e); return null; } catch (Exception e) { log.severe("Exception reading queue properties from " + filename + " " + e); return null; } if (value != null) { value = value.trim(); } return value; } // Will return -1 on failure private int readIntProp(String key) { if (key == null) return -1; String value = readStringProp(key); if (value != null) { return Integer.parseInt(value); } return -1; } // Only allows non null strings and non negetive numbers private boolean insertProp(String key, int value) { if (key == null || value < 0) return false; return insertProp(key, Integer.toString(value)); } // Only allows non null strings and non negetive numbers private boolean insertProp(String key, String value) { if (key == null || value == null) return false; try { openProp(); prop.setProperty(key, value); prop.store(outfile, header); closeProp(); } catch (IOException e) { log.severe("IO Error writing queue properties to " + filename + " " + e); return false; } catch (Exception e) { log.severe("Exception writing queue properties to " + filename + " " + e); return false; } return true; } private boolean openProp() { try { outfile = new FileOutputStream(filename); } catch (IOException e) { log.severe("IO Error writing queue properties to " + filename + " " + e); return false; } return true; } private boolean closeProp() { try { outfile.close(); } catch (IOException e) { log.warning("IO Error writing queue properties to " + filename + " " + e); return false; } return true; } private boolean checkStatus() { File file = new File(filename); if (file.exists()) { return initQueue(); } else { return createQueue(); } } private boolean createQueue() { if (!insertProp(CURSORKEY, cursor)) return false; if (!insertProp(LENGTHKEY, length)) return false; return true; } private boolean initQueue() { fileStatus(); initValues(); checkSanity(); checkRollback(); return garbageCollect(); } private void fileStatus() { boolean result = true; File file = new File(filename); if (file.exists()) { if (!file.canRead()) { log.severe("Cannot read queue file " + filename); result = false; } if (!file.canWrite()) { log.severe("Cannot write queue file " + filename); result = false; } } else { log.severe("Queue file " + filename + " does not exst"); result = false; } if (result == false) { log.severe("Please fix the above error. Exiting .."); System.exit(1); } } private void initValues() { length = readIntProp(LENGTHKEY); cursor = readIntProp(CURSORKEY); } private void checkSanity() { if (length < 0) length = 0; if (cursor < 0) cursor = 0; } private void checkRollback() { int lastput = readIntProp(LASTPUTKEY); /** * See comments at begin() and end() * A non zero value for LASTPUTKEY * indicate we have an incomplete * group insertion to the queue. * And the value of LASPUTKEY points to * the last confirmed group insertion. * So we rollback the queue to that * point for data integrity */ if (lastput > 0) { log.fine("Queue rollbacked"); length = lastput; } } private boolean garbageCollect() { // Check for safe point first if (length > 0 || cursor > 0) return true; File file = new File(filename); file.delete(); //Delete existing if(!createQueue()) return false; //Create new log.fine("Queue garbage collected"); return true; } }