blob: 5dd68802ed385c3f7df39e2060bd1f087dbe8ef6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <ostream>
#include <string>
#include <gtest/gtest.h>
#include <process/gtest.hpp>
#include <stout/recordio.hpp>
#include <stout/result.hpp>
#include <stout/strings.hpp>
#include "common/recordio.hpp"
using process::Future;
using std::string;
using namespace mesos;
using namespace mesos::internal;
template <typename T>
bool operator==(const Result<T>& lhs, const Result<T>& rhs)
{
if (lhs.isNone()) {
return rhs.isNone();
}
if (lhs.isError()) {
return rhs.isError() && rhs.error() == lhs.error();
}
return rhs.isSome() && lhs.get() == rhs.get();
}
template <typename T>
std::ostream& operator<<(std::ostream& stream, const Result<T>& r)
{
if (r.isNone()) {
return stream << "none";
}
if (r.isError()) {
return stream << "error(\"" << r.error() << "\")";
}
return stream << r.get();
}
TEST(RecordIOReaderTest, EndOfFile)
{
// Write some data to the pipe so that records
// are available before any reads occur.
::recordio::Encoder<string> encoder(strings::upper);
string data;
data += encoder.encode("hello");
data += encoder.encode("world!");
process::http::Pipe pipe;
pipe.writer().write(data);
mesos::internal::recordio::Reader<string> reader(
::recordio::Decoder<string>(strings::lower),
pipe.reader());
AWAIT_EXPECT_EQ(Result<string>::some("hello"), reader.read());
AWAIT_EXPECT_EQ(Result<string>::some("world!"), reader.read());
// Have multiple outstanding reads before we close the pipe.
Future<Result<string>> read1 = reader.read();
Future<Result<string>> read2 = reader.read();
EXPECT_TRUE(read1.isPending());
EXPECT_TRUE(read2.isPending());
pipe.writer().write(encoder.encode("goodbye"));
pipe.writer().close();
AWAIT_EXPECT_EQ(Result<string>::some("goodbye"), read1);
AWAIT_EXPECT_EQ(Result<string>::none(), read2);
// Subsequent reads should return EOF.
AWAIT_EXPECT_EQ(Result<string>::none(), reader.read());
}
TEST(RecordIOReaderTest, DecodingFailure)
{
::recordio::Encoder<string> encoder(strings::upper);
process::http::Pipe pipe;
mesos::internal::recordio::Reader<string> reader(
::recordio::Decoder<string>(strings::lower),
pipe.reader());
// Have multiple outstanding reads before we fail the decoder.
Future<Result<string>> read1 = reader.read();
Future<Result<string>> read2 = reader.read();
Future<Result<string>> read3 = reader.read();
// Write non-encoded data to the pipe so that the decoder fails.
pipe.writer().write(encoder.encode("encoded"));
pipe.writer().write("not encoded!\n");
AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
AWAIT_EXPECT_FAILED(read2);
AWAIT_EXPECT_FAILED(read3);
// The reader is now in a failed state, subsequent
// writes will be dropped and all reads will fail.
pipe.writer().write(encoder.encode("encoded"));
AWAIT_EXPECT_FAILED(reader.read());
}
TEST(RecordIOReaderTest, PipeFailure)
{
::recordio::Encoder<string> encoder(strings::upper);
process::http::Pipe pipe;
mesos::internal::recordio::Reader<string> reader(
::recordio::Decoder<string>(strings::lower),
pipe.reader());
// Have multiple outstanding reads before we fail the writer.
Future<Result<string>> read1 = reader.read();
Future<Result<string>> read2 = reader.read();
Future<Result<string>> read3 = reader.read();
// Write a record, then fail the pipe writer!
pipe.writer().write(encoder.encode("encoded"));
pipe.writer().fail("failure");
AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
AWAIT_EXPECT_FAILED(read2);
AWAIT_EXPECT_FAILED(read3);
// Subsequent reads should return a failure.
AWAIT_EXPECT_FAILED(reader.read());
}
// This test verifies that when an EOF is received by the `writer` used
// in `transform`, the future returned to the caller is satisfied.
TEST(RecordIOTransformTest, EndOfFile)
{
// Write some data to the pipe so that records
// are available before any reads occur.
::recordio::Encoder<string> encoder(strings::upper);
string data;
data += encoder.encode("hello ");
data += encoder.encode("world! ");
process::http::Pipe pipeA;
pipeA.writer().write(data);
process::Owned<mesos::internal::recordio::Reader<string>> reader(
new mesos::internal::recordio::Reader<string>(
::recordio::Decoder<string>(strings::lower),
pipeA.reader()));
process::http::Pipe pipeB;
auto trim = [](const string& str) { return strings::trim(str); };
Future<Nothing> transform = mesos::internal::recordio::transform<string>(
std::move(reader), trim, pipeB.writer());
Future<string> future = pipeB.reader().readAll();
pipeA.writer().close();
AWAIT_READY(transform);
pipeB.writer().close();
AWAIT_ASSERT_EQ("helloworld!", future);
}
// This test verifies that when the write end of the `reader` used in
// `transform` fails, a failure is returned to the caller.
TEST(RecordIOTransformTest, ReaderWriterEndFail)
{
// Write some data to the pipe so that records
// are available before any reads occur.
::recordio::Encoder<string> encoder(strings::upper);
string data;
data += encoder.encode("hello ");
data += encoder.encode("world! ");
process::http::Pipe pipeA;
pipeA.writer().write(data);
process::Owned<mesos::internal::recordio::Reader<string>> reader(
new mesos::internal::recordio::Reader<string>(
::recordio::Decoder<string>(strings::lower),
pipeA.reader()));
process::http::Pipe pipeB;
auto trim = [](const string& str) { return strings::trim(str); };
Future<Nothing> transform = mesos::internal::recordio::transform<string>(
std::move(reader), trim, pipeB.writer());
Future<string> future = pipeB.reader().readAll();
pipeA.writer().fail("Writer failure");
AWAIT_FAILED(transform);
ASSERT_TRUE(future.isPending());
}
// This test verifies that when the read end of the `writer` used in
// `transform` is closed, a failure is returned to the caller.
TEST(RecordIOTransformTest, WriterReadEndFail)
{
// Write some data to the pipe so that records
// are available before any reads occur.
::recordio::Encoder<string> encoder(strings::upper);
string data;
data += encoder.encode("hello ");
data += encoder.encode("world! ");
process::http::Pipe pipeA;
pipeA.writer().write(data);
process::Owned<mesos::internal::recordio::Reader<string>> reader(
new mesos::internal::recordio::Reader<string>(
::recordio::Decoder<string>(strings::lower),
pipeA.reader()));
process::http::Pipe pipeB;
auto trim = [](const string& str) { return strings::trim(str); };
pipeB.reader().close();
Future<Nothing> transform = mesos::internal::recordio::transform<string>(
std::move(reader), trim, pipeB.writer());
pipeA.writer().close();
AWAIT_FAILED(transform);
}