[GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event in MR alternative for distcp (#3843)
[GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event in MR deprecation for distcp
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 2dfb4b7..220d95a 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -50,6 +50,7 @@
import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -143,7 +144,7 @@
this.jobsMapping = jobsMapping;
this.locks = locks;
this.metricContext = metricContext;
- eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.runtime").build();
+ eventSubmitter = new EventSubmitter.Builder(this.metricContext, JobMetrics.NAMESPACE).build();
}
private boolean isRetriggeringEnabled() {
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
index 33f4b56..b84acb1 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
@@ -20,6 +20,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -113,7 +114,7 @@
* @param tags Additional tags to add to the returned context.
* @return A {@link org.apache.gobblin.metrics.MetricContext} with the appropriate tags and parent.
*/
- public static MetricContext getMetricContext(State state, Class<?> klazz, List<Tag<?>> tags) {
+ public static MetricContext getMetricContext(State state, Class<?> klazz, Collection<Tag<?>> tags) {
int randomId = RAND.nextInt(Integer.MAX_VALUE);
List<Tag<?>> generatedTags = Lists.newArrayList();
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
index 3ce4c79..b836a56 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
@@ -21,14 +21,15 @@
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+
/**
* A class representing a dimension or property associated with a {@link Taggable}.
@@ -66,6 +67,7 @@
super(key, value);
}
+ @JsonCreator
public Tag(Map.Entry<? extends String, ? extends T> entry) {
super(entry);
}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 4805425..0fc4cba 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -17,16 +17,18 @@
package org.apache.gobblin.metrics.event;
+import java.util.List;
import java.util.Map;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import lombok.Getter;
+
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
-
-import lombok.Getter;
+import org.apache.gobblin.metrics.Tag;
/**
@@ -204,4 +206,11 @@
public TimingEvent getTimingEvent(String name) {
return new TimingEvent(this, name);
}
+
+ public List<Tag<?>> getTags() {
+ return this.metricContext.isPresent() ?
+ this.metricContext.get().getTags() :
+ null;
+ }
+
}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index a3c0a40..8a626f1 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -19,6 +19,8 @@
import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -55,9 +57,17 @@
}
public GobblinEventBuilder(String name, String namespace) {
+ this(name, namespace, Maps.newHashMap());
+ }
+
+ @JsonCreator
+ private GobblinEventBuilder(
+ @JsonProperty("name") String name,
+ @JsonProperty("namespace") String namespace,
+ @JsonProperty("metadata") Map<String, String> metadata) {
this.name = name;
this.namespace = namespace;
- metadata = Maps.newHashMap();
+ this.metadata = metadata;
}
public ImmutableMap<String, String> getMetadata() {
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index ff79122..7a0d4cf 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -128,7 +128,7 @@
super(name);
this.stopped = false;
this.submitter = submitter;
- this.startTime = System.currentTimeMillis();
+ this.startTime = getStartTime();
}
/**
@@ -138,6 +138,15 @@
stop(Maps.<String, String>newHashMap());
}
+
+ public Long getStartTime() {
+ return System.currentTimeMillis();
+ }
+
+ public Long getEndTime() {
+ return System.currentTimeMillis();
+ }
+
/**
* Stop the timer and submit the event, along with the additional metadata specified. If the timer was already stopped
* before, this is a no-op.
@@ -162,7 +171,7 @@
}
this.stopped = true;
- this.endTime = System.currentTimeMillis();
+ this.endTime = getEndTime();
this.duration = this.endTime - this.startTime;
this.metadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
index ac03e52..eb20d53 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
@@ -17,9 +17,16 @@
package org.apache.gobblin.metrics;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
* Unit tests for {@link Tag}.
@@ -34,6 +41,8 @@
private static final String PROJECT_VERSION_KEY = "project.version";
private static final int PROJECT_VERSION = 1;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
@Test
public void testTags() {
Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
@@ -44,4 +53,16 @@
Assert.assertEquals(projectVersionTag.getKey(), PROJECT_VERSION_KEY);
Assert.assertEquals(projectVersionTag.getValue().intValue(), PROJECT_VERSION);
}
+
+ @Test
+ public void testSerde() throws IOException {
+ Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
+ Tag<Integer> projectVersionTag = new Tag<Integer>(PROJECT_VERSION_KEY, PROJECT_VERSION);
+ List<Tag<?>> tags = Arrays.asList(jobIdTag, projectVersionTag);
+ String bytes = objectMapper.writeValueAsString(tags);
+
+ JavaType type = objectMapper.getTypeFactory().constructCollectionType(List.class, Tag.class);
+ List<Tag<?>> deserTags = objectMapper.readValue(bytes, type);
+ Assert.assertEquals(deserTags, tags);
+ }
}
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 8fe09e5..db27935 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -135,10 +135,6 @@
@Override
public void cancel() throws Exception {
- try {
- this.gobblinYarnAppLauncher.stop();
- } finally {
- super.cancel();
- }
+ this.gobblinYarnAppLauncher.stop();
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index fbd4294..690dc11 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -1027,7 +1027,7 @@
* Build the {@link EventSubmitter} for this class.
*/
private EventSubmitter buildEventSubmitter(List<? extends Tag<?>> tags) {
- return new EventSubmitter.Builder(this.runtimeMetricContext, "gobblin.runtime")
+ return new EventSubmitter.Builder(this.runtimeMetricContext, JobMetrics.NAMESPACE)
.addMetadata(Tag.toMap(Tag.tagValuesToString(tags))).build();
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 7ebb77d..d8ce6f1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -502,7 +502,7 @@
}
EventSubmitter.Builder eventSubmitterBuilder = new EventSubmitter.Builder(JobMetrics.get(this.jobId, new JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
- "gobblin.runtime");
+ JobMetrics.NAMESPACE);
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState, JobEvent.TASKS_SUBMITTED));
eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, "tasksCount", Integer.toString(tasks.size()));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 03146e1..bb8bac9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -42,6 +42,7 @@
*/
@Slf4j
public class JobMetrics extends GobblinMetrics {
+ public static final String NAMESPACE = "gobblin.runtime";
public static final CreatorTag DEFAULT_CREATOR_TAG = new CreatorTag( "driver");
protected final String jobName;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index f238ffc..182ce7d 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -44,6 +44,18 @@
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
/**
+ * Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs
+ * during testing
+ *
+ */
+ String GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = PREFIX + "job.metrics.suffix";
+ /**
+ * Default suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs
+ * is not empty because temporal is still in alpha stages, and should not accidentally affect a prod job
+ */
+ String DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = "-temporal";
+
+ /**
* Number of worker processes to spin up per task runner
* NOTE: If this size is too large, your container can OOM and halt execution unexpectedly. It's recommended not to touch
* this parameter
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 14a4bac..f7040f7 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -17,20 +17,24 @@
package org.apache.gobblin.temporal.ddm.launcher;
-import io.temporal.client.WorkflowOptions;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import lombok.extern.slf4j.Slf4j;
-
-import com.typesafe.config.ConfigFactory;
import org.apache.hadoop.fs.Path;
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.client.WorkflowOptions;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
@@ -84,6 +88,12 @@
wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree));
}
Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec)));
+
+ wuSpec.setTags(GobblinMetrics.getCustomTagsFromState(new State(jobProps)));
+ wuSpec.setMetricsSuffix(this.jobProps.getProperty(
+ GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
+ GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));
+
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)))
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 3b25971..5e3ba3d 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,15 +18,23 @@
package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.hadoop.fs.Path;
+
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
@@ -44,6 +52,8 @@
@NonNull private URI fileSystemUri;
@NonNull private String workUnitsDir;
@NonNull private Tuning tuning = Tuning.DEFAULT;
+ @NonNull private List<Tag<?>> tags = new ArrayList<>();
+ @NonNull private String metricsSuffix = GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
@JsonIgnore // (because no-arg method resembles 'java bean property')
@Override
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index d425bbc..99c4d05 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -29,6 +29,7 @@
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
/** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
@@ -47,7 +48,7 @@
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl() };
+ return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl(), new SubmitGTEActivityImpl() };
}
@Override
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index ba2ccf9..eccad9b 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -19,6 +19,7 @@
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
+
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 141f220..531a018 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,7 +16,18 @@
*/
package org.apache.gobblin.temporal.ddm.workflow.impl;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.jetbrains.annotations.NotNull;
import com.typesafe.config.ConfigFactory;
@@ -25,17 +36,26 @@
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
-import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
@Slf4j
@@ -45,6 +65,32 @@
@Override
public int process(WUProcessingSpec workSpec) {
+ try {
+ EventSubmitterContext eventSubmitterContext = getEventSubmitterContext(workSpec);
+ TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
+ try (EventTimer timer = timerFactory.createJobTimer()) {
+ return performWork(workSpec);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @NotNull
+ private EventSubmitterContext getEventSubmitterContext(WUProcessingSpec workSpec)
+ throws IOException {
+ // NOTE: We are using the metrics tags from Job Props to create the metric context for the timer and NOT
+ // the deserialized jobState from HDFS that is created by the real distcp job. This is because the AZ runtime
+ // settings we want are for the job launcher that launched this Yarn job.
+ FileSystem fs = Help.loadFileSystemForce(workSpec);
+ JobState jobState = Help.loadJobStateUncached(workSpec, fs);
+ List<Tag<?>> tagsFromCurrentJob = workSpec.getTags();
+ String metricsSuffix = workSpec.getMetricsSuffix();
+ List<Tag<?>> tags = getTags(tagsFromCurrentJob, metricsSuffix, jobState);
+ return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+ }
+
+ private int performWork(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec);
int workunitsProcessed = processingWorkflow.performWorkload(
@@ -85,4 +131,34 @@
return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
}
+
+ private List<Tag<?>> getTags(List<Tag<?>> tagsFromCurJob, String metricsSuffix, JobState jobStateFromHdfs) {
+ // Construct new tags list by combining subset of tags on HDFS job state and the rest of the fields from the current job
+ Map<String, Tag<?>> tagsMap = new HashMap<>();
+ Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+ TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+ TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+ // Step 1, Add tags from the AZ props using the original job (the one that launched this yarn app)
+ tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+ // Step 2. Add tags from the jobState (the original MR job on HDFS)
+ List<String> targetKeysToAddSuffix = Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+ .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+ .forEach(tag -> {
+ // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND FLOW_GROUP_FIELDS to prevent collisions when testing
+ String value = targetKeysToAddSuffix.contains(tag.getKey())
+ ? tag.getValue() + metricsSuffix
+ : String.valueOf(tag.getValue());
+ tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+ });
+
+ // Step 3: Overwrite any pre-existing metadata with name of the current caller
+ tagsMap.put(GobblinMetricsKeys.CLASS_META, new Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+ return new ArrayList<>(tagsMap.values());
+ }
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index c6478b9..f7d7255 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -23,10 +23,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
-import lombok.extern.slf4j.Slf4j;
+import io.temporal.workflow.Workflow;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
@@ -55,8 +56,9 @@
* </p>
*/
@Alpha
-@Slf4j
public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
+ private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class);
+
protected WorkflowServiceStubs workflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 0dcf19a..6bef7a6 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -25,7 +25,6 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -33,6 +32,7 @@
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
@@ -109,7 +109,7 @@
String thisWorkflowId = Workflow.getInfo().getWorkflowId();
String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" + childAddr;
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
- .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+ .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
.setWorkflowId(childWorkflowId)
.build();
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
index 2bcf421..c653ce3 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
@@ -20,13 +20,19 @@
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
@WorkflowInterface
public interface GreetingWorkflow {
/**
* This is the method that is executed when the Workflow Execution is started. The Workflow
* Execution completes when this method finishes execution.
+ *
+ * This method also shows an example of metrics emission using the {@link EventSubmitter} seen in
+ * non-Temporal Gobblin code.
*/
@WorkflowMethod
- String getGreeting(String name);
+ String getGreeting(String name, EventSubmitterContext eventSubmitterContext);
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 774b77e..9d26360 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -19,11 +19,19 @@
import java.time.Duration;
+import org.slf4j.Logger;
+
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
public class GreetingWorkflowImpl implements GreetingWorkflow {
+ private final Logger LOG = Workflow.getLogger(GreetingWorkflowImpl.class);
+
/*
* At least one of the following options needs to be defined:
* - setStartToCloseTimeout
@@ -41,16 +49,19 @@
*
* The activity options that were defined above are passed in as a parameter.
*/
- private final FormatActivity activity = Workflow.newActivityStub(FormatActivity.class, options);
+ private final FormatActivity formatActivity = Workflow.newActivityStub(FormatActivity.class, options);
// This is the entry point to the Workflow.
@Override
- public String getGreeting(String name) {
-
+ public String getGreeting(String name, EventSubmitterContext eventSubmitterContext) {
/**
- * If there were other Activity methods they would be orchestrated here or from within other Activities.
- * This is a blocking call that returns only after the activity has completed.
+ * Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation.
*/
- return activity.composeGreeting(name);
+ TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
+ try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
+ LOG.info("Executing getGreeting");
+ timer.addMetadata("name", name);
+ return formatActivity.composeGreeting(name);
+ }
}
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 33183b6..5d196fa 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -33,6 +33,7 @@
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
/**
@@ -56,6 +57,9 @@
public void submitJob(List<WorkUnit> workunits) {
WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(queueName).build();
GreetingWorkflow greetingWorkflow = this.client.newWorkflowStub(GreetingWorkflow.class, options);
- greetingWorkflow.getGreeting("Gobblin");
+ EventSubmitterContext eventSubmitterContext = new EventSubmitterContext(this.eventSubmitter);
+
+ String greeting = greetingWorkflow.getGreeting("Gobblin", eventSubmitterContext);
+ log.info(greeting);
}
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
index 274daee..d5d01cc 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
@@ -22,6 +22,7 @@
import io.temporal.client.WorkflowClient;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
public class HelloWorldWorker extends AbstractTemporalWorker {
@@ -31,11 +32,16 @@
@Override
protected Class<?>[] getWorkflowImplClasses() {
- return new Class[] { GreetingWorkflowImpl.class };
+ return new Class[] {
+ GreetingWorkflowImpl.class,
+ };
}
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new FormatActivityImpl() };
+ return new Object[] {
+ new FormatActivityImpl(),
+ new SubmitGTEActivityImpl()
+ };
}
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
new file mode 100644
index 0000000..f0db7e7
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+
+/**
+ * Wrapper for sending the core essence of an {@link EventSubmitter} over the wire (e.g. metadata tags, namespace)
+ * This is in lieu of sending the entire {@link EventSubmitter} object over the wire, which is not serializable without
+ * losing some information, such as the gauges
+ */
+@Getter
+public class EventSubmitterContext {
+ private final List<Tag<?>> tags;
+ private final String namespace;
+ private final Class callerClass;
+
+ @JsonCreator
+ private EventSubmitterContext(
+ @JsonProperty("tags") List<Tag<?>> tags,
+ @JsonProperty("namespace") String namespace,
+ @JsonProperty("callerClass") Class callerClass) {
+ this.tags = tags;
+ this.namespace = namespace;
+ this.callerClass = callerClass;
+ }
+
+ public EventSubmitterContext(List<Tag<?>> tags, String namespace) {
+ // Explicitly send class over the wire to avoid any classloader issues
+ this(tags, namespace, tags.stream()
+ .filter(tag -> tag.getKey().equals(CLASS_META))
+ .findAny()
+ .map(tag -> (String) tag.getValue())
+ .map(EventSubmitterContext::resolveClass)
+ .orElse(EventSubmitterContext.class));
+ }
+
+ public EventSubmitterContext(EventSubmitter eventSubmitter) {
+ this(eventSubmitter.getTags(), eventSubmitter.getNamespace());
+ }
+
+ public EventSubmitter create() {
+ MetricContext metricContext = Instrumented.getMetricContext(new State(), callerClass, tags);
+ return new EventSubmitter.Builder(metricContext, namespace).build();
+ }
+
+ private static Class resolveClass(String canonicalClassName) {
+ try {
+ return Class.forName(canonicalClassName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
new file mode 100644
index 0000000..677baf4
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.Closeable;
+
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * <p> A timer that can be used to track the duration of an event. This event differs from the {@link TimingEvent} in that
+ * this class is not meant to be used outside of {@link io.temporal.workflow.Workflow} code. We cannot use {@link TimingEvent}
+ * because it is not serializable and cannot be passed to {@link io.temporal.workflow.Workflow} code due to the
+ * {@link EventSubmitter} field. It also relies on {@link System#currentTimeMillis()} which not compatible with {@link io.temporal.workflow.Workflow}
+ * since {@link System#currentTimeMillis()} is not deterministic.</p>
+ *
+ * <p> {@link EventSubmitter} is not easily serializable because the {@link MetricContext} field
+ * contains bi-directional relationships via the {@link org.apache.gobblin.metrics.InnerGauge}. Although it's possible
+ * to write a custom serializer for {@link EventSubmitter}, it creates a non-obvious sleight of hand where the EventSubmitter
+ * metadata will change when crossing {@link io.temporal.workflow.Workflow} or {@link io.temporal.activity.Activity} boundaries. </p>
+ *
+ * <p> It differs from {@link Closeable} because the close method does not throw {@link java.io.IOException}. {@link TimingEvent}
+ * does this but the issue is it does not implement an interface. Inheritance is not a good solution either because of the
+ * {@link EventSubmitter} member variable. </p>
+ */
+public interface EventTimer extends Closeable {
+ /**
+ * Add additional metadata that will be used for post-processing when the timer is stopped via {@link #stop()}
+ * @param key
+ * @param metadata
+ */
+ void addMetadata(String key, String metadata);
+
+ /**
+ * Stops the timer and execute any post-processing (e.g. event submission)
+ */
+ void stop();
+
+ default void close() {
+ stop();
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
new file mode 100644
index 0000000..aa975d1
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+@ActivityInterface
+public interface SubmitGTEActivity {
+ @ActivityMethod
+ void submitGTE(GobblinEventBuilder eventBuilder, EventSubmitterContext eventSubmitterContext);
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
new file mode 100644
index 0000000..63bce42
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import org.slf4j.Logger;
+
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+public class SubmitGTEActivityImpl implements SubmitGTEActivity {
+ private static Logger log = Workflow.getLogger(SubmitGTEActivityImpl.class);
+
+ @Override
+ public void submitGTE(GobblinEventBuilder eventBuilder, EventSubmitterContext eventSubmitterContext) {
+ eventSubmitterContext.create().submit(eventBuilder);
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
new file mode 100644
index 0000000..0f68f11
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.workflow.Workflow;
+import lombok.RequiredArgsConstructor;
+
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * Boiler plate for tracking elapsed time of events that is compatible with {@link Workflow}
+ * by using activities to record time
+ *
+ * This class is very similar to {@link TimingEvent} but uses {@link Workflow} compatible APIs. It's possible to refactor
+ * this class to inherit the {@link TimingEvent} but extra care would be needed to remove the {@link EventSubmitter} field
+ * since that class is not serializable without losing some information
+ */
+@RequiredArgsConstructor
+public class TemporalEventTimer implements EventTimer {
+ private final SubmitGTEActivity trackingEventActivity;
+ private final GobblinEventBuilder eventBuilder;
+ private final EventSubmitterContext eventSubmitterContext;
+ private final Instant startTime;
+
+ @Override
+ public void stop() {
+ stop(getCurrentTime());
+ }
+
+ @Override
+ public void addMetadata(String key, String metadata) {
+ this.eventBuilder.addMetadata(key, metadata);
+ }
+
+
+ private void stop(Instant endTime) {
+ this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, TimingEvent.METADATA_TIMING_EVENT);
+ this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, Long.toString(this.startTime.toEpochMilli()));
+ this.eventBuilder.addMetadata(TimingEvent.METADATA_END_TIME, Long.toString(endTime.toEpochMilli()));
+ Duration duration = Duration.between(this.startTime, endTime);
+ this.eventBuilder.addMetadata(TimingEvent.METADATA_DURATION, Long.toString(duration.toMillis()));
+
+ trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
+ }
+
+ private static Instant getCurrentTime() {
+ return Instant.ofEpochMilli(Workflow.currentTimeMillis());
+ }
+
+ public static class Factory {
+ private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder().build();
+ private final SubmitGTEActivity submitGTEActivity;
+ private final EventSubmitterContext eventSubmitterContext;
+
+ public Factory(EventSubmitterContext eventSubmitterContext) {
+ this(eventSubmitterContext, DEFAULT_OPTS);
+ }
+
+ public Factory(EventSubmitterContext eventSubmitterContext, ActivityOptions opts) {
+ this.submitGTEActivity = Workflow.newActivityStub(SubmitGTEActivity.class, opts);
+ this.eventSubmitterContext = eventSubmitterContext;
+ }
+
+ public TemporalEventTimer create(String eventName, Instant startTime) {
+ GobblinEventBuilder eventBuilder = new GobblinEventBuilder(eventName, eventSubmitterContext.getNamespace());
+ return new TemporalEventTimer(submitGTEActivity, eventBuilder, this.eventSubmitterContext, startTime);
+ }
+
+ public TemporalEventTimer create(String eventName) {
+ return create(eventName, getCurrentTime());
+ }
+
+ /**
+ * Utility for creating a timer that emits separate events at the start and end of a job. This imitates the behavior in
+ * {@link org.apache.gobblin.runtime.AbstractJobLauncher} and emits events that are compatible with the
+ * {@link org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor} to update GaaS flow statuses
+ *
+ * @return a timer that emits an event at the beginning of the job and a completion event ends at the end of the job
+ */
+ public TemporalEventTimer createJobTimer() {
+ TemporalEventTimer startTimer = create(TimingEvent.LauncherTimings.JOB_START);
+ startTimer.stop(Instant.EPOCH); // Emit start job event containing a stub end time
+ return create(TimingEvent.LauncherTimings.JOB_COMPLETE, startTimer.startTime);
+ }
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 829b97e..ab69545 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -17,21 +17,6 @@
package org.apache.gobblin.temporal.yarn;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -50,32 +35,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import lombok.AccessLevel;
-import lombok.Getter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
-import org.apache.gobblin.cluster.GobblinClusterUtils;
-import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
-import org.apache.gobblin.metrics.MultiReporterException;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
-import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
-import org.apache.gobblin.yarn.GobblinYarnEventConstants;
-import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
-import org.apache.gobblin.yarn.YarnHelixUtils;
-import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
-import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
-import org.apache.gobblin.yarn.event.NewContainerRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -107,6 +66,49 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnEventConstants;
+import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
/**
* This class is responsible for all Yarn-related stuffs including ApplicationMaster registration,
@@ -683,18 +685,10 @@
LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
- if (this.eventSubmitter.isPresent()) {
- this.eventSubmitter.get()
- .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
- }
+ // NOTE: logic for handling container failure is removed because original implementation relies on the auto scaling manager
+ // to control the number of containers by polling helix for the current number of tasks
+ // Without that integration, that code requests too many containers when there are exceptions and overloads yarn
}
- Optional<Resource> newContainerResource = completedContainerInfo != null ?
- Optional.of(completedContainerInfo.getContainer().getResource()) : Optional.absent();
- LOGGER.info("Requesting a new container to replace {} to run Helix instance {} with helix tag {} and resource {}",
- containerStatus.getContainerId(), completedInstanceName, helixTag, newContainerResource.orNull());
- this.eventBus.post(new NewContainerRequest(
- shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerInfo != null ?
- Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
}
private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo,
diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
new file mode 100644
index 0000000..91c960e
--- /dev/null
+++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+public class EventSubmitterContextTest {
+ private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final String NAMESPACE = "test-namespace";
+ @Test
+ public void testCreateEventSubmitter()
+ throws IOException {
+ List<Tag<?>> tags = Arrays.asList(new Tag<>("jobId", "stub"));
+ State state = new State();
+ state.setProp("someState", "stub");
+ MetricContext metricContext = Instrumented.getMetricContext(state, getClass(), tags);
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, NAMESPACE).build();
+ EventSubmitterContext eventSubmitterContext = new EventSubmitterContext(eventSubmitter);
+ byte[] asBytes = OBJECT_MAPPER.writeValueAsBytes(eventSubmitterContext);
+
+ EventSubmitterContext deserEventMetadata = OBJECT_MAPPER.readValue(asBytes, EventSubmitterContext.class);
+ EventSubmitter deserEventSubmitter = deserEventMetadata.create();
+ Assert.assertTrue(deserEventSubmitter.getTags().contains(tags.get(0)));
+ Assert.assertEquals(deserEventSubmitter.getNamespace(), NAMESPACE);
+
+ Map<? extends String, String> tagMap = Tag.toMap(Tag.tagValuesToString(eventSubmitter.getTags()));
+ Assert.assertEquals(tagMap.get(CLASS_META), this.getClass().getName());
+ Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_ID_TAG_NAME));
+ Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_NAME_TAG_NAME));
+ }
+}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 391b160..d76405a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -104,11 +104,14 @@
import org.apache.gobblin.cluster.GobblinHelixMessagingService;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.rest.JobExecutionInfoServer;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.util.AzkabanTags;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.EmailUtils;
@@ -121,6 +124,7 @@
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
+import static org.apache.gobblin.metrics.GobblinMetrics.METRICS_STATE_CUSTOM_TAGS;
import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
@@ -1051,8 +1055,15 @@
Schema schema = KafkaReporterUtils.getMetricReportSchema();
String schemaId = registry.register(schema, KafkaReporterUtils.getMetricsTopic(properties).get());
LOGGER.info("Adding schemaId {} for MetricReport to the config", schemaId);
- config = config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
- ConfigValueFactory.fromAnyRef(schemaId));
+ List<Tag<?>> tags = Lists.newArrayList();
+ tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+ GobblinMetrics.addCustomTagsToProperties(properties, tags);
+
+ config = config
+ .withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
+ ConfigValueFactory.fromAnyRef(schemaId))
+ .withValue(METRICS_STATE_CUSTOM_TAGS,
+ ConfigValueFactory.fromAnyRef(properties.getProperty(METRICS_STATE_CUSTOM_TAGS)));
}
return config;
}