| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.network.util; |
| |
| import java.lang.reflect.Field; |
| import java.util.concurrent.ThreadFactory; |
| |
| import io.netty.buffer.PooledByteBufAllocator; |
| import io.netty.channel.Channel; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.ServerChannel; |
| import io.netty.channel.epoll.EpollEventLoopGroup; |
| import io.netty.channel.epoll.EpollServerSocketChannel; |
| import io.netty.channel.epoll.EpollSocketChannel; |
| import io.netty.channel.nio.NioEventLoopGroup; |
| import io.netty.channel.socket.nio.NioServerSocketChannel; |
| import io.netty.channel.socket.nio.NioSocketChannel; |
| import io.netty.util.concurrent.DefaultThreadFactory; |
| import io.netty.util.internal.PlatformDependent; |
| import io.netty.util.internal.SystemPropertyUtil; |
| |
| import org.apache.uniffle.common.exception.RssException; |
| |
| /** |
| * copy from spark, In order to override the createPooledByteBufAllocator method, the property |
| * DEFAULT_TINY_CACHE_SIZE does not exist in netty>4.1.47. |
| * |
| * <p>Attention: This class is intended for use in the testing phase only and will not be included |
| * in the final packaged artifact for production environment. For production environment, Spark will |
| * use its own NettyUtils instead of this one. This is somewhat of a hack, but given that Spark 2.x |
| * doesn't update frequently, it's not much of an issue to proceed this way. |
| */ |
| public class NettyUtils { |
| |
| private static final int DEFAULT_TINY_CACHE_SIZE = |
| SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); |
| |
| /** Creates a new ThreadFactory which prefixes each thread with the given name. */ |
| public static ThreadFactory createThreadFactory(String threadPoolPrefix) { |
| return new DefaultThreadFactory(threadPoolPrefix, true); |
| } |
| |
| /** Creates a Netty EventLoopGroup based on the IOMode. */ |
| public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) { |
| ThreadFactory threadFactory = createThreadFactory(threadPrefix); |
| |
| switch (mode) { |
| case NIO: |
| return new NioEventLoopGroup(numThreads, threadFactory); |
| case EPOLL: |
| return new EpollEventLoopGroup(numThreads, threadFactory); |
| default: |
| throw new IllegalArgumentException("Unknown io mode: " + mode); |
| } |
| } |
| |
| /** Returns the correct (client) SocketChannel class based on IOMode. */ |
| public static Class<? extends Channel> getClientChannelClass(IOMode mode) { |
| switch (mode) { |
| case NIO: |
| return NioSocketChannel.class; |
| case EPOLL: |
| return EpollSocketChannel.class; |
| default: |
| throw new IllegalArgumentException("Unknown io mode: " + mode); |
| } |
| } |
| |
| /** Returns the correct ServerSocketChannel class based on IOMode. */ |
| public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) { |
| switch (mode) { |
| case NIO: |
| return NioServerSocketChannel.class; |
| case EPOLL: |
| return EpollServerSocketChannel.class; |
| default: |
| throw new IllegalArgumentException("Unknown io mode: " + mode); |
| } |
| } |
| |
| /** |
| * Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame. |
| * This is used before all decoders. |
| */ |
| public static TransportFrameDecoder createFrameDecoder() { |
| return new TransportFrameDecoder(); |
| } |
| |
| /** Returns the remote address on the channel or "<unknown remote>" if none exists. */ |
| public static String getRemoteAddress(Channel channel) { |
| if (channel != null && channel.remoteAddress() != null) { |
| return channel.remoteAddress().toString(); |
| } |
| return "<unknown remote>"; |
| } |
| |
| /** |
| * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches are |
| * disabled for TransportClients because the ByteBufs are allocated by the event loop thread, but |
| * released by the executor thread rather than the event loop thread. Those thread-local caches |
| * actually delay the recycling of buffers, leading to larger memory usage. |
| */ |
| public static PooledByteBufAllocator createPooledByteBufAllocator( |
| boolean allowDirectBufs, boolean allowCache, int numCores) { |
| if (numCores == 0) { |
| numCores = Runtime.getRuntime().availableProcessors(); |
| } |
| return new PooledByteBufAllocator( |
| allowDirectBufs && PlatformDependent.directBufferPreferred(), |
| Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), |
| Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), |
| getPrivateStaticField("DEFAULT_PAGE_SIZE"), |
| getPrivateStaticField("DEFAULT_MAX_ORDER"), |
| allowCache ? DEFAULT_TINY_CACHE_SIZE : 0, |
| allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, |
| allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0); |
| } |
| |
| /** Used to get defaults from Netty's private static fields. */ |
| private static int getPrivateStaticField(String name) { |
| try { |
| Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); |
| f.setAccessible(true); |
| return f.getInt(null); |
| } catch (Exception e) { |
| throw new RssException(e); |
| } |
| } |
| } |