| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "ShuffleWriter.h" |
| #include <Compression/CompressedWriteBuffer.h> |
| #include <Compression/CompressionFactory.h> |
| #include <Shuffle/WriteBufferFromJavaOutputStream.h> |
| #include <boost/algorithm/string/case_conv.hpp> |
| |
| using namespace DB; |
| |
| namespace local_engine |
| { |
| ShuffleWriter::ShuffleWriter( |
| jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size) |
| { |
| compression_enable = enable_compression; |
| write_buffer = std::make_unique<WriteBufferFromJavaOutputStream>(output_stream, buffer, customize_buffer_size); |
| if (compression_enable) |
| { |
| auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), level < 0 ? std::nullopt : std::optional<int>(level)); |
| compressed_out = std::make_unique<CompressedWriteBuffer>(*write_buffer, codec); |
| } |
| } |
| void ShuffleWriter::write(const Block & block) |
| { |
| if (!native_writer) |
| { |
| if (compression_enable) |
| { |
| native_writer = std::make_unique<NativeWriter>(*compressed_out, block.cloneEmpty()); |
| } |
| else |
| { |
| native_writer = std::make_unique<NativeWriter>(*write_buffer, block.cloneEmpty()); |
| } |
| } |
| if (block.rows() > 0) |
| { |
| native_writer->write(block); |
| } |
| } |
| void ShuffleWriter::flush() const |
| { |
| if (native_writer) |
| native_writer->flush(); |
| } |
| |
| ShuffleWriter::~ShuffleWriter() |
| { |
| if (native_writer) |
| native_writer->flush(); |
| |
| if (compression_enable) |
| compressed_out->finalize(); |
| |
| write_buffer->finalize(); |
| } |
| } |