package com.hackorama.util;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Logger;

/*
 * kishan at hackorama dot com www.hackorama.com 2004/2005
 *
 * A pessimistic messsage queue implementation which uses 
 * file system persistance through java.util.properties for 
 * one to one message caching between an asynchrounous publisher 
 * and an asynchrounous consumer.
 *
 * Provides rollbackable multiple insertions from the publisher 
 * and safe reading with read confirmation for the consumer.
 *
 * Not thread safe, depends on filesystem integrity for safe 
 * persistance and can grow till the file system limits. 
 * Uses lazy delete with garbage collection done at each new 
 * initialization after safe point check
 *
 * kishan at hackorama dot com www.hackorama.com 2004/2005
 *
 */

public class MessageQ {

    private Logger log = Logger.getLogger("com.hackorama.util");

    private String header = "MESSAGEQ PERSISTANT DATA DO NOT MODIFY";
    private String filename = null;
    private int length = 0;
    private int cursor = 0;
    private Properties prop = null;
    private FileOutputStream outfile = null;
    private boolean read_ok  = false;

    private static final String LENGTHKEY  = "LEN";
    private static final String CURSORKEY  = "CUR";
    private static final String KEY        = "KEY";
    private static final String LASTGOTKEY = "GOT";
    private static final String LASTPUTKEY = "PUT";

    /*---------------------------------------------*/
    /* Public Interface                            */
    /*---------------------------------------------*/

    public MessageQ(String filename) {
        this.filename = filename;
        prop = new Properties();
        checkStatus();
    }

    public boolean hasMore() {
        if (length > 0) return true;
        return false;
    }

    public String get() {
	String result = safeGet();	
	doneGet();
	return result;
    }

    public String reget() {
        String value = null;
        if (cursor == 0)
            value = readStringProp(LASTGOTKEY);
        else
            value = readStringProp(KEY + Integer.toString(cursor));
        return value;
    }

    public String safeGet() {
        read_ok = false;
        if (isEmpty()) return null;
        if (emptyQueue()) {
            log.fine("Finished all messages from queue");
            return null;
        }
        String value = readStringProp(KEY + Integer.toString(cursor + 1));
        read_ok = true;
        return value;
    }

    public void doneGet() {
        if (read_ok) cursorPlus();
    }

    public boolean put(String message) {
        if (message == null) return false;
        if (insertProp(KEY + Integer.toString(length + 1), message)) {
            lengthPlus();
            insertProp(LASTGOTKEY, message);
            return true;
        }
        return false;
    }

    public boolean reput(String message) {
        if (message == null) return false;
        if (length <= 0) return false;
        return insertProp(KEY + Integer.toString(length), message);
    }

    // Call this method to start the insertion
    // of a group of messages that can be rollbacked
    // if all the messages were not inserted
    public boolean begin() {
        return insertProp(LASTPUTKEY, length);
    }

    // Call this method to end the insertion
    // of a group of messages that gets rollbacked
    // like a transaction
    public boolean end() {
        return insertProp(LASTPUTKEY, 0);
    }

    public void setHeader(String header) {
	if(header == null){
            log.warning("Cannot set header to null value");
	}else{
	    this.header = header;
	}
    }

    /*---------------------------------------------*/
    /* Private Implementation                      */
    /*---------------------------------------------*/

    private boolean lengthPlus() {
        length++;
        return insertProp(LENGTHKEY, length);
    }

    private boolean cursorPlus() {
        cursor++;
        return insertProp(CURSORKEY, cursor);
    }

    private boolean isEmpty() {
        if (length == 0) return true;
        return false;
    }

    private boolean emptyQueue() {
        if (removeQueue()) {
            length = 0;
            cursor = 0;
            return true;
        }
        return false;
    }

    private boolean removeQueue() {
        if (!shouldEmpty()) return false;
        if (insertProp(LENGTHKEY, 0)) {
            if (insertProp(CURSORKEY, 0)) {
                return true;
            }
        }
        return false;
    }

    private boolean shouldEmpty() {
        //do not clean an empty queue
        if (length == 0) {
            return false;
        }
        // ok to clean if all marked as read
        if (cursor >= length) return true;
        //by default do not clean
        return false;
    }

    // Will return null on failure
    private String readStringProp(String key) {
        if (key == null) return null;
        String value = null;
        try {
            FileInputStream infile = new FileInputStream(filename);
            prop.load(infile);
            value = prop.getProperty(key);
            infile.close();
        } catch (IOException e) {
            log.severe("IO Error reading queue properties from " +
                    filename + " " + e);
            return null;
        } catch (Exception e) {
            log.severe("Exception reading queue properties from " +
                    filename + " " + e);
            return null;
        }
        if (value != null) {
            value = value.trim();
        }
        return value;
    }

    // Will return -1 on failure
    private int readIntProp(String key) {
        if (key == null) return -1;
        String value = readStringProp(key);
        if (value != null) {
            return Integer.parseInt(value);
        }
        return -1;
    }

    // Only allows non null strings and non negetive numbers
    private boolean insertProp(String key, int value) {
        if (key == null || value < 0) return false;
        return insertProp(key, Integer.toString(value));
    }

    // Only allows non null strings and non negetive numbers
    private boolean insertProp(String key, String value) {
        if (key == null || value == null) return false;

        try {
            openProp();
            prop.setProperty(key, value);
            prop.store(outfile, header);
            closeProp();
        } catch (IOException e) {
            log.severe("IO Error writing queue properties to " +
                    filename + " " + e);
            return false;
        } catch (Exception e) {
            log.severe("Exception writing queue properties to " +
                    filename + " " + e);
            return false;
        }
        return true;
    }

    private boolean openProp() {
        try {
            outfile = new FileOutputStream(filename);
        } catch (IOException e) {
            log.severe("IO Error writing queue properties to " +
                    filename + " " + e);
            return false;
        }
        return true;
    }

    private boolean closeProp() {
        try {
            outfile.close();
        } catch (IOException e) {
            log.warning("IO Error writing queue properties to " +
                    filename + " " + e);
            return false;
        }
        return true;
    }

    private boolean checkStatus() {
        File file = new File(filename);
        if (file.exists()) {
            return initQueue();
        } else {
            return createQueue();
        }
    }

    private boolean createQueue() {
        if (!insertProp(CURSORKEY, cursor))
            return false;
        if (!insertProp(LENGTHKEY, length))
            return false;
        return true;
    }

    private boolean initQueue() {
        fileStatus();
        initValues();
        checkSanity();
        checkRollback();
        return garbageCollect();
    }

    private void fileStatus() {
        boolean result = true;
        File file = new File(filename);
        if (file.exists()) {
            if (!file.canRead()) {
                log.severe("Cannot read queue file " + filename);
                result = false;
            }
            if (!file.canWrite()) {
                log.severe("Cannot write queue file " + filename);
                result = false;
            }
        } else {
            log.severe("Queue file " + filename + " does not exst");
            result = false;
        }
        if (result == false) {
            log.severe("Please fix the above error. Exiting ..");
            System.exit(1);
        }
    }

    private void initValues() {
        length = readIntProp(LENGTHKEY);
        cursor = readIntProp(CURSORKEY);
    }

    private void checkSanity() {
        if (length < 0) length = 0;
        if (cursor < 0) cursor = 0;
    }

    private void checkRollback() {
        int lastput = readIntProp(LASTPUTKEY);
        /**
         * See comments at begin() and end()
         * A non zero value for LASTPUTKEY
         * indicate we have an incomplete
         * group insertion to the queue.
         * And the value of LASPUTKEY points to
         * the last confirmed group insertion.
         * So we rollback the queue to that
         * point for data integrity
         */
        if (lastput > 0) {
            log.fine("Queue rollbacked");
            length = lastput;
        }
    }

    private boolean garbageCollect() {
        // Check for safe point first
        if (length > 0 || cursor > 0) return true;
        File file = new File(filename);
        file.delete(); //Delete existing
        if(!createQueue()) return false; //Create new
        log.fine("Queue garbage collected");
        return true;
    }

}


syntax highlighted by Code2HTML, v. 0.9