blob: 8cb2e73f16c753a85074fbf2ae9fd92ae0d2aeea [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __COMMON_RECORDIO_HPP__
#define __COMMON_RECORDIO_HPP__
#include <queue>
#include <string>
#include <utility>
#include <mesos/mesos.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/http.hpp>
#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/recordio.hpp>
#include <stout/result.hpp>
namespace mesos {
namespace internal {
namespace recordio {
namespace internal {
template <typename T>
class ReaderProcess;
} // namespace internal {
/**
* Provides RecordIO decoding on top of an http::Pipe::Reader.
* The caller is responsible for closing the http::Pipe::Reader
* when a failure is encountered or end-of-file is reached.
*
* TODO(bmahler): Since we currently do not have a generalized
* abstraction in libprocess for "streams" of asynchronous data
* (e.g. process::Stream<T>), we have to create a one-off wrapper
* here. In the future, this would be better expressed as "piping"
* data from a stream of raw bytes into a decoder, which yields a
* stream of typed data.
*/
template <typename T>
class Reader
{
public:
// We spawn `ReaderProcess` as a managed process to guarantee
// that it does not wait on itself (this would cause a deadlock!).
// See comments in `Connection::Data` for further details.
Reader(::recordio::Decoder<T>&& decoder,
process::http::Pipe::Reader reader)
: process(process::spawn(
new internal::ReaderProcess<T>(std::move(decoder), reader),
true)) {}
virtual ~Reader()
{
// Note that we pass 'false' here to avoid injecting the
// termination event at the front of the queue. This is
// to ensure we don't drop any queued request dispatches
// which would leave the caller with a future stuck in
// a pending state.
process::terminate(process, false);
}
/**
* Returns the next piece of decoded data from the pipe.
* Returns error if an individual record could not be decoded.
* Returns none when end-of-file is reached.
* Returns failure when the pipe or decoder has failed.
*/
process::Future<Result<T>> read()
{
return process::dispatch(process, &internal::ReaderProcess<T>::read);
}
private:
process::PID<internal::ReaderProcess<T>> process;
};
/**
* This is a helper function that reads records from a `Reader`, applies
* a transformation to the records and writes to the pipe.
*
* Returns a failed future if there are any errors reading or writing.
* The future is satisfied when we get a EOF.
*
* TODO(vinod): Split this method into primitives that can transform a
* stream of bytes to a stream of typed records that can be further transformed.
* See the TODO above in `Reader` for further details.
*/
template <typename T>
process::Future<Nothing> transform(
process::Owned<Reader<T>>&& reader,
const std::function<std::string(const T&)>& func,
process::http::Pipe::Writer writer)
{
return process::loop(
None(),
[=]() {
return reader->read();
},
[=](const Result<T>& record) mutable
-> process::Future<process::ControlFlow<Nothing>> {
// This could happen if EOF is sent by the writer.
if (record.isNone()) {
return process::Break();
}
// This could happen if there is a de-serialization error.
if (record.isError()) {
return process::Failure(record.error());
}
// TODO(vinod): Instead of detecting that the reader went away only
// after attempting a write, leverage `writer.readerClosed` future.
if (!writer.write(func(record.get()))) {
return process::Failure("Write failed to the pipe");
}
return process::Continue();
});
}
namespace internal {
template <typename T>
class ReaderProcess : public process::Process<ReaderProcess<T>>
{
public:
ReaderProcess(
::recordio::Decoder<T>&& _decoder,
process::http::Pipe::Reader _reader)
: process::ProcessBase(process::ID::generate("__reader__")),
decoder(_decoder),
reader(_reader),
done(false) {}
~ReaderProcess() override {}
process::Future<Result<T>> read()
{
if (!records.empty()) {
Result<T> record = std::move(records.front());
records.pop();
return record;
}
if (error.isSome()) {
return process::Failure(error->message);
}
if (done) {
return None();
}
auto waiter = process::Owned<process::Promise<Result<T>>>(
new process::Promise<Result<T>>());
waiters.push(std::move(waiter));
return waiters.back()->future();
}
protected:
void initialize() override
{
consume();
}
void finalize() override
{
// Fail any remaining waiters.
fail("Reader is terminating");
}
private:
void fail(const std::string& message)
{
error = Error(message);
while (!waiters.empty()) {
waiters.front()->fail(message);
waiters.pop();
}
}
void complete()
{
done = true;
while (!waiters.empty()) {
waiters.front()->set(Result<T>::none());
waiters.pop();
}
}
using process::Process<ReaderProcess<T>>::consume;
void consume()
{
reader.read()
.onAny(process::defer(this, &ReaderProcess::_consume, lambda::_1));
}
void _consume(const process::Future<std::string>& read)
{
if (!read.isReady()) {
fail("Pipe::Reader failure: " +
(read.isFailed() ? read.failure() : "discarded"));
return;
}
// Have we reached EOF?
if (read->empty()) {
complete();
return;
}
Try<std::deque<Try<T>>> decode = decoder.decode(read.get());
if (decode.isError()) {
fail("Decoder failure: " + decode.error());
return;
}
foreach (const Try<T>& record, decode.get()) {
if (!waiters.empty()) {
waiters.front()->set(Result<T>(std::move(record)));
waiters.pop();
} else {
records.push(std::move(record));
}
}
consume();
}
::recordio::Decoder<T> decoder;
process::http::Pipe::Reader reader;
std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
std::queue<Result<T>> records;
bool done;
Option<Error> error;
};
} // namespace internal {
} // namespace recordio {
} // namespace internal {
} // namespace mesos {
#endif // __COMMON_RECORDIO_HPP__