| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * cdbtmtest.c |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| #include <stdio.h> |
| #include <fcntl.h> |
| #include <unistd.h> |
| #include <signal.h> |
| #include <sys/stat.h> |
| #define PROVIDE_64BIT_CRC |
| |
| |
| #include "utils/elog.h" |
| #include "utils/guc.h" |
| #include "access/xlog_internal.h" |
| #include "miscadmin.h" |
| #include "libpq/libpq.h" |
| #include "libpq/pqformat.h" |
| #include "storage/ipc.h" |
| |
| #include "tcop/dest.h" |
| |
| #include "cdb/cdblogsync.h" |
| #include "catalog/pg_control.h" |
| #include "postmaster/walredoserver.h" |
| #include "storage/fd.h" |
| |
| /* |
| * There is a protocol consisting of 3 messages from primary->mirror: |
| * |
| * Q position_to_end |
| * Q xlog logid seg woffset wlen |
| * Q close |
| */ |
| |
| /* for xlog */ |
| static char xlogfilename[100]; |
| static int xlogfilefd = -1; |
| static uint32 xlogid = -1; |
| static uint32 xseg = -1; |
| static int xlogfileoffset = -1; |
| |
| /* param sent by primary segdb */ |
| static int cmdtype; |
| static uint32 wlogid; |
| static uint32 wseg; |
| static int woffset; |
| static int wlen; |
| |
| static uint32 logidNewCheckpoint; |
| static uint32 segNewCheckpoint; |
| static int offsetNewCheckpoint; |
| |
| /* XXX: file global buffer, called "buf" ??? At least it is allocated and free()ed as needed. */ |
| static void *buf = NULL; |
| static int buflen; |
| |
| static void cdb_sync_xlog(void); |
| static void parseCmd(const char *); |
| static void readLogMessage(void *, int); |
| static void syncWriteLog(int, void *, int, int); |
| static void ensureBufferSize(void); |
| static void write_with_ereport(int fd, void *data, int len); |
| static void cdb_position_to_end(void); |
| static void cdb_new_checkpoint_loc(void); |
| static void cdb_close(void); |
| |
| |
| static void |
| write_with_ereport(int fd, void *data, int len) |
| { |
| int write_len; |
| |
| write_len = write(fd, data, len); |
| |
| if (write_len != len) |
| { |
| close(fd); |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("error writing file: %m"))); |
| } |
| } |
| |
| |
| /* |
| * initialize log sync - create recovery.conf file ready for recovery |
| */ |
| void |
| cdb_init_log_sync(void) |
| { |
| /* create recovery.conf file for recovery */ |
| int rconffd = -1; |
| char path[MAXPGPATH]; |
| |
| /* the first time or file is changed */ |
| snprintf(path, MAXPGPATH, "%s/recovery.conf", DataDir); |
| if ((rconffd = open(path, O_RDWR, 0)) < 0) |
| { |
| char *cmd = "restore_command=''\n"; |
| |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: creating recovery.conf file"); |
| rconffd = open(path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
| if (rconffd < 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("QDSYNC: could not create recovery.conf file \"%s\"", |
| path))); |
| } |
| |
| write_with_ereport(rconffd, cmd, strlen(cmd)); |
| } |
| |
| close(rconffd); |
| } |
| |
| /* |
| * cdb_sync_command - process sync command |
| */ |
| bool |
| cdb_sync_command(const char *cmd) |
| { |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: %s", cmd); |
| parseCmd(cmd); |
| |
| switch (cmdtype) |
| { |
| case SYNC_XLOG: |
| cdb_sync_xlog(); |
| break; |
| case SYNC_POSITION_TO_END: |
| cdb_position_to_end(); |
| break; |
| case SYNC_NEW_CHECKPOINT_LOC: |
| cdb_new_checkpoint_loc(); |
| break; |
| case SYNC_SHUTDOWN_TOO_FAR_BEHIND: |
| return true; |
| case SYNC_CLOSE: |
| cdb_close(); |
| elog(LOG,"QDSYNC: master is closing..."); |
| break; |
| } |
| |
| return false; |
| } |
| |
| /* |
| * open xlog file |
| */ |
| static void |
| openXlogEnd(XLogRecPtr *endLocation) |
| { |
| char path[MAXPGPATH]; |
| uint32 logid; |
| uint32 seg; |
| char *xlogDir = NULL; |
| |
| XLByteToSeg(*endLocation, logid, seg); |
| XLogFileName(xlogfilename, ThisTimeLineID, logid, seg); |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: opening logid %d seg %d file %s", |
| logid, seg, xlogfilename); |
| |
| xlogDir = makeRelativeToTxnFilespace(XLOGDIR); |
| /* the first time or file is changed */ |
| if (snprintf(path, MAXPGPATH, "%s/%s", xlogDir, xlogfilename) >= MAXPGPATH) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("QDSYNC: could not create xlog file \"%s/%s\"", |
| xlogDir, xlogfilename))); |
| } |
| |
| if (xlogfilefd >= 0) |
| { |
| close(xlogfilefd); |
| xlogfilefd = -1; |
| xlogfileoffset = -1; |
| } |
| |
| xlogfilefd = open(path, O_RDWR, 0); |
| if (xlogfilefd < 0) |
| { |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: creating xlog file %s", xlogfilename); |
| xlogfilefd = open(path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
| if (xlogfilefd < 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("QDSYNC: could not create xlog file \"%s\"", |
| path))); |
| } |
| } |
| |
| xlogid = logid; |
| xseg = seg; |
| xlogfileoffset = endLocation->xrecoff % XLogSegSize; |
| |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5),"QDSYNC: opened '%s' offset 0x%X", |
| xlogfilename, xlogfileoffset); |
| |
| pfree(xlogDir); |
| } |
| |
| static void |
| putEndLocationReply(XLogRecPtr *endLocation) |
| { |
| StringInfoData buf; |
| |
| pq_beginmessage(&buf, 's'); |
| pq_sendint(&buf, endLocation->xlogid, 4); |
| pq_sendint(&buf, endLocation->xrecoff, 4); |
| pq_endmessage(&buf); |
| pq_flush(); |
| } |
| |
| static XLogRecPtr syncRedoLoc; |
| |
| static void |
| cdb_position_to_end(void) |
| { |
| XLogRecPtr redoCheckpointLoc; |
| CheckPoint redoCheckpoint; |
| XLogRecPtr endLocation; |
| |
| // Throw in extra new line to make log more readable. |
| elog(LOG,"--------------------------"); |
| |
| XLogGetRecoveryStart("QDSYNC", "to get initial restart location", &redoCheckpointLoc, &redoCheckpoint); |
| syncRedoLoc = redoCheckpoint.redo; |
| |
| // UNDONE: Minimum of redoCheckpointLoc and redoCheckpoint.redo? |
| |
| XLogScanForStandbyEndLocation(&syncRedoLoc, &endLocation); |
| |
| ereport(LOG, |
| (errmsg("QDSYNC: reporting recovery start location %s and scanned end location %s", |
| XLogLocationToString(&syncRedoLoc), |
| XLogLocationToString2(&endLocation)))); |
| |
| // Throw in extra new line to make log more readable. |
| elog(LOG,"--------------------------"); |
| |
| /* |
| * Open up end location segment and set offset to end. |
| */ |
| openXlogEnd(&endLocation); |
| |
| /* |
| * Extra reply information that gives our standby master XLOG end location |
| * to the primary. |
| */ |
| putEndLocationReply(&endLocation); |
| } |
| |
| static void |
| cdb_new_checkpoint_loc(void) |
| { |
| XLogRecPtr newCheckpointLoc; |
| bool successful; |
| |
| newCheckpointLoc.xlogid = logidNewCheckpoint; |
| newCheckpointLoc.xrecoff = segNewCheckpoint * XLogSegSize |
| + offsetNewCheckpoint; |
| if (newCheckpointLoc.xlogid == 0 && newCheckpointLoc.xrecoff == 0) |
| { |
| elog(ERROR, "QDSYNC: found invalid new checkpoint location"); |
| } |
| |
| successful = WalRedoServerNewCheckpointLocation(&newCheckpointLoc); |
| if (!successful) |
| { |
| elog(ERROR, "QDSYNC: redo error occurred"); |
| } |
| } |
| |
| void |
| cdb_shutdown_too_far_behind(void) |
| { |
| if (kill(PostmasterPid, SIGINT) < 0) |
| ereport(ERROR, |
| (errmsg( |
| "kill(%ld,%d) failed: %m", |
| (long) PostmasterPid, SIGINT))); |
| ereport(LOG, |
| (errmsg( |
| "QDSYNC: standby is too far behind the master to be synchronized -- " |
| "requested the standby to do a fast shutdown by signaling the postmaster (pid %d) with SIGINT", |
| (int)PostmasterPid))); |
| } |
| |
| void |
| cdb_perform_redo(XLogRecPtr *redoCheckPointLoc, CheckPoint *redoCheckPoint, XLogRecPtr *newCheckpointLoc) |
| { |
| CheckPoint oldRedoCheckpoint; |
| uint32 logid; |
| uint32 seg; |
| int nsegsremoved; |
| |
| if (redoCheckPointLoc->xlogid == 0 && redoCheckPointLoc->xrecoff == 0) |
| { |
| XLogGetRecoveryStart("QDSYNC", "for redo apply", redoCheckPointLoc, redoCheckPoint); |
| } |
| |
| XLogStandbyRecoverRange(redoCheckPointLoc, redoCheckPoint, newCheckpointLoc); |
| |
| /* |
| * Sample the recovery start location now to see if appling redo |
| * processed checkpoint records and moved the restart location forward. |
| */ |
| oldRedoCheckpoint = *redoCheckPoint; |
| |
| XLogGetRecoveryStart("QDSYNC", "for redo progress check", redoCheckPointLoc, redoCheckPoint); |
| |
| if (XLByteLT(oldRedoCheckpoint.redo,redoCheckPoint->redo)) |
| { |
| ereport(LOG, |
| (errmsg("QDSYNC: transaction redo moved the restart location from %s to %s", |
| XLogLocationToString(&oldRedoCheckpoint.redo), |
| XLogLocationToString2(&redoCheckPoint->redo)))); |
| } |
| else |
| { |
| Assert(XLByteEQ(oldRedoCheckpoint.redo,redoCheckPoint->redo)); |
| ereport(LOG, |
| (errmsg("QDSYNC: transaction redo did not move the restart location %s forward this pass", |
| XLogLocationToString(&oldRedoCheckpoint.redo)))); |
| return; |
| } |
| |
| XLByteToSeg(redoCheckPoint->redo, logid, seg); |
| |
| /* |
| * Delete offline log files (those no longer needed even for previous |
| * checkpoint). |
| */ |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), |
| "QDSYNC: keep log files as far back as (logid %d, seg %d)", |
| logid, seg); |
| |
| if (logid || seg) |
| { |
| PrevLogSeg(logid, seg); |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), |
| "QDSYNC: delete offline log files up to (logid %d, seg %d)", |
| logid, seg); |
| |
| XLogRemoveStandbyLogs(logid, seg, &nsegsremoved); |
| |
| if (nsegsremoved > 0) |
| { |
| // Throw in extra new line to make log more readable. |
| ereport(LOG, |
| (errmsg("QDSYNC: %d logs removed through logid %d, seg %d\n", |
| nsegsremoved, |
| logid, seg))); |
| } |
| |
| } |
| // Throw in extra new line to make log more readable. |
| elog(LOG,"--------------------------"); |
| } |
| |
| static void |
| cdb_close(void) |
| { |
| WalRedoServerQuiesce(); |
| } |
| |
| /* |
| * Most of this procedure is from XLogFileInit. |
| */ |
| static void |
| createZeroFilledNewFile(char *path) |
| { |
| char tmppath[MAXPGPATH]; |
| int fd; |
| char zbuffer[XLOG_BLCKSZ]; |
| int nbytes; |
| char *xlogDir = NULL; |
| |
| /* |
| * Initialize an empty (all zeroes) segment. |
| */ |
| xlogDir = makeRelativeToTxnFilespace(XLOGDIR); |
| if (snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d", xlogDir, (int) getpid()) > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate dir path %s/xlogtemp.%d", xlogDir, (int) getpid()))); |
| } |
| |
| pfree(xlogDir); |
| unlink(tmppath); |
| |
| /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */ |
| fd = open(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, |
| S_IRUSR | S_IWUSR); |
| if (fd < 0) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not create file \"%s\": %m", tmppath))); |
| |
| /* |
| * Zero-fill the file. We have to do this the hard way to ensure that all |
| * the file space has really been allocated --- on platforms that allow |
| * "holes" in files, just seeking to the end doesn't allocate intermediate |
| * space. This way, we know that we have all the space and (after the |
| * fsync below) that all the indirect blocks are down on disk. Therefore, |
| * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the |
| * log file. |
| */ |
| MemSet(zbuffer, 0, sizeof(zbuffer)); |
| for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer)) |
| { |
| errno = 0; |
| if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer)) |
| { |
| int save_errno = errno; |
| |
| /* |
| * If we fail to make the file, delete it to release disk space |
| */ |
| unlink(tmppath); |
| /* if write didn't set errno, assume problem is no disk space */ |
| errno = save_errno ? save_errno : ENOSPC; |
| |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not write to file \"%s\": %m", tmppath))); |
| } |
| } |
| |
| if (pg_fsync(fd) != 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not fsync file \"%s\": %m", tmppath))); |
| } |
| |
| if (close(fd)) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not close file \"%s\": %m", tmppath))); |
| } |
| |
| if (rename(tmppath, path) < 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m", |
| tmppath, path, xlogid, xseg))); |
| } |
| |
| /* |
| * Re-open with different open flags. |
| */ |
| xlogfilefd = open(path, O_RDWR, 0); |
| if (xlogfilefd < 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("QDSYNC: could not create xlog file \"%s\"", |
| path))); |
| } |
| |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: created zero-filled xlog file %s", xlogfilename); |
| } |
| |
| /* |
| * open xlog file |
| */ |
| static void |
| openXlogNextFile(void) |
| { |
| char path[MAXPGPATH]; |
| char *xlogDir = NULL; |
| |
| XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xseg); |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: opening next logid %d seg %d file %s", |
| xlogid, xseg, xlogfilename); |
| |
| xlogDir = makeRelativeToTxnFilespace(XLOGDIR); |
| |
| /* the first time or file is changed */ |
| if (snprintf(path, MAXPGPATH, "%s/%s", xlogDir, xlogfilename) >= MAXPGPATH) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("QDSYNC: could not create xlog file \"%s/%s\"", |
| xlogDir, xlogfilename))); |
| } |
| |
| pfree(xlogDir); |
| |
| xlogfilefd = open(path, O_RDWR, 0); |
| if (xlogfilefd < 0) |
| { |
| createZeroFilledNewFile(path); |
| } |
| |
| xlogfileoffset = 0; |
| |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), |
| "QDSYNC: openXlogNextFile: opened '%s' offset 0x%X", |
| xlogfilename, xlogfileoffset); |
| } |
| |
| /* |
| * cdb_sync_xlog - process xlog sync |
| */ |
| static void |
| cdb_sync_xlog(void) |
| { |
| uint32 currentBlockOffset; |
| XLogRecPtr writeLoc; |
| |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: write logid %d seg %d woffset 0x%X, wlen 0x%X", |
| wlogid, wseg, woffset, wlen); |
| if (woffset % XLOG_BLCKSZ != 0) |
| { |
| elog(ERROR,"QDSYNC: not on block boundaries 0x%X", |
| woffset); |
| } |
| |
| if (wlogid != xlogid || wseg != xseg) |
| { |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), "QDSYNC: closing previous file %s", |
| xlogfilename); |
| if (xlogfilefd >= 0) |
| { |
| close(xlogfilefd); |
| xlogfilefd = -1; |
| xlogfileoffset = -1; |
| } |
| xlogid = wlogid; |
| xseg = wseg; |
| openXlogNextFile(); |
| |
| /* |
| * Assume caller knows where to write. |
| */ |
| xlogfileoffset = woffset; |
| } |
| |
| /* |
| * Validate we are appending or overwritting previous block |
| */ |
| currentBlockOffset = (xlogfileoffset / XLOG_BLCKSZ) * XLOG_BLCKSZ; |
| if (woffset != currentBlockOffset && |
| woffset + XLOG_BLCKSZ != currentBlockOffset) |
| { |
| elog(ERROR,"QDSYNC: not appending to end (primary: 0x%X, standby: 0x%X)", |
| woffset, xlogfileoffset); |
| } |
| |
| /* |
| * no validation checking on xlog. xlog sync is by block and may repeat |
| * the same block, so we do not have any way to check it now. we will rely |
| * on tmlog checking for now. |
| */ |
| ensureBufferSize(); |
| readLogMessage(buf, wlen); |
| if ((wlen / XLOG_BLCKSZ) * XLOG_BLCKSZ != wlen) |
| { |
| int roundedUp; |
| int padLen; |
| |
| /* |
| * Pad buffer out with zeros. |
| */ |
| roundedUp = ((wlen + XLOG_BLCKSZ - 1) / XLOG_BLCKSZ) * XLOG_BLCKSZ; |
| Assert(buflen >= roundedUp); |
| padLen = roundedUp - wlen; |
| |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), |
| "QDSYNC: padding buffer with %d zeros (wlen %d, roundedUp %d)", |
| padLen, wlen, roundedUp); |
| memset(&((char*) buf)[wlen], 0, padLen); |
| |
| wlen = roundedUp; |
| } |
| |
| writeLoc.xlogid = wlogid; |
| writeLoc.xrecoff = wseg * XLogSegSize + |
| woffset; |
| syncWriteLog(xlogfilefd, buf, woffset, wlen); |
| elog((Debug_print_qd_mirroring ? LOG : DEBUG5), |
| "QDSYNC: wrote location %s len 0x%X", |
| XLogLocationToString(&writeLoc), |
| wlen); |
| |
| xlogfileoffset = woffset + wlen; |
| } |
| |
| /* |
| * We parse incoming commands into a file-local static set of parameters. |
| */ |
| static void |
| getXlogCmdArgs(const char *cmdArgs) |
| { |
| char *pos; |
| |
| wlogid = atoi(cmdArgs); |
| |
| pos = strchr(cmdArgs, ' '); |
| if (pos == NULL) |
| elog(ERROR, "QDSYNC: Cannot parse cmd args '%s' -- no seg", cmdArgs); |
| |
| pos++; |
| wseg = atoi(pos); |
| |
| pos = strchr(pos, ' '); |
| if (pos == NULL) |
| elog(ERROR, "QDSYNC: Cannot parse cmd args '%s' -- no offset", cmdArgs); |
| |
| pos++; |
| woffset = atoi(pos); |
| |
| pos = strchr(pos, ' '); |
| if (pos == NULL) |
| elog(ERROR, "QDSYNC: Cannot parse cmd args '%s' -- no len", cmdArgs); |
| |
| pos++; |
| wlen = atoi(pos); |
| } |
| |
| /* |
| * We parse incoming commands into a file-local static set of parameters. |
| */ |
| static void |
| getNewCheckpointLocCmdArgs(const char *cmdArgs) |
| { |
| char *pos; |
| |
| logidNewCheckpoint = atoi(cmdArgs); |
| |
| pos = strchr(cmdArgs, ' '); |
| if (pos == NULL) |
| elog(ERROR, "QDSYNC: cannot parse cmd args '%s' -- no seg", cmdArgs); |
| |
| pos++; |
| segNewCheckpoint = atoi(pos); |
| |
| pos = strchr(pos, ' '); |
| if (pos == NULL) |
| elog(ERROR, "QDSYNC: cannot parse cmd args '%s' -- no offset", cmdArgs); |
| |
| pos++; |
| offsetNewCheckpoint = atoi(pos); |
| } |
| |
| /* |
| * parse sync command |
| */ |
| static void |
| parseCmd(const char *cmd) |
| { |
| if (strncmp(cmd, "xlog", 4) == 0) |
| { |
| /* xlog command */ |
| cmdtype = SYNC_XLOG; |
| getXlogCmdArgs(cmd + strlen("xlog") + 1); |
| } |
| else if (strncmp(cmd, "position_to_end", 15) == 0) |
| { |
| cmdtype = SYNC_POSITION_TO_END; |
| } |
| else if (strncmp(cmd, "new_checkpoint_location", 23) == 0) |
| { |
| cmdtype = SYNC_NEW_CHECKPOINT_LOC; |
| getNewCheckpointLocCmdArgs(cmd + strlen("new_checkpoint_location") + 1); |
| } |
| else if (strncmp(cmd, "shutdown_too_far_behind",23) == 0) |
| { |
| cmdtype = SYNC_SHUTDOWN_TOO_FAR_BEHIND; |
| } |
| else if (strncmp(cmd, "close", 5) == 0) |
| { |
| cmdtype = SYNC_CLOSE; |
| } |
| else |
| ereport(FATAL, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg("queries cannot be executed against the standby master"), |
| errhint("Queries should be issued against the activate master. " |
| "If this host is not available, activate the standby so " |
| "that it can process queries."))); |
| } |
| |
| /* |
| * readLogMessage - read log message sent from client |
| */ |
| static void |
| readLogMessage(void *buf, int plen) |
| { |
| int32 len; |
| |
| if (pq_getbytes((char *) &len, 4) == EOF) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg("QDSYNC: incomplete log packet"))); |
| } |
| |
| len = ntohl(len); |
| len -= 4; |
| pq_getbytes(buf, len); |
| |
| elog(DEBUG4, "QDSYNC: readLogMessage: plen %d len: %d", plen, len); |
| } |
| |
| |
| /* |
| * syncWriteLog - sync log |
| */ |
| static void |
| syncWriteLog(int fd, void *buf, int offset, int len) |
| { |
| int loffset = lseek(fd, offset, SEEK_SET); |
| |
| if (loffset != offset) |
| { |
| elog(ERROR, "QDSYNC: error lseek location: %d, offset: %d, filename '%s', errno: %d", loffset, offset, xlogfilename, errno); |
| } |
| write_with_ereport(fd, buf, len); |
| |
| if (pg_fsync(fd) != 0) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("QDSYNC: could not fsync file '%s': %m", xlogfilename))); |
| } |
| |
| /* |
| * ensureBufferSize - increment buffer size to hold log message if necessary |
| */ |
| static void |
| ensureBufferSize(void) |
| { |
| int roundedUp; |
| |
| roundedUp = ((wlen + XLOG_BLCKSZ - 1) / XLOG_BLCKSZ) * XLOG_BLCKSZ; |
| |
| if (buf != NULL && buflen < roundedUp) |
| { |
| /* buf is smaller, reallocate it */ |
| free(buf); |
| buf = NULL; |
| } |
| |
| if (buf == NULL) |
| { |
| buf = malloc(roundedUp); |
| if (buf == NULL) |
| elog(ERROR, "QDSYNC: malloc failed in sync tmlog, possiblly running out of memory"); |
| buflen = roundedUp; |
| } |
| } |
| |