/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.log4j.db;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Hashtable;
import java.util.Properties;
import java.util.StringTokenizer;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.plugins.Pauseable;
import org.apache.log4j.plugins.Receiver;
import org.apache.log4j.scheduler.Job;
import org.apache.log4j.scheduler.Scheduler;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggerRepositoryEx;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.log4j.xml.UnrecognizedElementHandler;
import org.w3c.dom.Element;

/**
 * Converts log data stored in a database into LoggingEvents.
 * <p>
 * <b>NOTE:</b> This receiver cannot yet be created through Chainsaw's receiver panel.  
 * It must be created through an XML configuration file.
 * <p>
 * This receiver supports database configuration via ConnectionSource, in the
 * org.apache.log4j.db package: DriverManagerConnectionSource,
 * DataSourceConnectionSource, JNDIConnectionSource
 * <p>
 * This database receiver differs from DBReceiver in that this receiver relies
 * on custom SQL to retrieve logging event data, where DBReceiver requires the
 * use of a log4j-defined schema.
 * <p>
 * A 'refreshMillis' int parameter controls SQL execution. If 'refreshMillis' is
 * zero (the default), the receiver will run only one time. If it is set to any
 * other numeric value, the SQL will be executed on a recurring basis every
 * 'refreshMillis' milliseconds.
 * <p>
 * The receiver closes the connection and acquires a new connection on each 
 * execution of the SQL (use pooled connections if possible).
 * <p>
 * If the SQL will be executing on a recurring basis, specify the IDField param -
 * the column name holding the unique identifier (int) representing the logging
 * event.
 * <p>
 * As events are retrieved, the column represented by IDField is examined and
 * the largest value is held and used by the next execution of the SQL statement
 * to avoid retrieving previously processed events.
 * <p>
 * As an example, the IDField references a 'COUNTER' (int, auto-increment,
 * unique) column. The first execution of the SQL statement returns 500 rows,
 * with a final value in the COUNTER field of 500.
 * <p>
 * The SQL statement is manipulated prior to the next execution, adding ' WHERE
 * COUNTER &gt; 500' to the statement to avoid retrieval of previously processed
 * events.
 * <p>
 * The select statement must provide ALL fields which define a LoggingEvent.
 * <p>
 * The SQL statement MUST include the columns: LOGGER, TIMESTAMP, LEVEL, THREAD,
 * MESSAGE, NDC, MDC, CLASS, METHOD, FILE, LINE, PROPERTIES, THROWABLE
 * <p>
 * Use ' AS ' in the SQL statement to alias the SQL's column names to match your
 * database schema. (see example below).
 * <p>
 * Include all fields in the SQL statement, even if you don't have data for the
 * field (specify an empty string as the value for columns which you don't have
 * data).
 * <p>
 * The TIMESTAMP column must be a datetime.
 * <p>
 * Both a PROPERTIES column and an MDC column are supported. These fields
 * represent Maps on the logging event, but require the use of string
 * concatenation database functions to hold the (possibly multiple) name/value
 * pairs in the column.
 * <p>
 * For example, to include both 'userid' and 'lastname' properties in the
 * logging event (from either the PROPERTIES or MDC columns), the name/value
 * pairs must be concatenated together by your database.
 * <p>
 * The resulting PROPERTIES or MDC column must have data in this format: {{name,
 * value, name2, value2}}
 * <p>
 * The resulting PROPERTIES column would contain this text: {{userid, someone,
 * lastname, mylastname}}
 * <p>
 * Here is an example of concatenating a PROPERTIES or MDC column using MySQL's
 * concat function, where the 'application' and 'hostname' parameters were fixed
 * text, but the 'log4jid' key's value is the value of the COUNTER column:
 * <p>
 * concat("{{application,databaselogs,hostname,mymachine,log4jid,", COUNTER,
 * "}}") as PROPERTIES
 * <p>
 * log4jid is a special property that is used by Chainsaw to represent an 'ID'
 * field. Specify this property to ensure you can map events in Chainsaw to
 * events in the database if you need to go back and view events at a later time
 * or save the events to XML for later analysis.
 * <p>
 * Here is a complete MySQL SQL statement which can be used to provide events to
 * Chainsaw (note how in the example below, there is no column in logtable representing the throwable, so an
 * empty string is passed in and an ALIAS is still defined):
 * <p>
 * select myloggercolumn as LOGGER, mytimestampcolumn as TIMESTAMP, mylevelcolumn as LEVEL, mythreadcolumn as
 * THREAD, mymessagecolumn as MESSAGE, myndccolumn as NDC, mymdccolumn as MDC, myclasscolumn as CLASS, mymethodcolumn as
 * METHOD, myfilecolumn as FILE, mylinecolumn as LINE,
 * concat("{{application,databaselogs,hostname,mymachine, log4jid,",
 * COUNTER,"}}") as PROPERTIES, "" as THROWABLE from logtable
 * <p>
 * @author Scott Deboy &lt;sdeboy@apache.org&gt;
 * <p>
 */
public class CustomSQLDBReceiver extends Receiver implements Pauseable, UnrecognizedElementHandler {

    protected volatile Connection connection = null;

    protected String sqlStatement = "";

    /**
     * By default we refresh data every 1000 milliseconds.
     * 
     * @see #setRefreshMillis
     */
    static int DEFAULT_REFRESH_MILLIS = 1000;

    int refreshMillis = DEFAULT_REFRESH_MILLIS;

    protected String idField = null;

    int lastID = -1;

    private static final String WHERE_CLAUSE = " WHERE ";

    private static final String AND_CLAUSE = " AND ";

    private boolean whereExists = false;

    private boolean paused = false;

    private ConnectionSource connectionSource;

    public static final String LOG4J_ID_KEY = "log4jid";

    private Job customReceiverJob;

    public void activateOptions() {
      
      if(connectionSource == null)  {
        throw new IllegalStateException(
          "CustomSQLDBReceiver cannot function without a connection source");
      }
      whereExists = (sqlStatement.toUpperCase().contains(WHERE_CLAUSE));
    
      customReceiverJob = new CustomReceiverJob();
        
      if(this.repository == null) {
        throw new IllegalStateException(
        "CustomSQLDBReceiver cannot function without a reference to its owning repository");
      }
     
    

      if (repository instanceof LoggerRepositoryEx) {
        Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
      
        scheduler.schedule(
          customReceiverJob, System.currentTimeMillis() + 500, refreshMillis);
      }

    }

    void closeConnection() {
        if (connection != null) {
            try {
                // LogLog.warn("closing the connection. ", new Exception("x"));
                connection.close();
            } catch (SQLException sqle) {
                // nothing we can do here
            }
        }
    }

    public void setRefreshMillis(int refreshMillis) {
        this.refreshMillis = refreshMillis;
    }

    public int getRefreshMillis() {
        return refreshMillis;
    }

    /**
     * @return Returns the connectionSource.
     */
    public ConnectionSource getConnectionSource() {
        return connectionSource;
    }

    /**
     * @param connectionSource
     *            The connectionSource to set.
     */
    public void setConnectionSource(ConnectionSource connectionSource) {
        this.connectionSource = connectionSource;
    }

    public void close() {
        try {
            if ((connection != null) && !connection.isClosed()) {
                connection.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            connection = null;
        }
    }

    public void finalize() throws Throwable {
        super.finalize();
        close();
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.log4j.plugins.Plugin#shutdown()
     */
    public void shutdown() {
        getLogger().info("removing receiverJob from the Scheduler.");

        if(this.repository instanceof LoggerRepositoryEx) {
          Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
          scheduler.delete(customReceiverJob);
        }

        lastID = -1;
    }

    public void setSql(String s) {
        sqlStatement = s;
    }

    public String getSql() {
        return sqlStatement;
    }

    public void setIDField(String id) {
        idField = id;
    }

    public String getIDField() {
        return idField;
    }

    public synchronized void setPaused(boolean p) {
        paused = p;
    }

    public synchronized boolean isPaused() {
        return paused;
    }

    class CustomReceiverJob implements Job {
        public void execute() {
            int oldLastID = lastID;
            try {
                connection = connectionSource.getConnection();
                Statement statement = connection.createStatement();

                Logger eventLogger;
                long timeStamp;
                String level;
                String threadName;
                Object message;
                String ndc;
                Hashtable<String, String> mdc;
                String[] throwable;
                String className;
                String methodName;
                String fileName;
                String lineNumber;
                Hashtable<String, String> properties;

                String currentSQLStatement;
                if (whereExists) {
                    currentSQLStatement = sqlStatement + AND_CLAUSE + idField
                            + " > " + lastID;
                } else {
                    currentSQLStatement = sqlStatement + WHERE_CLAUSE + idField
                            + " > " + lastID;
                }

                ResultSet rs = statement.executeQuery(currentSQLStatement);

                int i = 0;
                while (rs.next()) {
                    // add a small break every 1000 received events
                    if (++i == 1000) {
                        synchronized (this) {
                            try {
                                // add a delay
                                wait(300);
                            } catch (InterruptedException ie) {
                            }
                            i = 0;
                        }
                    }
                    eventLogger = Logger.getLogger(rs.getString("LOGGER"));
                    timeStamp = rs.getTimestamp("TIMESTAMP").getTime();

                    level = rs.getString("LEVEL");
                    threadName = rs.getString("THREAD");
                    message = rs.getString("MESSAGE");
                    ndc = rs.getString("NDC");

                    String mdcString = rs.getString("MDC");
                    mdc = new Hashtable<>();

                    if (mdcString != null) {
                        // support MDC being wrapped in {{name, value}}
                        // or
                        // just name, value
                        if ((mdcString.contains("{{"))
                                && (mdcString.contains("}}"))) {
                            mdcString = mdcString
                                    .substring(mdcString.indexOf("{{") + 2,
                                            mdcString.indexOf("}}"));
                        }

                        StringTokenizer tok = new StringTokenizer(mdcString,
                                ",");

                        while (tok.countTokens() > 1) {
                            mdc.put(tok.nextToken(), tok.nextToken());
                        }
                    }

                    throwable = new String[] { rs.getString("THROWABLE") };
                    className = rs.getString("CLASS");
                    methodName = rs.getString("METHOD");
                    fileName = rs.getString("FILE");
                    lineNumber = rs.getString("LINE");

                    // if properties are provided in the
                    // SQL they can be used here (for example, to route
                    // events to a unique tab in
                    // Chainsaw if the machinename and/or appname
                    // property
                    // are set)
                    String propertiesString = rs.getString("PROPERTIES");
                    properties = new Hashtable<>();

                    if (propertiesString != null) {
                        // support properties being wrapped in {{name,
                        // value}} or just name, value
                        if ((propertiesString.contains("{{"))
                                && (propertiesString.contains("}}"))) {
                            propertiesString = propertiesString.substring(
                                    propertiesString.indexOf("{{") + 2,
                                    propertiesString.indexOf("}}"));
                        }

                        StringTokenizer tok2 = new StringTokenizer(
                                propertiesString, ",");
                        while (tok2.countTokens() > 1) {
                            String tokenName = tok2.nextToken();
                            String value = tok2.nextToken();
                            if (tokenName.equals(LOG4J_ID_KEY)) {
                                try {
                                    int thisInt = Integer.parseInt(value);
                                    value = String.valueOf(thisInt);
                                    if (thisInt > lastID) {
                                        lastID = thisInt;
                                    }
                                } catch (Exception e) {
                                }
                            }
                            properties.put(tokenName, value);
                        }
                    }

                    Level levelImpl = Level.toLevel(level);


					LocationInfo locationInfo = new LocationInfo(fileName,
	                            className, methodName, lineNumber);
	
					ThrowableInformation throwableInfo =  new ThrowableInformation(
		                            throwable);
	
					properties.putAll(mdc);
		
				    LoggingEvent event = new LoggingEvent(eventLogger.getName(),
				            eventLogger, timeStamp, levelImpl, message,
				            threadName,
				            throwableInfo,
				            ndc,
				            locationInfo,
				            properties);

                    doPost(event);
                }
                //log when rows are retrieved
                if (lastID != oldLastID) {
                    getLogger().debug("lastID: " + lastID);
                }

                statement.close();
            } catch (SQLException sqle) {
                getLogger()
                        .error("*************Problem receiving events", sqle);
            } finally {
                closeConnection();
            }

            // if paused, loop prior to executing sql query
            synchronized (this) {
                while (isPaused()) {
                    try {
                        wait(1000);
                    } catch (InterruptedException ie) {
                    }
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
  public boolean parseUnrecognizedElement(Element element, Properties props) throws Exception {
        if ("connectionSource".equals(element.getNodeName())) {
            Object instance =
                    DOMConfigurator.parseElement(element, props, ConnectionSource.class);
            if (instance instanceof ConnectionSource) {
               ConnectionSource source = (ConnectionSource) instance;
               source.activateOptions();
               setConnectionSource(source);
            }
            return true;
        }
        return false;
  }
    
}
