package com.hackorama.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Logger;
/*
* kishan at hackorama dot com www.hackorama.com 2004/2005
*
* A pessimistic messsage queue implementation which uses
* file system persistance through java.util.properties for
* one to one message caching between an asynchrounous publisher
* and an asynchrounous consumer.
*
* Provides rollbackable multiple insertions from the publisher
* and safe reading with read confirmation for the consumer.
*
* Not thread safe, depends on filesystem integrity for safe
* persistance and can grow till the file system limits.
* Uses lazy delete with garbage collection done at each new
* initialization after safe point check
*
* kishan at hackorama dot com www.hackorama.com 2004/2005
*
*/
public class MessageQ {
private Logger log = Logger.getLogger("com.hackorama.util");
private String header = "MESSAGEQ PERSISTANT DATA DO NOT MODIFY";
private String filename = null;
private int length = 0;
private int cursor = 0;
private Properties prop = null;
private FileOutputStream outfile = null;
private boolean read_ok = false;
private static final String LENGTHKEY = "LEN";
private static final String CURSORKEY = "CUR";
private static final String KEY = "KEY";
private static final String LASTGOTKEY = "GOT";
private static final String LASTPUTKEY = "PUT";
/*---------------------------------------------*/
/* Public Interface */
/*---------------------------------------------*/
public MessageQ(String filename) {
this.filename = filename;
prop = new Properties();
checkStatus();
}
public boolean hasMore() {
if (length > 0) return true;
return false;
}
public String get() {
String result = safeGet();
doneGet();
return result;
}
public String reget() {
String value = null;
if (cursor == 0)
value = readStringProp(LASTGOTKEY);
else
value = readStringProp(KEY + Integer.toString(cursor));
return value;
}
public String safeGet() {
read_ok = false;
if (isEmpty()) return null;
if (emptyQueue()) {
log.fine("Finished all messages from queue");
return null;
}
String value = readStringProp(KEY + Integer.toString(cursor + 1));
read_ok = true;
return value;
}
public void doneGet() {
if (read_ok) cursorPlus();
}
public boolean put(String message) {
if (message == null) return false;
if (insertProp(KEY + Integer.toString(length + 1), message)) {
lengthPlus();
insertProp(LASTGOTKEY, message);
return true;
}
return false;
}
public boolean reput(String message) {
if (message == null) return false;
if (length <= 0) return false;
return insertProp(KEY + Integer.toString(length), message);
}
// Call this method to start the insertion
// of a group of messages that can be rollbacked
// if all the messages were not inserted
public boolean begin() {
return insertProp(LASTPUTKEY, length);
}
// Call this method to end the insertion
// of a group of messages that gets rollbacked
// like a transaction
public boolean end() {
return insertProp(LASTPUTKEY, 0);
}
public void setHeader(String header) {
if(header == null){
log.warning("Cannot set header to null value");
}else{
this.header = header;
}
}
/*---------------------------------------------*/
/* Private Implementation */
/*---------------------------------------------*/
private boolean lengthPlus() {
length++;
return insertProp(LENGTHKEY, length);
}
private boolean cursorPlus() {
cursor++;
return insertProp(CURSORKEY, cursor);
}
private boolean isEmpty() {
if (length == 0) return true;
return false;
}
private boolean emptyQueue() {
if (removeQueue()) {
length = 0;
cursor = 0;
return true;
}
return false;
}
private boolean removeQueue() {
if (!shouldEmpty()) return false;
if (insertProp(LENGTHKEY, 0)) {
if (insertProp(CURSORKEY, 0)) {
return true;
}
}
return false;
}
private boolean shouldEmpty() {
//do not clean an empty queue
if (length == 0) {
return false;
}
// ok to clean if all marked as read
if (cursor >= length) return true;
//by default do not clean
return false;
}
// Will return null on failure
private String readStringProp(String key) {
if (key == null) return null;
String value = null;
try {
FileInputStream infile = new FileInputStream(filename);
prop.load(infile);
value = prop.getProperty(key);
infile.close();
} catch (IOException e) {
log.severe("IO Error reading queue properties from " +
filename + " " + e);
return null;
} catch (Exception e) {
log.severe("Exception reading queue properties from " +
filename + " " + e);
return null;
}
if (value != null) {
value = value.trim();
}
return value;
}
// Will return -1 on failure
private int readIntProp(String key) {
if (key == null) return -1;
String value = readStringProp(key);
if (value != null) {
return Integer.parseInt(value);
}
return -1;
}
// Only allows non null strings and non negetive numbers
private boolean insertProp(String key, int value) {
if (key == null || value < 0) return false;
return insertProp(key, Integer.toString(value));
}
// Only allows non null strings and non negetive numbers
private boolean insertProp(String key, String value) {
if (key == null || value == null) return false;
try {
openProp();
prop.setProperty(key, value);
prop.store(outfile, header);
closeProp();
} catch (IOException e) {
log.severe("IO Error writing queue properties to " +
filename + " " + e);
return false;
} catch (Exception e) {
log.severe("Exception writing queue properties to " +
filename + " " + e);
return false;
}
return true;
}
private boolean openProp() {
try {
outfile = new FileOutputStream(filename);
} catch (IOException e) {
log.severe("IO Error writing queue properties to " +
filename + " " + e);
return false;
}
return true;
}
private boolean closeProp() {
try {
outfile.close();
} catch (IOException e) {
log.warning("IO Error writing queue properties to " +
filename + " " + e);
return false;
}
return true;
}
private boolean checkStatus() {
File file = new File(filename);
if (file.exists()) {
return initQueue();
} else {
return createQueue();
}
}
private boolean createQueue() {
if (!insertProp(CURSORKEY, cursor))
return false;
if (!insertProp(LENGTHKEY, length))
return false;
return true;
}
private boolean initQueue() {
fileStatus();
initValues();
checkSanity();
checkRollback();
return garbageCollect();
}
private void fileStatus() {
boolean result = true;
File file = new File(filename);
if (file.exists()) {
if (!file.canRead()) {
log.severe("Cannot read queue file " + filename);
result = false;
}
if (!file.canWrite()) {
log.severe("Cannot write queue file " + filename);
result = false;
}
} else {
log.severe("Queue file " + filename + " does not exst");
result = false;
}
if (result == false) {
log.severe("Please fix the above error. Exiting ..");
System.exit(1);
}
}
private void initValues() {
length = readIntProp(LENGTHKEY);
cursor = readIntProp(CURSORKEY);
}
private void checkSanity() {
if (length < 0) length = 0;
if (cursor < 0) cursor = 0;
}
private void checkRollback() {
int lastput = readIntProp(LASTPUTKEY);
/**
* See comments at begin() and end()
* A non zero value for LASTPUTKEY
* indicate we have an incomplete
* group insertion to the queue.
* And the value of LASPUTKEY points to
* the last confirmed group insertion.
* So we rollback the queue to that
* point for data integrity
*/
if (lastput > 0) {
log.fine("Queue rollbacked");
length = lastput;
}
}
private boolean garbageCollect() {
// Check for safe point first
if (length > 0 || cursor > 0) return true;
File file = new File(filename);
file.delete(); //Delete existing
if(!createQueue()) return false; //Create new
log.fine("Queue garbage collected");
return true;
}
}
syntax highlighted by Code2HTML, v. 0.9