STORM-3541 cleanup use of executor metric registry
diff --git a/docs/metrics_v2.md b/docs/metrics_v2.md
index dbb4b85..e05ee96 100644
--- a/docs/metrics_v2.md
+++ b/docs/metrics_v2.md
@@ -145,6 +145,5 @@
 }
 ```
 
-V2 metrics can also be reported to the Metrics Consumers registered with topology.metrics.consumer.register by enabling the 
-topology.enable.v2.metrics.tick configuration.
+V2 metrics can also be reported to the Metrics Consumers registered with topology.metrics.consumer.register by enabling the topology.enable.v2.metrics.tick configuration.
 
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 7fa892f..2f2d53b 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -232,7 +232,7 @@
 
     private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
         Map<String, Object> conf = workerData.getConf();
-        TopologyContext topologyContext = new TopologyContext(
+        return new TopologyContext(
             topology,
             workerData.getTopologyConf(),
             workerData.getTaskToComponent(),
@@ -252,8 +252,6 @@
             executor.getIntervalToTaskToMetricToRegistry(),
             executor.getOpenOrPrepareWasCalled(),
             workerData.getMetricRegistry());
-        executor.setMetricRegistry(workerData.getMetricRegistry());
-        return topologyContext;
     }
 
     private Object mkTaskObject() {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 89fa6e9..b404a2d 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -124,7 +124,6 @@
     protected ArrayList<Task> idToTask;
     protected int idToTaskBase;
     protected String hostname;
-    private StormMetricRegistry stormMetricRegistry;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
 
     protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
@@ -335,13 +334,11 @@
 
     // updates v1 metric dataPoints with v2 metric API data
     private void addV2Metrics(List<IMetricsConsumer.DataPoint> dataPoints) {
-        if (stormMetricRegistry == null) {
-            return;
-        }
         boolean enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
         if (!enableV2MetricsDataPoints) {
             return;
         }
+        StormMetricRegistry stormMetricRegistry = workerData.getMetricRegistry();
         for (Map.Entry<String, Gauge> entry : stormMetricRegistry.registry().getGauges().entrySet()) {
             String name = entry.getKey();
             Object v = entry.getValue().getValue();
@@ -349,7 +346,7 @@
                 IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(name, v);
                 dataPoints.add(dataPoint);
             } else {
-                LOG.warn("Cannot report {}, it's value is not a Number {}", name, v);
+                LOG.warn("Cannot report {}, its value is not a Number {}", name, v);
             }
         }
         for (Map.Entry<String, Counter> entry : stormMetricRegistry.registry().getCounters().entrySet()) {
@@ -646,9 +643,4 @@
     public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
         this.executorTransfer = executorTransfer;
     }
-
-    public void setMetricRegistry(StormMetricRegistry stormMetricRegistry) {
-        this.stormMetricRegistry = stormMetricRegistry;
-    }
-
 }