PIG-4714: Improve logging across multiple components with callerId

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1745387 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 84299ab..47a6bd1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@
 
 IMPROVEMENTS
 
+PIG-4714: Improve logging across multiple components with callerId (daijy)
+
 PIG-4885: Turn off union optimizer if there is PARALLEL clause in union in Tez (rohini)
 
 PIG-4894: Add API for StoreFunc to specify if they are write safe from two different vertices (rohini)
diff --git a/conf/pig.properties b/conf/pig.properties
index ee9ae6d..b960be2 100644
--- a/conf/pig.properties
+++ b/conf/pig.properties
@@ -557,6 +557,9 @@
 #
 hcat.bin=/usr/local/hcat/bin/hcat
 
+# Enable ATS hook to log the Pig specific ATS entry, disable only when ATS server is not deployed
+pig.ats.enabled=true
+
 ###########################################################################
 #
 # Overrides for extreme environments
diff --git a/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java b/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
new file mode 100644
index 0000000..07c5f49
--- /dev/null
+++ b/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import org.apache.pig.impl.PigContext;
+
+public class PigATSClient {
+    public static class ATSEvent {
+        public ATSEvent(String pigAuditId, String callerId) {
+            this.pigScriptId = pigAuditId;
+            this.callerId = callerId;
+        }
+        String callerId;
+        String pigScriptId;
+    }
+    private static PigATSClient instance;
+
+    public static synchronized PigATSClient getInstance() {
+        if (instance==null) {
+            instance = new PigATSClient();
+        }
+        return instance;
+    }
+
+    private PigATSClient() {
+    }
+
+    public static String getPigAuditId(PigContext context) {
+        return "";
+    }
+
+    synchronized public void logEvent(final ATSEvent event) {
+    }
+}
diff --git a/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java b/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
new file mode 100644
index 0000000..31ceeb2
--- /dev/null
+++ b/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PigATSClient {
+    public static class ATSEvent {
+        public ATSEvent(String pigAuditId, String callerId) {
+            this.pigScriptId = pigAuditId;
+            this.callerId = callerId;
+        }
+        String callerId;
+        String pigScriptId;
+    }
+    public static final String ENTITY_TYPE = "PIG_SCRIPT_ID";
+    public static final String ENTITY_CALLERID = "callerId";
+    public static final String CALLER_CONTEXT = "PIG";
+    public static final int AUDIT_ID_MAX_LENGTH = 128;
+
+    private static final Log log = LogFactory.getLog(PigATSClient.class.getName());
+    private static PigATSClient instance;
+    private static ExecutorService executor;
+    private TimelineClient timelineClient;
+
+    public static synchronized PigATSClient getInstance() {
+        if (instance==null) {
+            instance = new PigATSClient();
+        }
+        return instance;
+    }
+
+    private PigATSClient() {
+        if (executor == null) {
+            executor = Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
+            YarnConfiguration yarnConf = new YarnConfiguration();
+            timelineClient = TimelineClient.createTimelineClient();
+            timelineClient.init(yarnConf);
+            timelineClient.start();
+        }
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                timelineClient.stop();
+                executor.shutdownNow();
+                executor = null;
+            }
+        });
+        log.info("Created ATS Hook");
+    }
+
+    public static String getPigAuditId(PigContext context) {
+        String auditId;
+        if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != null) {
+            auditId = (String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID);
+        } else {
+            ScriptState ss = ScriptState.get();
+            String filename = ss.getFileName().isEmpty()?"default" : new File(ss.getFileName()).getName();
+            auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId();
+        }
+        return auditId.substring(0, Math.min(auditId.length(), AUDIT_ID_MAX_LENGTH));
+    }
+
+    synchronized public void logEvent(final ATSEvent event) {
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                TimelineEntity entity = new TimelineEntity();
+                entity.setEntityId(event.pigScriptId);
+                entity.setEntityType(ENTITY_TYPE);
+                entity.addPrimaryFilter(ENTITY_CALLERID, event.callerId!=null?event.callerId : "default");
+                try {
+                    timelineClient.putEntities(entity);
+                } catch (Exception e) {
+                    log.info("Failed to submit plan to ATS: " + e.getMessage());
+                }
+            }
+        });
+    }
+}
diff --git a/src/org/apache/pig/PigConfiguration.java b/src/org/apache/pig/PigConfiguration.java
index 3cdbb6d..2b258aa 100644
--- a/src/org/apache/pig/PigConfiguration.java
+++ b/src/org/apache/pig/PigConfiguration.java
@@ -418,6 +418,16 @@
      */
     public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = "pig.spill.unused.memory.threshold.size";
 
+    /**
+     * Log tracing id that can be used by upstream clients for tracking respective logs
+     */
+    public static final String CALLER_ID = "pig.log.trace.id";
+
+    /**
+     * Enable ATS for Pig
+     */
+    public static final String ENABLE_ATS = "pig.ats.enabled";
+
     // Deprecated settings of Pig 0.13
 
     /**
diff --git a/src/org/apache/pig/PigServer.java b/src/org/apache/pig/PigServer.java
index 732889a..0946cda 100644
--- a/src/org/apache/pig/PigServer.java
+++ b/src/org/apache/pig/PigServer.java
@@ -24,7 +24,11 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
+import java.io.PrintWriter;
 import java.io.StringReader;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -53,6 +57,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.HJob;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.classification.InterfaceAudience;
@@ -241,6 +246,54 @@
         }
         PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
 
+        // log ATS event includes the caller context
+        String auditId = PigATSClient.getPigAuditId(pigContext);
+        String callerId = (String)pigContext.getProperties().get(PigConfiguration.CALLER_ID);
+        log.info("Pig Script ID for the session: " + auditId);
+        if (callerId != null) {
+            log.info("Caller ID for session: " + callerId);
+        }
+        if (Boolean.parseBoolean(pigContext.getProperties()
+                .getProperty(PigConfiguration.ENABLE_ATS))) {
+            if (Boolean.parseBoolean(pigContext.getProperties()
+                    .getProperty("yarn.timeline-service.enabled", "false"))) {
+                PigATSClient.ATSEvent event = new PigATSClient.ATSEvent(auditId, callerId);
+                try {
+                    PigATSClient.getInstance().logEvent(event);
+                } catch (Exception e) {
+                    log.warn("Error posting to ATS: ", e);
+                }
+            } else {
+                log.warn("ATS is disabled since"
+                        + " yarn.timeline-service.enabled set to false");
+            }
+            
+        }
+
+        // set hdfs caller context
+        Class callerContextClass = null;
+        try {
+            callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext");
+        } catch (ClassNotFoundException e) {
+            // If pre-Hadoop 2.8.0, skip setting CallerContext
+        }
+        if (callerContextClass != null) {
+            try {
+                // Reflection for the following code since it is only available since hadoop 2.8.0:
+                // CallerContext hdfsContext = new CallerContext.Builder(auditId).build();
+                // CallerContext.setCurrent(hdfsContext);
+                Class callerContextBuilderClass = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder");
+                Constructor callerContextBuilderConstruct = callerContextBuilderClass.getConstructor(String.class);
+                Object builder = callerContextBuilderConstruct.newInstance(auditId);
+                Method builderBuildMethod = builder.getClass().getMethod("build");
+                Object hdfsContext = builderBuildMethod.invoke(builder);
+                Method callerContextSetCurrentMethod = callerContextClass.getMethod("setCurrent", hdfsContext.getClass());
+                callerContextSetCurrentMethod.invoke(callerContextClass, hdfsContext);
+            } catch (Exception e) {
+                // Shall not happen unless API change in future Hadoop commons
+                throw new ExecException(e);
+            }
+        }
     }
 
     private void addHadoopProperties() throws ExecException {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
index 1c3b6cf..4502e3c 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +31,7 @@
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -107,6 +109,26 @@
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+            // set Tez caller context
+            // Reflection for the following code since it is only available since tez 0.8.1:
+            // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext),
+            //     ATSService.EntityType, "");
+            // tezDag.setCallerContext(context);
+            Class callerContextClass = null;
+            try {
+                callerContextClass = Class.forName("org.apache.tez.client.CallerContext");
+            } catch (ClassNotFoundException e) {
+                // If pre-Tez 0.8.1, skip setting CallerContext
+            }
+            if (callerContextClass != null) {
+                Method builderBuildMethod = callerContextClass.getMethod("create", String.class,
+                        String.class, String.class, String.class);
+                Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT,
+                        PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, "");
+                Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext",
+                        context.getClass());
+                dagSetCallerContext.invoke(tezDag, context);
+            }
             log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
             return new TezJob(tezConf, tezDag, localResources, tezPlan);
         } catch (Exception e) {
diff --git a/src/org/apache/pig/impl/PigImplConstants.java b/src/org/apache/pig/impl/PigImplConstants.java
index 3baa70e..c665b42 100644
--- a/src/org/apache/pig/impl/PigImplConstants.java
+++ b/src/org/apache/pig/impl/PigImplConstants.java
@@ -79,4 +79,9 @@
      * Pig log4j properties
      */
     public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties";
+
+    /**
+     * A unique id for a Pig session used as callerId for underlining component
+     */
+    public static final String PIG_AUDIT_ID = "pig.script.id";
 }
diff --git a/src/pig-default.properties b/src/pig-default.properties
index ed6475d..a29a4a0 100644
--- a/src/pig-default.properties
+++ b/src/pig-default.properties
@@ -61,4 +61,6 @@
 
 pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
 
-pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
+
+pig.ats.enabled=true