| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.end2end; |
| |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; |
| import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.net.InetAddress; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.phoenix.compile.StatementContext; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDriver; |
| import org.apache.phoenix.jdbc.PhoenixResultSet; |
| import org.apache.phoenix.log.LogLevel; |
| import org.apache.phoenix.log.QueryStatus; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.util.EnvironmentEdge; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.QueryUtil; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import com.google.common.collect.Maps; |
| |
| public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { |
| |
| |
| @BeforeClass |
| public static void doSetup() throws Exception { |
| Map<String, String> props = Maps.newHashMapWithExpectedSize(1); |
| // Enable request metric collection at the driver level |
| props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); |
| // disable renewing leases as this will force spooling to happen. |
| props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); |
| setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); |
| // need the non-test driver for some tests that check number of hconnections, etc. |
| DriverManager.registerDriver(PhoenixDriver.INSTANCE); |
| } |
| |
| private static class MyClock extends EnvironmentEdge { |
| public volatile long time; |
| |
| public MyClock (long time) { |
| this.time = time; |
| } |
| |
| @Override |
| public long currentTime() { |
| return time; |
| } |
| } |
| |
| |
| @Test |
| public void testDebugLogs() throws Exception { |
| String tableName = generateUniqueName(); |
| createTableAndInsertValues(tableName, true); |
| Properties props= new Properties(); |
| props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); |
| Connection conn = DriverManager.getConnection(getUrl(),props); |
| assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG); |
| String query = "SELECT * FROM " + tableName; |
| ResultSet rs = conn.createStatement().executeQuery(query); |
| StatementContext context = ((PhoenixResultSet)rs).getContext(); |
| String queryId = context.getQueryLogger().getQueryId(); |
| while (rs.next()) { |
| rs.getString(1); |
| rs.getString(2); |
| } |
| ResultSet explainRS = conn.createStatement().executeQuery("Explain " + query); |
| |
| String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; |
| int delay = 5000; |
| |
| // sleep for sometime to let query log committed |
| Thread.sleep(delay); |
| rs = conn.createStatement().executeQuery(logQuery); |
| boolean foundQueryLog = false; |
| |
| while (rs.next()) { |
| if (rs.getString(QUERY_ID).equals(queryId)) { |
| foundQueryLog = true; |
| assertEquals(rs.getString(BIND_PARAMETERS), null); |
| assertEquals(rs.getString(USER), System.getProperty("user.name")); |
| assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); |
| assertEquals(rs.getString(EXPLAIN_PLAN), QueryUtil.getExplainPlan(explainRS)); |
| assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); |
| assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10); |
| assertEquals(rs.getString(QUERY), query); |
| assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString()); |
| assertEquals(rs.getString(TENANT_ID), null); |
| assertTrue(rs.getString(SCAN_METRICS_JSON)==null); |
| assertEquals(rs.getString(EXCEPTION_TRACE),null); |
| }else{ |
| //confirm we are not logging system queries |
| assertFalse(rs.getString(QUERY).toString().contains(SYSTEM_CATALOG_SCHEMA)); |
| } |
| } |
| assertTrue(foundQueryLog); |
| conn.close(); |
| } |
| |
| @Test |
| public void testLogSampling() throws Exception { |
| String tableName = generateUniqueName(); |
| createTableAndInsertValues(tableName, true); |
| Properties props= new Properties(); |
| props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); |
| props.setProperty(QueryServices.LOG_SAMPLE_RATE, "0.5"); |
| Connection conn = DriverManager.getConnection(getUrl(),props); |
| assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG); |
| String query = "SELECT * FROM " + tableName; |
| int count=100; |
| for (int i = 0; i < count; i++) { |
| ResultSet rs = conn.createStatement().executeQuery(query); |
| while(rs.next()){ |
| |
| } |
| } |
| |
| String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; |
| ResultSet rs = conn.createStatement().executeQuery(logQuery); |
| int delay = 5000; |
| |
| // sleep for sometime to let query log committed |
| Thread.sleep(delay); |
| int logCount=0; |
| while (rs.next()) { |
| logCount++; |
| } |
| |
| //sampling rate is 0.5 , but with lesser count, uniformity of thread random may not be perfect, so taking 0.75 for comparison |
| assertTrue(logCount != 0 && logCount < count * 0.75); |
| conn.close(); |
| } |
| |
| @Test |
| public void testInfoLogs() throws Exception{ |
| String tableName = generateUniqueName(); |
| createTableAndInsertValues(tableName, true); |
| Properties props= new Properties(); |
| props.setProperty(QueryServices.LOG_LEVEL, LogLevel.INFO.name()); |
| Connection conn = DriverManager.getConnection(getUrl(),props); |
| assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.INFO); |
| String query = "SELECT * FROM " + tableName; |
| |
| ResultSet rs = conn.createStatement().executeQuery(query); |
| StatementContext context = ((PhoenixResultSet)rs).getContext(); |
| String queryId = context.getQueryLogger().getQueryId(); |
| while (rs.next()) { |
| rs.getString(1); |
| rs.getString(2); |
| } |
| |
| String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; |
| int delay = 5000; |
| |
| // sleep for sometime to let query log committed |
| Thread.sleep(delay); |
| rs = conn.createStatement().executeQuery(logQuery); |
| boolean foundQueryLog = false; |
| while (rs.next()) { |
| if (rs.getString(QUERY_ID).equals(queryId)) { |
| foundQueryLog = true; |
| assertEquals(rs.getString(USER), System.getProperty("user.name")); |
| assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); |
| assertEquals(rs.getString(EXPLAIN_PLAN), null); |
| assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null); |
| assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10); |
| assertEquals(rs.getString(QUERY), query); |
| assertEquals(rs.getString(QUERY_STATUS),QueryStatus.COMPLETED.toString()); |
| assertEquals(rs.getString(TENANT_ID), null); |
| } |
| } |
| assertTrue(foundQueryLog); |
| conn.close(); |
| } |
| |
| @Test |
| public void testWithLoggingOFF() throws Exception{ |
| String tableName = generateUniqueName(); |
| createTableAndInsertValues(tableName, true); |
| Properties props= new Properties(); |
| props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name()); |
| Connection conn = DriverManager.getConnection(getUrl(),props); |
| assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.OFF); |
| String query = "SELECT * FROM " + tableName; |
| |
| ResultSet rs = conn.createStatement().executeQuery(query); |
| StatementContext context = ((PhoenixResultSet)rs).getContext(); |
| String queryId = context.getQueryLogger().getQueryId(); |
| while (rs.next()) { |
| rs.getString(1); |
| rs.getString(2); |
| } |
| |
| String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; |
| int delay = 5000; |
| |
| // sleep for sometime to let query log committed |
| Thread.sleep(delay); |
| rs = conn.createStatement().executeQuery(logQuery); |
| boolean foundQueryLog = false; |
| while (rs.next()) { |
| if (rs.getString(QUERY_ID).equals(queryId)) { |
| foundQueryLog = true; |
| } |
| } |
| assertFalse(foundQueryLog); |
| conn.close(); |
| } |
| |
| |
| @Test |
| public void testPreparedStatementWithTrace() throws Exception{ |
| testPreparedStatement(LogLevel.TRACE); |
| } |
| |
| @Test |
| public void testPreparedStatementWithDebug() throws Exception{ |
| testPreparedStatement(LogLevel.DEBUG); |
| } |
| |
| private void testPreparedStatement(LogLevel loglevel) throws Exception{ |
| String tableName = generateUniqueName(); |
| createTableAndInsertValues(tableName, true); |
| Properties props= new Properties(); |
| props.setProperty(QueryServices.LOG_LEVEL, loglevel.name()); |
| Connection conn = DriverManager.getConnection(getUrl(),props); |
| assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel); |
| final MyClock clock = new MyClock(100); |
| EnvironmentEdgeManager.injectEdge(clock); |
| try{ |
| String query = "SELECT * FROM " + tableName +" where V = ?"; |
| |
| PreparedStatement pstmt = conn.prepareStatement(query); |
| pstmt.setString(1, "value5"); |
| ResultSet rs = pstmt.executeQuery(); |
| StatementContext context = ((PhoenixResultSet)rs).getContext(); |
| String queryId = context.getQueryLogger().getQueryId(); |
| while (rs.next()) { |
| rs.getString(1); |
| rs.getString(2); |
| } |
| ResultSet explainRS = conn.createStatement() |
| .executeQuery("Explain " + "SELECT * FROM " + tableName + " where V = 'value5'"); |
| String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; |
| int delay = 5000; |
| |
| // sleep for sometime to let query log committed |
| Thread.sleep(delay); |
| rs = conn.createStatement().executeQuery(logQuery); |
| boolean foundQueryLog = false; |
| while (rs.next()) { |
| if (rs.getString(QUERY_ID).equals(queryId)) { |
| foundQueryLog = true; |
| assertEquals(rs.getString(BIND_PARAMETERS), loglevel == LogLevel.TRACE ? "value5" : null); |
| assertEquals(rs.getString(USER), System.getProperty("user.name")); |
| assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); |
| assertEquals(rs.getString(EXPLAIN_PLAN), QueryUtil.getExplainPlan(explainRS)); |
| assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); |
| assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1); |
| assertEquals(rs.getString(QUERY), query); |
| assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString()); |
| assertTrue(LogLevel.TRACE == loglevel ? rs.getString(SCAN_METRICS_JSON).contains("scanMetrics") |
| : rs.getString(SCAN_METRICS_JSON) == null); |
| assertEquals(rs.getTimestamp(START_TIME).getTime(),100); |
| assertEquals(rs.getString(TENANT_ID), null); |
| } |
| } |
| assertTrue(foundQueryLog); |
| conn.close(); |
| }finally{ |
| EnvironmentEdgeManager.injectEdge(null); |
| } |
| } |
| |
| |
| |
| @Test |
| public void testFailedQuery() throws Exception { |
| String tableName = generateUniqueName(); |
| Properties props = new Properties(); |
| props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); |
| Connection conn = DriverManager.getConnection(getUrl(), props); |
| assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(), LogLevel.DEBUG); |
| // Table does not exists |
| String query = "SELECT * FROM " + tableName; |
| |
| try { |
| conn.createStatement().executeQuery(query); |
| fail(); |
| } catch (SQLException e) { |
| assertEquals(e.getErrorCode(), SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()); |
| } |
| String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; |
| int delay = 5000; |
| |
| // sleep for sometime to let query log committed |
| Thread.sleep(delay); |
| ResultSet rs = conn.createStatement().executeQuery(logQuery); |
| boolean foundQueryLog = false; |
| while (rs.next()) { |
| if (QueryStatus.FAILED.name().equals(rs.getString(QUERY_STATUS))) { |
| foundQueryLog = true; |
| assertEquals(rs.getString(USER), System.getProperty("user.name")); |
| assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); |
| assertEquals(rs.getString(EXPLAIN_PLAN), null); |
| assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), null); |
| assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0); |
| assertEquals(rs.getString(QUERY), query); |
| assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage())); |
| } |
| } |
| assertTrue(foundQueryLog); |
| conn.close(); |
| } |
| |
| private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate) |
| throws Exception { |
| String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; |
| Connection conn = DriverManager.getConnection(getUrl()); |
| conn.createStatement().execute(ddl); |
| // executing 10 upserts/mutations. |
| String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; |
| PreparedStatement stmt = conn.prepareStatement(dml); |
| for (int i = 1; i <= 10; i++) { |
| stmt.setString(1, "key" + i); |
| stmt.setString(2, "value" + i); |
| stmt.executeUpdate(); |
| } |
| conn.commit(); |
| } |
| |
| } |