blob: 380a053f487fed9aa12108fbafa0a57926878392 [file] [log] [blame]
From 5f79f2a059534eaa63c0479a2142a250fa78c1e3 Mon Sep 17 00:00:00 2001
From: amaruthappan <alagappan.maruthappan@yahooinc.com>
Date: Mon, 8 May 2023 17:00:46 -0700
Subject: [PATCH 1/2] =?UTF-8?q?HIVE-23190:=20LLAP:=20modify=20IndexCache?=
=?UTF-8?q?=20to=20pass=20filesystem=20object=20to=20TezSpillRecord=20(L?=
=?UTF-8?q?=C3=A1szl=C3=B3=20Bodor=20reviewed=20by=20Rajesh=20Balamohan)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../hadoop/hive/llap/shufflehandler/IndexCache.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
index 4de03f232d70..c7b986469f4a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
@@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -42,11 +43,21 @@ class IndexCache {
private final LinkedBlockingQueue<String> queue =
new LinkedBlockingQueue<String>();
+ private FileSystem fs;
public IndexCache(Configuration conf) {
this.conf = conf;
totalMemoryAllowed = 10 * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+ initLocalFs();
+ }
+
+ private void initLocalFs() {
+ try {
+ this.fs = FileSystem.getLocal(conf).getRaw();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
From 1edbe403ff424f91ed0cd1ae91eb39290b5beb7f Mon Sep 17 00:00:00 2001
From: amaruthappan <alagappan.maruthappan@yahooinc.com>
Date: Mon, 8 May 2023 16:58:51 -0700
Subject: [PATCH 2/2] HIVE-27336: Upgrade Tez to 0.10.2 in Hive-3.X
---
.../hive/llap/daemon/impl/ContainerRunnerImpl.java | 4 ++--
.../hadoop/hive/llap/daemon/impl/LlapTaskReporter.java | 2 +-
.../hive/llap/daemon/impl/TaskRunnerCallable.java | 2 +-
.../hadoop/hive/llap/shufflehandler/IndexCache.java | 2 +-
.../hive/llap/tezplugins/LlapTaskCommunicator.java | 10 +++++-----
.../hive/llap/tezplugins/LlapTaskSchedulerService.java | 8 ++++----
.../hive/llap/tezplugins/TestLlapTaskCommunicator.java | 1 +
pom.xml | 2 +-
.../org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 +++++++
9 files changed, 23 insertions(+), 15 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index ef5922ef41b6..95d601a8e2f3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -215,7 +215,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
// This is the start of container-annotated logging.
- final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString();
+ final String dagId = attemptId.getDAGID().toString();
final String queryId = vertex.getHiveQueryId();
final String fragmentId = LlapTezUtils.stripAttemptPrefix(fragmentIdString);
MDC.put("dagId", dagId);
@@ -237,7 +237,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser());
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString);
- int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
+ int dagIdentifier = taskAttemptId.getDAGID().getId();
QueryIdentifier queryIdentifier = new QueryIdentifier(
qIdProto.getApplicationIdString(), dagIdentifier);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 33ade55ee1f5..cc7879cdecea 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -291,7 +291,7 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
int fromPreRoutedEventId = task.getNextPreRoutedEventId();
int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
- containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
+ containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 7f436e23264b..66f7c330f786 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID taskAttemptId) {
StringBuilder sb = new StringBuilder();
TezTaskID taskId = taskAttemptId.getTaskID();
TezVertexID vertexId = taskId.getVertexID();
- TezDAGID dagId = vertexId.getDAGId();
+ TezDAGID dagId = vertexId.getDAGID();
ApplicationId appId = dagId.getApplicationId();
long clusterTs = appId.getClusterTimestamp();
long clusterTsShort = clusterTs % 1_000_000L;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
index c7b986469f4a..cc5019a64d84 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
@@ -129,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path indexFileName,
LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
TezSpillRecord tmp = null;
try {
- tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+ tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
} catch (Throwable e) {
tmp = new TezSpillRecord(0);
cache.remove(mapId);
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 5d4ce223d9e9..5eebe10ac9a3 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -322,7 +322,7 @@ public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assig
UpdateFragmentRequestProto request = UpdateFragmentRequestProto.newBuilder()
.setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString())
.setQueryIdentifier(constructQueryIdentifierProto(
- attemptId.getTaskID().getVertexID().getDAGId().getId())).build();
+ attemptId.getDAGID().getId())).build();
communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() {
@@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
int priority) {
super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
credentialsChanged, priority);
- int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
+ int dagId = taskSpec.getTaskAttemptID().getDAGID().getId();
if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) {
// TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved
// once TEZ-2672 is fixed.
@@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
TerminateFragmentRequestProto request =
TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(
constructQueryIdentifierProto(
- taskAttemptId.getTaskID().getVertexID().getDAGId().getId()))
+ taskAttemptId.getDAGID().getId()))
.setFragmentIdentifierString(taskAttemptId.toString()).build();
communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
@@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId co
private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString,
final boolean isDone, final String nmAddress) {
- String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
+ String dagId = attemptID.getDAGID().toString();
String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""),
"?nm.id=", nmAddress);
String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers",
@@ -794,7 +794,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI
builder.setAmPort(getAddress().getPort());
Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
- taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+ taskSpec.getTaskAttemptID().getDAGID().getId());
ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
if (credentialsBinary == null) {
credentialsBinary = serializeCredentials(getContext().getCurrentDagInfo().getCredentials());
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 82179645da00..99038cd49542 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin
writeLock.lock();
try {
if (!dagRunning && metrics != null && id != null) {
- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
+ metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
dagStats.registerTaskRequest(hosts, racks);
@@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container
writeLock.lock();
try {
if (!dagRunning && metrics != null && id != null) {
- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
+ metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
dagStats.registerTaskRequest(null, null);
@@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container
protected TezTaskAttemptID getTaskAttemptId(Object task) {
// TODO: why does Tez API use "Object" for this?
if (task instanceof TaskAttempt) {
- return ((TaskAttempt)task).getID();
+ return ((TaskAttempt)task).getTaskAttemptID();
}
throw new AssertionError("LLAP plugin can only schedule task attempts");
}
@@ -2030,7 +2030,7 @@ private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> r
continue; // Not the right host.
}
Map<Integer,Set<Integer>> depInfo = getDependencyInfo(
- taskInfo.attemptId.getTaskID().getVertexID().getDAGId());
+ taskInfo.attemptId.getDAGID());
Set<Integer> vertexDepInfo = null;
if (depInfo != null) {
vertexDepInfo = depInfo.get(forVertex);
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
index 5efe7c677ce6..2fa2487a74d7 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(
TezTaskID.getInstance(vertexId, taskIdx), 0);
doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
+ doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID();
doReturn(DAG_NAME).when(taskSpec).getDAGName();
doReturn(vertexName).when(taskSpec).getVertexName();
ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload);
diff --git a/pom.xml b/pom.xml
index cb54806ef5ca..053ccc059f3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,7 +196,7 @@
<slf4j.version>1.7.10</slf4j.version>
<ST4.version>4.0.4</ST4.version>
<storage-api.version>2.7.0</storage-api.version>
- <tez.version>0.9.1</tez.version>
+ <tez.version>0.10.2</tez.version>
<super-csv.version>2.2.0</super-csv.version>
<spark.version>2.3.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index a15482f19c43..288341a2b229 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -761,5 +761,12 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts>
return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
}
}
+
+ @Override
+ public String getWebUIAddress() throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getWebUIAddress();
+ }
+ }
}
}