blob: 397a46fde99dc06b2187c0efbf66a6bacef9b72b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.history.parser;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.Preconditions;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.history.logging.proto.HistoryEventProtoJsonConversion;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
/**
* Parser utility to parse data generated by ProtoHistoryLoggingService.
*/
public class ProtoHistoryParser extends SimpleHistoryParser {
private static final Logger LOG = LoggerFactory.getLogger(ProtoHistoryParser.class);
private List<File> protoFiles;
public ProtoHistoryParser(List<File> files) {
super(files);
this.protoFiles = files;
}
/**
* Get in-memory representation of DagInfo.
*
* @return DagInfo
* @throws TezException
*/
public DagInfo getDAGData(String dagId) throws TezException {
try {
Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please provide valid dagId");
dagId = dagId.trim();
parseContents(dagId);
linkParsedContents();
addRawDataToDagInfo();
return dagInfo;
} catch (IOException | JSONException e) {
LOG.error("Error in reading DAG ", e);
throw new TezException(e);
}
}
private void parseContents(String dagId)
throws JSONException, FileNotFoundException, TezException, IOException {
JSONObjectSource source = getJsonSource();
parse(dagId, source);
}
private JSONObjectSource getJsonSource() throws IOException {
final TezConfiguration conf = new TezConfiguration();
Iterator<File> fileIt = protoFiles.iterator();
JSONObjectSource source = new JSONObjectSource() {
private HistoryEventProto message = null;
private ProtoMessageReader<HistoryEventProto> reader = new ProtoMessageReader<>(conf,
new Path(fileIt.next().getPath()), HistoryEventProto.PARSER);
@Override
public JSONObject next() throws JSONException {
return HistoryEventProtoJsonConversion.convertToJson(message);
}
@Override
public boolean hasNext() throws IOException {
try {
message = (HistoryEventProto) reader.readEvent();
return message != null;
} catch (java.io.EOFException e) {
reader.close();
if (!fileIt.hasNext()) {
return false;
} else {
reader = new ProtoMessageReader<>(conf, new Path(fileIt.next().getPath()),
HistoryEventProto.PARSER);
try {
message = (HistoryEventProto) reader.readEvent();
return message != null;
} catch (java.io.EOFException e2) {
return false;
}
}
}
}
@Override
public void close() {
try {
reader.close();
} catch (IOException e) {
LOG.warn("error while closing ProtoMessageReader", e);
}
}
};
return source;
}
}