blob: c8cd3adef45717dd478ff74328400ea1bb3378c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <jni.h>
#include <algorithm>
#include <cstdint>
#include <filesystem>
#include "compute/Runtime.h"
#include "config/GlutenConfig.h"
#include "jni/JniCommon.h"
#include "jni/JniError.h"
#include <arrow/c/bridge.h>
#include <google/protobuf/stubs/common.h>
#include <optional>
#include <string>
#include "memory/AllocationListener.h"
#include "memory/SplitAwareColumnarBatchIterator.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/Partitioning.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/Utils.h"
#include "utils/ArrowStatus.h"
#include "utils/StringUtil.h"
using namespace gluten;
namespace {
jclass byteArrayClass;
jclass jniUnsafeByteBufferClass;
jmethodID jniUnsafeByteBufferAllocate;
jmethodID jniUnsafeByteBufferAddress;
jmethodID jniUnsafeByteBufferSize;
jclass jniByteInputStreamClass;
jmethodID jniByteInputStreamRead;
jmethodID jniByteInputStreamTell;
jmethodID jniByteInputStreamClose;
jclass splitResultClass;
jmethodID splitResultConstructor;
jclass metricsBuilderClass;
jmethodID metricsBuilderConstructor;
jclass nativeColumnarToRowInfoClass;
jmethodID nativeColumnarToRowInfoConstructor;
jclass shuffleReaderMetricsClass;
jmethodID shuffleReaderMetricsSetDecompressTime;
jmethodID shuffleReaderMetricsSetDeserializeTime;
jclass shuffleStreamReaderClass;
jmethodID shuffleStreamReaderNextStream;
class JavaInputStreamAdaptor final : public arrow::io::InputStream {
public:
JavaInputStreamAdaptor(JNIEnv* env, arrow::MemoryPool* pool, jobject jniIn) : pool_(pool) {
// IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD
if (env->GetJavaVM(&vm_) != JNI_OK) {
std::string errorMessage = "Unable to get JavaVM instance";
throw GlutenException(errorMessage);
}
jniIn_ = env->NewGlobalRef(jniIn);
}
~JavaInputStreamAdaptor() override {
try {
auto status = JavaInputStreamAdaptor::Close();
if (!status.ok()) {
LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() failed, status:" << status.ToString();
}
} catch (std::exception& e) {
LOG(WARNING) << __func__ << " call JavaInputStreamAdaptor::Close() got exception:" << e.what();
}
}
// not thread safe
arrow::Status Close() override {
if (closed_) {
return arrow::Status::OK();
}
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
env->CallVoidMethod(jniIn_, jniByteInputStreamClose);
checkException(env);
env->DeleteGlobalRef(jniIn_);
vm_->DetachCurrentThread();
closed_ = true;
return arrow::Status::OK();
}
arrow::Result<int64_t> Tell() const override {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jlong told = env->CallLongMethod(jniIn_, jniByteInputStreamTell);
checkException(env);
return told;
}
bool closed() const override {
return closed_;
}
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jlong read = env->CallLongMethod(jniIn_, jniByteInputStreamRead, reinterpret_cast<jlong>(out), nbytes);
checkException(env);
return read;
}
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
GLUTEN_ASSIGN_OR_THROW(auto buffer, arrow::AllocateResizableBuffer(nbytes, pool_))
GLUTEN_ASSIGN_OR_THROW(int64_t bytes_read, Read(nbytes, buffer->mutable_data()))
GLUTEN_THROW_NOT_OK(buffer->Resize(bytes_read, false));
buffer->ZeroPadding();
return std::move(buffer);
}
private:
arrow::MemoryPool* pool_;
JavaVM* vm_;
jobject jniIn_;
bool closed_ = false;
};
/// Internal backend consists of empty implementations of Runtime API and MemoryManager API.
/// The backend is used for saving contextual objects only.
///
/// It's also possible to extend the implementation for handling Arrow-based requests either in the future.
inline static const std::string kInternalBackendKind{"internal"};
class InternalMemoryManager : public MemoryManager {
public:
InternalMemoryManager(const std::string& kind) : MemoryManager(kind) {}
arrow::MemoryPool* defaultArrowMemoryPool() override {
throw GlutenException("Not implemented");
}
std::shared_ptr<arrow::MemoryPool> getOrCreateArrowMemoryPool(const std::string& name) override {
throw GlutenException("Not yet implemented");
}
const MemoryUsageStats collectMemoryUsageStats() const override {
return MemoryUsageStats();
}
const int64_t shrink(int64_t size) override {
return 0;
}
void hold() override {}
};
class InternalRuntime : public Runtime {
public:
InternalRuntime(
const std::string& kind,
MemoryManager* memoryManager,
const std::unordered_map<std::string, std::string>& confMap)
: Runtime(kind, memoryManager, confMap) {}
};
MemoryManager* internalMemoryManagerFactory(const std::string& kind, std::unique_ptr<AllocationListener> listener) {
return new InternalMemoryManager(kind);
}
void internalMemoryManagerReleaser(MemoryManager* memoryManager) {
delete memoryManager;
}
Runtime* internalRuntimeFactory(
const std::string& kind,
MemoryManager* memoryManager,
const std::unordered_map<std::string, std::string>& sessionConf) {
return new InternalRuntime(kind, memoryManager, sessionConf);
}
void internalRuntimeReleaser(Runtime* runtime) {
delete runtime;
}
class ShuffleStreamReader : public StreamReader {
public:
ShuffleStreamReader(JNIEnv* env, jobject reader) {
if (env->GetJavaVM(&vm_) != JNI_OK) {
throw GlutenException("Unable to get JavaVM instance");
}
ref_ = env->NewGlobalRef(reader);
}
~ShuffleStreamReader() override {
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
env->DeleteGlobalRef(ref_);
}
std::shared_ptr<arrow::io::InputStream> readNextStream(arrow::MemoryPool* pool) override {
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jobject jniIn = env->CallObjectMethod(ref_, shuffleStreamReaderNextStream);
checkException(env);
if (jniIn == nullptr) {
return nullptr; // No more streams to read
}
std::shared_ptr<arrow::io::InputStream> in = std::make_shared<JavaInputStreamAdaptor>(env, pool, jniIn);
return in;
}
private:
JavaVM* vm_;
jobject ref_;
};
} // namespace
#ifdef __cplusplus
extern "C" {
#endif
jint JNI_OnLoad(JavaVM* vm, void* reserved) {
JNIEnv* env;
if (vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
return JNI_ERR;
}
getJniCommonState()->ensureInitialized(env);
getJniErrorState()->ensureInitialized(env);
MemoryManager::registerFactory(kInternalBackendKind, internalMemoryManagerFactory, internalMemoryManagerReleaser);
Runtime::registerFactory(kInternalBackendKind, internalRuntimeFactory, internalRuntimeReleaser);
byteArrayClass = createGlobalClassReferenceOrError(env, "[B");
jniUnsafeByteBufferClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/spark/sql/execution/unsafe/JniUnsafeByteBuffer;");
jniUnsafeByteBufferAllocate = env->GetStaticMethodID(
jniUnsafeByteBufferClass, "allocate", "(J)Lorg/apache/spark/sql/execution/unsafe/JniUnsafeByteBuffer;");
jniUnsafeByteBufferAddress = env->GetMethodID(jniUnsafeByteBufferClass, "address", "()J");
jniUnsafeByteBufferSize = env->GetMethodID(jniUnsafeByteBufferClass, "size", "()J");
jniByteInputStreamClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/JniByteInputStream;");
jniByteInputStreamRead = getMethodIdOrError(env, jniByteInputStreamClass, "read", "(JJ)J");
jniByteInputStreamTell = getMethodIdOrError(env, jniByteInputStreamClass, "tell", "()J");
jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, "close", "()V");
splitResultClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/GlutenSplitResult;");
splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>", "(JJJJJJJJJJDJ[J[J)V");
metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");
metricsBuilderConstructor = getMethodIdOrError(
env,
metricsBuilderClass,
"<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
nativeColumnarToRowInfoConstructor = getMethodIdOrError(env, nativeColumnarToRowInfoClass, "<init>", "([I[IJ)V");
shuffleReaderMetricsClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/ShuffleReaderMetrics;");
shuffleReaderMetricsSetDecompressTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "setDecompressTime", "(J)V");
shuffleReaderMetricsSetDeserializeTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "setDeserializeTime", "(J)V");
shuffleStreamReaderClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/ShuffleStreamReader;");
shuffleStreamReaderNextStream = getMethodIdOrError(
env, shuffleStreamReaderClass, "nextStream", "()Lorg/apache/gluten/vectorized/JniByteInputStream;");
return jniVersion;
}
void JNI_OnUnload(JavaVM* vm, void* reserved) {
JNIEnv* env;
vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion);
env->DeleteGlobalRef(jniByteInputStreamClass);
env->DeleteGlobalRef(splitResultClass);
env->DeleteGlobalRef(nativeColumnarToRowInfoClass);
env->DeleteGlobalRef(byteArrayClass);
env->DeleteGlobalRef(jniUnsafeByteBufferClass);
env->DeleteGlobalRef(shuffleReaderMetricsClass);
getJniErrorState()->close();
getJniCommonState()->close();
google::protobuf::ShutdownProtobufLibrary();
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createRuntime( // NOLINT
JNIEnv* env,
jclass,
jstring jBackendType,
jlong nmmHandle,
jbyteArray sessionConf) {
JNI_METHOD_START
MemoryManager* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle);
auto safeArray = getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
auto backendType = jStringToCString(env, jBackendType);
auto runtime = Runtime::create(backendType, memoryManager, sparkConf);
return reinterpret_cast<jlong>(runtime);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_releaseRuntime( // NOLINT
JNIEnv* env,
jclass,
jlong ctxHandle) {
JNI_METHOD_START
auto runtime = jniCastOrThrow<Runtime>(ctxHandle);
Runtime::release(runtime);
JNI_METHOD_END()
}
namespace {
const std::string kBacktraceAllocation = "spark.gluten.memory.backtrace.allocation";
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_create( // NOLINT
JNIEnv* env,
jclass,
jstring jBackendType,
jobject jListener,
jbyteArray sessionConf) {
JNI_METHOD_START
JavaVM* vm;
if (env->GetJavaVM(&vm) != JNI_OK) {
throw GlutenException("Unable to get JavaVM instance");
}
auto backendType = jStringToCString(env, jBackendType);
auto safeArray = getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
std::unique_ptr<AllocationListener> listener = std::make_unique<SparkAllocationListener>(vm, jListener);
bool backtrace = sparkConf.at(kBacktraceAllocation) == "true";
if (backtrace) {
listener = std::make_unique<BacktraceAllocationListener>(std::move(listener));
}
MemoryManager* mm = MemoryManager::create(backendType, std::move(listener));
return reinterpret_cast<jlong>(mm);
JNI_METHOD_END(-1L)
}
JNIEXPORT jbyteArray JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_collectUsage( // NOLINT
JNIEnv* env,
jclass,
jlong nmmHandle) {
JNI_METHOD_START
auto* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle);
const MemoryUsageStats& stats = memoryManager->collectMemoryUsageStats();
auto size = stats.ByteSizeLong();
jbyteArray out = env->NewByteArray(size);
std::vector<uint8_t> buffer(size);
GLUTEN_CHECK(
stats.SerializeToArray(reinterpret_cast<void*>(buffer.data()), size),
"Serialization failed when collecting memory usage stats");
env->SetByteArrayRegion(out, 0, size, reinterpret_cast<jbyte*>(buffer.data()));
return out;
JNI_METHOD_END(nullptr)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_shrink( // NOLINT
JNIEnv* env,
jclass,
jlong nmmHandle,
jlong size) {
JNI_METHOD_START
auto* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle);
return memoryManager->shrink(static_cast<int64_t>(size));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_hold( // NOLINT
JNIEnv* env,
jclass,
jlong nmmHandle) {
JNI_METHOD_START
auto* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle);
memoryManager->hold();
JNI_METHOD_END()
}
JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_release( // NOLINT
JNIEnv* env,
jclass,
jlong nmmHandle) {
JNI_METHOD_START
auto* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle);
MemoryManager::release(memoryManager);
JNI_METHOD_END()
}
JNIEXPORT jstring JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativePlanString( // NOLINT
JNIEnv* env,
jobject wrapper,
jbyteArray planArray,
jboolean details) {
JNI_METHOD_START
auto safeArray = getByteArrayElementsSafe(env, planArray);
auto planData = safeArray.elems();
auto planSize = env->GetArrayLength(planArray);
auto ctx = getRuntime(env, wrapper);
ctx->parsePlan(planData, planSize);
auto& conf = ctx->getConfMap();
auto planString = ctx->planString(details, conf);
return env->NewStringUTF(planString.c_str());
JNI_METHOD_END(nullptr)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_injectWriteFilesTempPath( // NOLINT
JNIEnv* env,
jclass,
jbyteArray path,
jbyteArray fileName) {
JNI_METHOD_START
auto len = env->GetArrayLength(path);
auto safeArray = getByteArrayElementsSafe(env, path);
std::string pathStr(reinterpret_cast<char*>(safeArray.elems()), len);
*Runtime::localWriteFilesTempPath() = pathStr;
len = env->GetArrayLength(fileName);
auto fileNameArray = getByteArrayElementsSafe(env, fileName);
std::string fileNameStr(reinterpret_cast<char*>(fileNameArray.elems()), len);
*Runtime::localWriteFileName() = fileNameStr;
JNI_METHOD_END()
}
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithIterator( // NOLINT
JNIEnv* env,
jobject wrapper,
jbyteArray planArr,
jobjectArray splitInfosArr,
jobjectArray batchItrArray,
jint stageId,
jint partitionId,
jlong taskId,
jboolean enableDumping,
jstring spillDir) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
ctx->setSparkTaskInfo({stageId, partitionId, taskId});
if (enableDumping) {
ctx->enableDumping();
}
auto spillDirStr = jStringToCString(env, spillDir);
auto safePlanArray = getByteArrayElementsSafe(env, planArr);
auto planSize = env->GetArrayLength(planArr);
ctx->parsePlan(safePlanArray.elems(), planSize);
if (splitInfosArr != nullptr) {
for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i < splitInfoArraySize; i++) {
jbyteArray splitInfoArray = static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
auto splitInfoData = safeSplitArray.elems();
ctx->parseSplitInfo(splitInfoData, splitInfoSize, i);
}
}
// Handle the Java iters
std::vector<std::shared_ptr<ResultIterator>> inputIters;
if (batchItrArray != nullptr) {
jsize itersLen = env->GetArrayLength(batchItrArray);
inputIters.reserve(itersLen);
for (int idx = 0; idx < itersLen; idx++) {
jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, ctx, idx);
auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
inputIters.push_back(std::move(resultIter));
}
}
return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (iter == nullptr) {
std::string errorMessage =
"When hasNext() is called on a closed iterator, an exception is thrown. To prevent this, consider using the protectInvocationFlow() method when creating the iterator in scala side. This will allow the hasNext() method to be called multiple times without issue.";
throw GlutenException(errorMessage);
}
return iter->hasNext();
JNI_METHOD_END(false)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeNext( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (!iter->hasNext()) {
return kInvalidObjectHandle;
}
std::shared_ptr<ColumnarBatch> batch = iter->next();
auto batchHandle = ctx->saveObject(batch);
iter->setExportNanos(batch->getExportNanos());
return batchHandle;
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapper_nativeFetchMetrics( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
auto metrics = iter->getMetrics();
unsigned int numMetrics = 0;
if (metrics) {
numMetrics = metrics->numMetrics;
}
jlongArray longArray[Metrics::kNum];
for (auto i = static_cast<int>(Metrics::kBegin); i != static_cast<int>(Metrics::kEnd); ++i) {
longArray[i] = env->NewLongArray(numMetrics);
if (metrics) {
env->SetLongArrayRegion(longArray[i], 0, numMetrics, metrics->get((Metrics::TYPE)i));
}
}
return env->NewObject(
metricsBuilderClass,
metricsBuilderConstructor,
longArray[Metrics::kInputRows],
longArray[Metrics::kInputVectors],
longArray[Metrics::kInputBytes],
longArray[Metrics::kRawInputRows],
longArray[Metrics::kRawInputBytes],
longArray[Metrics::kOutputRows],
longArray[Metrics::kOutputVectors],
longArray[Metrics::kOutputBytes],
longArray[Metrics::kCpuCount],
longArray[Metrics::kWallNanos],
metrics ? metrics->veloxToArrow : -1,
longArray[Metrics::kPeakMemoryBytes],
longArray[Metrics::kNumMemoryAllocations],
longArray[Metrics::kSpilledInputBytes],
longArray[Metrics::kSpilledBytes],
longArray[Metrics::kSpilledRows],
longArray[Metrics::kSpilledPartitions],
longArray[Metrics::kSpilledFiles],
longArray[Metrics::kNumDynamicFiltersProduced],
longArray[Metrics::kNumDynamicFiltersAccepted],
longArray[Metrics::kNumReplacedWithDynamicFilterRows],
longArray[Metrics::kNumDynamicFilterInputRows],
longArray[Metrics::kFlushRowCount],
longArray[Metrics::kLoadedToValueHook],
longArray[Metrics::kBloomFilterBlocksByteSize],
longArray[Metrics::kScanTime],
longArray[Metrics::kSkippedSplits],
longArray[Metrics::kProcessedSplits],
longArray[Metrics::kSkippedStrides],
longArray[Metrics::kProcessedStrides],
longArray[Metrics::kRemainingFilterTime],
longArray[Metrics::kIoWaitTime],
longArray[Metrics::kStorageReadBytes],
longArray[Metrics::kLocalReadBytes],
longArray[Metrics::kRamReadBytes],
longArray[Metrics::kPreloadSplits],
longArray[Metrics::kPageLoadTime],
longArray[Metrics::kDataSourceAddSplitWallNanos],
longArray[Metrics::kDataSourceReadWallNanos],
longArray[Metrics::kPhysicalWrittenBytes],
longArray[Metrics::kWriteIOTime],
longArray[Metrics::kNumWrittenFiles],
longArray[Metrics::kLoadLazyVectorTime],
metrics && metrics->stats.has_value() ? env->NewStringUTF(metrics->stats->c_str()) : nullptr);
JNI_METHOD_END(nullptr)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeSpill( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle,
jlong size) {
JNI_METHOD_START
auto it = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (it == nullptr) {
std::string errorMessage = "Invalid result iter handle " + std::to_string(iterHandle);
throw GlutenException(errorMessage);
}
return it->spillFixedSize(size);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeClose( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
ObjectStore::release(iterHandle);
JNI_METHOD_END()
}
JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeAddIteratorSplits( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle,
jobjectArray batchItrArray) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto outIter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (outIter == nullptr) {
throw GlutenException("Invalid iterator handle for addSplits");
}
// Get the underlying split-aware iterator
auto* splitAwareIter = dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(outIter->getInputIter());
if (splitAwareIter == nullptr) {
throw GlutenException("Iterator does not support split management");
}
GLUTEN_CHECK(batchItrArray != nullptr, "FATAL: Splits to add cannot be null");
// Convert Java ColumnarBatchInIterator[] to native iterators and add as splits
jsize numIterators = env->GetArrayLength(batchItrArray);
std::vector<std::shared_ptr<ResultIterator>> inputIterators;
inputIterators.reserve(numIterators);
for (jsize idx = 0; idx < numIterators; idx++) {
jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
if (iter == nullptr) {
inputIterators.push_back(nullptr);
} else {
auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, ctx, idx);
auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
inputIterators.push_back(std::move(resultIter));
}
env->DeleteLocalRef(iter);
}
// Add iterator splits via interface method
if (!inputIterators.empty()) {
splitAwareIter->addIteratorSplits(inputIterators);
}
return true;
JNI_METHOD_END(false)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeNoMoreSplits( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (iter == nullptr) {
throw GlutenException("Invalid iterator handle for noMoreSplits");
}
ctx->noMoreSplits(iter.get());
JNI_METHOD_END()
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeRequestBarrier( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong iterHandle) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
if (iter == nullptr) {
throw GlutenException("Invalid iterator handle for requestBarrier");
}
ctx->requestBarrier(iter.get());
JNI_METHOD_END()
}
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit( // NOLINT
JNIEnv* env,
jobject wrapper) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto& conf = ctx->getConfMap();
int64_t column2RowMemThreshold;
auto it = conf.find(kColumnarToRowMemoryThreshold);
GLUTEN_CHECK(!(it == conf.end()), "Required key not found in runtime config: " + kColumnarToRowMemoryThreshold);
column2RowMemThreshold = std::stoll(it->second);
// Convert the native batch to Spark unsafe row.
return ctx->saveObject(ctx->createColumnar2RowConverter(column2RowMemThreshold));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowConvert( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong c2rHandle,
jlong batchHandle,
jlong startRow) {
JNI_METHOD_START
auto columnarToRowConverter = ObjectStore::retrieve<ColumnarToRowConverter>(c2rHandle);
auto cb = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
columnarToRowConverter->convert(cb, startRow);
const auto& offsets = columnarToRowConverter->getOffsets();
const auto& lengths = columnarToRowConverter->getLengths();
auto numRows = columnarToRowConverter->numRows();
auto offsetsArr = env->NewIntArray(numRows);
auto offsetsSrc = reinterpret_cast<const jint*>(offsets.data());
env->SetIntArrayRegion(offsetsArr, 0, numRows, offsetsSrc);
auto lengthsArr = env->NewIntArray(numRows);
auto lengthsSrc = reinterpret_cast<const jint*>(lengths.data());
env->SetIntArrayRegion(lengthsArr, 0, numRows, lengthsSrc);
long address = reinterpret_cast<long>(columnarToRowConverter->getBufferAddress());
jobject nativeColumnarToRowInfo =
env->NewObject(nativeColumnarToRowInfoClass, nativeColumnarToRowInfoConstructor, offsetsArr, lengthsArr, address);
return nativeColumnarToRowInfo;
JNI_METHOD_END(nullptr)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeClose( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong c2rHandle) {
JNI_METHOD_START
ObjectStore::release(c2rHandle);
JNI_METHOD_END()
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_NativeRowToColumnarJniWrapper_init( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong cSchema) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
return ctx->saveObject(ctx->createRow2ColumnarConverter(reinterpret_cast<struct ArrowSchema*>(cSchema)));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_NativeRowToColumnarJniWrapper_nativeConvertRowToColumnar( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong r2cHandle,
jlongArray rowLength,
jlong memoryAddress) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
if (rowLength == nullptr) {
throw GlutenException("Native convert row to columnar: buf_addrs can't be null");
}
int numRows = env->GetArrayLength(rowLength);
auto safeArray = getLongArrayElementsSafe(env, rowLength);
uint8_t* address = reinterpret_cast<uint8_t*>(memoryAddress);
auto converter = ObjectStore::retrieve<RowToColumnarConverter>(r2cHandle);
auto cb = converter->convert(numRows, safeArray.elems(), address);
return ctx->saveObject(cb);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_NativeRowToColumnarJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong r2cHandle) {
JNI_METHOD_START
ObjectStore::release(r2cHandle);
JNI_METHOD_END()
}
JNIEXPORT jstring JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_getType( // NOLINT
JNIEnv* env,
jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return env->NewStringUTF(batch->getType().c_str());
JNI_METHOD_END(nullptr)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_numBytes( // NOLINT
JNIEnv* env,
jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return batch->numBytes();
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_numColumns( // NOLINT
JNIEnv* env,
jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return batch->numColumns();
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_numRows( // NOLINT
JNIEnv* env,
jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
return batch->numRows();
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_exportToArrow( // NOLINT
JNIEnv* env,
jclass,
jlong batchHandle,
jlong cSchema,
jlong cArray) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
std::shared_ptr<ArrowSchema> exportedSchema = batch->exportArrowSchema();
std::shared_ptr<ArrowArray> exportedArray = batch->exportArrowArray();
ArrowSchemaMove(exportedSchema.get(), reinterpret_cast<struct ArrowSchema*>(cSchema));
ArrowArrayMove(exportedArray.get(), reinterpret_cast<struct ArrowArray*>(cArray));
JNI_METHOD_END()
}
JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_close( // NOLINT
JNIEnv* env,
jclass,
jlong batchHandle) {
JNI_METHOD_START
ObjectStore::release(batchHandle);
JNI_METHOD_END()
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_createWithArrowArray( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong cSchema,
jlong cArray) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
std::unique_ptr<ArrowSchema> targetSchema = std::make_unique<ArrowSchema>();
std::unique_ptr<ArrowArray> targetArray = std::make_unique<ArrowArray>();
auto* arrowSchema = reinterpret_cast<ArrowSchema*>(cSchema);
auto* arrowArray = reinterpret_cast<ArrowArray*>(cArray);
ArrowArrayMove(arrowArray, targetArray.get());
ArrowSchemaMove(arrowSchema, targetSchema.get());
std::shared_ptr<ColumnarBatch> batch =
std::make_shared<ArrowCStructColumnarBatch>(std::move(targetSchema), std::move(targetArray));
return ctx->saveObject(batch);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_getForEmptySchema( // NOLINT
JNIEnv* env,
jobject wrapper,
jint numRows) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
return ctx->saveObject(ctx->createOrGetEmptySchemaBatch(static_cast<int32_t>(numRows)));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_select( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong batchHandle,
jintArray jcolumnIndices) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto safeArray = getIntArrayElementsSafe(env, jcolumnIndices);
int size = env->GetArrayLength(jcolumnIndices);
std::vector<int32_t> columnIndices;
columnIndices.reserve(size);
for (int32_t i = 0; i < size; i++) {
columnIndices.push_back(safeArray.elems()[i]);
}
return ctx->saveObject(ctx->select(ObjectStore::retrieve<ColumnarBatch>(batchHandle), std::move(columnIndices)));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartitionWriter( // NOLINT
JNIEnv* env,
jobject wrapper,
jint numPartitions,
jstring codecJstr,
jstring codecBackendJstr,
jint compressionLevel,
jint compressionBufferSize,
jint compressionThreshold,
jint mergeBufferSize,
jdouble mergeThreshold,
jint numSubDirs,
jint shuffleFileBufferSize,
jstring dataFileJstr,
jstring localDirsJstr,
jboolean enableDictionary) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
auto dataFile = jStringToCString(env, dataFileJstr);
auto localDirs = splitPaths(jStringToCString(env, localDirsJstr));
auto partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>(
shuffleFileBufferSize,
compressionBufferSize,
compressionThreshold,
mergeBufferSize,
mergeThreshold,
numSubDirs,
enableDictionary);
auto partitionWriter = std::make_shared<LocalPartitionWriter>(
numPartitions,
createCompressionCodec(
getCompressionType(env, codecJstr), getCodecBackend(env, codecBackendJstr), compressionLevel),
ctx->memoryManager(),
partitionWriterOptions,
dataFile,
std::move(localDirs));
return ctx->saveObject(partitionWriter);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_createHashShuffleWriter(
JNIEnv* env,
jobject wrapper,
jint numPartitions,
jstring partitioningNameJstr,
jint startPartitionId,
jint splitBufferSize,
jdouble splitBufferReallocThreshold,
jlong partitionWriterHandle) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
auto partitionWriter = ObjectStore::retrieve<PartitionWriter>(partitionWriterHandle);
if (partitionWriter == nullptr) {
throw GlutenException("Partition writer handle is invalid: " + std::to_string(partitionWriterHandle));
}
ObjectStore::release(partitionWriterHandle);
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>(
toPartitioning(jStringToCString(env, partitioningNameJstr)),
startPartitionId,
splitBufferSize,
splitBufferReallocThreshold);
return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_createGpuHashShuffleWriter(
JNIEnv* env,
jobject wrapper,
jint numPartitions,
jstring partitioningNameJstr,
jint startPartitionId,
jint splitBufferSize,
jdouble splitBufferReallocThreshold,
jlong partitionWriterHandle) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
auto partitionWriter = ObjectStore::retrieve<PartitionWriter>(partitionWriterHandle);
if (partitionWriter == nullptr) {
throw GlutenException("Partition writer handle is invalid: " + std::to_string(partitionWriterHandle));
}
ObjectStore::release(partitionWriterHandle);
auto shuffleWriterOptions = std::make_shared<GpuHashShuffleWriterOptions>(
toPartitioning(jStringToCString(env, partitioningNameJstr)),
startPartitionId,
splitBufferSize,
splitBufferReallocThreshold);
return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_createSortShuffleWriter(
JNIEnv* env,
jobject wrapper,
jint numPartitions,
jstring partitioningNameJstr,
jint startPartitionId,
jint diskWriteBufferSize,
jint initialSortBufferSize,
jboolean useRadixSort,
jlong partitionWriterHandle) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
auto partitionWriter = ObjectStore::retrieve<PartitionWriter>(partitionWriterHandle);
if (partitionWriter == nullptr) {
throw GlutenException("Partition writer handle is invalid: " + std::to_string(partitionWriterHandle));
}
ObjectStore::release(partitionWriterHandle);
auto shuffleWriterOptions = std::make_shared<SortShuffleWriterOptions>(
toPartitioning(jStringToCString(env, partitioningNameJstr)),
startPartitionId,
initialSortBufferSize,
diskWriteBufferSize,
static_cast<bool>(useRadixSort));
return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_createRssSortShuffleWriter(
JNIEnv* env,
jobject wrapper,
jint numPartitions,
jstring partitioningNameJstr,
jint startPartitionId,
jint splitBufferSize,
jlong sortBufferMaxSize,
jstring codecJstr,
jlong partitionWriterHandle) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
auto partitionWriter = ObjectStore::retrieve<PartitionWriter>(partitionWriterHandle);
if (partitionWriter == nullptr) {
throw GlutenException("Partition writer handle is invalid: " + std::to_string(partitionWriterHandle));
}
ObjectStore::release(partitionWriterHandle);
auto shuffleWriterOptions = std::make_shared<RssSortShuffleWriterOptions>(
toPartitioning(jStringToCString(env, partitioningNameJstr)),
startPartitionId,
splitBufferSize,
sortBufferMaxSize,
getCompressionType(env, codecJstr));
return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_reclaim( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleWriterHandle,
jlong size) {
JNI_METHOD_START
auto shuffleWriter = ObjectStore::retrieve<ShuffleWriter>(shuffleWriterHandle);
if (!shuffleWriter) {
std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle);
throw GlutenException(errorMessage);
}
int64_t evictedSize;
arrowAssertOkOrThrow(shuffleWriter->reclaimFixedSize(size, &evictedSize), "(shuffle) nativeEvict: evict failed");
return evictedSize;
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_write( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleWriterHandle,
jint numRows,
jlong batchHandle,
jlong memLimit) {
JNI_METHOD_START
auto shuffleWriter = ObjectStore::retrieve<ShuffleWriter>(shuffleWriterHandle);
if (!shuffleWriter) {
std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle);
throw GlutenException(errorMessage);
}
// The column batch maybe VeloxColumnBatch or ArrowCStructColumnarBatch(FallbackRangeShuffleWriter)
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
arrowAssertOkOrThrow(shuffleWriter->write(batch, memLimit), "Native write: shuffle writer failed");
return shuffleWriter->bytesWritten();
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_stop( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleWriterHandle) {
JNI_METHOD_START
auto shuffleWriter = ObjectStore::retrieve<ShuffleWriter>(shuffleWriterHandle);
if (!shuffleWriter) {
std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle);
throw GlutenException(errorMessage);
}
arrowAssertOkOrThrow(shuffleWriter->stop(), "Native shuffle write: ShuffleWriter stop failed");
const auto& partitionLengths = shuffleWriter->partitionLengths();
auto partitionLengthArr = env->NewLongArray(partitionLengths.size());
auto src = reinterpret_cast<const jlong*>(partitionLengths.data());
env->SetLongArrayRegion(partitionLengthArr, 0, partitionLengths.size(), src);
const auto& rawPartitionLengths = shuffleWriter->rawPartitionLengths();
auto rawPartitionLengthArr = env->NewLongArray(rawPartitionLengths.size());
auto rawSrc = reinterpret_cast<const jlong*>(rawPartitionLengths.data());
env->SetLongArrayRegion(rawPartitionLengthArr, 0, rawPartitionLengths.size(), rawSrc);
jobject splitResult = env->NewObject(
splitResultClass,
splitResultConstructor,
0L,
shuffleWriter->totalWriteTime(),
shuffleWriter->totalEvictTime(),
shuffleWriter->totalCompressTime(),
shuffleWriter->totalSortTime(),
shuffleWriter->totalC2RTime(),
shuffleWriter->bytesWritten(),
shuffleWriter->totalBytesEvicted(),
shuffleWriter->totalBytesToEvict(),
shuffleWriter->peakBytesAllocated(),
shuffleWriter->avgDictionaryFields(),
shuffleWriter->dictionarySize(),
partitionLengthArr,
rawPartitionLengthArr);
return splitResult;
JNI_METHOD_END(nullptr)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleWriterHandle) {
JNI_METHOD_START
ObjectStore::release(shuffleWriterHandle);
JNI_METHOD_END()
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_OnHeapJniByteInputStream_memCopyFromHeap( // NOLINT
JNIEnv* env,
jobject,
jbyteArray source,
jlong destAddress,
jint size) {
JNI_METHOD_START
auto safeArray = getByteArrayElementsSafe(env, source);
std::memcpy(reinterpret_cast<void*>(destAddress), safeArray.elems(), size);
JNI_METHOD_END()
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper_make( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong cSchema,
jstring compressionType,
jstring compressionBackend,
jint batchSize,
jlong readerBufferSize,
jlong deserializerBufferSize,
jstring shuffleWriterType) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
ShuffleReaderOptions options = ShuffleReaderOptions{};
options.compressionType = getCompressionType(env, compressionType);
if (compressionType != nullptr) {
options.codecBackend = getCodecBackend(env, compressionBackend);
}
options.batchSize = batchSize;
options.readerBufferSize = readerBufferSize;
options.deserializerBufferSize = deserializerBufferSize;
options.shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
std::shared_ptr<arrow::Schema> schema =
arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct ArrowSchema*>(cSchema)));
return ctx->saveObject(ctx->createShuffleReader(schema, options));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper_read( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleReaderHandle,
jobject jStreamReader) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto reader = ObjectStore::retrieve<ShuffleReader>(shuffleReaderHandle);
auto streamReader = std::make_shared<ShuffleStreamReader>(env, jStreamReader);
auto outItr = reader->read(streamReader);
return ctx->saveObject(outItr);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper_populateMetrics( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleReaderHandle,
jobject metrics) {
JNI_METHOD_START
auto reader = ObjectStore::retrieve<ShuffleReader>(shuffleReaderHandle);
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDecompressTime, reader->getDecompressTime());
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDeserializeTime, reader->getDeserializeTime());
checkException(env);
JNI_METHOD_END()
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong shuffleReaderHandle) {
JNI_METHOD_START
auto reader = ObjectStore::retrieve<ShuffleReader>(shuffleReaderHandle);
ObjectStore::release(shuffleReaderHandle);
JNI_METHOD_END()
}
JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_serialize( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong handle) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
GLUTEN_DCHECK(batch != nullptr, "Cannot find the ColumnarBatch with handle " + std::to_string(handle));
auto serializer = ctx->createColumnarBatchSerializer(nullptr);
serializer->append(batch);
auto serializedSize = serializer->maxSerializedSize();
auto byteBuffer = env->CallStaticObjectMethod(jniUnsafeByteBufferClass, jniUnsafeByteBufferAllocate, serializedSize);
auto byteBufferAddress = env->CallLongMethod(byteBuffer, jniUnsafeByteBufferAddress);
auto byteBufferSize = env->CallLongMethod(byteBuffer, jniUnsafeByteBufferSize);
serializer->serializeTo(reinterpret_cast<uint8_t*>(byteBufferAddress), byteBufferSize);
return byteBuffer;
JNI_METHOD_END(nullptr)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_init( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong cSchema) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
return ctx->saveObject(ctx->createColumnarBatchSerializer(reinterpret_cast<struct ArrowSchema*>(cSchema)));
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_deserialize( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong serializerHandle,
jbyteArray data) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto serializer = ObjectStore::retrieve<ColumnarBatchSerializer>(serializerHandle);
GLUTEN_DCHECK(serializer != nullptr, "ColumnarBatchSerializer cannot be null");
int32_t size = env->GetArrayLength(data);
auto safeArray = getByteArrayElementsSafe(env, data);
auto batch = serializer->deserialize(safeArray.elems(), size);
return ctx->saveObject(batch);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_deserializeDirect( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong serializerHandle,
jlong address,
jint size) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto serializer = ObjectStore::retrieve<ColumnarBatchSerializer>(serializerHandle);
GLUTEN_DCHECK(serializer != nullptr, "ColumnarBatchSerializer cannot be null");
auto batch = serializer->deserialize((uint8_t*)address, size);
return ctx->saveObject(batch);
JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong serializerHandle) {
JNI_METHOD_START
ObjectStore::release(serializerHandle);
JNI_METHOD_END()
}
#ifdef __cplusplus
}
#endif