blob: e91009d5860b742bde3d66f1c9bf8e1e0c096dac [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <vector>
#include "arrow/util/logging.h"
#include "arrow/util/memory.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
namespace internal {
inline uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
uintptr_t value = reinterpret_cast<uintptr_t>(address);
return reinterpret_cast<uint8_t*>(value & bits);
}
// This function is just for avoiding MinGW-w64 32bit crash.
// See also: https://sourceforge.net/p/mingw-w64/bugs/767/
void* wrap_memcpy(void* dst, const void* src, size_t n) { return memcpy(dst, src, n); }
void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
uintptr_t block_size, int num_threads) {
// XXX This function is really using `num_threads + 1` threads.
auto pool = GetCpuThreadPool();
uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
int64_t num_blocks = (right - left) / block_size;
// Update right address
right = right - (num_blocks % num_threads) * block_size;
// Now we divide these blocks between available threads. The remainder is
// handled separately.
size_t chunk_size = (right - left) / num_threads;
int64_t prefix = left - src;
int64_t suffix = src + nbytes - right;
// Now the data layout is | prefix | k * num_threads * block_size | suffix |.
// We have chunk_size = k * block_size, therefore the data layout is
// | prefix | num_threads * chunk_size | suffix |.
// Each thread gets a "chunk" of k blocks.
// Start all parallel memcpy tasks and handle leftovers while threads run.
std::vector<Future<void*>> futures;
for (int i = 0; i < num_threads; i++) {
futures.push_back(*pool->Submit(wrap_memcpy, dst + prefix + i * chunk_size,
left + i * chunk_size, chunk_size));
}
memcpy(dst, src, prefix);
memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
for (auto& fut : futures) {
ARROW_CHECK_OK(fut.status());
}
}
} // namespace internal
} // namespace arrow