blob: 668940c4c0c1270ac2cc888d237eb95c47f15923 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.util.history;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.worker.TaskHistory;
import java.io.Closeable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* History directory structure
* tajo.query-history.path: hdfs
* tajo.task-history.path: local or hdfs
*
* <tajo.history.query.dir>/<yyyyMMdd>/query-list/query-list-<HHmmss>.hist (TajoMaster's query list, hourly rolling)
* /query-detail/<QUERY_ID>/query.hist (QueryMaster's query detail)
* /<EB_ID>.hist (QueryMaster's stage detail)
* <tajo.history.task.dir>/<yyyyMMdd>/tasks/<WORKER_HOST>_<WORKER_PORT>/<WORKER_HOST>_<WORKER_PORT>_<HH>_<seq>.hist
* History files are kept for "tajo.history.expiry-time-day" (default value is 7 days)
*/
public class HistoryWriter extends AbstractService {
private static final Log LOG = LogFactory.getLog(HistoryWriter.class);
public static final String HISTORY_QUERY_REPLICATION = "tajo.history.query.replication";
public static final String HISTORY_TASK_REPLICATION = "tajo.history.task.replication";
public static final String QUERY_LIST = "query-list";
public static final String QUERY_DETAIL = "query-detail";
public static final String HISTORY_FILE_POSTFIX = ".hist";
private final LinkedBlockingQueue<WriterFuture<WriterHolder>>
historyQueue = new LinkedBlockingQueue<>();
// key: yyyyMMddHH
private Map<String, WriterHolder> taskWriters = new HashMap<>();
// For TajoMaster's query list
private WriterHolder querySummaryWriter = null;
private WriterThread writerThread;
private AtomicBoolean stopped = new AtomicBoolean(false);
private Path historyParentPath;
private Path taskHistoryParentPath;
private String processName;
private TajoConf tajoConf;
private HistoryCleaner historyCleaner;
private boolean isMaster;
private short queryReplication;
private short taskReplication;
public HistoryWriter(String processName, boolean isMaster) {
super(HistoryWriter.class.getName() + ":" + processName);
this.processName = processName.replaceAll(":", "_").toLowerCase();
this.isMaster = isMaster;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
if (!(conf instanceof TajoConf)) {
throw new IllegalArgumentException("conf should be a TajoConf type.");
}
tajoConf = (TajoConf) conf;
historyParentPath = tajoConf.getQueryHistoryDir(tajoConf);
taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
writerThread = new WriterThread();
historyCleaner = new HistoryCleaner(tajoConf, isMaster);
queryReplication = (short) tajoConf.getInt(HISTORY_QUERY_REPLICATION,
FileSystem.get(tajoConf).getDefaultReplication(historyParentPath));
taskReplication = (short) tajoConf.getInt(HISTORY_TASK_REPLICATION,
FileSystem.get(tajoConf).getDefaultReplication(taskHistoryParentPath));
super.serviceInit(conf);
}
@Override
public void serviceStop() throws Exception {
if(stopped.getAndSet(true)){
return;
}
for (WriterHolder eachWriter : taskWriters.values()) {
IOUtils.cleanup(LOG, eachWriter);
}
taskWriters.clear();
writerThread.interrupt();
IOUtils.cleanup(LOG, querySummaryWriter);
if (historyCleaner != null) {
historyCleaner.doStop();
}
super.serviceStop();
}
@Override
public void serviceStart() throws Exception {
writerThread.start();
historyCleaner.start();
}
/* asynchronously append to history file */
public WriterFuture<WriterHolder> appendHistory(History history) {
WriterFuture<WriterHolder> future = new WriterFuture<>(history);
historyQueue.add(future);
return future;
}
public void appendHistory(WriterFuture<WriterHolder> future) {
historyQueue.add(future);
synchronized (writerThread) {
writerThread.notifyAll();
}
}
/* asynchronously flush to history file */
public WriterFuture<WriterHolder> appendAndFlush(History history) {
WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history) {
public void done(WriterHolder holder) {
try {
if (holder != null) holder.flush();
super.done(holder);
} catch (IOException e) {
super.failed(e);
}
}
};
historyQueue.add(future);
synchronized (writerThread) {
writerThread.notifyAll();
}
return future;
}
/* synchronously flush to history file */
public void appendAndSync(History history)
throws TimeoutException, InterruptedException, IOException {
WriterFuture<WriterHolder> future = appendAndFlush(history);
future.get(5, TimeUnit.SECONDS);
if(!future.isSucceed()){
throw new IOException(future.getError());
}
}
/**
* flush all task histories
*/
public WriterFuture flushTaskHistories() {
WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(null) {
public void done(WriterHolder holder) {
for(WriterHolder writerHolder : taskWriters.values()) {
try {
writerHolder.flush();
} catch (IOException e) {
super.failed(e);
}
}
super.done(null);
}
};
historyQueue.add(future);
synchronized (writerThread) {
writerThread.notifyAll();
}
return future;
}
public static FileSystem getNonCrcFileSystem(Path path, Configuration conf) throws IOException {
// https://issues.apache.org/jira/browse/HADOOP-7844
// If FileSystem is a local and CheckSumFileSystem, flushing doesn't touch file length.
// So HistoryReader can't read until closing the file.
FileSystem fs = path.getFileSystem(conf);
if (path.toUri().getScheme().equals("file")) {
fs.setWriteChecksum(false);
}
return fs;
}
public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId, long startTime) {
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
Path datePath = new Path(historyParentPath, df.format(startTime) + "/" + QUERY_DETAIL);
return new Path(datePath, queryId + "/query" + HISTORY_FILE_POSTFIX);
}
public static Path getQueryHistoryFilePath(Path historyParentPath, String queryId) {
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
Path datePath;
try {
String[] tokens = queryId.split("_");
//q_1412483083972_0005 = q_<timestamp>_<seq>
if (tokens.length == 3) {
datePath = new Path(historyParentPath, df.format(tokens[1]) + "/" + QUERY_DETAIL);
} else {
datePath = new Path(historyParentPath, df.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL);
}
} catch (Exception e) {
datePath = new Path(historyParentPath, df.format(new Date(System.currentTimeMillis())) + "/" + QUERY_DETAIL);
}
return new Path(datePath, queryId + "/query" + HISTORY_FILE_POSTFIX);
}
class WriterThread extends Thread {
public void run() {
LOG.info("HistoryWriter_" + processName + " started.");
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
while (!stopped.get() && !Thread.interrupted()) {
List<WriterFuture<WriterHolder>> histories = Lists.newArrayList();
try {
drainHistory(histories, 100, 1000);
} catch (InterruptedException e) {
if (stopped.get()) break;
}
try {
if (!stopped.get() && !histories.isEmpty()) {
writeHistory(histories);
} else {
continue;
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
//clean up history file
// closing previous writer
Calendar cal = Calendar.getInstance();
cal.add(Calendar.HOUR_OF_DAY, -2);
String closeTargetTime = df.format(cal.getTime());
List<String> closingTargets = new ArrayList<>();
for (String eachWriterTime : taskWriters.keySet()) {
if (eachWriterTime.compareTo(closeTargetTime) <= 0) {
closingTargets.add(eachWriterTime);
}
}
for (String eachWriterTime : closingTargets) {
WriterHolder writerHolder;
writerHolder = taskWriters.remove(eachWriterTime);
if (writerHolder != null) {
LOG.info("Closing task history file: " + writerHolder.path);
IOUtils.cleanup(LOG, writerHolder);
}
}
}
LOG.info("HistoryWriter_" + processName + " stopped.");
}
private int drainHistory(Collection<WriterFuture<WriterHolder>> buffer, int numElements,
long timeoutMillis) throws InterruptedException {
long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
int added = 0;
while (added < numElements) {
added += historyQueue.drainTo(buffer, numElements - added);
if (added < numElements) { // not enough elements immediately available; will have to wait
WriterFuture<WriterHolder> e = historyQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
if (e == null) {
break; // we already waited enough, and there are no more elements in sight
}
buffer.add(e);
added++;
}
}
return added;
}
private List<WriterFuture<WriterHolder>> writeHistory(List<WriterFuture<WriterHolder>> histories) {
if (histories.isEmpty()) {
return histories;
}
for (WriterFuture<WriterHolder> future : histories) {
History history = future.getHistory();
if(history == null) {
future.done(null);
} else {
switch (history.getHistoryType()) {
case TASK:
try {
future.done(writeTaskHistory((TaskHistory) history));
} catch (Throwable e) {
LOG.error("Error while saving task history: " +
((TaskHistory) history).getTaskAttemptId() + ":" + e.getMessage(), e);
future.failed(e);
}
break;
case QUERY:
try {
writeQueryHistory((QueryHistory) history);
future.done(null);
} catch (Throwable e) {
LOG.error("Error while saving query history: " +
((QueryHistory) history).getQueryId() + ":" + e.getMessage(), e);
future.failed(e);
}
break;
case QUERY_SUMMARY:
try {
future.done(writeQuerySummary((QueryInfo) history));
} catch (Throwable e) {
LOG.error("Error while saving query summary: " +
((QueryInfo) history).getQueryId() + ":" + e.getMessage(), e);
future.failed(e);
}
break;
default:
LOG.warn("Wrong history type: " + history.getHistoryType());
}
}
}
return histories;
}
private void writeQueryHistory(QueryHistory queryHistory) throws Exception {
// QueryMaster's query detail history (json format)
// <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/query.hist
// QueryMaster's stage detail history(proto binary format)
// <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/<EB_ID>.hist
Path queryHistoryFile = getQueryHistoryFilePath(historyParentPath, queryHistory.getQueryId());
FileSystem fs = getNonCrcFileSystem(queryHistoryFile, tajoConf);
if (!fs.exists(queryHistoryFile.getParent())) {
if (!fs.mkdirs(queryHistoryFile.getParent())) {
LOG.error("Can't make QueryHistory dir: " + queryHistoryFile.getParent());
return;
}
}
FSDataOutputStream out = null;
try {
LOG.info("Saving query summary: " + queryHistoryFile);
out = fs.create(queryHistoryFile, queryReplication);
out.write(queryHistory.toJson().getBytes(Bytes.UTF8_CHARSET));
} finally {
IOUtils.cleanup(LOG, out);
}
if (queryHistory.getStageHistories() != null) {
for (StageHistory stageHistory : queryHistory.getStageHistories()) {
Path path = new Path(queryHistoryFile.getParent(), stageHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX);
out = null;
try {
out = fs.create(path, queryReplication);
out.write(stageHistory.toTasksJson().getBytes(Bytes.UTF8_CHARSET));
LOG.info("Saving query unit: " + path);
} finally {
IOUtils.cleanup(LOG, out);
}
}
}
}
private WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception {
if(stopped.get()) return null;
// writing to HDFS and rolling hourly
if (querySummaryWriter == null) {
querySummaryWriter = new WriterHolder();
rollingQuerySummaryWriter();
} else {
if (querySummaryWriter.out == null) {
rollingQuerySummaryWriter();
} else if (System.currentTimeMillis() - querySummaryWriter.lastWritingTime >= 60 * 60 * 1000) {
LOG.info("Close query history file: " + querySummaryWriter.path);
IOUtils.cleanup(LOG, querySummaryWriter);
rollingQuerySummaryWriter();
}
}
byte[] jsonBytes = ("\n" + queryInfo.toJson() + "\n").getBytes(Bytes.UTF8_CHARSET);
try {
querySummaryWriter.out.writeInt(jsonBytes.length);
querySummaryWriter.out.write(jsonBytes);
} catch (IOException ie) {
IOUtils.cleanup(LOG, querySummaryWriter);
querySummaryWriter.out = null;
throw ie;
}
return querySummaryWriter;
}
private void rollingQuerySummaryWriter() throws Exception {
// finding largest file sequence
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
String currentDateTime = df.format(new Date(System.currentTimeMillis()));
Path datePath = new Path(historyParentPath, currentDateTime.substring(0, 8) + "/" + QUERY_LIST);
FileSystem fs = getNonCrcFileSystem(datePath, tajoConf);
if (!fs.exists(datePath)) {
if (!fs.mkdirs(datePath)) {
LOG.error("Can't make QueryList history dir: " + datePath.getParent());
return;
}
}
Path historyFile = new Path(datePath, QUERY_LIST + "-" + currentDateTime.substring(8, 14) + HISTORY_FILE_POSTFIX);
querySummaryWriter.path = historyFile;
querySummaryWriter.lastWritingTime = System.currentTimeMillis();
LOG.info("Create query history file: " + historyFile);
querySummaryWriter.out = fs.create(historyFile, queryReplication);
}
private void flushTaskHistories() {
for (WriterHolder holder : taskWriters.values()) {
try {
holder.flush();
} catch (IOException e) {
LOG.warn(e, e);
}
}
}
private WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception {
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
String taskStartTime = df.format(new Date(taskHistory.getStartTime()));
// taskWriters variable has three Writer(currentTime-2, currentTime-1, currentTime)
// because Task history writer is rolled by hourly
WriterHolder writerHolder = taskWriters.get(taskStartTime);
if (writerHolder == null) {
writerHolder = new WriterHolder();
writerHolder.out = createTaskHistoryFile(taskStartTime, writerHolder);
taskWriters.put(taskStartTime, writerHolder);
}
writerHolder.lastWritingTime = System.currentTimeMillis();
if (writerHolder.out != null) {
try {
byte[] taskHistoryBytes = taskHistory.getProto().toByteArray();
writerHolder.out.writeInt(taskHistoryBytes.length);
writerHolder.out.write(taskHistoryBytes);
} catch (IOException ie) {
taskWriters.remove(taskStartTime);
IOUtils.cleanup(LOG, writerHolder);
throw ie;
}
}
return writerHolder;
}
private FSDataOutputStream createTaskHistoryFile(String taskStartTime, WriterHolder writerHolder) throws IOException {
FileSystem fs = getNonCrcFileSystem(taskHistoryParentPath, tajoConf);
Path path = getQueryTaskHistoryPath(fs, taskHistoryParentPath, processName, taskStartTime);
if (!fs.exists(path)) {
if (!fs.mkdirs(path.getParent())) {
LOG.error("Can't make Query history directory: " + path);
return null;
}
}
writerHolder.path = path;
return fs.create(path, false, 4096, taskReplication, fs.getDefaultBlockSize(path));
}
}
public static Path getQueryTaskHistoryPath(FileSystem fs, Path parent,
String processName, String taskStartTime) throws IOException {
// <tajo.task-history.path>/<yyyyMMdd>/tasks/<WORKER_HOST>_<WORKER_PORT>/<WORKER_HOST>_<WORKER_PORT>_<HH>_<seq>.hist
// finding largest sequence path
Path fileParent = new Path(parent, taskStartTime.substring(0, 8) + "/tasks/" + processName);
String hour = taskStartTime.substring(8, 10);
int maxSeq = -1;
if (!fs.exists(fileParent)) {
maxSeq++;
return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX);
}
if (!fs.isDirectory(fileParent)) {
throw new IOException("Task history path is not directory: " + fileParent);
}
FileStatus[] files = fs.listStatus(fileParent);
if (files != null) {
for (FileStatus eachFile: files) {
String[] nameTokens = eachFile.getPath().getName().split("_");
if (nameTokens.length != 4) {
continue;
}
if (nameTokens[2].equals(hour)) {
int prefixIndex = nameTokens[3].indexOf(".");
if (prefixIndex > 0) {
try {
int fileSeq = Integer.parseInt(nameTokens[3].substring(0, prefixIndex));
if (fileSeq > maxSeq) {
maxSeq = fileSeq;
}
} catch (NumberFormatException e) {
}
}
}
}
}
maxSeq++;
return new Path(fileParent, processName + "_" + hour + "_" + maxSeq + HISTORY_FILE_POSTFIX);
}
public static class WriterHolder implements Closeable {
long lastWritingTime;
Path path;
FSDataOutputStream out;
@Override
public void close() throws IOException {
if (out != null) out.close();
}
/*
* Sync buffered data to DataNodes or disks (flush to disk devices).
*/
private void flush() throws IOException {
if (out != null) out.hsync();
}
}
public static class WriterFuture<T> implements Future<T> {
private boolean done = false;
private T result;
private History history;
private Throwable error;
private CountDownLatch latch = new CountDownLatch(1);
public WriterFuture(History history) {
this.history = history;
}
private History getHistory() {
return history;
}
public void done(T t) {
this.result = t;
this.done = true;
this.latch.countDown();
}
public void failed(Throwable e) {
this.error = e;
done(null);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO - to be implemented
throw new UnsupportedOperationException();
}
@Override
public boolean isCancelled() {
// TODO - to be implemented
throw new UnsupportedOperationException();
}
@Override
public boolean isDone() {
return done;
}
public boolean isSucceed() {
return error == null;
}
public Throwable getError() {
return error;
}
@Override
public T get() throws InterruptedException {
this.latch.await();
return result;
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (latch.await(timeout, unit)) {
return result;
} else {
throw new TimeoutException();
}
}
}
}