blob: 675bef8311a9cb7f9a251f7348b166a45811dfce [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <iterator>
#include <thread>
#include <vector>
#include <benchmark/benchmark.h>
#include "arrow/buffer.h"
#include "arrow/util/logging.h"
#include "arrow/util/queue.h"
namespace arrow {
namespace util {
static constexpr int64_t kSize = 100000;
void SpscQueueThroughput(benchmark::State& state) {
SpscQueue<std::shared_ptr<Buffer>> queue(16);
std::vector<std::shared_ptr<Buffer>> source;
std::vector<std::shared_ptr<Buffer>> sink;
source.reserve(kSize);
sink.resize(kSize);
const uint8_t data[1] = {0};
for (int64_t i = 0; i < kSize; i++) {
source.push_back(std::make_shared<Buffer>(data, 1));
}
for (auto _ : state) {
std::thread producer([&] {
auto itr = std::make_move_iterator(source.begin());
auto end = std::make_move_iterator(source.end());
while (itr != end) {
while (!queue.Write(*itr)) {
}
itr++;
}
});
std::thread consumer([&] {
auto itr = sink.begin();
auto end = sink.end();
while (itr != end) {
auto next = queue.FrontPtr();
if (next != nullptr) {
(*itr).swap(*next);
queue.PopFront();
itr++;
}
}
});
producer.join();
consumer.join();
std::swap(source, sink);
}
for (const auto& buf : source) {
ARROW_CHECK(buf && buf->size() == 1);
}
state.SetItemsProcessed(state.iterations() * kSize);
}
BENCHMARK(SpscQueueThroughput)->UseRealTime();
} // namespace util
} // namespace arrow