blob: 4714df74cfda61f32c154ff607cc9e623e206141 [file] [log] [blame]
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
BINDIR=$(dirname "$0")
export PULSAR_HOME=`cd -P $BINDIR/..;pwd`
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
# Check for the java to use
if [[ -z $JAVA_HOME ]]; then
JAVA=$(which java)
if [ $? != 0 ]; then
echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
exit 1
fi
else
JAVA=$JAVA_HOME/bin/java
fi
for token in $("$JAVA" -version 2>&1 | grep 'version "'); do
if [[ $token =~ \"([[:digit:]]+)\.([[:digit:]]+)(.*)\" ]]; then
if [[ ${BASH_REMATCH[1]} == "1" ]]; then
JAVA_MAJOR_VERSION=${BASH_REMATCH[2]}
else
JAVA_MAJOR_VERSION=${BASH_REMATCH[1]}
fi
break
elif [[ $token =~ \"([[:digit:]]+)(.*)\" ]]; then
# Process the java versions without dots, such as `17-internal`.
JAVA_MAJOR_VERSION=${BASH_REMATCH[1]}
break
fi
done
PULSAR_MEM=${PULSAR_MEM:-"-Xmx128m -XX:MaxDirectMemorySize=128m"}
# Garbage collection options
if [ -z "$PULSAR_GC" ]; then
PULSAR_GC="-XX:+PerfDisableSharedMem -XX:+AlwaysPreTouch"
if [[ $JAVA_MAJOR_VERSION -eq 21 || $JAVA_MAJOR_VERSION -eq 22 ]]; then
PULSAR_GC="-XX:+UseZGC -XX:+ZGenerational ${PULSAR_GC}"
else
PULSAR_GC="-XX:+UseZGC ${PULSAR_GC}"
fi
fi
# Garbage collection log.
PULSAR_GC_LOG_DIR=${PULSAR_GC_LOG_DIR:-logs}
if [[ -z "$PULSAR_GC_LOG" ]]; then
if [[ $JAVA_MAJOR_VERSION -gt 8 ]]; then
PULSAR_GC_LOG="-Xlog:gc*,safepoint:${PULSAR_GC_LOG_DIR}/pulsar_gc_%p.log:time,uptime,tags:filecount=10,filesize=20M"
if [[ $JAVA_MAJOR_VERSION -ge 17 ]]; then
# Use async logging on Java 17+ https://bugs.openjdk.java.net/browse/JDK-8264323
PULSAR_GC_LOG="-Xlog:async ${PULSAR_GC_LOG}"
fi
else
# Java 8 gc log options
PULSAR_GC_LOG="-Xloggc:${PULSAR_GC_LOG_DIR}/pulsar_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=20M"
fi
fi
if [ -z "$PULSAR_LOG_CONF" ]; then
PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
fi
add_gradle_deps_to_classpath() {
f="${PULSAR_HOME}/distribution/server/build/classpath.txt"
if [ ! -f "${f}" ]
then
(
cd "${PULSAR_HOME}"
./gradlew :distribution:pulsar-server-distribution:exportClasspath --no-configuration-cache -q 2> /dev/null
)
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
if [ -d "$PULSAR_HOME/lib" ]; then
PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
else
add_gradle_deps_to_classpath
fi
PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
# Allow Netty to use reflection access
OPTS="$OPTS -Dio.netty.tryReflectionSetAccessible=true"
OPTS="$OPTS -Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
if [[ $JAVA_MAJOR_VERSION -gt 8 ]]; then
# Required by Pulsar client optimized checksum calculation on other than Linux x86_64 platforms
# reflection access to java.util.zip.CRC32C
OPTS="$OPTS --add-opens java.base/java.util.zip=ALL-UNNAMED"
fi
if [[ $JAVA_MAJOR_VERSION -ge 11 ]]; then
# Required by Netty for optimized direct byte buffer access
OPTS="$OPTS --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
fi
# Ensure we can read bigger content from ZK. (It might be
# rarely needed when trying to list many z-nodes under a
# directory)
OPTS="-Djava.net.preferIPv4Stack=true $OPTS -Djute.maxbuffer=10485760"
# Required to allow sun.misc.Unsafe on JDK 24 without warnings
# Also required for enabling unsafe memory access for Netty since 4.1.121.Final
if [[ $JAVA_MAJOR_VERSION -ge 23 ]]; then
OPTS="--sun-misc-unsafe-memory-access=allow $OPTS"
fi
OPTS="-cp $PULSAR_CLASSPATH $OPTS"
# we should exit on OOM for localrun especially when using ThreadRuntime
# These two settings work together to ensure the Pulsar process exits immediately and predictably
# if it runs out of either Java heap memory or its internal off-heap memory,
# as these are unrecoverable errors that require a process restart to clear the faulty state and restore operation
OPTS="-XX:+ExitOnOutOfMemoryError -Dpulsar.allocator.exit_on_oom=true $OPTS"
# Netty tuning
# These settings are primarily used to modify the Netty allocator configuration,
# improving memory utilization and reducing the frequency of requesting off-heap memory from the OS
#
# Based on the netty source code, the allocator's default chunk size is calculated as:
# io.netty.allocator.pageSize (default: 8192) shifted left by
# io.netty.allocator.maxOrder (default: 9 after Netty 4.1.76.Final version).
# This equals 8192 * 2^9 = 4 MB:
# https://github.com/netty/netty/blob/4.1/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java#L105
#
# Allocations that are larger than chunk size are considered huge allocations and don't use the pool:
# https://github.com/netty/netty/blob/4.1/buffer/src/main/java/io/netty/buffer/PoolArena.java#L141-L142
#
# Currently, Pulsar defaults to a maximum single message size of 5 MB.
# Therefore, when frequently producing messages whose size exceeds the chunk size,
# Netty cannot utilize resources from the memory pool and must frequently allocate native memory.
# This can lead to increased physical memory fragmentation and higher reclamation costs.
# Thus, increasing io.netty.allocator.maxOrder to 10 to ensure that a single message is larger
# than chunk size (8MB) and can reuse Netty's memory pool.
OPTS="-Dio.netty.recycler.maxCapacityPerThread=4096 -Dio.netty.allocator.maxOrder=10 $OPTS"
OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"
# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
# find the java instance location
if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
# didn't find a released jar, then search the built jar
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/build/libs/java-instance.jar"
if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then
JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
else
echo "\nCouldn't find pulsar java instance jar.";
echo "Make sure you've run './gradlew assemble'\n";
exit 1;
fi
fi
# find the python instance location
if [ ! -f "${PY_INSTANCE_FILE}" ]; then
# didn't find a released python instance, then search the built python instance
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/build/python-instance/python_instance_main.py"
if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
else
echo "\nCouldn't find pulsar python instance.";
echo "Make sure you've run './gradlew assemble'\n";
exit 1;
fi
fi
# functions
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
MAINCLASS="org.apache.pulsar.functions.LocalRunner"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
exec $JAVA $OPTS $MAINCLASS "$@"