blob: 994941383c961fc41936b05b0b9f329fd676ac69 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class ThriftServerEventProcessor implements TServerEventHandler {
private static final Logger LOG = LogManager.getLogger(ThriftServerEventProcessor.class);
private ThriftServer thriftServer;
private static ThreadLocal<ThriftServerContext> connectionContext;
public ThriftServerEventProcessor(ThriftServer thriftServer) {
this.thriftServer = thriftServer;
connectionContext = new ThreadLocal<>();
}
public static ThriftServerContext getConnectionContext() {
return connectionContext.get();
}
@Override
public void preServe() {
}
@Override
public ServerContext createContext(TProtocol input, TProtocol output) {
// param input is class org.apache.thrift.protocol.TBinaryProtocol
TSocket tSocket = null;
TTransport transport = input.getTransport();
switch (thriftServer.getType()) {
case THREADED:
// class org.apache.thrift.transport.TFramedTransport
Preconditions.checkState(transport instanceof TFramedTransport);
TFramedTransport framedTransport = (TFramedTransport) transport;
// NOTE: we need patch code in TNonblockingServer, we don't use for now.
// see https://issues.apache.org/jira/browse/THRIFT-1053
break;
case SIMPLE:
case THREAD_POOL:
// org.apache.thrift.transport.TSocket
Preconditions.checkState(transport instanceof TSocket);
tSocket = (TSocket) transport;
break;
}
if (tSocket == null) {
LOG.info("fail to get client socket. server type: {}", thriftServer.getType());
return null;
}
SocketAddress socketAddress = tSocket.getSocket().getRemoteSocketAddress();
InetSocketAddress inetSocketAddress = null;
if (socketAddress instanceof InetSocketAddress) {
inetSocketAddress = (InetSocketAddress) socketAddress;
} else {
LOG.info("fail to get client socket address. server type: {}",
thriftServer.getType());
return null;
}
TNetworkAddress clientAddress = new TNetworkAddress(
inetSocketAddress.getHostString(),
inetSocketAddress.getPort());
thriftServer.addConnect(clientAddress);
LOG.info("create thrift context. client: {}", clientAddress);
return new ThriftServerContext(clientAddress);
}
@Override
public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
if (serverContext == null) {
return;
}
Preconditions.checkState(serverContext instanceof ThriftServerContext);
ThriftServerContext thriftServerContext = (ThriftServerContext) serverContext;
TNetworkAddress clientAddress = thriftServerContext.getClient();
connectionContext.remove();
thriftServer.removeConnect(clientAddress);
LOG.info("delete thrift context. client: {}", clientAddress);
}
@Override
public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
if (serverContext == null) {
return;
}
ThriftServerContext thriftServerContext = (ThriftServerContext) serverContext;
TNetworkAddress clientAddress = thriftServerContext.getClient();
Preconditions.checkState(serverContext instanceof ThriftServerContext);
connectionContext.set(new ThriftServerContext(clientAddress));
}
}