| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * cdbbufferedappend.c |
| * Write buffers to the end of a file efficiently. |
| * |
| * (See .h file for usage comments) |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "cdb/cdbbufferedappend.h" |
| #include <unistd.h> /* for write() */ |
| #include "utils/guc.h" |
| |
| static void BufferedAppendWrite( |
| BufferedAppend *bufferedAppend); |
| |
| /* |
| * Determines the amount of memory to supply for |
| * BufferedAppend given the desired buffer and |
| * large write lengths. |
| */ |
| int32 BufferedAppendMemoryLen( |
| int32 maxBufferLen, |
| int32 maxLargeWriteLen) |
| { |
| Assert(maxBufferLen > 0); |
| Assert(maxLargeWriteLen >= maxBufferLen); |
| |
| // Large write memory areas plus adjacent extra memory for 1 buffer. |
| return (maxLargeWriteLen + maxBufferLen); |
| } |
| |
| /* |
| * Initialize BufferedAppend. |
| * |
| * Use the BufferedAppendMemoryLen procedure to |
| * determine the amount of memory to supply. |
| */ |
| void BufferedAppendInit( |
| BufferedAppend *bufferedAppend, |
| uint8 *memory, |
| int32 memoryLen, |
| int32 maxBufferLen, |
| int32 maxLargeWriteLen, |
| char *relationName) |
| { |
| int relationNameLen; |
| |
| Assert(bufferedAppend != NULL); |
| Assert(memory != NULL); |
| Assert(maxBufferLen > 0); |
| Assert(maxLargeWriteLen >= maxBufferLen); |
| Assert(memoryLen >= BufferedAppendMemoryLen(maxBufferLen, maxLargeWriteLen)); |
| |
| memset(bufferedAppend, 0, sizeof(BufferedAppend)); |
| |
| /* |
| * Init level. |
| */ |
| relationNameLen = strlen(relationName); |
| bufferedAppend->relationName = (char *) palloc(relationNameLen + 1); |
| memcpy(bufferedAppend->relationName, relationName, relationNameLen + 1); |
| |
| /* |
| * Large-read memory level members. |
| */ |
| bufferedAppend->maxBufferLen = maxBufferLen; |
| bufferedAppend->maxLargeWriteLen = maxLargeWriteLen; |
| |
| bufferedAppend->memory = memory; |
| bufferedAppend->memoryLen = memoryLen; |
| |
| bufferedAppend->largeWriteMemory = memory; |
| bufferedAppend->afterBufferMemory = |
| &memory[maxLargeWriteLen]; |
| |
| bufferedAppend->largeWritePosition = 0; |
| bufferedAppend->largeWriteLen = 0; |
| |
| /* |
| * Buffer level members. |
| */ |
| bufferedAppend->bufferLen = 0; |
| |
| /* |
| * File level members. |
| */ |
| bufferedAppend->file = -1; |
| bufferedAppend->filePathName = NULL; |
| bufferedAppend->fileLen = 0; |
| } |
| |
| /* |
| * Takes an open file handle for the next file. |
| * |
| * Note that eof_uncompressed is used only for storing the |
| * uncompressed file size in the catalog, so that the compression |
| * ratio could be calculated at the user's request. |
| */ |
| void BufferedAppendSetFile( |
| BufferedAppend *bufferedAppend, |
| File file, |
| char *filePathName, |
| int64 eof, |
| int64 eof_uncompressed) |
| { |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->largeWritePosition == 0); |
| Assert(bufferedAppend->largeWriteLen == 0); |
| Assert(bufferedAppend->bufferLen == 0); |
| Assert(bufferedAppend->file == -1); |
| Assert(bufferedAppend->fileLen == 0); |
| Assert(bufferedAppend->fileLen_uncompressed == 0); |
| Assert(bufferedAppend->initialSetFilePosition == 0); |
| |
| Assert(file >= 0); |
| Assert(eof >= 0); |
| |
| bufferedAppend->largeWritePosition = eof; |
| bufferedAppend->file = file; |
| bufferedAppend->filePathName = filePathName; |
| bufferedAppend->fileLen = eof; |
| bufferedAppend->fileLen_uncompressed = eof_uncompressed; |
| |
| bufferedAppend->initialSetFilePosition = eof; |
| } |
| |
| |
| /* |
| * Perform a large write i/o. |
| */ |
| static void BufferedAppendWrite( |
| BufferedAppend *bufferedAppend) |
| { |
| int32 writeLen; |
| uint8 *largeWriteMemory; |
| int actualLen; |
| |
| writeLen = bufferedAppend->largeWriteLen; |
| Assert(bufferedAppend->largeWriteLen > 0); |
| largeWriteMemory = bufferedAppend->largeWriteMemory; |
| |
| #ifdef USE_ASSERT_CHECKING |
| { |
| int64 currentWritePosition; |
| |
| currentWritePosition = FileNonVirtualTell(bufferedAppend->file); |
| if (currentWritePosition < 0) |
| ereport(ERROR, |
| (errcode_for_file_access(), errmsg("unable to get current position in table \"%s\" for file \"%s\" (errcode %d)", |
| bufferedAppend->relationName, bufferedAppend->filePathName, errno), |
| errdetail("%s", HdfsGetLastError()))); |
| |
| if (currentWritePosition != bufferedAppend->largeWritePosition) |
| ereport(ERROR, (errcode_for_file_access(), |
| errmsg("Current position mismatch actual " INT64_FORMAT ", expected " INT64_FORMAT " in table \"%s\" for file \"%s\"", |
| currentWritePosition, bufferedAppend->largeWritePosition, bufferedAppend->relationName, bufferedAppend->filePathName), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| #endif |
| |
| while (writeLen > 0) |
| { |
| int primaryError; |
| /*bool mirrorDataLossOccurred;*/ |
| |
| MirroredAppendOnly_Append( |
| &bufferedAppend->mirroredOpen, |
| (char*)largeWriteMemory, |
| writeLen, |
| &primaryError/*, |
| &mirrorDataLossOccurred*/); |
| if (primaryError != 0) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("Could not write in table \"%s\" to segment file '%s': %m", |
| bufferedAppend->relationName, bufferedAppend->filePathName), |
| errdetail("%s", HdfsGetLastError()))); |
| |
| if (Debug_appendonly_print_append_block) |
| { |
| elog(LOG, |
| "Append-Only storage write: table '%s', segment file '%s', write position " INT64_FORMAT ", " |
| "writeLen %d (equals large write length %d is %s)", |
| bufferedAppend->relationName, |
| bufferedAppend->filePathName, |
| bufferedAppend->largeWritePosition, |
| writeLen, |
| bufferedAppend->largeWriteLen, |
| (writeLen == bufferedAppend->largeWriteLen ? "true" : "false")); |
| } |
| |
| actualLen = writeLen; |
| |
| writeLen -= actualLen; |
| largeWriteMemory += actualLen; |
| } |
| |
| bufferedAppend->largeWritePosition += bufferedAppend->largeWriteLen; |
| bufferedAppend->largeWriteLen = 0; |
| |
| } |
| |
| /* |
| * Return the position of the current write buffer in bytes. |
| */ |
| int64 BufferedAppendCurrentBufferPosition( |
| BufferedAppend *bufferedAppend) |
| { |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| |
| return bufferedAppend->largeWritePosition; |
| } |
| |
| /* |
| * Return the position of the next write buffer in bytes. |
| */ |
| int64 BufferedAppendNextBufferPosition( |
| BufferedAppend *bufferedAppend) |
| { |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| |
| return bufferedAppend->largeWritePosition + |
| bufferedAppend->largeWriteLen; |
| } |
| |
| /* |
| * Get the next buffer space for appending with a specified length. |
| * |
| * Returns NULL when the current file does not have enough |
| * room for another buffer. |
| */ |
| uint8 *BufferedAppendGetBuffer( |
| BufferedAppend *bufferedAppend, |
| int32 bufferLen) |
| { |
| int32 currentLargeWriteLen; |
| |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| if (bufferLen > bufferedAppend->maxBufferLen) |
| elog(ERROR, |
| "bufferLen %d greater than maxBufferLen %d at position " INT64_FORMAT " in table \"%s\" in file \"%s\"", |
| bufferLen, bufferedAppend->maxBufferLen, bufferedAppend->largeWritePosition, |
| bufferedAppend->relationName, |
| bufferedAppend->filePathName); |
| |
| /* |
| * Let next buffer carry-over into the extra buffer space after |
| * the large write buffer. |
| */ |
| currentLargeWriteLen = bufferedAppend->largeWriteLen; |
| Assert (currentLargeWriteLen + bufferLen <= |
| bufferedAppend->maxLargeWriteLen + |
| bufferedAppend->maxBufferLen); |
| |
| bufferedAppend->bufferLen = bufferLen; |
| |
| return &bufferedAppend->largeWriteMemory[currentLargeWriteLen]; |
| } |
| |
| /* |
| * Get the address of the current buffer space being used appending. |
| */ |
| uint8 *BufferedAppendGetCurrentBuffer( |
| BufferedAppend *bufferedAppend) |
| { |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| |
| return &bufferedAppend->largeWriteMemory[bufferedAppend->largeWriteLen]; |
| } |
| |
| /* |
| * Get the next maximum length buffer space for appending. |
| * |
| * Returns NULL when the current file does not have enough |
| * room for another buffer. |
| */ |
| uint8 *BufferedAppendGetMaxBuffer( |
| BufferedAppend *bufferedAppend) |
| { |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| |
| return BufferedAppendGetBuffer( |
| bufferedAppend, |
| bufferedAppend->maxBufferLen); |
| } |
| |
| void BufferedAppendCancelLastBuffer( |
| BufferedAppend *bufferedAppend) |
| { |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| |
| bufferedAppend->bufferLen = 0; |
| } |
| |
| /* |
| * Indicate the current buffer is finished. |
| */ |
| void BufferedAppendFinishBuffer( |
| BufferedAppend *bufferedAppend, |
| int32 usedLen, |
| int32 usedLen_uncompressed) |
| { |
| int32 newLen; |
| |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| if (usedLen > bufferedAppend->bufferLen) |
| elog(ERROR, |
| "Used length %d greater than bufferLen %d at position " INT64_FORMAT " in table \"%s\" in file \"%s\"", |
| usedLen, bufferedAppend->bufferLen, bufferedAppend->largeWritePosition, |
| bufferedAppend->relationName, |
| bufferedAppend->filePathName); |
| |
| |
| newLen = bufferedAppend->largeWriteLen + usedLen; |
| Assert (newLen <= bufferedAppend->maxLargeWriteLen + |
| bufferedAppend->maxBufferLen); |
| if (newLen >= bufferedAppend->maxLargeWriteLen) |
| { |
| /* |
| * Current large-write memory is full. |
| */ |
| bufferedAppend->largeWriteLen = bufferedAppend->maxLargeWriteLen; |
| BufferedAppendWrite(bufferedAppend); |
| |
| if (newLen > bufferedAppend->maxLargeWriteLen) |
| { |
| int32 excessLen; |
| |
| /* |
| * We have carry-over in the extra buffer. Write and then |
| * copy the extra to the front of the large write buffer. |
| */ |
| excessLen = newLen - bufferedAppend->maxLargeWriteLen; |
| |
| bufferedAppend->largeWriteLen = bufferedAppend->maxLargeWriteLen; |
| |
| memcpy(bufferedAppend->largeWriteMemory, |
| bufferedAppend->afterBufferMemory, |
| excessLen); |
| |
| bufferedAppend->largeWriteLen = excessLen; |
| } |
| else |
| { |
| /* |
| * Exactly fits. |
| */ |
| Assert(newLen == bufferedAppend->maxLargeWriteLen); |
| Assert(bufferedAppend->largeWriteLen == 0); |
| } |
| } |
| else |
| { |
| /* |
| * Normal case -- added more data to current buffer. |
| */ |
| bufferedAppend->largeWriteLen = newLen; |
| } |
| |
| bufferedAppend->fileLen += usedLen; |
| bufferedAppend->fileLen_uncompressed += usedLen_uncompressed; |
| } |
| |
| /* |
| * Finish the current buffer and get the next |
| * buffer for appending. |
| * |
| * Returns NULL when the current file does not have enough |
| * room for another buffer. |
| */ |
| uint8 *BufferedAppendMoveToNextBuffer( |
| BufferedAppend *bufferedAppend, |
| int32 usedLen, |
| int32 usedLen_uncompressed) |
| { |
| Assert(bufferedAppend != NULL); |
| |
| BufferedAppendFinishBuffer( |
| bufferedAppend, |
| usedLen, |
| usedLen_uncompressed); |
| |
| return BufferedAppendGetMaxBuffer(bufferedAppend); |
| } |
| |
| |
| /* |
| * Returns the current file length. |
| */ |
| int64 BufferedAppendFileLen( |
| BufferedAppend *bufferedAppend) |
| { |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| |
| return bufferedAppend->fileLen; |
| } |
| |
| /* |
| * Flushes the current file for append. Caller is resposible for closing |
| * the file afterwards. That close will flush any buffered writes for the |
| * file. |
| * |
| * Returns the file length. |
| */ |
| void BufferedAppendCompleteFile( |
| BufferedAppend *bufferedAppend, |
| int64 *fileLen, |
| int64 *fileLen_uncompressed) |
| { |
| |
| Assert(bufferedAppend != NULL); |
| Assert(bufferedAppend->file >= 0); |
| |
| if (bufferedAppend->largeWriteLen > 0) |
| BufferedAppendWrite(bufferedAppend); |
| |
| *fileLen = bufferedAppend->fileLen; |
| *fileLen_uncompressed = bufferedAppend->fileLen_uncompressed; |
| |
| bufferedAppend->largeWritePosition = 0; |
| bufferedAppend->largeWriteLen = 0; |
| |
| bufferedAppend->bufferLen = 0; |
| |
| bufferedAppend->fileLen = 0; |
| bufferedAppend->fileLen_uncompressed = 0; |
| bufferedAppend->file = -1; |
| bufferedAppend->filePathName = NULL; |
| |
| bufferedAppend->initialSetFilePosition = 0; |
| } |
| |
| /* |
| * Finish with writing all together. |
| */ |
| void BufferedAppendFinish( |
| BufferedAppend *bufferedAppend) |
| { |
| Assert(bufferedAppend != NULL); |
| |
| Assert(bufferedAppend->file == -1); |
| Assert(bufferedAppend->bufferLen == 0); |
| Assert(bufferedAppend->fileLen == 0); |
| Assert(bufferedAppend->fileLen_uncompressed == 0); |
| |
| if (bufferedAppend->relationName != NULL) |
| { |
| pfree(bufferedAppend->relationName); |
| bufferedAppend->relationName = NULL; |
| } |
| } |