blob: 305244108753305d7fcec6efeb06199059931c64 [file] [log] [blame]
package org.apache.helix.tools.commandtools;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.IdealState.IdealStateProperty;
import org.apache.helix.util.HelixUtil;
public class ZkLogCSVFormatter {
private static final ZNRecordSerializer _deserializer = new ZNRecordSerializer();
private static String _fieldDelim = ",";
/**
* @param args
*/
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("USAGE: ZkLogCSVFormatter log_file output_dir");
System.exit(2);
}
File outputDir = new File(args[1]);
if (!outputDir.exists() || !outputDir.isDirectory()) {
System.err.println(outputDir.getAbsolutePath() + " does NOT exist or is NOT a directory");
System.exit(2);
}
format(args[0], args[1]);
}
private static void formatter(BufferedWriter bw, String... args) {
StringBuffer sb = new StringBuffer();
if (args.length == 0) {
return;
} else {
sb.append(args[0]);
for (int i = 1; i < args.length; i++) {
sb.append(_fieldDelim).append(args[i]);
}
}
try {
bw.write(sb.toString());
bw.newLine();
// System.out.println(sb.toString());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static String getAttributeValue(String line, String attribute) {
String[] parts = line.split("\\s");
if (parts != null && parts.length > 0) {
for (int i = 0; i < parts.length; i++) {
if (parts[i].startsWith(attribute)) {
String val = parts[i].substring(attribute.length());
return val;
}
}
}
return null;
}
private static void format(String logfilepath, String outputDir) throws FileNotFoundException {
try {
// input file
FileInputStream fis = new FileInputStream(logfilepath);
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
// output files
FileOutputStream isFos = new FileOutputStream(outputDir + "/" + "idealState.csv");
BufferedWriter isBw = new BufferedWriter(new OutputStreamWriter(isFos));
FileOutputStream cfgFos = new FileOutputStream(outputDir + "/" + "config.csv");
BufferedWriter cfgBw = new BufferedWriter(new OutputStreamWriter(cfgFos));
FileOutputStream evFos = new FileOutputStream(outputDir + "/" + "externalView.csv");
BufferedWriter evBw = new BufferedWriter(new OutputStreamWriter(evFos));
FileOutputStream smdCntFos =
new FileOutputStream(outputDir + "/" + "stateModelDefStateCount.csv");
BufferedWriter smdCntBw = new BufferedWriter(new OutputStreamWriter(smdCntFos));
FileOutputStream smdNextFos =
new FileOutputStream(outputDir + "/" + "stateModelDefStateNext.csv");
BufferedWriter smdNextBw = new BufferedWriter(new OutputStreamWriter(smdNextFos));
FileOutputStream csFos = new FileOutputStream(outputDir + "/" + "currentState.csv");
BufferedWriter csBw = new BufferedWriter(new OutputStreamWriter(csFos));
FileOutputStream msgFos = new FileOutputStream(outputDir + "/" + "messages.csv");
BufferedWriter msgBw = new BufferedWriter(new OutputStreamWriter(msgFos));
FileOutputStream hrPerfFos =
new FileOutputStream(outputDir + "/" + "healthReportDefaultPerfCounters.csv");
BufferedWriter hrPerfBw = new BufferedWriter(new OutputStreamWriter(hrPerfFos));
FileOutputStream liFos = new FileOutputStream(outputDir + "/" + "liveInstances.csv");
BufferedWriter liBw = new BufferedWriter(new OutputStreamWriter(liFos));
formatter(cfgBw, "timestamp", "instanceName", "host", "port", "enabled");
formatter(isBw, "timestamp", "resourceName", "partitionNumber", "mode", "partition",
"instanceName", "priority");
formatter(evBw, "timestamp", "resourceName", "partition", "instanceName", "state");
formatter(smdCntBw, "timestamp", "stateModel", "state", "count");
formatter(smdNextBw, "timestamp", "stateModel", "from", "to", "next");
formatter(liBw, "timestamp", "instanceName", "sessionId", "Operation");
formatter(csBw, "timestamp", "resourceName", "partition", "instanceName", "sessionId",
"state");
formatter(msgBw, "timestamp", "resourceName", "partition", "instanceName", "sessionId",
"from", "to", "messageType", "messageState");
formatter(hrPerfBw, "timestamp", "instanceName", "availableCPUs", "averageSystemLoad",
"freeJvmMemory", "freePhysicalMemory", "totalJvmMemory");
Map<String, ZNRecord> liveInstanceSessionMap = new HashMap<String, ZNRecord>();
int pos;
String inputLine;
while ((inputLine = br.readLine()) != null) {
if (inputLine.indexOf("CONFIGS") != -1) {
pos = inputLine.indexOf("CONFIGS");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
formatter(cfgBw, timestamp, record.getId(), record.getSimpleField("HOST"),
record.getSimpleField("PORT"), record.getSimpleField("ENABLED"));
}
} else if (inputLine.indexOf("IDEALSTATES") != -1) {
pos = inputLine.indexOf("IDEALSTATES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
// System.out.println("record=" + record);
for (String partition : record.getListFields().keySet()) {
List<String> preferenceList = record.getListFields().get(partition);
for (int i = 0; i < preferenceList.size(); i++) {
String instance = preferenceList.get(i);
formatter(isBw, timestamp, record.getId(),
record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString()),
record.getSimpleField(IdealStateProperty.REBALANCE_MODE.toString()), partition,
instance, Integer.toString(i));
}
}
}
} else if (inputLine.indexOf("LIVEINSTANCES") != -1) {
pos = inputLine.indexOf("LIVEINSTANCES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"), "ADD");
String zkSessionId = getAttributeValue(inputLine, "session:");
if (zkSessionId == null) {
System.err.println("no zk session id associated with the adding of live instance: "
+ inputLine);
} else {
liveInstanceSessionMap.put(zkSessionId, record);
}
}
} else if (inputLine.indexOf("EXTERNALVIEW") != -1) {
pos = inputLine.indexOf("EXTERNALVIEW");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
// System.out.println("record=" + record);
for (String partition : record.getMapFields().keySet()) {
Map<String, String> stateMap = record.getMapFields().get(partition);
for (String instance : stateMap.keySet()) {
String state = stateMap.get(instance);
formatter(evBw, timestamp, record.getId(), partition, instance, state);
}
}
}
} else if (inputLine.indexOf("STATEMODELDEFS") != -1) {
pos = inputLine.indexOf("STATEMODELDEFS");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
for (String stateInfo : record.getMapFields().keySet()) {
if (stateInfo.endsWith(".meta")) {
Map<String, String> metaMap = record.getMapFields().get(stateInfo);
formatter(smdCntBw, timestamp, record.getId(),
stateInfo.substring(0, stateInfo.indexOf('.')), metaMap.get("count"));
} else if (stateInfo.endsWith(".next")) {
Map<String, String> nextMap = record.getMapFields().get(stateInfo);
for (String destState : nextMap.keySet()) {
formatter(smdNextBw, timestamp, record.getId(),
stateInfo.substring(0, stateInfo.indexOf('.')), destState,
nextMap.get(destState));
}
}
}
}
} else if (inputLine.indexOf("CURRENTSTATES") != -1) {
pos = inputLine.indexOf("CURRENTSTATES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
// System.out.println("record=" + record);
for (String partition : record.getMapFields().keySet()) {
Map<String, String> stateMap = record.getMapFields().get(partition);
String path = getAttributeValue(inputLine, "path:");
if (path != null) {
String instance = HelixUtil.getInstanceNameFromPath(path);
formatter(csBw, timestamp, record.getId(), partition, instance,
record.getSimpleField("SESSION_ID"), stateMap.get("CURRENT_STATE"));
}
}
}
} else if (inputLine.indexOf("MESSAGES") != -1) {
pos = inputLine.indexOf("MESSAGES");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
formatter(msgBw, timestamp, record.getSimpleField("RESOURCE_NAME"),
record.getSimpleField("PARTITION_NAME"), record.getSimpleField("TGT_NAME"),
record.getSimpleField("TGT_SESSION_ID"), record.getSimpleField("FROM_STATE"),
record.getSimpleField("TO_STATE"), record.getSimpleField("MSG_TYPE"),
record.getSimpleField("MSG_STATE"));
}
} else if (inputLine.indexOf("closeSession") != -1) {
String zkSessionId = getAttributeValue(inputLine, "session:");
if (zkSessionId == null) {
System.err.println("no zk session id associated with the closing of zk session: "
+ inputLine);
} else {
ZNRecord record = liveInstanceSessionMap.remove(zkSessionId);
// System.err.println("zkSessionId:" + zkSessionId + ", record:" + record);
if (record != null) {
String timestamp = getAttributeValue(inputLine, "time:");
formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"),
"DELETE");
}
}
} else if (inputLine.indexOf("HEALTHREPORT/defaultPerfCounters") != -1) {
pos = inputLine.indexOf("HEALTHREPORT/defaultPerfCounters");
pos = inputLine.indexOf("data:{", pos);
if (pos != -1) {
String timestamp = getAttributeValue(inputLine, "time:");
ZNRecord record =
(ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5).getBytes());
String path = getAttributeValue(inputLine, "path:");
if (path != null) {
String instance = HelixUtil.getInstanceNameFromPath(path);
formatter(hrPerfBw, timestamp, instance, record.getSimpleField("availableCPUs"),
record.getSimpleField("averageSystemLoad"),
record.getSimpleField("freeJvmMemory"),
record.getSimpleField("freePhysicalMemory"),
record.getSimpleField("totalJvmMemory"));
}
}
}
}
br.close();
isBw.close();
cfgBw.close();
evBw.close();
smdCntBw.close();
smdNextBw.close();
csBw.close();
msgBw.close();
liBw.close();
hrPerfBw.close();
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
}
}
}