PIG-4714: Improve logging across multiple components with callerId
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1745387 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 84299ab..47a6bd1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@
IMPROVEMENTS
+PIG-4714: Improve logging across multiple components with callerId (daijy)
+
PIG-4885: Turn off union optimizer if there is PARALLEL clause in union in Tez (rohini)
PIG-4894: Add API for StoreFunc to specify if they are write safe from two different vertices (rohini)
diff --git a/conf/pig.properties b/conf/pig.properties
index ee9ae6d..b960be2 100644
--- a/conf/pig.properties
+++ b/conf/pig.properties
@@ -557,6 +557,9 @@
#
hcat.bin=/usr/local/hcat/bin/hcat
+# Enable ATS hook to log the Pig specific ATS entry, disable only when ATS server is not deployed
+pig.ats.enabled=true
+
###########################################################################
#
# Overrides for extreme environments
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
new file mode 100644
index 0000000..07c5f49
--- /dev/null
+++ b/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import org.apache.pig.impl.PigContext;
+
+public class PigATSClient {
+ public static class ATSEvent {
+ public ATSEvent(String pigAuditId, String callerId) {
+ this.pigScriptId = pigAuditId;
+ this.callerId = callerId;
+ }
+ String callerId;
+ String pigScriptId;
+ }
+ private static PigATSClient instance;
+
+ public static synchronized PigATSClient getInstance() {
+ if (instance==null) {
+ instance = new PigATSClient();
+ }
+ return instance;
+ }
+
+ private PigATSClient() {
+ }
+
+ public static String getPigAuditId(PigContext context) {
+ return "";
+ }
+
+ synchronized public void logEvent(final ATSEvent event) {
+ }
+}
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java b/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
new file mode 100644
index 0000000..31ceeb2
--- /dev/null
+++ b/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PigATSClient {
+ public static class ATSEvent {
+ public ATSEvent(String pigAuditId, String callerId) {
+ this.pigScriptId = pigAuditId;
+ this.callerId = callerId;
+ }
+ String callerId;
+ String pigScriptId;
+ }
+ public static final String ENTITY_TYPE = "PIG_SCRIPT_ID";
+ public static final String ENTITY_CALLERID = "callerId";
+ public static final String CALLER_CONTEXT = "PIG";
+ public static final int AUDIT_ID_MAX_LENGTH = 128;
+
+ private static final Log log = LogFactory.getLog(PigATSClient.class.getName());
+ private static PigATSClient instance;
+ private static ExecutorService executor;
+ private TimelineClient timelineClient;
+
+ public static synchronized PigATSClient getInstance() {
+ if (instance==null) {
+ instance = new PigATSClient();
+ }
+ return instance;
+ }
+
+ private PigATSClient() {
+ if (executor == null) {
+ executor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
+ YarnConfiguration yarnConf = new YarnConfiguration();
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(yarnConf);
+ timelineClient.start();
+ }
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ timelineClient.stop();
+ executor.shutdownNow();
+ executor = null;
+ }
+ });
+ log.info("Created ATS Hook");
+ }
+
+ public static String getPigAuditId(PigContext context) {
+ String auditId;
+ if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != null) {
+ auditId = (String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID);
+ } else {
+ ScriptState ss = ScriptState.get();
+ String filename = ss.getFileName().isEmpty()?"default" : new File(ss.getFileName()).getName();
+ auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId();
+ }
+ return auditId.substring(0, Math.min(auditId.length(), AUDIT_ID_MAX_LENGTH));
+ }
+
+ synchronized public void logEvent(final ATSEvent event) {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId(event.pigScriptId);
+ entity.setEntityType(ENTITY_TYPE);
+ entity.addPrimaryFilter(ENTITY_CALLERID, event.callerId!=null?event.callerId : "default");
+ try {
+ timelineClient.putEntities(entity);
+ } catch (Exception e) {
+ log.info("Failed to submit plan to ATS: " + e.getMessage());
+ }
+ }
+ });
+ }
+}
diff --git a/src/org/apache/pig/PigConfiguration.java b/src/org/apache/pig/PigConfiguration.java
index 3cdbb6d..2b258aa 100644
--- a/src/org/apache/pig/PigConfiguration.java
+++ b/src/org/apache/pig/PigConfiguration.java
@@ -418,6 +418,16 @@
*/
public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = "pig.spill.unused.memory.threshold.size";
+ /**
+ * Log tracing id that can be used by upstream clients for tracking respective logs
+ */
+ public static final String CALLER_ID = "pig.log.trace.id";
+
+ /**
+ * Enable ATS for Pig
+ */
+ public static final String ENABLE_ATS = "pig.ats.enabled";
+
// Deprecated settings of Pig 0.13
/**
diff --git a/src/org/apache/pig/PigServer.java b/src/org/apache/pig/PigServer.java
index 732889a..0946cda 100644
--- a/src/org/apache/pig/PigServer.java
+++ b/src/org/apache/pig/PigServer.java
@@ -24,7 +24,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
+import java.io.PrintWriter;
import java.io.StringReader;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@@ -53,6 +57,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.PigATSClient;
import org.apache.pig.backend.hadoop.executionengine.HJob;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.classification.InterfaceAudience;
@@ -241,6 +246,54 @@
}
PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
+ // log ATS event includes the caller context
+ String auditId = PigATSClient.getPigAuditId(pigContext);
+ String callerId = (String)pigContext.getProperties().get(PigConfiguration.CALLER_ID);
+ log.info("Pig Script ID for the session: " + auditId);
+ if (callerId != null) {
+ log.info("Caller ID for session: " + callerId);
+ }
+ if (Boolean.parseBoolean(pigContext.getProperties()
+ .getProperty(PigConfiguration.ENABLE_ATS))) {
+ if (Boolean.parseBoolean(pigContext.getProperties()
+ .getProperty("yarn.timeline-service.enabled", "false"))) {
+ PigATSClient.ATSEvent event = new PigATSClient.ATSEvent(auditId, callerId);
+ try {
+ PigATSClient.getInstance().logEvent(event);
+ } catch (Exception e) {
+ log.warn("Error posting to ATS: ", e);
+ }
+ } else {
+ log.warn("ATS is disabled since"
+ + " yarn.timeline-service.enabled set to false");
+ }
+
+ }
+
+ // set hdfs caller context
+ Class callerContextClass = null;
+ try {
+ callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext");
+ } catch (ClassNotFoundException e) {
+ // If pre-Hadoop 2.8.0, skip setting CallerContext
+ }
+ if (callerContextClass != null) {
+ try {
+ // Reflection for the following code since it is only available since hadoop 2.8.0:
+ // CallerContext hdfsContext = new CallerContext.Builder(auditId).build();
+ // CallerContext.setCurrent(hdfsContext);
+ Class callerContextBuilderClass = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder");
+ Constructor callerContextBuilderConstruct = callerContextBuilderClass.getConstructor(String.class);
+ Object builder = callerContextBuilderConstruct.newInstance(auditId);
+ Method builderBuildMethod = builder.getClass().getMethod("build");
+ Object hdfsContext = builderBuildMethod.invoke(builder);
+ Method callerContextSetCurrentMethod = callerContextClass.getMethod("setCurrent", hdfsContext.getClass());
+ callerContextSetCurrentMethod.invoke(callerContextClass, hdfsContext);
+ } catch (Exception e) {
+ // Shall not happen unless API change in future Hadoop commons
+ throw new ExecException(e);
+ }
+ }
}
private void addHadoopProperties() throws ExecException {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
index 1c3b6cf..4502e3c 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
@@ -19,6 +19,7 @@
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -30,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.PigATSClient;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -107,6 +109,26 @@
}
DAG tezDag = buildDAG(tezPlanNode, localResources);
tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+ // set Tez caller context
+ // Reflection for the following code since it is only available since tez 0.8.1:
+ // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext),
+ // ATSService.EntityType, "");
+ // tezDag.setCallerContext(context);
+ Class callerContextClass = null;
+ try {
+ callerContextClass = Class.forName("org.apache.tez.client.CallerContext");
+ } catch (ClassNotFoundException e) {
+ // If pre-Tez 0.8.1, skip setting CallerContext
+ }
+ if (callerContextClass != null) {
+ Method builderBuildMethod = callerContextClass.getMethod("create", String.class,
+ String.class, String.class, String.class);
+ Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT,
+ PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, "");
+ Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext",
+ context.getClass());
+ dagSetCallerContext.invoke(tezDag, context);
+ }
log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
return new TezJob(tezConf, tezDag, localResources, tezPlan);
} catch (Exception e) {
diff --git a/src/org/apache/pig/impl/PigImplConstants.java b/src/org/apache/pig/impl/PigImplConstants.java
index 3baa70e..c665b42 100644
--- a/src/org/apache/pig/impl/PigImplConstants.java
+++ b/src/org/apache/pig/impl/PigImplConstants.java
@@ -79,4 +79,9 @@
* Pig log4j properties
*/
public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties";
+
+ /**
+ * A unique id for a Pig session used as callerId for underlining component
+ */
+ public static final String PIG_AUDIT_ID = "pig.script.id";
}
diff --git a/src/pig-default.properties b/src/pig-default.properties
index ed6475d..a29a4a0 100644
--- a/src/pig-default.properties
+++ b/src/pig-default.properties
@@ -61,4 +61,6 @@
pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
-pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
+
+pig.ats.enabled=true