| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <thrift/thrift-config.h> |
| |
| #ifdef HAVE_SYS_TIME_H |
| #include <sys/time.h> |
| #else |
| #include <time.h> |
| #endif |
| #include <fcntl.h> |
| #ifdef HAVE_UNISTD_H |
| #include <unistd.h> |
| #endif |
| #ifdef HAVE_STRINGS_H |
| #include <strings.h> |
| #endif |
| #include <cstdlib> |
| #include <cstring> |
| #include <iostream> |
| #include <limits> |
| #include <memory> |
| #ifdef HAVE_SYS_STAT_H |
| #include <sys/stat.h> |
| #endif |
| |
| #ifdef _WIN32 |
| #include <io.h> |
| #endif |
| |
| #include <thrift/transport/TFileTransport.h> |
| #include <thrift/transport/TTransportUtils.h> |
| #include <thrift/transport/PlatformSocket.h> |
| #include <thrift/concurrency/FunctionRunner.h> |
| |
| namespace apache { |
| namespace thrift { |
| namespace transport { |
| |
| using std::shared_ptr; |
| using std::cerr; |
| using std::string; |
| using namespace apache::thrift::protocol; |
| using namespace apache::thrift::concurrency; |
| |
| TFileTransport::TFileTransport(string path, bool readOnly, std::shared_ptr<TConfiguration> config) |
| : TTransport(config), |
| readState_(), |
| readBuff_(nullptr), |
| currentEvent_(nullptr), |
| readBuffSize_(DEFAULT_READ_BUFF_SIZE), |
| readTimeout_(NO_TAIL_READ_TIMEOUT), |
| chunkSize_(DEFAULT_CHUNK_SIZE), |
| eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE), |
| flushMaxUs_(DEFAULT_FLUSH_MAX_US), |
| flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES), |
| maxEventSize_(DEFAULT_MAX_EVENT_SIZE), |
| maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS), |
| eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US), |
| corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US), |
| writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US), |
| dequeueBuffer_(nullptr), |
| enqueueBuffer_(nullptr), |
| notFull_(&mutex_), |
| notEmpty_(&mutex_), |
| closing_(false), |
| flushed_(&mutex_), |
| forceFlush_(false), |
| filename_(path), |
| fd_(0), |
| bufferAndThreadInitialized_(false), |
| offset_(0), |
| lastBadChunk_(0), |
| numCorruptedEventsInChunk_(0), |
| readOnly_(readOnly) { |
| threadFactory_.setDetached(false); |
| openLogFile(); |
| } |
| |
| void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) { |
| filename_ = filename; |
| offset_ = offset; |
| |
| // check if current file is still open |
| if (fd_ > 0) { |
| // flush any events in the queue |
| flush(); |
| GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str()); |
| if (-1 == ::THRIFT_CLOSE(fd_)) { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy); |
| throw TTransportException(TTransportException::UNKNOWN, |
| "TFileTransport: error in file close", |
| errno_copy); |
| } else { |
| // successfully closed fd |
| fd_ = 0; |
| } |
| } |
| |
| if (fd) { |
| fd_ = fd; |
| } else { |
| // open file if the input fd is 0 |
| openLogFile(); |
| } |
| } |
| |
| TFileTransport::~TFileTransport() { |
| // flush the buffer if a writer thread is active |
| if (writerThread_.get()) { |
| // set state to closing |
| closing_ = true; |
| |
| // wake up the writer thread |
| // Since closing_ is true, it will attempt to flush all data, then exit. |
| notEmpty_.notify(); |
| |
| writerThread_->join(); |
| writerThread_.reset(); |
| } |
| |
| if (dequeueBuffer_) { |
| delete dequeueBuffer_; |
| dequeueBuffer_ = nullptr; |
| } |
| |
| if (enqueueBuffer_) { |
| delete enqueueBuffer_; |
| enqueueBuffer_ = nullptr; |
| } |
| |
| if (readBuff_) { |
| delete[] readBuff_; |
| readBuff_ = nullptr; |
| } |
| |
| if (currentEvent_) { |
| delete currentEvent_; |
| currentEvent_ = nullptr; |
| } |
| |
| // close logfile |
| if (fd_ > 0) { |
| if (-1 == ::THRIFT_CLOSE(fd_)) { |
| GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO); |
| } else { |
| // successfully closed fd |
| fd_ = 0; |
| } |
| } |
| } |
| |
| bool TFileTransport::initBufferAndWriteThread() { |
| if (bufferAndThreadInitialized_) { |
| T_ERROR("%s", "Trying to double-init TFileTransport"); |
| return false; |
| } |
| |
| if (!writerThread_.get()) { |
| writerThread_ = threadFactory_.newThread( |
| apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this)); |
| writerThread_->start(); |
| } |
| |
| dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); |
| enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); |
| bufferAndThreadInitialized_ = true; |
| |
| return true; |
| } |
| |
| void TFileTransport::write(const uint8_t* buf, uint32_t len) { |
| if (readOnly_) { |
| throw TTransportException("TFileTransport: attempting to write to file opened readonly"); |
| } |
| |
| enqueueEvent(buf, len); |
| } |
| |
| template <class _T> |
| struct uniqueDeleter |
| { |
| void operator()(_T *ptr) const { delete ptr; } |
| }; |
| |
| void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { |
| // can't enqueue more events if file is going to close |
| if (closing_) { |
| return; |
| } |
| |
| // make sure that event size is valid |
| if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) { |
| T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_); |
| return; |
| } |
| |
| if (eventLen == 0) { |
| T_ERROR("%s", "cannot enqueue an empty event"); |
| return; |
| } |
| |
| std::unique_ptr<eventInfo, uniqueDeleter<eventInfo> > toEnqueue(new eventInfo()); |
| toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4]; |
| |
| // first 4 bytes is the event length |
| memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4); |
| // actual event contents |
| memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen); |
| toEnqueue->eventSize_ = eventLen + 4; |
| |
| // lock mutex |
| Guard g(mutex_); |
| |
| // make sure that enqueue buffer is initialized and writer thread is running |
| if (!bufferAndThreadInitialized_) { |
| if (!initBufferAndWriteThread()) { |
| return; |
| } |
| } |
| |
| // Can't enqueue while buffer is full |
| while (enqueueBuffer_->isFull()) { |
| notFull_.wait(); |
| } |
| |
| // We shouldn't be trying to enqueue new data while a forced flush is |
| // requested. (Otherwise the writer thread might not ever be able to finish |
| // the flush if more data keeps being enqueued.) |
| assert(!forceFlush_); |
| |
| // add to the buffer |
| eventInfo* pEvent = toEnqueue.release(); |
| if (!enqueueBuffer_->addEvent(pEvent)) { |
| delete pEvent; |
| return; |
| } |
| |
| // signal anybody who's waiting for the buffer to be non-empty |
| notEmpty_.notify(); |
| |
| // this really should be a loop where it makes sure it got flushed |
| // because condition variables can get triggered by the os for no reason |
| // it is probably a non-factor for the time being |
| } |
| |
| bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) { |
| bool swap; |
| Guard g(mutex_); |
| |
| if (!enqueueBuffer_->isEmpty()) { |
| swap = true; |
| } else if (closing_) { |
| // even though there is no data to write, |
| // return immediately if the transport is closing |
| swap = false; |
| } else { |
| if (deadline != nullptr) { |
| // if we were handed a deadline time struct, do a timed wait |
| notEmpty_.waitForTime(*deadline); |
| } else { |
| // just wait until the buffer gets an item |
| notEmpty_.wait(); |
| } |
| |
| // could be empty if we timed out |
| swap = enqueueBuffer_->isEmpty(); |
| } |
| |
| if (swap) { |
| TFileTransportBuffer* temp = enqueueBuffer_; |
| enqueueBuffer_ = dequeueBuffer_; |
| dequeueBuffer_ = temp; |
| } |
| |
| if (swap) { |
| notFull_.notify(); |
| } |
| |
| return swap; |
| } |
| |
| void TFileTransport::writerThread() { |
| bool hasIOError = false; |
| |
| // open file if it is not open |
| if (!fd_) { |
| try { |
| openLogFile(); |
| } catch (...) { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy); |
| fd_ = 0; |
| hasIOError = true; |
| } |
| } |
| |
| // set the offset to the correct value (EOF) |
| if (!hasIOError) { |
| try { |
| seekToEnd(); |
| // throw away any partial events |
| offset_ += readState_.lastDispatchPtr_; |
| if (0 == THRIFT_FTRUNCATE(fd_, offset_)) { |
| readState_.resetAllValues(); |
| } else { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy); |
| hasIOError = true; |
| } |
| } catch (...) { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy); |
| hasIOError = true; |
| } |
| } |
| |
| // Figure out the next time by which a flush must take place |
| auto ts_next_flush = getNextFlushTime(); |
| uint32_t unflushed = 0; |
| |
| while (1) { |
| // this will only be true when the destructor is being invoked |
| if (closing_) { |
| if (hasIOError) { |
| return; |
| } |
| |
| // Try to empty buffers before exit |
| if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) { |
| ::THRIFT_FSYNC(fd_); |
| if (-1 == ::THRIFT_CLOSE(fd_)) { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy); |
| } else { |
| // fd successfully closed |
| fd_ = 0; |
| } |
| return; |
| } |
| } |
| |
| if (swapEventBuffers(&ts_next_flush)) { |
| eventInfo* outEvent; |
| while (nullptr != (outEvent = dequeueBuffer_->getNext())) { |
| // Remove an event from the buffer and write it out to disk. If there is any IO error, for |
| // instance, |
| // the output file is unmounted or deleted, then this event is dropped. However, the writer |
| // thread |
| // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then |
| // start writing |
| // from the end. |
| |
| while (hasIOError) { |
| T_ERROR( |
| "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors", |
| writerThreadIOErrorSleepTime_); |
| THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_); |
| if (closing_) { |
| return; |
| } |
| if (!fd_) { |
| ::THRIFT_CLOSE(fd_); |
| fd_ = 0; |
| } |
| try { |
| openLogFile(); |
| seekToEnd(); |
| unflushed = 0; |
| hasIOError = false; |
| T_LOG_OPER( |
| "TFileTransport: log file %s reopened by writer thread during error recovery", |
| filename_.c_str()); |
| } catch (...) { |
| T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", |
| filename_.c_str()); |
| } |
| } |
| |
| // sanity check on event |
| if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { |
| T_ERROR("msg size is greater than max event size: %u > %u\n", |
| outEvent->eventSize_, |
| maxEventSize_); |
| continue; |
| } |
| |
| // If chunking is required, then make sure that msg does not cross chunk boundary |
| if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { |
| // event size must be less than chunk size |
| if (outEvent->eventSize_ > chunkSize_) { |
| T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", |
| outEvent->eventSize_, |
| chunkSize_); |
| continue; |
| } |
| |
| int64_t chunk1 = offset_ / chunkSize_; |
| int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_; |
| |
| // if adding this event will cross a chunk boundary, pad the chunk with zeros |
| if (chunk1 != chunk2) { |
| // refetch the offset to keep in sync |
| offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR); |
| auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_); |
| |
| auto* zeros = new uint8_t[padding]; |
| memset(zeros, '\0', padding); |
| std::unique_ptr<uint8_t[]> array(zeros); |
| if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", |
| errno_copy); |
| hasIOError = true; |
| continue; |
| } |
| unflushed += padding; |
| offset_ += padding; |
| } |
| } |
| |
| // write the dequeued event to the file |
| if (outEvent->eventSize_ > 0) { |
| if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy); |
| hasIOError = true; |
| continue; |
| } |
| unflushed += outEvent->eventSize_; |
| offset_ += outEvent->eventSize_; |
| } |
| } |
| dequeueBuffer_->reset(); |
| } |
| |
| if (hasIOError) { |
| continue; |
| } |
| |
| // Local variable to cache the state of forceFlush_. |
| // |
| // We only want to check the value of forceFlush_ once each time around the |
| // loop. If we check it more than once without holding the lock the entire |
| // time, it could have changed state in between. This will result in us |
| // making inconsistent decisions. |
| bool forced_flush = false; |
| { |
| Guard g(mutex_); |
| if (forceFlush_) { |
| if (!enqueueBuffer_->isEmpty()) { |
| // If forceFlush_ is true, we need to flush all available data. |
| // If enqueueBuffer_ is not empty, go back to the start of the loop to |
| // write it out. |
| // |
| // We know the main thread is waiting on forceFlush_ to be cleared, |
| // so no new events will be added to enqueueBuffer_ until we clear |
| // forceFlush_. Therefore the next time around the loop enqueueBuffer_ |
| // is guaranteed to be empty. (I.e., we're guaranteed to make progress |
| // and clear forceFlush_ the next time around the loop.) |
| continue; |
| } |
| forced_flush = true; |
| } |
| } |
| |
| // determine if we need to perform an fsync |
| bool flush = false; |
| if (forced_flush || unflushed > flushMaxBytes_) { |
| flush = true; |
| } else { |
| if (std::chrono::steady_clock::now() > ts_next_flush) { |
| if (unflushed > 0) { |
| flush = true; |
| } else { |
| // If there is no new data since the last fsync, |
| // don't perform the fsync, but do reset the timer. |
| ts_next_flush = getNextFlushTime(); |
| } |
| } |
| } |
| |
| if (flush) { |
| // sync (force flush) file to disk |
| THRIFT_FSYNC(fd_); |
| unflushed = 0; |
| ts_next_flush = getNextFlushTime(); |
| |
| // notify anybody waiting for flush completion |
| if (forced_flush) { |
| Guard g(mutex_); |
| forceFlush_ = false; |
| assert(enqueueBuffer_->isEmpty()); |
| assert(dequeueBuffer_->isEmpty()); |
| flushed_.notifyAll(); |
| } |
| } |
| } |
| } |
| |
| void TFileTransport::flush() { |
| resetConsumedMessageSize(); |
| // file must be open for writing for any flushing to take place |
| if (!writerThread_.get()) { |
| return; |
| } |
| // wait for flush to take place |
| Guard g(mutex_); |
| |
| // Indicate that we are requesting a flush |
| forceFlush_ = true; |
| // Wake up the writer thread so it will perform the flush immediately |
| notEmpty_.notify(); |
| |
| while (forceFlush_) { |
| flushed_.wait(); |
| } |
| } |
| |
| uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) { |
| checkReadBytesAvailable(len); |
| uint32_t have = 0; |
| uint32_t get = 0; |
| |
| while (have < len) { |
| get = read(buf + have, len - have); |
| if (get <= 0) { |
| throw TEOFException(); |
| } |
| have += get; |
| } |
| |
| return have; |
| } |
| |
| bool TFileTransport::peek() { |
| // check if there is an event ready to be read |
| if (!currentEvent_) { |
| currentEvent_ = readEvent(); |
| } |
| |
| // did not manage to read an event from the file. This could have happened |
| // if the timeout expired or there was some other error |
| if (!currentEvent_) { |
| return false; |
| } |
| |
| // check if there is anything to read |
| return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0; |
| } |
| |
| uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) { |
| checkReadBytesAvailable(len); |
| // check if there an event is ready to be read |
| if (!currentEvent_) { |
| currentEvent_ = readEvent(); |
| } |
| |
| // did not manage to read an event from the file. This could have happened |
| // if the timeout expired or there was some other error |
| if (!currentEvent_) { |
| return 0; |
| } |
| |
| // read as much of the current event as possible |
| int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_; |
| if (remaining <= (int32_t)len) { |
| // copy over anything thats remaining |
| if (remaining > 0) { |
| memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining); |
| } |
| delete (currentEvent_); |
| currentEvent_ = nullptr; |
| return remaining; |
| } |
| |
| // read as much as possible |
| memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len); |
| currentEvent_->eventBuffPos_ += len; |
| return len; |
| } |
| |
| // note caller is responsible for freeing returned events |
| eventInfo* TFileTransport::readEvent() { |
| int readTries = 0; |
| |
| if (!readBuff_) { |
| readBuff_ = new uint8_t[readBuffSize_]; |
| } |
| |
| while (1) { |
| // read from the file if read buffer is exhausted |
| if (readState_.bufferPtr_ == readState_.bufferLen_) { |
| // advance the offset pointer |
| offset_ += readState_.bufferLen_; |
| readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_)); |
| // if (readState_.bufferLen_) { |
| // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); |
| // } |
| readState_.bufferPtr_ = 0; |
| readState_.lastDispatchPtr_ = 0; |
| |
| // read error |
| if (readState_.bufferLen_ == -1) { |
| readState_.resetAllValues(); |
| GlobalOutput("TFileTransport: error while reading from file"); |
| throw TTransportException("TFileTransport: error while reading from file"); |
| } else if (readState_.bufferLen_ == 0) { // EOF |
| // wait indefinitely if there is no timeout |
| if (readTimeout_ == TAIL_READ_TIMEOUT) { |
| THRIFT_SLEEP_USEC(eofSleepTime_); |
| continue; |
| } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) { |
| // reset state |
| readState_.resetState(0); |
| return nullptr; |
| } else if (readTimeout_ > 0) { |
| // timeout already expired once |
| if (readTries > 0) { |
| readState_.resetState(0); |
| return nullptr; |
| } else { |
| THRIFT_SLEEP_USEC(readTimeout_ * 1000); |
| readTries++; |
| continue; |
| } |
| } |
| } |
| } |
| |
| readTries = 0; |
| |
| // attempt to read an event from the buffer |
| while (readState_.bufferPtr_ < readState_.bufferLen_) { |
| if (readState_.readingSize_) { |
| if (readState_.eventSizeBuffPos_ == 0) { |
| if ((offset_ + readState_.bufferPtr_) / chunkSize_ |
| != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) { |
| // skip one byte towards chunk boundary |
| // T_DEBUG_L(1, "Skipping a byte"); |
| readState_.bufferPtr_++; |
| continue; |
| } |
| } |
| |
| readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] |
| = readBuff_[readState_.bufferPtr_++]; |
| |
| if (readState_.eventSizeBuffPos_ == 4) { |
| if (readState_.getEventSize() == 0) { |
| // 0 length event indicates padding |
| // T_DEBUG_L(1, "Got padding"); |
| readState_.resetState(readState_.lastDispatchPtr_); |
| continue; |
| } |
| // got a valid event |
| readState_.readingSize_ = false; |
| if (readState_.event_) { |
| delete (readState_.event_); |
| } |
| readState_.event_ = new eventInfo(); |
| readState_.event_->eventSize_ = readState_.getEventSize(); |
| |
| // check if the event is corrupted and perform recovery if required |
| if (isEventCorrupted()) { |
| performRecovery(); |
| // start from the top |
| break; |
| } |
| } |
| } else { |
| if (!readState_.event_->eventBuff_) { |
| readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_]; |
| readState_.event_->eventBuffPos_ = 0; |
| } |
| // take either the entire event or the remaining bytes in the buffer |
| int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_), |
| readState_.event_->eventSize_ - readState_.event_->eventBuffPos_); |
| |
| // copy data from read buffer into event buffer |
| memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_, |
| readBuff_ + readState_.bufferPtr_, |
| reclaimBuffer); |
| |
| // increment position ptrs |
| readState_.event_->eventBuffPos_ += reclaimBuffer; |
| readState_.bufferPtr_ += reclaimBuffer; |
| |
| // check if the event has been read in full |
| if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) { |
| // set the completed event to the current event |
| eventInfo* completeEvent = readState_.event_; |
| completeEvent->eventBuffPos_ = 0; |
| |
| readState_.event_ = nullptr; |
| readState_.resetState(readState_.bufferPtr_); |
| |
| // exit criteria |
| return completeEvent; |
| } |
| } |
| } |
| } |
| } |
| |
| bool TFileTransport::isEventCorrupted() { |
| // an error is triggered if: |
| if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) { |
| // 1. Event size is larger than user-speficied max-event size |
| T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)", |
| readState_.event_->eventSize_, |
| maxEventSize_); |
| return true; |
| } else if (readState_.event_->eventSize_ > chunkSize_) { |
| // 2. Event size is larger than chunk size |
| T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)", |
| readState_.event_->eventSize_, |
| chunkSize_); |
| return true; |
| } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_) |
| != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1) |
| / chunkSize_)) { |
| // 3. size indicates that event crosses chunk boundary |
| T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu", |
| readState_.event_->eventSize_, |
| static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4)); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| void TFileTransport::performRecovery() { |
| // perform some kickass recovery |
| uint32_t curChunk = getCurChunk(); |
| if (lastBadChunk_ == curChunk) { |
| numCorruptedEventsInChunk_++; |
| } else { |
| lastBadChunk_ = curChunk; |
| numCorruptedEventsInChunk_ = 1; |
| } |
| |
| if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { |
| // maybe there was an error in reading the file from disk |
| // seek to the beginning of chunk and try again |
| seekToChunk(curChunk); |
| } else { |
| |
| // just skip ahead to the next chunk if we not already at the last chunk |
| if (curChunk != (getNumChunks() - 1)) { |
| seekToChunk(curChunk + 1); |
| } else if (readTimeout_ == TAIL_READ_TIMEOUT) { |
| // if tailing the file, wait until there is enough data to start |
| // the next chunk |
| while (curChunk == (getNumChunks() - 1)) { |
| THRIFT_SLEEP_USEC(corruptedEventSleepTime_); |
| } |
| seekToChunk(curChunk + 1); |
| } else { |
| // pretty hosed at this stage, rewind the file back to the last successful |
| // point and punt on the error |
| readState_.resetState(readState_.lastDispatchPtr_); |
| currentEvent_ = nullptr; |
| char errorMsg[1024]; |
| sprintf(errorMsg, |
| "TFileTransport: log file corrupted at offset: %lu", |
| static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_)); |
| |
| GlobalOutput(errorMsg); |
| throw TTransportException(errorMsg); |
| } |
| } |
| } |
| |
| void TFileTransport::seekToChunk(int32_t chunk) { |
| if (fd_ <= 0) { |
| throw TTransportException("File not open"); |
| } |
| |
| int32_t numChunks = getNumChunks(); |
| |
| // file is empty, seeking to chunk is pointless |
| if (numChunks == 0) { |
| return; |
| } |
| |
| // negative indicates reverse seek (from the end) |
| if (chunk < 0) { |
| chunk += numChunks; |
| } |
| |
| // too large a value for reverse seek, just seek to beginning |
| if (chunk < 0) { |
| T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning..."); |
| chunk = 0; |
| } |
| |
| // cannot seek past EOF |
| bool seekToEnd = false; |
| off_t minEndOffset = 0; |
| if (chunk >= numChunks) { |
| T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead..."); |
| seekToEnd = true; |
| chunk = numChunks - 1; |
| // this is the min offset to process events till |
| minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END); |
| } |
| |
| off_t newOffset = off_t(chunk) * chunkSize_; |
| offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET); |
| readState_.resetAllValues(); |
| currentEvent_ = nullptr; |
| if (offset_ == -1) { |
| GlobalOutput("TFileTransport: lseek error in seekToChunk"); |
| throw TTransportException("TFileTransport: lseek error in seekToChunk"); |
| } |
| |
| // seek to EOF if user wanted to go to last chunk |
| if (seekToEnd) { |
| uint32_t oldReadTimeout = getReadTimeout(); |
| setReadTimeout(NO_TAIL_READ_TIMEOUT); |
| // keep on reading unti the last event at point of seekChunk call |
| shared_ptr<eventInfo> event; |
| while ((offset_ + readState_.bufferPtr_) < minEndOffset) { |
| event.reset(readEvent()); |
| if (event.get() == nullptr) { |
| break; |
| } |
| } |
| setReadTimeout(oldReadTimeout); |
| } |
| } |
| |
| void TFileTransport::seekToEnd() { |
| seekToChunk(getNumChunks()); |
| } |
| |
| uint32_t TFileTransport::getNumChunks() { |
| if (fd_ <= 0) { |
| return 0; |
| } |
| |
| struct THRIFT_STAT f_info; |
| int rv = ::THRIFT_FSTAT(fd_, &f_info); |
| |
| if (rv < 0) { |
| int errno_copy = THRIFT_ERRNO; |
| throw TTransportException(TTransportException::UNKNOWN, |
| "TFileTransport::getNumChunks() (fstat)", |
| errno_copy); |
| } |
| |
| if (f_info.st_size > 0) { |
| size_t numChunks = ((f_info.st_size) / chunkSize_) + 1; |
| if (numChunks > (std::numeric_limits<uint32_t>::max)()) |
| throw TTransportException("Too many chunks"); |
| return static_cast<uint32_t>(numChunks); |
| } |
| |
| // empty file has no chunks |
| return 0; |
| } |
| |
| uint32_t TFileTransport::getCurChunk() { |
| return static_cast<uint32_t>(offset_ / chunkSize_); |
| } |
| |
| // Utility Functions |
| void TFileTransport::openLogFile() { |
| #ifndef _WIN32 |
| mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; |
| int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND; |
| #else |
| int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE; |
| int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND; |
| #endif |
| fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode); |
| offset_ = 0; |
| |
| // make sure open call was successful |
| if (fd_ == -1) { |
| int errno_copy = THRIFT_ERRNO; |
| GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy); |
| throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy); |
| } |
| } |
| |
| std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() { |
| return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_); |
| } |
| |
| TFileTransportBuffer::TFileTransportBuffer(uint32_t size) |
| : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) { |
| buffer_ = new eventInfo* [size]; |
| } |
| |
| TFileTransportBuffer::~TFileTransportBuffer() { |
| if (buffer_) { |
| for (uint32_t i = 0; i < writePoint_; i++) { |
| delete buffer_[i]; |
| } |
| delete[] buffer_; |
| buffer_ = nullptr; |
| } |
| } |
| |
| bool TFileTransportBuffer::addEvent(eventInfo* event) { |
| if (bufferMode_ == READ) { |
| GlobalOutput("Trying to write to a buffer in read mode"); |
| } |
| if (writePoint_ < size_) { |
| buffer_[writePoint_++] = event; |
| return true; |
| } else { |
| // buffer is full |
| return false; |
| } |
| } |
| |
| eventInfo* TFileTransportBuffer::getNext() { |
| if (bufferMode_ == WRITE) { |
| bufferMode_ = READ; |
| } |
| if (readPoint_ < writePoint_) { |
| return buffer_[readPoint_++]; |
| } else { |
| // no more entries |
| return nullptr; |
| } |
| } |
| |
| void TFileTransportBuffer::reset() { |
| if (bufferMode_ == WRITE || writePoint_ > readPoint_) { |
| T_DEBUG("%s", "Resetting a buffer with unread entries"); |
| } |
| // Clean up the old entries |
| for (uint32_t i = 0; i < writePoint_; i++) { |
| delete buffer_[i]; |
| } |
| bufferMode_ = WRITE; |
| writePoint_ = 0; |
| readPoint_ = 0; |
| } |
| |
| bool TFileTransportBuffer::isFull() { |
| return writePoint_ == size_; |
| } |
| |
| bool TFileTransportBuffer::isEmpty() { |
| return writePoint_ == 0; |
| } |
| |
| TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, |
| shared_ptr<TProtocolFactory> protocolFactory, |
| shared_ptr<TFileReaderTransport> inputTransport) |
| : processor_(processor), |
| inputProtocolFactory_(protocolFactory), |
| outputProtocolFactory_(protocolFactory), |
| inputTransport_(inputTransport) { |
| |
| // default the output transport to a null transport (common case) |
| outputTransport_ = std::make_shared<TNullTransport>(); |
| } |
| |
| TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, |
| shared_ptr<TProtocolFactory> inputProtocolFactory, |
| shared_ptr<TProtocolFactory> outputProtocolFactory, |
| shared_ptr<TFileReaderTransport> inputTransport) |
| : processor_(processor), |
| inputProtocolFactory_(inputProtocolFactory), |
| outputProtocolFactory_(outputProtocolFactory), |
| inputTransport_(inputTransport) { |
| |
| // default the output transport to a null transport (common case) |
| outputTransport_ = std::make_shared<TNullTransport>(); |
| } |
| |
| TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, |
| shared_ptr<TProtocolFactory> protocolFactory, |
| shared_ptr<TFileReaderTransport> inputTransport, |
| shared_ptr<TTransport> outputTransport) |
| : processor_(processor), |
| inputProtocolFactory_(protocolFactory), |
| outputProtocolFactory_(protocolFactory), |
| inputTransport_(inputTransport), |
| outputTransport_(outputTransport) { |
| } |
| |
| void TFileProcessor::process(uint32_t numEvents, bool tail) { |
| shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); |
| shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); |
| |
| // set the read timeout to 0 if tailing is required |
| int32_t oldReadTimeout = inputTransport_->getReadTimeout(); |
| if (tail) { |
| // save old read timeout so it can be restored |
| inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT); |
| } |
| |
| uint32_t numProcessed = 0; |
| while (1) { |
| // bad form to use exceptions for flow control but there is really |
| // no other way around it |
| try { |
| processor_->process(inputProtocol, outputProtocol, nullptr); |
| numProcessed++; |
| if ((numEvents > 0) && (numProcessed == numEvents)) { |
| return; |
| } |
| } catch (TEOFException&) { |
| if (!tail) { |
| break; |
| } |
| } catch (TException& te) { |
| cerr << te.what() << '\n'; |
| break; |
| } |
| } |
| |
| // restore old read timeout |
| if (tail) { |
| inputTransport_->setReadTimeout(oldReadTimeout); |
| } |
| } |
| |
| void TFileProcessor::processChunk() { |
| shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); |
| shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); |
| |
| uint32_t curChunk = inputTransport_->getCurChunk(); |
| |
| while (1) { |
| // bad form to use exceptions for flow control but there is really |
| // no other way around it |
| try { |
| processor_->process(inputProtocol, outputProtocol, nullptr); |
| if (curChunk != inputTransport_->getCurChunk()) { |
| break; |
| } |
| } catch (TEOFException&) { |
| break; |
| } catch (TException& te) { |
| cerr << te.what() << '\n'; |
| break; |
| } |
| } |
| } |
| } |
| } |
| } // apache::thrift::transport |