blob: a18c4f7ee3418eb7063dd6fb661815d221ef937c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TPipe.h>
#ifdef _WIN32
#include <thrift/windows/OverlappedSubmissionThread.h>
#include <thrift/windows/Sync.h>
#endif
namespace apache {
namespace thrift {
namespace transport {
/**
* TPipe implementation.
*/
#ifdef _WIN32
uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len);
void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len);
uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len);
void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len);
class TPipeImpl : apache::thrift::TNonCopyable {
public:
TPipeImpl() {}
virtual ~TPipeImpl() {}
virtual uint32_t read(uint8_t* buf, uint32_t len) = 0;
virtual void write(const uint8_t* buf, uint32_t len) = 0;
virtual HANDLE getPipeHandle() = 0; // doubles as the read handle for anon pipe
virtual void setPipeHandle(HANDLE pipehandle) = 0;
virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
virtual void setWrtPipeHandle(HANDLE) {}
virtual bool isBufferedDataAvailable() { return false; }
virtual HANDLE getNativeWaitHandle() { return INVALID_HANDLE_VALUE; }
};
class TNamedPipeImpl : public TPipeImpl {
public:
explicit TNamedPipeImpl(TAutoHandle &pipehandle) : Pipe_(pipehandle.release()) {}
virtual ~TNamedPipeImpl() {}
virtual uint32_t read(uint8_t* buf, uint32_t len) {
return pseudo_sync_read(Pipe_.h, read_event_.h, buf, len);
}
virtual void write(const uint8_t* buf, uint32_t len) {
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
virtual HANDLE getPipeHandle() { return Pipe_.h; }
virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
private:
TManualResetEvent read_event_;
TManualResetEvent write_event_;
TAutoHandle Pipe_;
};
class TAnonPipeImpl : public TPipeImpl {
public:
TAnonPipeImpl(HANDLE PipeRd, HANDLE PipeWrt) : PipeRd_(PipeRd), PipeWrt_(PipeWrt) {}
virtual ~TAnonPipeImpl() {}
virtual uint32_t read(uint8_t* buf, uint32_t len) { return pipe_read(PipeRd_.h, buf, len); }
virtual void write(const uint8_t* buf, uint32_t len) { pipe_write(PipeWrt_.h, buf, len); }
virtual HANDLE getPipeHandle() { return PipeRd_.h; }
virtual void setPipeHandle(HANDLE PipeRd) { PipeRd_.reset(PipeRd); }
virtual HANDLE getWrtPipeHandle() { return PipeWrt_.h; }
virtual void setWrtPipeHandle(HANDLE PipeWrt) { PipeWrt_.reset(PipeWrt); }
private:
TAutoHandle PipeRd_;
TAutoHandle PipeWrt_;
};
// If you want a select-like loop to work, use this subclass. Be warned...
// the read implementation has several context switches, so this is slower
// than using the regular named pipe implementation
class TWaitableNamedPipeImpl : public TPipeImpl {
public:
explicit TWaitableNamedPipeImpl(TAutoHandle &pipehandle)
: begin_unread_idx_(0), end_unread_idx_(0) {
readOverlap_.action = TOverlappedWorkItem::READ;
readOverlap_.h = pipehandle.h;
cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
cancelOverlap_.h = pipehandle.h;
buffer_.resize(1024 /*arbitrary buffer size*/, '\0');
beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
Pipe_.reset(pipehandle.release());
}
virtual ~TWaitableNamedPipeImpl() {
// see if there is an outstanding read request
if (begin_unread_idx_ == end_unread_idx_) {
// if so, cancel it, and wait for the dead completion
thread_->addWorkItem(&cancelOverlap_);
readOverlap_.overlappedResults(false /*ignore errors*/);
}
}
virtual uint32_t read(uint8_t* buf, uint32_t len);
virtual void write(const uint8_t* buf, uint32_t len) {
pseudo_sync_write(Pipe_.h, write_event_.h, buf, len);
}
virtual HANDLE getPipeHandle() { return Pipe_.h; }
virtual void setPipeHandle(HANDLE pipehandle) { Pipe_.reset(pipehandle); }
virtual bool isBufferedDataAvailable() { return begin_unread_idx_ < end_unread_idx_; }
virtual HANDLE getNativeWaitHandle() { return ready_event_.h; }
private:
void beginAsyncRead(uint8_t* buf, uint32_t len);
uint32_t endAsyncRead();
TAutoOverlapThread thread_;
TAutoHandle Pipe_;
TOverlappedWorkItem readOverlap_;
TOverlappedWorkItem cancelOverlap_;
TManualResetEvent ready_event_;
TManualResetEvent write_event_;
std::vector<uint8_t> buffer_;
uint32_t begin_unread_idx_;
uint32_t end_unread_idx_;
};
void TWaitableNamedPipeImpl::beginAsyncRead(uint8_t* buf, uint32_t len) {
begin_unread_idx_ = end_unread_idx_ = 0;
readOverlap_.reset(buf, len, ready_event_.h);
thread_->addWorkItem(&readOverlap_);
if (readOverlap_.success == FALSE && readOverlap_.last_error != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", readOverlap_.last_error);
throw TTransportException(TTransportException::UNKNOWN, "TPipe: ReadFile failed");
}
}
uint32_t TWaitableNamedPipeImpl::endAsyncRead() {
return readOverlap_.overlappedResults();
}
uint32_t TWaitableNamedPipeImpl::read(uint8_t* buf, uint32_t len) {
if (begin_unread_idx_ == end_unread_idx_) {
end_unread_idx_ = endAsyncRead();
}
uint32_t __idxsize = end_unread_idx_ - begin_unread_idx_;
uint32_t bytes_to_copy = (len < __idxsize) ? len : __idxsize;
memcpy(buf, &buffer_[begin_unread_idx_], bytes_to_copy);
begin_unread_idx_ += bytes_to_copy;
if (begin_unread_idx_ != end_unread_idx_) {
assert(len == bytes_to_copy);
// we were able to fulfill the read with just the bytes in our
// buffer, and we still have buffer left
return bytes_to_copy;
}
uint32_t bytes_copied = bytes_to_copy;
// all of the requested data has been read. Kick off an async read for the next round.
beginAsyncRead(&buffer_[0], static_cast<uint32_t>(buffer_.size()));
return bytes_copied;
}
void pseudo_sync_write(HANDLE pipe, HANDLE event, const uint8_t* buf, uint32_t len) {
OVERLAPPED tempOverlap;
memset(&tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
uint32_t written = 0;
while (written < len) {
BOOL result = ::WriteFile(pipe, buf + written, len - written, nullptr, &tempOverlap);
if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::WriteFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: write failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
if (!result) {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
written += bytes;
}
}
uint32_t pseudo_sync_read(HANDLE pipe, HANDLE event, uint8_t* buf, uint32_t len) {
OVERLAPPED tempOverlap;
memset(&tempOverlap, 0, sizeof(tempOverlap));
tempOverlap.hEvent = event;
BOOL result = ::ReadFile(pipe, buf, len, nullptr, &tempOverlap);
if (result == FALSE && ::GetLastError() != ERROR_IO_PENDING) {
GlobalOutput.perror("TPipe ::ReadFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: read failed");
}
DWORD bytes = 0;
result = ::GetOverlappedResult(pipe, &tempOverlap, &bytes, TRUE);
if (!result) {
GlobalOutput.perror("TPipe ::GetOverlappedResult errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::UNKNOWN, "TPipe: GetOverlappedResult failed");
}
return bytes;
}
//---- Constructors ----
TPipe::TPipe(TAutoHandle &Pipe, std::shared_ptr<TConfiguration> config)
: impl_(new TWaitableNamedPipeImpl(Pipe)), TimeoutSeconds_(3),
isAnonymous_(false), TVirtualTransport(config) {
}
TPipe::TPipe(HANDLE Pipe, std::shared_ptr<TConfiguration> config)
: TimeoutSeconds_(3), isAnonymous_(false), TVirtualTransport(config)
{
TAutoHandle pipeHandle(Pipe);
impl_.reset(new TWaitableNamedPipeImpl(pipeHandle));
}
TPipe::TPipe(const char* pipename, std::shared_ptr<TConfiguration> config) : TimeoutSeconds_(3),
isAnonymous_(false), TVirtualTransport(config) {
setPipename(pipename);
}
TPipe::TPipe(const std::string& pipename, std::shared_ptr<TConfiguration> config) : TimeoutSeconds_(3),
isAnonymous_(false), TVirtualTransport(config) {
setPipename(pipename);
}
TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt, std::shared_ptr<TConfiguration> config)
: impl_(new TAnonPipeImpl(PipeRd, PipeWrt)), TimeoutSeconds_(3), isAnonymous_(true),
TVirtualTransport(config) {
}
TPipe::TPipe(std::shared_ptr<TConfiguration> config) : TimeoutSeconds_(3), isAnonymous_(false),
TVirtualTransport(config) {
}
TPipe::~TPipe() {
}
//---------------------------------------------------------
// Transport callbacks
//---------------------------------------------------------
bool TPipe::isOpen() const {
return impl_.get() != nullptr;
}
bool TPipe::peek() {
return isOpen();
}
void TPipe::open() {
if (isOpen())
return;
TAutoHandle hPipe;
do {
DWORD flags = FILE_FLAG_OVERLAPPED; // async mode, so we can do reads at the same time as writes
hPipe.reset(CreateFileA(pipename_.c_str(),
GENERIC_READ | GENERIC_WRITE,
0, // no sharing
nullptr, // default security attributes
OPEN_EXISTING, // opens existing pipe
flags,
nullptr)); // no template file
if (hPipe.h != INVALID_HANDLE_VALUE)
break; // success!
if (::GetLastError() != ERROR_PIPE_BUSY) {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
} while (::WaitNamedPipeA(pipename_.c_str(), TimeoutSeconds_ * 1000));
if (hPipe.h == INVALID_HANDLE_VALUE) {
GlobalOutput.perror("TPipe::open ::CreateFile errored GLE=", ::GetLastError());
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
}
impl_.reset(new TNamedPipeImpl(hPipe));
}
void TPipe::close() {
impl_.reset();
}
uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
checkReadBytesAvailable(len);
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open pipe");
return impl_->read(buf, len);
}
uint32_t pipe_read(HANDLE pipe, uint8_t* buf, uint32_t len) {
DWORD cbRead;
int fSuccess = ReadFile(pipe, // pipe handle
buf, // buffer to receive reply
len, // size of buffer
&cbRead, // number of bytes read
nullptr); // not overlapped
if (!fSuccess && GetLastError() != ERROR_MORE_DATA)
return 0; // No more data, possibly because client disconnected.
return cbRead;
}
void TPipe::write(const uint8_t* buf, uint32_t len) {
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open pipe");
impl_->write(buf, len);
}
void pipe_write(HANDLE pipe, const uint8_t* buf, uint32_t len) {
DWORD cbWritten;
int fSuccess = WriteFile(pipe, // pipe handle
buf, // message
len, // message length
&cbWritten, // bytes written
nullptr); // not overlapped
if (!fSuccess)
throw TTransportException(TTransportException::NOT_OPEN, "Write to pipe failed");
}
//---------------------------------------------------------
// Accessors
//---------------------------------------------------------
std::string TPipe::getPipename() {
return pipename_;
}
void TPipe::setPipename(const std::string& pipename) {
if (pipename.find("\\\\") == std::string::npos)
pipename_ = "\\\\.\\pipe\\" + pipename;
else
pipename_ = pipename;
}
HANDLE TPipe::getPipeHandle() {
if (impl_)
return impl_->getPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setPipeHandle(HANDLE pipehandle) {
if (isAnonymous_)
impl_->setPipeHandle(pipehandle);
else
{
TAutoHandle pipe(pipehandle);
impl_.reset(new TNamedPipeImpl(pipe));
}
}
HANDLE TPipe::getWrtPipeHandle() {
if (impl_)
return impl_->getWrtPipeHandle();
return INVALID_HANDLE_VALUE;
}
void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
if (impl_)
impl_->setWrtPipeHandle(pipehandle);
}
HANDLE TPipe::getNativeWaitHandle() {
if (impl_)
return impl_->getNativeWaitHandle();
return INVALID_HANDLE_VALUE;
}
long TPipe::getConnTimeout() {
return TimeoutSeconds_;
}
void TPipe::setConnTimeout(long seconds) {
TimeoutSeconds_ = seconds;
}
#endif //_WIN32
}
}
} // apache::thrift::transport