| From 5f79f2a059534eaa63c0479a2142a250fa78c1e3 Mon Sep 17 00:00:00 2001 |
| From: amaruthappan <alagappan.maruthappan@yahooinc.com> |
| Date: Mon, 8 May 2023 17:00:46 -0700 |
| Subject: [PATCH 1/2] =?UTF-8?q?HIVE-23190:=20LLAP:=20modify=20IndexCache?= |
| =?UTF-8?q?=20to=20pass=20filesystem=20object=20to=20TezSpillRecord=20(L?= |
| =?UTF-8?q?=C3=A1szl=C3=B3=20Bodor=20reviewed=20by=20Rajesh=20Balamohan)?= |
| MIME-Version: 1.0 |
| Content-Type: text/plain; charset=UTF-8 |
| Content-Transfer-Encoding: 8bit |
| |
| --- |
| .../hadoop/hive/llap/shufflehandler/IndexCache.java | 11 +++++++++++ |
| 1 file changed, 11 insertions(+) |
| |
| diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java |
| index 4de03f232d70..c7b986469f4a 100644 |
| --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java |
| +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java |
| @@ -25,6 +25,7 @@ |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| +import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.tez.runtime.library.common.Constants; |
| import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; |
| @@ -42,11 +43,21 @@ class IndexCache { |
| |
| private final LinkedBlockingQueue<String> queue = |
| new LinkedBlockingQueue<String>(); |
| + private FileSystem fs; |
| |
| public IndexCache(Configuration conf) { |
| this.conf = conf; |
| totalMemoryAllowed = 10 * 1024 * 1024; |
| LOG.info("IndexCache created with max memory = " + totalMemoryAllowed); |
| + initLocalFs(); |
| + } |
| + |
| + private void initLocalFs() { |
| + try { |
| + this.fs = FileSystem.getLocal(conf).getRaw(); |
| + } catch (IOException e) { |
| + throw new RuntimeException(e); |
| + } |
| } |
| |
| /** |
| |
| From 1edbe403ff424f91ed0cd1ae91eb39290b5beb7f Mon Sep 17 00:00:00 2001 |
| From: amaruthappan <alagappan.maruthappan@yahooinc.com> |
| Date: Mon, 8 May 2023 16:58:51 -0700 |
| Subject: [PATCH 2/2] HIVE-27336: Upgrade Tez to 0.10.2 in Hive-3.X |
| |
| --- |
| .../hive/llap/daemon/impl/ContainerRunnerImpl.java | 4 ++-- |
| .../hadoop/hive/llap/daemon/impl/LlapTaskReporter.java | 2 +- |
| .../hive/llap/daemon/impl/TaskRunnerCallable.java | 2 +- |
| .../hadoop/hive/llap/shufflehandler/IndexCache.java | 2 +- |
| .../hive/llap/tezplugins/LlapTaskCommunicator.java | 10 +++++----- |
| .../hive/llap/tezplugins/LlapTaskSchedulerService.java | 8 ++++---- |
| .../hive/llap/tezplugins/TestLlapTaskCommunicator.java | 1 + |
| pom.xml | 2 +- |
| .../org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 +++++++ |
| 9 files changed, 23 insertions(+), 15 deletions(-) |
| |
| diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java |
| index ef5922ef41b6..95d601a8e2f3 100644 |
| --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java |
| +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java |
| @@ -215,7 +215,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws |
| vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); |
| |
| // This is the start of container-annotated logging. |
| - final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); |
| + final String dagId = attemptId.getDAGID().toString(); |
| final String queryId = vertex.getHiveQueryId(); |
| final String fragmentId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); |
| MDC.put("dagId", dagId); |
| @@ -237,7 +237,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws |
| env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser()); |
| |
| TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString); |
| - int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); |
| + int dagIdentifier = taskAttemptId.getDAGID().getId(); |
| |
| QueryIdentifier queryIdentifier = new QueryIdentifier( |
| qIdProto.getApplicationIdString(), dagIdentifier); |
| diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java |
| index 33ade55ee1f5..cc7879cdecea 100644 |
| --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java |
| +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java |
| @@ -291,7 +291,7 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t |
| int fromPreRoutedEventId = task.getNextPreRoutedEventId(); |
| int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle()); |
| TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId, |
| - containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents); |
| + containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending heartbeat to AM, request=" + request); |
| } |
| diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java |
| index 7f436e23264b..66f7c330f786 100644 |
| --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java |
| +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java |
| @@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID taskAttemptId) { |
| StringBuilder sb = new StringBuilder(); |
| TezTaskID taskId = taskAttemptId.getTaskID(); |
| TezVertexID vertexId = taskId.getVertexID(); |
| - TezDAGID dagId = vertexId.getDAGId(); |
| + TezDAGID dagId = vertexId.getDAGID(); |
| ApplicationId appId = dagId.getApplicationId(); |
| long clusterTs = appId.getClusterTimestamp(); |
| long clusterTsShort = clusterTs % 1_000_000L; |
| diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java |
| index c7b986469f4a..cc5019a64d84 100644 |
| --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java |
| +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java |
| @@ -129,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path indexFileName, |
| LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ; |
| TezSpillRecord tmp = null; |
| try { |
| - tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner); |
| + tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner); |
| } catch (Throwable e) { |
| tmp = new TezSpillRecord(0); |
| cache.remove(mapId); |
| diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java |
| index 5d4ce223d9e9..5eebe10ac9a3 100644 |
| --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java |
| +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java |
| @@ -322,7 +322,7 @@ public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assig |
| UpdateFragmentRequestProto request = UpdateFragmentRequestProto.newBuilder() |
| .setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString()) |
| .setQueryIdentifier(constructQueryIdentifierProto( |
| - attemptId.getTaskID().getVertexID().getDAGId().getId())).build(); |
| + attemptId.getDAGID().getId())).build(); |
| |
| communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(), |
| new LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() { |
| @@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task |
| int priority) { |
| super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, |
| credentialsChanged, priority); |
| - int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); |
| + int dagId = taskSpec.getTaskAttemptID().getDAGID().getId(); |
| if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) { |
| // TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved |
| // once TEZ-2672 is fixed. |
| @@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, |
| TerminateFragmentRequestProto request = |
| TerminateFragmentRequestProto.newBuilder().setQueryIdentifier( |
| constructQueryIdentifierProto( |
| - taskAttemptId.getTaskID().getVertexID().getDAGId().getId())) |
| + taskAttemptId.getDAGID().getId())) |
| .setFragmentIdentifierString(taskAttemptId.toString()).build(); |
| communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), |
| new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() { |
| @@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId co |
| |
| private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString, |
| final boolean isDone, final String nmAddress) { |
| - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); |
| + String dagId = attemptID.getDAGID().toString(); |
| String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""), |
| "?nm.id=", nmAddress); |
| String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers", |
| @@ -794,7 +794,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI |
| builder.setAmPort(getAddress().getPort()); |
| |
| Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() == |
| - taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); |
| + taskSpec.getTaskAttemptID().getDAGID().getId()); |
| ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); |
| if (credentialsBinary == null) { |
| credentialsBinary = serializeCredentials(getContext().getCurrentDagInfo().getCredentials()); |
| diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java |
| index 82179645da00..99038cd49542 100644 |
| --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java |
| +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java |
| @@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin |
| writeLock.lock(); |
| try { |
| if (!dagRunning && metrics != null && id != null) { |
| - metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); |
| + metrics.setDagId(id.getDAGID().toString()); |
| } |
| dagRunning = true; |
| dagStats.registerTaskRequest(hosts, racks); |
| @@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container |
| writeLock.lock(); |
| try { |
| if (!dagRunning && metrics != null && id != null) { |
| - metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); |
| + metrics.setDagId(id.getDAGID().toString()); |
| } |
| dagRunning = true; |
| dagStats.registerTaskRequest(null, null); |
| @@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container |
| protected TezTaskAttemptID getTaskAttemptId(Object task) { |
| // TODO: why does Tez API use "Object" for this? |
| if (task instanceof TaskAttempt) { |
| - return ((TaskAttempt)task).getID(); |
| + return ((TaskAttempt)task).getTaskAttemptID(); |
| } |
| throw new AssertionError("LLAP plugin can only schedule task attempts"); |
| } |
| @@ -2030,7 +2030,7 @@ private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> r |
| continue; // Not the right host. |
| } |
| Map<Integer,Set<Integer>> depInfo = getDependencyInfo( |
| - taskInfo.attemptId.getTaskID().getVertexID().getDAGId()); |
| + taskInfo.attemptId.getDAGID()); |
| Set<Integer> vertexDepInfo = null; |
| if (depInfo != null) { |
| vertexDepInfo = depInfo.get(forVertex); |
| diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java |
| index 5efe7c677ce6..2fa2487a74d7 100644 |
| --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java |
| +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java |
| @@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int |
| TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance( |
| TezTaskID.getInstance(vertexId, taskIdx), 0); |
| doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); |
| + doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID(); |
| doReturn(DAG_NAME).when(taskSpec).getDAGName(); |
| doReturn(vertexName).when(taskSpec).getVertexName(); |
| ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload); |
| diff --git a/pom.xml b/pom.xml |
| index cb54806ef5ca..053ccc059f3f 100644 |
| --- a/pom.xml |
| +++ b/pom.xml |
| @@ -196,7 +196,7 @@ |
| <slf4j.version>1.7.10</slf4j.version> |
| <ST4.version>4.0.4</ST4.version> |
| <storage-api.version>2.7.0</storage-api.version> |
| - <tez.version>0.9.1</tez.version> |
| + <tez.version>0.10.2</tez.version> |
| <super-csv.version>2.2.0</super-csv.version> |
| <spark.version>2.3.0</spark.version> |
| <scala.binary.version>2.11</scala.binary.version> |
| diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java |
| index a15482f19c43..288341a2b229 100644 |
| --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java |
| +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java |
| @@ -761,5 +761,12 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> |
| return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts); |
| } |
| } |
| + |
| + @Override |
| + public String getWebUIAddress() throws IOException, TezException { |
| + synchronized (dagClient) { |
| + return dagClient.getWebUIAddress(); |
| + } |
| + } |
| } |
| } |