blob: 6a7c0b6075aa6fbe21bc34f1d4ab0b147e7187f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.log;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
/**
* Writes RingBuffer log event into table
*
*/
public class TableLogWriter implements LogWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(LogWriter.class);
private Connection connection;
private boolean isClosed;
private PreparedStatement upsertStatement;
private Configuration config;
private Map<MetricType,Integer> metricOrdinals=new HashMap<MetricType,Integer>();
public TableLogWriter(Configuration configuration) {
this.config=configuration;
}
private PreparedStatement buildUpsertStatement(Connection conn) throws SQLException {
StringBuilder buf = new StringBuilder("UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(");
int queryLogEntries=0;
for (QueryLogInfo info : QueryLogInfo.values()) {
buf.append(info.columnName);
buf.append(',');
queryLogEntries++;
}
for (MetricType metric : MetricType.values()) {
if (metric.logLevel() != LogLevel.OFF) {
metricOrdinals.put(metric, ++queryLogEntries);
buf.append(metric.columnName());
buf.append(',');
}
}
buf.setLength(buf.length()-1);
buf.append(") VALUES (");
for (int i = 0; i < QueryLogInfo.values().length; i++) {
buf.append("?,");
}
for (MetricType metric : MetricType.values()) {
if (metric.logLevel() != LogLevel.OFF) {
buf.append("?,");
}
}
buf.setLength(buf.length()-1);
buf.append(")");
return conn.prepareStatement(buf.toString());
}
@Override
public void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException {
if (isClosed()) {
LOGGER.warn("Unable to commit query log as Log committer is already closed");
return;
}
if (connection == null) {
synchronized (this) {
if (connection == null) {
connection = QueryUtil.getConnectionForQueryLog(this.config);
this.upsertStatement = buildUpsertStatement(connection);
}
}
}
ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo();
for (QueryLogInfo info : QueryLogInfo.values()) {
if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) {
upsertStatement.setObject(info.ordinal() + 1, queryInfoMap.get(info));
} else {
upsertStatement.setObject(info.ordinal() + 1, null);
}
}
Map<MetricType, Long> overAllMetrics = event.getOverAllMetrics();
Map<String, Map<MetricType, Long>> readMetrics = event.getReadMetrics();
for (MetricType metric : MetricType.values()) {
if (overAllMetrics != null && overAllMetrics.containsKey(metric)
&& metric.isLoggingEnabled(event.getConnectionLogLevel())) {
upsertStatement.setObject(metricOrdinals.get(metric), overAllMetrics.get(metric));
} else {
if (metric.logLevel() != LogLevel.OFF) {
upsertStatement.setObject(metricOrdinals.get(metric), null);
}
}
}
if (readMetrics != null && !readMetrics.isEmpty()) {
for (Map.Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) {
upsertStatement.setObject(QueryLogInfo.TABLE_NAME_I.ordinal() + 1, entry.getKey());
for (MetricType metric : entry.getValue().keySet()) {
if (metric.isLoggingEnabled(event.getConnectionLogLevel())) {
upsertStatement.setObject(metricOrdinals.get(metric), entry.getValue().get(metric));
}
}
upsertStatement.executeUpdate();
}
} else {
upsertStatement.executeUpdate();
}
connection.commit();
}
@Override
public void close() throws IOException {
if (isClosed()) { return; }
isClosed = true;
try {
if (connection != null) {
// It should internally close all the statements
connection.close();
}
} catch (SQLException e) {
// TODO Ignore?
}
}
public boolean isClosed(){
return isClosed;
}
}