| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifdef HAVE_CONFIG_H |
| #include "config.h" |
| #endif |
| |
| #include "TFileTransport.h" |
| #include "TTransportUtils.h" |
| |
| #include <pthread.h> |
| #ifdef HAVE_SYS_TIME_H |
| #include <sys/time.h> |
| #else |
| #include <time.h> |
| #endif |
| #include <fcntl.h> |
| #include <errno.h> |
| #include <unistd.h> |
| #ifdef HAVE_STRINGS_H |
| #include <strings.h> |
| #endif |
| #include <cstdlib> |
| #include <cstring> |
| #include <iostream> |
| #include <sys/stat.h> |
| |
| namespace apache { namespace thrift { namespace transport { |
| |
| using boost::shared_ptr; |
| using namespace std; |
| using namespace apache::thrift::protocol; |
| |
| #ifndef HAVE_CLOCK_GETTIME |
| |
| /** |
| * Fake clock_gettime for systems like darwin |
| * |
| */ |
| #define CLOCK_REALTIME 0 |
| static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) { |
| struct timeval now; |
| |
| int rv = gettimeofday(&now, NULL); |
| if (rv != 0) { |
| return rv; |
| } |
| |
| tp->tv_sec = now.tv_sec; |
| tp->tv_nsec = now.tv_usec * 1000; |
| return 0; |
| } |
| #endif |
| |
| TFileTransport::TFileTransport(string path, bool readOnly) |
| : readState_() |
| , readBuff_(NULL) |
| , currentEvent_(NULL) |
| , readBuffSize_(DEFAULT_READ_BUFF_SIZE) |
| , readTimeout_(NO_TAIL_READ_TIMEOUT) |
| , chunkSize_(DEFAULT_CHUNK_SIZE) |
| , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE) |
| , flushMaxUs_(DEFAULT_FLUSH_MAX_US) |
| , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES) |
| , maxEventSize_(DEFAULT_MAX_EVENT_SIZE) |
| , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS) |
| , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US) |
| , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US) |
| , writerThreadId_(0) |
| , dequeueBuffer_(NULL) |
| , enqueueBuffer_(NULL) |
| , closing_(false) |
| , forceFlush_(false) |
| , filename_(path) |
| , fd_(0) |
| , bufferAndThreadInitialized_(false) |
| , offset_(0) |
| , lastBadChunk_(0) |
| , numCorruptedEventsInChunk_(0) |
| , readOnly_(readOnly) |
| { |
| // initialize all the condition vars/mutexes |
| pthread_mutex_init(&mutex_, NULL); |
| pthread_cond_init(¬Full_, NULL); |
| pthread_cond_init(¬Empty_, NULL); |
| pthread_cond_init(&flushed_, NULL); |
| |
| openLogFile(); |
| } |
| |
| void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) { |
| filename_ = filename; |
| offset_ = offset; |
| |
| // check if current file is still open |
| if (fd_ > 0) { |
| // flush any events in the queue |
| flush(); |
| GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str()); |
| if (-1 == ::close(fd_)) { |
| int errno_copy = errno; |
| GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy); |
| throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy); |
| } |
| } |
| |
| if (fd) { |
| fd_ = fd; |
| } else { |
| // open file if the input fd is 0 |
| openLogFile(); |
| } |
| } |
| |
| |
| TFileTransport::~TFileTransport() { |
| // flush the buffer if a writer thread is active |
| if (writerThreadId_ > 0) { |
| // reduce the flush timeout so that closing is quicker |
| setFlushMaxUs(300*1000); |
| |
| // flush output buffer |
| flush(); |
| |
| // set state to closing |
| closing_ = true; |
| |
| // TODO: make sure event queue is empty |
| // currently only the write buffer is flushed |
| // we dont actually wait until the queue is empty. This shouldn't be a big |
| // deal in the common case because writing is quick |
| |
| pthread_join(writerThreadId_, NULL); |
| writerThreadId_ = 0; |
| } |
| |
| if (dequeueBuffer_) { |
| delete dequeueBuffer_; |
| dequeueBuffer_ = NULL; |
| } |
| |
| if (enqueueBuffer_) { |
| delete enqueueBuffer_; |
| enqueueBuffer_ = NULL; |
| } |
| |
| if (readBuff_) { |
| delete[] readBuff_; |
| readBuff_ = NULL; |
| } |
| |
| if (currentEvent_) { |
| delete currentEvent_; |
| currentEvent_ = NULL; |
| } |
| |
| // close logfile |
| if (fd_ > 0) { |
| if(-1 == ::close(fd_)) { |
| GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", errno); |
| } |
| } |
| } |
| |
| bool TFileTransport::initBufferAndWriteThread() { |
| if (bufferAndThreadInitialized_) { |
| T_ERROR("Trying to double-init TFileTransport"); |
| return false; |
| } |
| |
| if (writerThreadId_ == 0) { |
| if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) { |
| T_ERROR("Could not create writer thread"); |
| return false; |
| } |
| } |
| |
| dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); |
| enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); |
| bufferAndThreadInitialized_ = true; |
| |
| return true; |
| } |
| |
| void TFileTransport::write(const uint8_t* buf, uint32_t len) { |
| if (readOnly_) { |
| throw TTransportException("TFileTransport: attempting to write to file opened readonly"); |
| } |
| |
| enqueueEvent(buf, len, false); |
| } |
| |
| void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { |
| // can't enqueue more events if file is going to close |
| if (closing_) { |
| return; |
| } |
| |
| // make sure that event size is valid |
| if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) { |
| T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_); |
| return; |
| } |
| |
| if (eventLen == 0) { |
| T_ERROR("cannot enqueue an empty event"); |
| return; |
| } |
| |
| eventInfo* toEnqueue = new eventInfo(); |
| toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4); |
| // first 4 bytes is the event length |
| memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4); |
| // actual event contents |
| memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen); |
| toEnqueue->eventSize_ = eventLen + 4; |
| |
| // lock mutex |
| pthread_mutex_lock(&mutex_); |
| |
| // make sure that enqueue buffer is initialized and writer thread is running |
| if (!bufferAndThreadInitialized_) { |
| if (!initBufferAndWriteThread()) { |
| delete toEnqueue; |
| pthread_mutex_unlock(&mutex_); |
| return; |
| } |
| } |
| |
| // Can't enqueue while buffer is full |
| while (enqueueBuffer_->isFull()) { |
| pthread_cond_wait(¬Full_, &mutex_); |
| } |
| |
| // add to the buffer |
| if (!enqueueBuffer_->addEvent(toEnqueue)) { |
| delete toEnqueue; |
| pthread_mutex_unlock(&mutex_); |
| return; |
| } |
| |
| // signal anybody who's waiting for the buffer to be non-empty |
| pthread_cond_signal(¬Empty_); |
| |
| if (blockUntilFlush) { |
| pthread_cond_wait(&flushed_, &mutex_); |
| } |
| |
| // this really should be a loop where it makes sure it got flushed |
| // because condition variables can get triggered by the os for no reason |
| // it is probably a non-factor for the time being |
| pthread_mutex_unlock(&mutex_); |
| } |
| |
| bool TFileTransport::swapEventBuffers(struct timespec* deadline) { |
| pthread_mutex_lock(&mutex_); |
| if (deadline != NULL) { |
| // if we were handed a deadline time struct, do a timed wait |
| pthread_cond_timedwait(¬Empty_, &mutex_, deadline); |
| } else { |
| // just wait until the buffer gets an item |
| pthread_cond_wait(¬Empty_, &mutex_); |
| } |
| |
| bool swapped = false; |
| |
| // could be empty if we timed out |
| if (!enqueueBuffer_->isEmpty()) { |
| TFileTransportBuffer *temp = enqueueBuffer_; |
| enqueueBuffer_ = dequeueBuffer_; |
| dequeueBuffer_ = temp; |
| |
| swapped = true; |
| } |
| |
| // unlock the mutex and signal if required |
| pthread_mutex_unlock(&mutex_); |
| |
| if (swapped) { |
| pthread_cond_signal(¬Full_); |
| } |
| |
| return swapped; |
| } |
| |
| |
| void TFileTransport::writerThread() { |
| // open file if it is not open |
| if(!fd_) { |
| openLogFile(); |
| } |
| |
| // set the offset to the correct value (EOF) |
| try { |
| seekToEnd(); |
| } catch (TException &te) { |
| } |
| |
| // throw away any partial events |
| offset_ += readState_.lastDispatchPtr_; |
| ftruncate(fd_, offset_); |
| readState_.resetAllValues(); |
| |
| // Figure out the next time by which a flush must take place |
| |
| struct timespec ts_next_flush; |
| getNextFlushTime(&ts_next_flush); |
| uint32_t unflushed = 0; |
| |
| while(1) { |
| // this will only be true when the destructor is being invoked |
| if(closing_) { |
| // empty out both the buffers |
| if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) { |
| if (-1 == ::close(fd_)) { |
| int errno_copy = errno; |
| GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy); |
| throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy); |
| } |
| // just be safe and sync to disk |
| fsync(fd_); |
| fd_ = 0; |
| pthread_exit(NULL); |
| return; |
| } |
| } |
| |
| if (swapEventBuffers(&ts_next_flush)) { |
| eventInfo* outEvent; |
| while (NULL != (outEvent = dequeueBuffer_->getNext())) { |
| if (!outEvent) { |
| T_DEBUG_L(1, "Got an empty event"); |
| return; |
| } |
| |
| // sanity check on event |
| if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { |
| T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_); |
| continue; |
| } |
| |
| // If chunking is required, then make sure that msg does not cross chunk boundary |
| if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { |
| |
| // event size must be less than chunk size |
| if(outEvent->eventSize_ > chunkSize_) { |
| T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event", |
| outEvent->eventSize_, chunkSize_); |
| continue; |
| } |
| |
| int64_t chunk1 = offset_/chunkSize_; |
| int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_; |
| |
| // if adding this event will cross a chunk boundary, pad the chunk with zeros |
| if (chunk1 != chunk2) { |
| // refetch the offset to keep in sync |
| offset_ = lseek(fd_, 0, SEEK_CUR); |
| int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_); |
| |
| uint8_t zeros[padding]; |
| bzero(zeros, padding); |
| if (-1 == ::write(fd_, zeros, padding)) { |
| int errno_copy = errno; |
| GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy); |
| throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy); |
| } |
| unflushed += padding; |
| offset_ += padding; |
| } |
| } |
| |
| // write the dequeued event to the file |
| if (outEvent->eventSize_ > 0) { |
| if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { |
| int errno_copy = errno; |
| GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy); |
| throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy); |
| } |
| |
| unflushed += outEvent->eventSize_; |
| offset_ += outEvent->eventSize_; |
| } |
| } |
| dequeueBuffer_->reset(); |
| } |
| |
| bool flushTimeElapsed = false; |
| struct timespec current_time; |
| clock_gettime(CLOCK_REALTIME, ¤t_time); |
| |
| if (current_time.tv_sec > ts_next_flush.tv_sec || |
| (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) { |
| flushTimeElapsed = true; |
| getNextFlushTime(&ts_next_flush); |
| } |
| |
| // couple of cases from which a flush could be triggered |
| if ((flushTimeElapsed && unflushed > 0) || |
| unflushed > flushMaxBytes_ || |
| forceFlush_) { |
| |
| // sync (force flush) file to disk |
| fsync(fd_); |
| unflushed = 0; |
| |
| // notify anybody waiting for flush completion |
| forceFlush_ = false; |
| pthread_cond_broadcast(&flushed_); |
| } |
| } |
| } |
| |
| void TFileTransport::flush() { |
| // file must be open for writing for any flushing to take place |
| if (writerThreadId_ <= 0) { |
| return; |
| } |
| // wait for flush to take place |
| pthread_mutex_lock(&mutex_); |
| |
| forceFlush_ = true; |
| |
| while (forceFlush_) { |
| pthread_cond_wait(&flushed_, &mutex_); |
| } |
| |
| pthread_mutex_unlock(&mutex_); |
| } |
| |
| |
| uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) { |
| uint32_t have = 0; |
| uint32_t get = 0; |
| |
| while (have < len) { |
| get = read(buf+have, len-have); |
| if (get <= 0) { |
| throw TEOFException(); |
| } |
| have += get; |
| } |
| |
| return have; |
| } |
| |
| uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) { |
| // check if there an event is ready to be read |
| if (!currentEvent_) { |
| currentEvent_ = readEvent(); |
| } |
| |
| // did not manage to read an event from the file. This could have happened |
| // if the timeout expired or there was some other error |
| if (!currentEvent_) { |
| return 0; |
| } |
| |
| // read as much of the current event as possible |
| int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_; |
| if (remaining <= (int32_t)len) { |
| // copy over anything thats remaining |
| if (remaining > 0) { |
| memcpy(buf, |
| currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, |
| remaining); |
| } |
| delete(currentEvent_); |
| currentEvent_ = NULL; |
| return remaining; |
| } |
| |
| // read as much as possible |
| memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len); |
| currentEvent_->eventBuffPos_ += len; |
| return len; |
| } |
| |
| eventInfo* TFileTransport::readEvent() { |
| int readTries = 0; |
| |
| if (!readBuff_) { |
| readBuff_ = new uint8_t[readBuffSize_]; |
| } |
| |
| while (1) { |
| // read from the file if read buffer is exhausted |
| if (readState_.bufferPtr_ == readState_.bufferLen_) { |
| // advance the offset pointer |
| offset_ += readState_.bufferLen_; |
| readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_); |
| // if (readState_.bufferLen_) { |
| // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); |
| // } |
| readState_.bufferPtr_ = 0; |
| readState_.lastDispatchPtr_ = 0; |
| |
| // read error |
| if (readState_.bufferLen_ == -1) { |
| readState_.resetAllValues(); |
| GlobalOutput("TFileTransport: error while reading from file"); |
| throw TTransportException("TFileTransport: error while reading from file"); |
| } else if (readState_.bufferLen_ == 0) { // EOF |
| // wait indefinitely if there is no timeout |
| if (readTimeout_ == TAIL_READ_TIMEOUT) { |
| usleep(eofSleepTime_); |
| continue; |
| } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) { |
| // reset state |
| readState_.resetState(0); |
| return NULL; |
| } else if (readTimeout_ > 0) { |
| // timeout already expired once |
| if (readTries > 0) { |
| readState_.resetState(0); |
| return NULL; |
| } else { |
| usleep(readTimeout_ * 1000); |
| readTries++; |
| continue; |
| } |
| } |
| } |
| } |
| |
| readTries = 0; |
| |
| // attempt to read an event from the buffer |
| while(readState_.bufferPtr_ < readState_.bufferLen_) { |
| if (readState_.readingSize_) { |
| if(readState_.eventSizeBuffPos_ == 0) { |
| if ( (offset_ + readState_.bufferPtr_)/chunkSize_ != |
| ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) { |
| // skip one byte towards chunk boundary |
| // T_DEBUG_L(1, "Skipping a byte"); |
| readState_.bufferPtr_++; |
| continue; |
| } |
| } |
| |
| readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = |
| readBuff_[readState_.bufferPtr_++]; |
| if (readState_.eventSizeBuffPos_ == 4) { |
| // 0 length event indicates padding |
| if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) { |
| // T_DEBUG_L(1, "Got padding"); |
| readState_.resetState(readState_.lastDispatchPtr_); |
| continue; |
| } |
| // got a valid event |
| readState_.readingSize_ = false; |
| if (readState_.event_) { |
| delete(readState_.event_); |
| } |
| readState_.event_ = new eventInfo(); |
| readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_)); |
| |
| // check if the event is corrupted and perform recovery if required |
| if (isEventCorrupted()) { |
| performRecovery(); |
| // start from the top |
| break; |
| } |
| } |
| } else { |
| if (!readState_.event_->eventBuff_) { |
| readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_]; |
| readState_.event_->eventBuffPos_ = 0; |
| } |
| // take either the entire event or the remaining bytes in the buffer |
| int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_), |
| readState_.event_->eventSize_ - readState_.event_->eventBuffPos_); |
| |
| // copy data from read buffer into event buffer |
| memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_, |
| readBuff_ + readState_.bufferPtr_, |
| reclaimBuffer); |
| |
| // increment position ptrs |
| readState_.event_->eventBuffPos_ += reclaimBuffer; |
| readState_.bufferPtr_ += reclaimBuffer; |
| |
| // check if the event has been read in full |
| if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) { |
| // set the completed event to the current event |
| eventInfo* completeEvent = readState_.event_; |
| completeEvent->eventBuffPos_ = 0; |
| |
| readState_.event_ = NULL; |
| readState_.resetState(readState_.bufferPtr_); |
| |
| // exit criteria |
| return completeEvent; |
| } |
| } |
| } |
| |
| } |
| } |
| |
| bool TFileTransport::isEventCorrupted() { |
| // an error is triggered if: |
| if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) { |
| // 1. Event size is larger than user-speficied max-event size |
| T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)", |
| readState_.event_->eventSize_, maxEventSize_); |
| return true; |
| } else if (readState_.event_->eventSize_ > chunkSize_) { |
| // 2. Event size is larger than chunk size |
| T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)", |
| readState_.event_->eventSize_, chunkSize_); |
| return true; |
| } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) != |
| ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) { |
| // 3. size indicates that event crosses chunk boundary |
| T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld", |
| readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| void TFileTransport::performRecovery() { |
| // perform some kickass recovery |
| uint32_t curChunk = getCurChunk(); |
| if (lastBadChunk_ == curChunk) { |
| numCorruptedEventsInChunk_++; |
| } else { |
| lastBadChunk_ = curChunk; |
| numCorruptedEventsInChunk_ = 1; |
| } |
| |
| if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { |
| // maybe there was an error in reading the file from disk |
| // seek to the beginning of chunk and try again |
| seekToChunk(curChunk); |
| } else { |
| |
| // just skip ahead to the next chunk if we not already at the last chunk |
| if (curChunk != (getNumChunks() - 1)) { |
| seekToChunk(curChunk + 1); |
| } else if (readTimeout_ == TAIL_READ_TIMEOUT) { |
| // if tailing the file, wait until there is enough data to start |
| // the next chunk |
| while(curChunk == (getNumChunks() - 1)) { |
| usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US); |
| } |
| seekToChunk(curChunk + 1); |
| } else { |
| // pretty hosed at this stage, rewind the file back to the last successful |
| // point and punt on the error |
| readState_.resetState(readState_.lastDispatchPtr_); |
| currentEvent_ = NULL; |
| char errorMsg[1024]; |
| sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", |
| offset_ + readState_.lastDispatchPtr_); |
| GlobalOutput(errorMsg); |
| throw TTransportException(errorMsg); |
| } |
| } |
| |
| } |
| |
| void TFileTransport::seekToChunk(int32_t chunk) { |
| if (fd_ <= 0) { |
| throw TTransportException("File not open"); |
| } |
| |
| int32_t numChunks = getNumChunks(); |
| |
| // file is empty, seeking to chunk is pointless |
| if (numChunks == 0) { |
| return; |
| } |
| |
| // negative indicates reverse seek (from the end) |
| if (chunk < 0) { |
| chunk += numChunks; |
| } |
| |
| // too large a value for reverse seek, just seek to beginning |
| if (chunk < 0) { |
| T_DEBUG("Incorrect value for reverse seek. Seeking to beginning...", chunk) |
| chunk = 0; |
| } |
| |
| // cannot seek past EOF |
| bool seekToEnd = false; |
| uint32_t minEndOffset = 0; |
| if (chunk >= numChunks) { |
| T_DEBUG("Trying to seek past EOF. Seeking to EOF instead..."); |
| seekToEnd = true; |
| chunk = numChunks - 1; |
| // this is the min offset to process events till |
| minEndOffset = lseek(fd_, 0, SEEK_END); |
| } |
| |
| off_t newOffset = off_t(chunk) * chunkSize_; |
| offset_ = lseek(fd_, newOffset, SEEK_SET); |
| readState_.resetAllValues(); |
| currentEvent_ = NULL; |
| if (offset_ == -1) { |
| GlobalOutput("TFileTransport: lseek error in seekToChunk"); |
| throw TTransportException("TFileTransport: lseek error in seekToChunk"); |
| } |
| |
| // seek to EOF if user wanted to go to last chunk |
| if (seekToEnd) { |
| uint32_t oldReadTimeout = getReadTimeout(); |
| setReadTimeout(NO_TAIL_READ_TIMEOUT); |
| // keep on reading unti the last event at point of seekChunk call |
| while (readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {}; |
| setReadTimeout(oldReadTimeout); |
| } |
| |
| } |
| |
| void TFileTransport::seekToEnd() { |
| seekToChunk(getNumChunks()); |
| } |
| |
| uint32_t TFileTransport::getNumChunks() { |
| if (fd_ <= 0) { |
| return 0; |
| } |
| |
| struct stat f_info; |
| int rv = fstat(fd_, &f_info); |
| |
| if (rv < 0) { |
| int errno_copy = errno; |
| throw TTransportException(TTransportException::UNKNOWN, |
| "TFileTransport::getNumChunks() (fstat)", |
| errno_copy); |
| } |
| |
| if (f_info.st_size > 0) { |
| return ((f_info.st_size)/chunkSize_) + 1; |
| } |
| |
| // empty file has no chunks |
| return 0; |
| } |
| |
| uint32_t TFileTransport::getCurChunk() { |
| return offset_/chunkSize_; |
| } |
| |
| // Utility Functions |
| void TFileTransport::openLogFile() { |
| mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH; |
| int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND; |
| fd_ = ::open(filename_.c_str(), flags, mode); |
| offset_ = 0; |
| |
| // make sure open call was successful |
| if(fd_ == -1) { |
| int errno_copy = errno; |
| GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy); |
| throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy); |
| } |
| |
| } |
| |
| void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) { |
| clock_gettime(CLOCK_REALTIME, ts_next_flush); |
| ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000; |
| if (ts_next_flush->tv_nsec > 1000000000) { |
| ts_next_flush->tv_nsec -= 1000000000; |
| ts_next_flush->tv_sec += 1; |
| } |
| ts_next_flush->tv_sec += flushMaxUs_ / 1000000; |
| } |
| |
| TFileTransportBuffer::TFileTransportBuffer(uint32_t size) |
| : bufferMode_(WRITE) |
| , writePoint_(0) |
| , readPoint_(0) |
| , size_(size) |
| { |
| buffer_ = new eventInfo*[size]; |
| } |
| |
| TFileTransportBuffer::~TFileTransportBuffer() { |
| if (buffer_) { |
| for (uint32_t i = 0; i < writePoint_; i++) { |
| delete buffer_[i]; |
| } |
| delete[] buffer_; |
| buffer_ = NULL; |
| } |
| } |
| |
| bool TFileTransportBuffer::addEvent(eventInfo *event) { |
| if (bufferMode_ == READ) { |
| GlobalOutput("Trying to write to a buffer in read mode"); |
| } |
| if (writePoint_ < size_) { |
| buffer_[writePoint_++] = event; |
| return true; |
| } else { |
| // buffer is full |
| return false; |
| } |
| } |
| |
| eventInfo* TFileTransportBuffer::getNext() { |
| if (bufferMode_ == WRITE) { |
| bufferMode_ = READ; |
| } |
| if (readPoint_ < writePoint_) { |
| return buffer_[readPoint_++]; |
| } else { |
| // no more entries |
| return NULL; |
| } |
| } |
| |
| void TFileTransportBuffer::reset() { |
| if (bufferMode_ == WRITE || writePoint_ > readPoint_) { |
| T_DEBUG("Resetting a buffer with unread entries"); |
| } |
| // Clean up the old entries |
| for (uint32_t i = 0; i < writePoint_; i++) { |
| delete buffer_[i]; |
| } |
| bufferMode_ = WRITE; |
| writePoint_ = 0; |
| readPoint_ = 0; |
| } |
| |
| bool TFileTransportBuffer::isFull() { |
| return writePoint_ == size_; |
| } |
| |
| bool TFileTransportBuffer::isEmpty() { |
| return writePoint_ == 0; |
| } |
| |
| TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, |
| shared_ptr<TProtocolFactory> protocolFactory, |
| shared_ptr<TFileReaderTransport> inputTransport): |
| processor_(processor), |
| inputProtocolFactory_(protocolFactory), |
| outputProtocolFactory_(protocolFactory), |
| inputTransport_(inputTransport) { |
| |
| // default the output transport to a null transport (common case) |
| outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport()); |
| } |
| |
| TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, |
| shared_ptr<TProtocolFactory> inputProtocolFactory, |
| shared_ptr<TProtocolFactory> outputProtocolFactory, |
| shared_ptr<TFileReaderTransport> inputTransport): |
| processor_(processor), |
| inputProtocolFactory_(inputProtocolFactory), |
| outputProtocolFactory_(outputProtocolFactory), |
| inputTransport_(inputTransport) { |
| |
| // default the output transport to a null transport (common case) |
| outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport()); |
| } |
| |
| TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, |
| shared_ptr<TProtocolFactory> protocolFactory, |
| shared_ptr<TFileReaderTransport> inputTransport, |
| shared_ptr<TTransport> outputTransport): |
| processor_(processor), |
| inputProtocolFactory_(protocolFactory), |
| outputProtocolFactory_(protocolFactory), |
| inputTransport_(inputTransport), |
| outputTransport_(outputTransport) {}; |
| |
| void TFileProcessor::process(uint32_t numEvents, bool tail) { |
| shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); |
| shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); |
| |
| // set the read timeout to 0 if tailing is required |
| int32_t oldReadTimeout = inputTransport_->getReadTimeout(); |
| if (tail) { |
| // save old read timeout so it can be restored |
| inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT); |
| } |
| |
| uint32_t numProcessed = 0; |
| while(1) { |
| // bad form to use exceptions for flow control but there is really |
| // no other way around it |
| try { |
| processor_->process(inputProtocol, outputProtocol); |
| numProcessed++; |
| if ( (numEvents > 0) && (numProcessed == numEvents)) { |
| return; |
| } |
| } catch (TEOFException& teof) { |
| if (!tail) { |
| break; |
| } |
| } catch (TException &te) { |
| cerr << te.what() << endl; |
| break; |
| } |
| } |
| |
| // restore old read timeout |
| if (tail) { |
| inputTransport_->setReadTimeout(oldReadTimeout); |
| } |
| |
| } |
| |
| void TFileProcessor::processChunk() { |
| shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); |
| shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); |
| |
| uint32_t curChunk = inputTransport_->getCurChunk(); |
| |
| while(1) { |
| // bad form to use exceptions for flow control but there is really |
| // no other way around it |
| try { |
| processor_->process(inputProtocol, outputProtocol); |
| if (curChunk != inputTransport_->getCurChunk()) { |
| break; |
| } |
| } catch (TEOFException& teof) { |
| break; |
| } catch (TException &te) { |
| cerr << te.what() << endl; |
| break; |
| } |
| } |
| } |
| |
| }}} // apache::thrift::transport |