blob: 42907354d281d7d81e6f08442af7300f05411cf3 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* walredoserver.c
* Process under QD postmaster that manages redo of the WAL by the standby.
*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <time.h>
#include <fcntl.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <netinet/in.h>
#include "utils/elog.h"
#include "miscadmin.h"
#include "libpq/pqsignal.h"
#include "cdb/cdbvars.h"
#include "commands/sequence.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "catalog/pg_control.h"
#include "postmaster/service.h"
#include "postmaster/walredoserver.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/pmsignal.h" /* PostmasterIsAlive */
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/ps_status.h"
#include "cdb/cdbutil.h"
#include "cdb/cdblink.h"
#include "access/xlog_internal.h"
#include "utils/memutils.h"
#include "storage/ipc.h"
#include "storage/spin.h"
#include "cdb/cdblogsync.h"
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
#ifdef EXEC_BACKEND
static pid_t walredoserver_forkexec(void);
#endif
static void WalRedoServerInitContext(void);
/*
* The following are standard procedure signatures needed by
* the Service shared library.
*/
static void WalRedoServer_ServiceRequestShutdown(SIGNAL_ARGS);
static void WalRedoServer_ServiceEarlyInit(void);
static void WalRedoServer_ServicePostgresInit(void);
static void WalRedoServer_ServiceInit(int listenerPort);
static bool WalRedoServer_ServiceShutdownRequested(void);
static bool WalRedoServer_ServiceRequest(ServiceCtrl *serviceCtrl, int sockfd, uint8 *request);
static void WalRedoServer_ServiceShutdown(void);
/*=========================================================================
* GLOBAL STATE VARIABLES
*/
static bool am_walredoserver = false;
static ServiceConfig WalRedoServer_ServiceConfig =
{"WALRDO",
"WAL Redo Server", "WAL Redo Server process", "walredoserver",
sizeof(WalRedoRequest), sizeof(WalRedoResponse),
NULL, // Default client timeout.
WalRedoServer_ServiceRequestShutdown,
WalRedoServer_ServiceEarlyInit,
WalRedoServer_ServicePostgresInit,
WalRedoServer_ServiceInit,
WalRedoServer_ServiceShutdownRequested,
WalRedoServer_ServiceRequest,
WalRedoServer_ServiceShutdown,
NULL}; // No PostmasterDied callback.
static ServiceCtrl WalRedoServer_ServiceCtrl;
/*=========================================================================
* GLOBAL STATE VARIABLES
*/
typedef struct WalRedoServerShmem
{
int listenerPort;
} WalRedoServerShmem;
static WalRedoServerShmem *WalRedoServerShared = NULL;
/*=========================================================================
* VISIBLE FUNCTIONS
*/
int
WalRedoServerShmemSize(void)
{
return sizeof(WalRedoServerShmem);
}
void
WalRedoServerShmemInit(void)
{
bool found;
/* Create or attach to the SharedSnapshot shared structure */
WalRedoServerShared = (WalRedoServerShmem *) ShmemInitStruct("WAL Redo Server", WalRedoServerShmemSize(), &found);
if (!found)
{
WalRedoServerShared->listenerPort = -1;
}
}
bool
WalRedoServerNewCheckpointLocation(XLogRecPtr *newCheckpointLocation)
{
WalRedoRequest walRedoRequest;
WalRedoResponse walRedoResponse;
bool connected;
bool successful;
bool pollResponseReceived;
MemSet(&walRedoRequest, 0, sizeof(walRedoRequest));
walRedoRequest.command = NewCheckpointLocation;
walRedoRequest.newCheckpointLocation= *newCheckpointLocation;
connected = WalRedoServerClientConnect();
if (!connected)
{
elog(LOG, "cannot connect to WAL Redo server)");
return false;
}
successful = WalRedoServerClientSendRequest(&walRedoRequest);
if (successful)
{
/*
* See if the optional redo server error message was sent...
*/
successful = WalRedoServerClientPollResponse(&walRedoResponse, &pollResponseReceived);
if (pollResponseReceived)
{
Assert(walRedoResponse.response == RedoUnexpectedError);
elog(LOG,"redo encountered an unexpected error");
successful = false;
}
}
return successful;
}
bool
WalRedoServerQuiesce(void)
{
WalRedoRequest walRedoRequest;
WalRedoResponse walRedoResponse;
bool connected;
bool successful;
elog(LOG,"quiesce");
MemSet(&walRedoRequest, 0, sizeof(walRedoRequest));
walRedoRequest.command = Quiesce;
connected = WalRedoServerClientConnect();
if (!connected)
{
elog(LOG, "cannot connect to WAL Redo server)");
return false;
}
successful = WalRedoServerClientSendRequest(&walRedoRequest);
if (successful)
{
struct timeval clientTimeout;
ServiceGetClientTimeout(&WalRedoServer_ServiceConfig, &clientTimeout);
/*
* See if we received a quiesced response.
*/
successful = WalRedoServerClientReceiveResponse(&walRedoResponse, &clientTimeout);
if (successful)
{
// elog(LOG,"received response %s",
// WalRedoResponseCommandToString(walRedoResponse.response));
}
}
return successful;
}
/*
* Main entry point for walredoserver controller process.
*
* This code is heavily based on pgarch.c, q.v.
*/
int
walredoserver_start(void)
{
pid_t WalRedoServerPID;
#ifdef EXEC_BACKEND
switch ((WalRedoServerPID = walredoserver_forkexec()))
#else
switch ((WalRedoServerPID = fork_process()))
#endif
{
case -1:
ereport(LOG,
(errmsg("could not fork walredoserver process: %m")));
return 0;
#ifndef EXEC_BACKEND
case 0:
/* in postmaster child ... */
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
ServiceInit(&WalRedoServer_ServiceConfig, &WalRedoServer_ServiceCtrl);
ServiceMain(&WalRedoServer_ServiceCtrl);
break;
#endif
default:
return (int) WalRedoServerPID;
}
/* shouldn't get here */
return 0;
}
/*=========================================================================
* HELPER FUNCTIONS
*/
#ifdef EXEC_BACKEND
/*
* walredoserver_forkexec()
*
* Format up the arglist for the serqserver process, then fork and exec.
*/
static pid_t
walredoserver_forkexec(void)
{
char *av[10];
int ac = 0;
av[ac++] = "postgres";
av[ac++] = "--forkwalredoserver";
av[ac++] = NULL; /* filled in by postmaster_forkexec */
av[ac] = NULL;
Assert(ac < lengthof(av));
return postmaster_forkexec(ac, av);
}
#endif /* EXEC_BACKEND */
bool walredo_shutdown_requested=false;
static void
WalRedoServer_ServiceRequestShutdown(SIGNAL_ARGS)
{
walredo_shutdown_requested = true;
}
static void
WalRedoServer_ServiceEarlyInit(void)
{
// UNDONE: Kludge that allow initialization of components for recovery purposes.
SetProcessingMode(BootstrapProcessing);
}
static void
WalRedoServer_ServicePostgresInit(void)
{
/* See InitPostgres()... */
InitProcess();
InitBufferPoolBackend();
InitXLOGAccess();
}
static void
WalRedoServer_ServiceInit(int listenerPort)
{
if (WalRedoServerShared == NULL)
elog(FATAL,"WAL Redo server shared memory not initialized");
WalRedoServerShared->listenerPort = listenerPort;
am_walredoserver = true;
WalRedoServerInitContext();
}
static bool
WalRedoServer_ServiceShutdownRequested(void)
{
if (walredo_shutdown_requested)
elog((Debug_print_qd_mirroring ? LOG : DEBUG5),"shutdown requested is true");
return walredo_shutdown_requested;
}
static void
WalRedoServer_ServiceShutdown(void)
{
// Empty.
}
/*
* Process local static that contains the connection to the WAL Redo server.
*/
static ServiceClient WalRedoServerClient = {NULL,-1};
bool
WalRedoServerClientConnect(void)
{
bool connected;
if (WalRedoServerShared == NULL)
elog(FATAL,"WAL Redo server shared memory not initialized");
connected = ServiceClientConnect(&WalRedoServer_ServiceConfig,
WalRedoServerShared->listenerPort,
&WalRedoServerClient,
/* complain */ true);
return connected;
}
bool
WalRedoServerClientSendRequest(WalRedoRequest* walRedoRequest)
{
bool successful;
successful = ServiceClientSendRequest(&WalRedoServerClient, walRedoRequest, sizeof(WalRedoRequest));
return successful;
}
bool
WalRedoServerClientReceiveResponse(WalRedoResponse* walRedoResponse, struct timeval *timeout)
{
bool successful;
successful = ServiceClientReceiveResponse(&WalRedoServerClient, walRedoResponse, sizeof(WalRedoResponse), timeout);
return successful;
}
bool
WalRedoServerClientPollResponse(WalRedoResponse* walRedoResponse, bool *pollResponseReceived)
{
bool successful;
successful = ServiceClientPollResponse(&WalRedoServerClient, walRedoResponse, sizeof(WalRedoResponse), pollResponseReceived);
return successful;
}
static bool RedoQuiesce;
static bool RedoServerError;
static XLogRecPtr RedoCheckPointLoc;
static CheckPoint RedoCheckpoint;
static void
WalRedoServerInitContext(void)
{
RedoQuiesce = false;
RedoServerError = false;
memset(&RedoCheckPointLoc, 0, sizeof(XLogRecPtr));
memset(&RedoCheckpoint, 0, sizeof(CheckPoint));
}
static bool WalRedoServer_ServiceRequest(ServiceCtrl *serviceCtrl, int sockfd, uint8 *request)
{
WalRedoRequest *walRedoRequest = (WalRedoRequest*)request;
WalRedoResponse walRedoResponse;
bool result;
//*** Missing: On normal shutdown, disable and disconnect.
result = true; // Assume.
/*
* Use a TRY block to catch unexpected errors that bubble up to this level
* and disable QD mirroring.
*/
PG_TRY();
{
if (Debug_print_qd_mirroring)
elog(LOG, "request command %d = '%s'",
walRedoRequest->command,
WalRedoRequestCommandToString(walRedoRequest->command));
switch (walRedoRequest->command)
{
case NewCheckpointLocation:
{
if (!RedoQuiesce && !RedoServerError)
{
elog((Debug_print_qd_mirroring ? LOG : DEBUG1),"new checkpoint location %s",
XLogLocationToString(&walRedoRequest->newCheckpointLocation));
PG_TRY();
{
cdb_perform_redo(&RedoCheckPointLoc, &RedoCheckpoint, &walRedoRequest->newCheckpointLocation);
}
PG_CATCH();
{
/*
* Report the error related to reading the primary's WAL
* to the server log
*/
EmitErrorReport();
FlushErrorState();
elog(NOTICE,"error occurred during redo");
RedoServerError = true;
walRedoResponse.response = RedoUnexpectedError;
result = ServiceProcessRespond(serviceCtrl, sockfd, (uint8*)&walRedoResponse, sizeof(walRedoResponse));
}
PG_END_TRY();
}
break;
}
case Quiesce:
RedoQuiesce = true;
walRedoResponse.response = Quiesced;
result = ServiceProcessRespond(serviceCtrl, sockfd, (uint8*)&walRedoResponse, sizeof(walRedoResponse));
break;
default:
elog(ERROR, "Unknown WalRedoRequestCommand %d", walRedoRequest->command);
}
}
PG_CATCH();
{
/*
* Report the unexpected error.
*/
EmitErrorReport();
FlushErrorState();
elog(NOTICE,"disabling redo (unexpected error encountered)");
result = false;
}
PG_END_TRY();
return result;
}
char*
WalRedoRequestCommandToString(WalRedoRequestCommand command)
{
switch (command)
{
case NewCheckpointLocation:
return "NewCheckpointLocation";
case Quiesce:
return "Quiesce";
default:
return "Unknown";
}
}
char*
WalRedoResponseCommandToString(WalRedoResponseCommand command)
{
switch (command)
{
case Quiesced:
return "Quiesced";
case RedoUnexpectedError:
return "RedoUnexpectedError";
default:
return "Unknown";
}
}