blob: 7b04e2c4552fe92444d664ef61d9aec5e5957117 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
import org.apache.hadoop.hive.llap.ext.LlapDaemonInfo;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
/**
* Base LLAP input format to handle requesting of splits and communication with LLAP daemon.
*/
public class LlapBaseInputFormat<V extends WritableComparable<?>>
implements InputFormat<NullWritable, V> {
private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
private static final Object lock = new Object();
private static final Map<String, List<Connection>> connectionMap = new HashMap<String, List<Connection>>();
private String url; // "jdbc:hive2://localhost:10000/default"
private String user; // "hive",
private String pwd; // ""
private String query;
private boolean useArrow;
private long arrowAllocatorLimit;
private BufferAllocator allocator;
private final Random rand = new Random();
public static final String URL_KEY = "llap.if.hs2.connection";
public static final String QUERY_KEY = "llap.if.query";
public static final String USER_KEY = "llap.if.user";
public static final String PWD_KEY = "llap.if.pwd";
public static final String HANDLE_ID = "llap.if.handleid";
public static final String DB_KEY = "llap.if.database";
public static final String USE_NEW_SPLIT_FORMAT = "llap.if.use.new.split.format";
public static final String SESSION_QUERIES_FOR_GET_NUM_SPLITS = "llap.session.queries.for.get.num.splits";
public static final Pattern SET_QUERY_PATTERN = Pattern.compile("^\\s*set\\s+.*=.+$", Pattern.CASE_INSENSITIVE);
public static final String SPLIT_QUERY = "select get_llap_splits(\"%s\",%d)";
public LlapBaseInputFormat(String url, String user, String pwd, String query) {
this.url = url;
this.user = user;
this.pwd = pwd;
this.query = query;
}
//Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead)
public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
this.useArrow = useArrow;
this.arrowAllocatorLimit = arrowAllocatorLimit;
}
public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) {
this.useArrow = useArrow;
this.allocator = allocator;
}
public LlapBaseInputFormat() {
this.useArrow = false;
}
@SuppressWarnings("unchecked")
@Override
public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
LlapInputSplit llapSplit = (LlapInputSplit) split;
// Set conf to use LLAP user rather than current user for LLAP Zk registry.
HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
// llapSplit.getLlapDaemonInfos() will never be empty as of now, also validated this in GenericUDTFGetSplits while populating.
final LlapDaemonInfo llapDaemonInfo = llapSplit.getLlapDaemonInfos()[0];
final String host = llapDaemonInfo.getHost();
final int outputPort = llapDaemonInfo.getOutputFormatPort();
final int llapSubmitPort = llapDaemonInfo.getRpcPort();
LOG.info("Will try to submit request to first Llap Daemon in the split - {}", llapDaemonInfo);
byte[] llapTokenBytes = llapSplit.getTokenBytes();
Token<LlapTokenIdentifier> llapToken = null;
if (llapTokenBytes != null) {
DataInputBuffer in = new DataInputBuffer();
in.reset(llapTokenBytes, 0, llapTokenBytes.length);
llapToken = new Token<LlapTokenIdentifier>();
llapToken.readFields(in);
}
LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
new LlapRecordReaderTaskUmbilicalExternalResponder();
LlapTaskUmbilicalExternalClient llapClient =
new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
submitWorkInfo.getToken(), umbilicalResponder, llapToken);
int attemptNum = 0;
final int taskNum;
// Use task attempt number, task number from conf if provided
TaskAttemptID taskAttemptId = TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID));
if (taskAttemptId != null) {
attemptNum = taskAttemptId.getId();
taskNum = taskAttemptId.getTaskID().getId();
if (LOG.isDebugEnabled()) {
LOG.debug("Setting attempt number to: {}, task number to: {} from given taskAttemptId: {} in conf",
attemptNum, taskNum, taskAttemptId);
}
} else {
taskNum = llapSplit.getSplitNum();
}
SubmitWorkRequestProto request = constructSubmitWorkRequestProto(
submitWorkInfo, taskNum, attemptNum, llapClient.getAddress(),
submitWorkInfo.getToken(), llapSplit, job);
SignableVertexSpec vertex = SignableVertexSpec.parseFrom(submitWorkInfo.getVertexBinary());
String fragmentId =
Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(),
request.getFragmentNumber(), request.getAttemptNumber()).toString();
LOG.info("Submitting fragment:{} to llap [host = {}, port = {}] ", fragmentId, host, llapSubmitPort);
llapClient.submitWork(request, host, llapSubmitPort);
Socket socket = new Socket(host, outputPort);
OutputStream socketStream = socket.getOutputStream();
LlapOutputSocketInitMessage.Builder builder =
LlapOutputSocketInitMessage.newBuilder().setFragmentId(fragmentId);
if (llapSplit.getTokenBytes() != null) {
builder.setToken(ByteString.copyFrom(llapSplit.getTokenBytes()));
}
LOG.info("Registering fragment:{} to llap [host = {}, output port = {}] to read output",
fragmentId, host, outputPort);
builder.build().writeDelimitedTo(socketStream);
socketStream.flush();
LOG.info("Registered id: " + fragmentId);
@SuppressWarnings("rawtypes")
LlapBaseRecordReader recordReader;
if(useArrow) {
if(allocator != null) {
//Client provided their own allocator
recordReader = new LlapArrowBatchRecordReader(
socket.getInputStream(), llapSplit.getSchema(),
ArrowWrapperWritable.class, job, llapClient, socket,
allocator);
} else {
//Client did not provide their own allocator, use constructor for global allocator
recordReader = new LlapArrowBatchRecordReader(
socket.getInputStream(), llapSplit.getSchema(),
ArrowWrapperWritable.class, job, llapClient, socket,
arrowAllocatorLimit);
}
} else {
recordReader = new LlapBaseRecordReader(socket.getInputStream(),
llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);
}
umbilicalResponder.setRecordReader(recordReader);
return recordReader;
}
/**
* Calling getSplits() will open a HiveServer2 connection which should be closed by the calling application
* using LlapBaseInputFormat.close() when the application is done with the splits.
*/
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<InputSplit> ins = new ArrayList<InputSplit>();
if (url == null) url = job.get(URL_KEY);
if (query == null) query = job.get(QUERY_KEY);
if (user == null) user = job.get(USER_KEY);
if (pwd == null) pwd = job.get(PWD_KEY);
String database = job.get(DB_KEY);
if (url == null || query == null) {
throw new IllegalStateException();
}
String handleId = job.get(HANDLE_ID);
if (handleId == null) {
handleId = UUID.randomUUID().toString();
LOG.info("Handle ID not specified - generated handle ID {}", handleId);
}
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
LOG.info("Handle ID {}: query={}", handleId, query);
String escapedQuery = StringUtils.escapeString(query, ESCAPE_CHAR, escapedChars);
String sql = String.format(SPLIT_QUERY, escapedQuery, numSplits);
try {
Connection conn = DriverManager.getConnection(url,user,pwd);
try (
Statement stmt = conn.createStatement();
) {
if (database != null && !database.isEmpty()) {
stmt.execute("USE " + database);
}
String sessionQueries = job.get(SESSION_QUERIES_FOR_GET_NUM_SPLITS);
if (sessionQueries != null && !sessionQueries.trim().isEmpty()) {
String[] queries = sessionQueries.trim().split(",");
for (String q : queries) {
//allow only set queries
if (SET_QUERY_PATTERN.matcher(q).matches()) {
LOG.debug("Executing session query: {}", q);
stmt.execute(q);
} else {
LOG.warn("Only SET queries are allowed, not executing this query: {}", q);
}
}
}
// In case of USE_NEW_SPLIT_FORMAT=true, following format is used
// type split
// schema-split LlapInputSplit -- contains only schema
// plan-split LlapInputSplit -- contains only planBytes[]
// 0 LlapInputSplit -- actual split 1
// 1 LlapInputSplit -- actual split 2
// ... ...
boolean useNewSplitFormat = job.getBoolean(USE_NEW_SPLIT_FORMAT, false);
ResultSet res = stmt.executeQuery(sql);
int count = 0;
LlapInputSplit schemaSplit = null;
LlapInputSplit planSplit = null;
while (res.next()) {
// deserialize split
DataInput in = new DataInputStream(res.getBinaryStream(2));
LlapInputSplit is = new LlapInputSplit();
is.readFields(in);
if (useNewSplitFormat) {
ins.add(is);
} else {
// to keep the old format, populate schema and planBytes[] in actual splits
if (count == 0) {
schemaSplit = is;
if (numSplits == 0) {
ins.add(schemaSplit);
}
} else if (count == 1) {
planSplit = is;
} else {
is.setSchema(schemaSplit.getSchema());
assert planSplit != null;
is.setPlanBytes(planSplit.getPlanBytes());
ins.add(is);
}
count++;
}
}
res.close();
} catch (Exception e) {
LOG.error("Closing connection due to error", e);
conn.close();
throw e;
}
// Keep connection open to hang on to associated resources (temp tables, locks).
// Save to connectionMap so it can be closed at user's convenience.
addConnection(handleId, conn);
} catch (Exception e) {
throw new IOException(e);
}
return ins.toArray(new InputSplit[ins.size()]);
}
private void addConnection(String handleId, Connection connection) {
synchronized (lock) {
List<Connection> handleConnections = connectionMap.get(handleId);
if (handleConnections == null) {
handleConnections = new ArrayList<Connection>();
connectionMap.put(handleId, handleConnections);
}
handleConnections.add(connection);
}
}
/**
* Close the connection associated with the handle ID, if getSplits() was configured with a handle ID.
* Call when the application is done using the splits generated by getSplits().
* @param handleId Handle ID used in configuration for getSplits()
* @throws IOException
*/
public static void close(String handleId) throws IOException {
List<Connection> handleConnections;
synchronized (lock) {
handleConnections = connectionMap.remove(handleId);
}
closeConnections(handleId, handleConnections);
}
private static void closeConnections(String handleId, List<Connection> handleConnections) {
if (handleConnections != null) {
LOG.debug("Closing {} connections for handle ID {}", handleConnections.size(), handleId);
for (Connection conn : handleConnections) {
try {
conn.close();
} catch (Exception err) {
LOG.error("Error while closing connection for " + handleId, err);
}
}
} else {
LOG.debug("No connection found for handle ID {}", handleId);
}
}
/**
* Close all outstanding connections created by getSplits() calls
*/
public static void closeAll() {
LOG.debug("Closing all handles");
synchronized (lock) {
Iterator<Map.Entry<String, List<Connection>>> itr = connectionMap.entrySet().iterator();
Map.Entry<String, List<Connection>> connHandle = null;
while (itr.hasNext()) {
connHandle = itr.next();
closeConnections(connHandle.getKey(), connHandle.getValue());
itr.remove();
}
}
}
static {
// Shutdown hook to clean up resources at process end.
ShutdownHookManager.addShutdownHook(new Runnable() {
@Override
public void run() {
closeAll();
}
});
}
private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
int taskNum, int attemptNum, InetSocketAddress address, Token<JobTokenIdentifier> token,
LlapInputSplit llapInputSplit, JobConf job) throws IOException {
byte[] fragmentBytes = llapInputSplit.getFragmentBytes();
byte[] fragmentBytesSignature = llapInputSplit.getFragmentBytesSignature();
ApplicationId appId = submitWorkInfo.getFakeAppId();
// This works, assuming the executor is running within YARN.
String user = System.getenv(ApplicationConstants.Environment.USER.name());
LOG.info("Setting user in submitWorkRequest to: " + user);
ContainerId containerId =
ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, attemptNum), taskNum);
// Credentials can change across DAGs. Ideally construct only once per DAG.
Credentials credentials = new Credentials();
TokenCache.setSessionToken(token, credentials);
ByteBuffer credentialsBinary = serializeCredentials(credentials);
FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
runtimeInfo.setWithinDagPriority(0);
runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
runtimeInfo.setNumSelfAndUpstreamTasks(submitWorkInfo.getVertexParallelism());
runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
VertexOrBinary.Builder vertexBuilder = VertexOrBinary.newBuilder();
vertexBuilder.setVertexBinary(ByteString.copyFrom(submitWorkInfo.getVertexBinary()));
if (submitWorkInfo.getVertexSignature() != null) {
// Unsecure case?
builder.setWorkSpecSignature(ByteString.copyFrom(submitWorkInfo.getVertexSignature()));
}
builder.setWorkSpec(vertexBuilder.build());
builder.setFragmentNumber(taskNum);
builder.setAttemptNumber(attemptNum);
builder.setContainerIdString(containerId.toString());
builder.setAmHost(LlapUtil.getAmHostNameFromAddress(address, job));
builder.setAmPort(address.getPort());
builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
builder.setFragmentRuntimeInfo(runtimeInfo.build());
builder.setInitialEventBytes(ByteString.copyFrom(fragmentBytes));
if (fragmentBytesSignature != null) {
builder.setInitialEventSignature(ByteString.copyFrom(fragmentBytesSignature));
}
builder.setJwt(llapInputSplit.getJwt());
builder.setIsExternalClientRequest(true);
return builder.build();
}
private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
Credentials containerCredentials = new Credentials();
containerCredentials.addAll(credentials);
DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
containerCredentials.writeTokenStorageToStream(containerTokens_dob);
return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
}
private static final char ESCAPE_CHAR = '\\';
private static final char[] escapedChars = {
'"', ESCAPE_CHAR
};
private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
protected LlapBaseRecordReader<?> recordReader = null;
protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
public LlapRecordReaderTaskUmbilicalExternalResponder() {
}
@Override
public void submissionFailed(String fragmentId, Throwable throwable) {
try {
sendOrQueueEvent(ReaderEvent.errorEvent(
"Received submission failed event for fragment ID " + fragmentId + ": " + throwable.toString()));
} catch (Exception err) {
LOG.error("Error during heartbeat responder:", err);
}
}
@Override
public void heartbeat(TezHeartbeatRequest request) {
List<TezEvent> inEvents = request.getEvents();
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
EventType eventType = tezEvent.getEventType();
try {
switch (eventType) {
case TASK_ATTEMPT_COMPLETED_EVENT:
sendOrQueueEvent(ReaderEvent.doneEvent());
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
break;
case TASK_STATUS_UPDATE_EVENT:
// If we want to handle counters
break;
default:
LOG.warn("Unhandled event type " + eventType);
break;
}
} catch (Exception err) {
LOG.error("Error during heartbeat responder:", err);
}
}
}
@Override
public void taskKilled(TezTaskAttemptID taskAttemptId) {
try {
sendOrQueueEvent(ReaderEvent.errorEvent(
"Received task killed event for task ID " + taskAttemptId));
} catch (Exception err) {
LOG.error("Error during heartbeat responder:", err);
}
}
@Override
public void heartbeatTimeout(String taskAttemptId) {
try {
sendOrQueueEvent(ReaderEvent.errorEvent(
"Timed out waiting for heartbeat for task ID " + taskAttemptId));
} catch (Exception err) {
LOG.error("Error during heartbeat responder:", err);
}
}
public synchronized LlapBaseRecordReader<?> getRecordReader() {
return recordReader;
}
public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
this.recordReader = recordReader;
if (recordReader == null) {
return;
}
// If any events were queued by the responder, give them to the record reader now.
while (!queuedEvents.isEmpty()) {
ReaderEvent readerEvent = queuedEvents.poll();
LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
recordReader.handleEvent(readerEvent);
}
}
/**
* Send the ReaderEvents to the record reader, if it is registered to this responder.
* If there is no registered record reader, add them to a list of pending reader events
* since we don't want to drop these events.
* @param readerEvent
*/
protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
LlapBaseRecordReader<?> recordReader = getRecordReader();
if (recordReader != null) {
recordReader.handleEvent(readerEvent);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
+ " with message " + readerEvent.getMessage());
}
try {
queuedEvents.put(readerEvent);
} catch (Exception err) {
throw new RuntimeException("Unexpected exception while queueing reader event", err);
}
}
}
/**
* Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
*/
public void clearQueuedEvents() {
queuedEvents.clear();
}
}
}