blob: 132def727d39d12e1ef8f13803d7a5c55b194005 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.util.history;
import com.google.common.io.Files;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.*;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.util.TajoIdUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class TestHistoryWriterReader extends QueryTestCaseBase {
public static final String HISTORY_DIR = "/tmp/tajo-test-history";
TajoConf tajoConf;
@Before
public void setUp() throws Exception {
tajoConf = new TajoConf(testingCluster.getConfiguration());
tajoConf.setVar(ConfVars.HISTORY_QUERY_DIR, HISTORY_DIR);
}
@After
public void tearDown() throws Exception {
Path path = TajoConf.getQueryHistoryDir(tajoConf);
FileSystem fs = path.getFileSystem(tajoConf);
fs.delete(path, true);
}
@Test
public void testQueryInfoReadAndWrite() throws Exception {
HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
try {
writer.init(tajoConf);
writer.start();
long startTime = System.currentTimeMillis();
QueryInfo queryInfo1 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 1));
queryInfo1.setStartTime(startTime);
queryInfo1.setProgress(1.0f);
queryInfo1.setQueryState(QueryState.QUERY_SUCCEEDED);
writer.appendHistory(queryInfo1);
QueryInfo queryInfo2 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 2));
queryInfo2.setStartTime(startTime);
queryInfo2.setProgress(0.5f);
queryInfo2.setQueryState(QueryState.QUERY_FAILED);
writer.appendAndSync(queryInfo2);
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
FileSystem fs = path.getFileSystem(tajoConf);
Path parentPath = new Path(path, df.format(startTime) + "/query-list");
FileStatus[] histFiles = fs.listStatus(parentPath);
assertNotNull(histFiles);
assertEquals(1, histFiles.length);
assertTrue(histFiles[0].isFile());
assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, 2);
assertNotNull(queryInfos);
assertEquals(2, queryInfos.size());
QueryInfo foundQueryInfo = queryInfos.get(0);
assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId());
assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
foundQueryInfo = reader.getQueryByQueryId(queryInfo2.getQueryId());
assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId());
assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState());
assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0);
foundQueryInfo = queryInfos.get(1);
assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
foundQueryInfo = reader.getQueryByQueryId(queryInfo1.getQueryId());
assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId());
assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState());
assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0);
} finally {
writer.stop();
}
}
@Test
public void testQueryInfoPagination() throws Exception {
HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
try {
writer.init(tajoConf);
writer.start();
long startTime = System.currentTimeMillis();
int testSize = 10;
QueryInfo queryInfo;
for (int i = 1; i < testSize + 1; i++) {
queryInfo = new QueryInfo(QueryIdFactory.newQueryId(startTime, i));
queryInfo.setStartTime(startTime);
queryInfo.setProgress(1.0f);
queryInfo.setQueryState(QueryState.QUERY_SUCCEEDED);
if (testSize == i) {
writer.appendAndSync(queryInfo);
} else {
writer.appendHistory(queryInfo);
}
}
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
FileSystem fs = path.getFileSystem(tajoConf);
Path parentPath = new Path(path, df.format(startTime) + "/query-list");
FileStatus[] histFiles = fs.listStatus(parentPath);
assertNotNull(histFiles);
assertEquals(1, histFiles.length);
assertTrue(histFiles[0].isFile());
assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, testSize);
assertNotNull(queryInfos);
assertEquals(testSize, queryInfos.size());
// the pagination api returns a descending ordered list
for (int i = 0; i < testSize; i++) {
assertEquals(testSize - i, queryInfos.get(i).getQueryId().getSeq());
}
int pages = 5;
int pageSize = testSize / pages;
int expectIdSequence = testSize;
//min startIndex of page is 1
for (int i = 1; i < pages + 1; i++) {
queryInfos = reader.getQueriesInHistory(i, pageSize);
assertNotNull(queryInfos);
assertEquals(pageSize, queryInfos.size());
for (QueryInfo qInfo : queryInfos) {
assertEquals(expectIdSequence--, qInfo.getQueryId().getSeq());
}
}
} finally {
writer.stop();
}
}
@Test
public void testQueryHistoryReadAndWrite() throws Exception {
HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
writer.init(tajoConf);
writer.start();
try {
long startTime = System.currentTimeMillis();
QueryHistory queryHistory = new QueryHistory();
QueryId queryId = QueryIdFactory.newQueryId(startTime, 1);
queryHistory.setQueryId(queryId.toString());
queryHistory.setLogicalPlan("LogicalPlan");
List<StageHistory> stages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId, i);
StageHistory stageHistory = new StageHistory();
stageHistory.setExecutionBlockId(ebId.toString());
stageHistory.setStartTime(startTime + i);
List<TaskHistory> taskHistories = new ArrayList<>();
for (int j = 0; j < 5; j++) {
TaskHistory taskHistory = new TaskHistory();
taskHistory.setId(QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId), 1).toString());
taskHistories.add(taskHistory);
}
stageHistory.setTasks(taskHistories);
stages.add(stageHistory);
}
queryHistory.setStageHistories(stages);
writer.appendAndSync(queryHistory);
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
FileSystem fs = path.getFileSystem(tajoConf);
assertTrue(fs.exists(new Path(path,
df.format(startTime) + "/query-detail/" + queryId.toString() + "/query.hist")));
for (int i = 0; i < 3; i++) {
String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString();
assertTrue(fs.exists(new Path(path,
df.format(startTime) + "/query-detail/" + queryId.toString() + "/" + ebId + ".hist")));
}
HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
QueryHistory foundQueryHistory = reader.getQueryHistory(queryId.toString());
assertNotNull(foundQueryHistory);
assertEquals(queryId.toString(), foundQueryHistory.getQueryId());
assertEquals(3, foundQueryHistory.getStageHistories().size());
for (int i = 0; i < 3; i++) {
String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString();
StageHistory stageHistory = foundQueryHistory.getStageHistories().get(i);
assertEquals(ebId, stageHistory.getExecutionBlockId());
assertEquals(startTime + i, stageHistory.getStartTime());
// TaskHistory is stored in the other file.
assertNull(stageHistory.getTasks());
List<TaskHistory> tasks = reader.getTaskHistory(queryId.toString(), ebId);
assertNotNull(tasks);
assertEquals(5, tasks.size());
for (int j = 0; j < 5; j++) {
TaskHistory taskHistory = tasks.get(j);
assertEquals(stages.get(i).getTasks().get(j).getId(), taskHistory.getId());
}
}
} finally {
writer.stop();
}
}
@Test
public void testTaskHistoryReadAndWrite() throws Exception {
TajoConf tajoConf = new TajoConf();
File historyParentDir = Files.createTempDir();
historyParentDir.deleteOnExit();
tajoConf.setVar(ConfVars.HISTORY_TASK_DIR, "file://" + historyParentDir.getCanonicalPath());
HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", false);
writer.init(tajoConf);
writer.start();
try {
// Write TaskHistory
TableStatsProto tableStats = TableStatsProto.newBuilder()
.setNumRows(10)
.setNumBytes(100)
.build();
long startTime = System.currentTimeMillis() - 2000;
TaskAttemptId id1 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000001_00");
org.apache.tajo.worker.TaskHistory taskHistory1 = new org.apache.tajo.worker.TaskHistory(
id1, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis(), tableStats);
writer.appendHistory(taskHistory1);
TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats);
writer.appendHistory(taskHistory2);
HistoryWriter.WriterFuture future = writer.flushTaskHistories();
future.get(10, TimeUnit.SECONDS);
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
String startDate = df.format(new Date(startTime));
Path taskParentPath = new Path(tajoConf.getVar(ConfVars.HISTORY_TASK_DIR),
startDate.substring(0, 8) + "/tasks/127.0.0.1_28090");
FileSystem fs = taskParentPath.getFileSystem(tajoConf);
assertTrue(fs.exists(taskParentPath));
HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
org.apache.tajo.worker.TaskHistory foundTaskHistory = reader.getTaskHistory(id1.toString(), startTime);
assertNotNull(foundTaskHistory);
assertEquals(id1, foundTaskHistory.getTaskAttemptId());
assertEquals(taskHistory1, foundTaskHistory);
foundTaskHistory = reader.getTaskHistory(id2.toString(), startTime);
assertNotNull(foundTaskHistory);
assertEquals(id2, foundTaskHistory.getTaskAttemptId());
assertEquals(taskHistory2, foundTaskHistory);
foundTaskHistory = reader.getTaskHistory("ta_1412326813565_0001_000001_000003_00", startTime);
assertNull(foundTaskHistory);
} finally {
writer.stop();
}
}
}