update quering min apply lsn

Signed-off-by: shipixian <shipixian_yewu@cmss.chinamobile.com>
This commit is contained in:
shipixian 2023-05-04 14:24:23 +08:00
parent 758de265bf
commit 3d471a8982
5 changed files with 163 additions and 117 deletions

View File

@ -29,29 +29,29 @@ static PGconn *connToPushStandby = NULL;
pid_t startupPid = 0;
static bool ConnectPushStandbyDB() {
char *err;
const char *keys[] = {"dbname","user","password","host","port",NULL};
const char *vals[] = {"postgres","repl","123456","100.73.36.123","15431",NULL};
connToPushStandby = PQconnectdbParams(keys, vals, false);
if (PQstatus(connToPushStandby) == CONNECTION_BAD)
{
err = pchomp(PQerrorMessage(connToPushStandby));
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("push standby could not connect to the push standby server: %s", err)));
return false;
}
return true;
// static bool ConnectPushStandbyDB() {
// char *err;
// const char *keys[] = {"dbname","user","password","host","port",NULL};
// const char *vals[] = {"postgres","repl","123456","100.73.36.123","15431",NULL};
// connToPushStandby = PQconnectdbParams(keys, vals, false);
// if (PQstatus(connToPushStandby) == CONNECTION_BAD)
// {
// err = pchomp(PQerrorMessage(connToPushStandby));
// ereport(ERROR,
// (errcode(ERRCODE_CONNECTION_FAILURE),
// errmsg("push standby could not connect to the push standby server: %s", err)));
// return false;
// }
// return true;
}
// }
static bool ConnectPrimaryDB() {
char *err;
char conninfo[maxconnlen];
const char *keys[] = {"dbname","user","password","host","port",NULL};
const char *vals[] = {"postgres","repl","123456","100.73.36.123","15432",NULL};
// const char *keys[] = {"dbname","user","password","host","port",NULL};
// const char *vals[] = {"postgres","repl","123456","100.73.36.123","15432",NULL};
strlcpy(conninfo, (char *) PrimaryConnInfo, maxconnlen);
/* Establish the connection to the primary for query Min Lsn*/
/*
@ -59,7 +59,8 @@ static bool ConnectPrimaryDB() {
* URI), and pass some extra options.
*/
/* Note we do not want libpq to re-expand the dbname parameter */
pushconn = PQconnectdbParams(keys, vals, true);
pushconn = PQconnectdb(conninfo);
// pushconn = PQconnectdbParams(keys, vals, true);
if (PQstatus(pushconn) == CONNECTION_BAD)
{
err = pchomp(PQerrorMessage(pushconn));
@ -71,57 +72,58 @@ static bool ConnectPrimaryDB() {
return true;
}
static bool ConnectPrimaryDB4ReplyLSN() {
char *err;
char conninfo[maxconnlen];
const char *keys[] = {"dbname","user","password","host","port",NULL};
const char *vals[] = {"postgres","postgres","","100.73.36.123","15432",NULL};
strlcpy(conninfo, (char *) PrimaryConnInfo, maxconnlen);
/* Establish the connection to the primary for query Min Lsn*/
/*
* We use the expand_dbname parameter to process the connection string (or
* URI), and pass some extra options.
*/
/* Note we do not want libpq to re-expand the dbname parameter */
pushconn = PQconnectdbParams(keys, vals, true);
if (PQstatus(pushconn) == CONNECTION_BAD)
{
err = pchomp(PQerrorMessage(pushconn));
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("push standby could not connect to the primary server: %s", err)));
return false;
}
return true;
}
// static bool ConnectPrimaryDB4ReplyLSN() {
// char *err;
// char conninfo[maxconnlen];
// const char *keys[] = {"dbname","user","password","host","port",NULL};
// const char *vals[] = {"postgres","postgres","","100.73.36.123","15432",NULL};
// strlcpy(conninfo, (char *) PrimaryConnInfo, maxconnlen);
// /* Establish the connection to the primary for query Min Lsn*/
// /*
// * We use the expand_dbname parameter to process the connection string (or
// * URI), and pass some extra options.
// */
// /* Note we do not want libpq to re-expand the dbname parameter */
// pushconn = PQconnectdbParams(keys, vals, true);
// if (PQstatus(pushconn) == CONNECTION_BAD)
// {
// err = pchomp(PQerrorMessage(pushconn));
// ereport(WARNING,
// (errcode(ERRCODE_CONNECTION_FAILURE),
// errmsg("push standby could not connect to the primary server: %s", err)));
// return false;
// }
// return true;
// }
XLogRecPtr QueryPushLsn()
{
StringInfoData cmd;
XLogRecPtr replylsn = InvalidXLogRecPtr;
char *replyptr;
initStringInfo(&cmd);
appendStringInfoString(&cmd,"select pg_last_wal_replay_lsn()");
replylsn = InvalidXLogRecPtr;
if (connToPushStandby == NULL) {
if (ConnectPushStandbyDB() == false) {
return InvalidXLogRecPtr;
}
}
PGresult *pgres = NULL;
pgres = PQexec(connToPushStandby, cmd.data);
if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) {
replyptr = PQgetvalue(pgres, 0, 0);
bool flag;
replylsn = pg_lsn_in_internal(replyptr,&flag);
// XLogRecPtr QueryPushLsn()
// {
// StringInfoData cmd;
// XLogRecPtr replylsn = InvalidXLogRecPtr;
// char *replyptr;
// initStringInfo(&cmd);
// appendStringInfoString(&cmd,"select pg_last_wal_replay_lsn()");
// replylsn = InvalidXLogRecPtr;
// if (connToPushStandby == NULL) {
// if (ConnectPushStandbyDB() == false) {
// return InvalidXLogRecPtr;
// }
// }
// PGresult *pgres = NULL;
// pgres = PQexec(connToPushStandby, cmd.data);
// if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) {
// replyptr = PQgetvalue(pgres, 0, 0);
// bool flag;
// replylsn = pg_lsn_in_internal(replyptr,&flag);
}
PQfinish(connToPushStandby);
connToPushStandby = NULL;
PQclear(pgres);
return replylsn;
// }
// PQfinish(connToPushStandby);
// connToPushStandby = NULL;
// PQclear(pgres);
// return replylsn;
}
// }
XLogRecPtr QueryPushChkpointLsn(void)
{
@ -202,50 +204,50 @@ XLogRecPtr QueryMinLsn(XLogRecPtr lsn)
return replylsn;
}
XLogRecPtr QueryReplyLsn(XLogRecPtr lsn)
{
StringInfoData cmd;
XLogRecPtr replylsn;
PGresult *pgres = NULL;
char *appname;
char *state;
char *syncstate;
char *replyptr;
replylsn = InvalidXLogRecPtr;
if (pushconn == NULL) {
if (ConnectPrimaryDB4ReplyLSN() == false) {
return InvalidXLogRecPtr;
}
}
// XLogRecPtr QueryReplyLsn(XLogRecPtr lsn)
// {
// StringInfoData cmd;
// XLogRecPtr replylsn;
// PGresult *pgres = NULL;
// char *appname;
// char *state;
// char *syncstate;
// char *replyptr;
// replylsn = InvalidXLogRecPtr;
// if (pushconn == NULL) {
// if (ConnectPrimaryDB4ReplyLSN() == false) {
// return InvalidXLogRecPtr;
// }
// }
initStringInfo(&cmd);
appendStringInfoString(&cmd, "SELECT t.application_name, t.replay_lsn, t.state, t.sync_state FROM pg_catalog.pg_stat_replication t WHERE t.application_name <> \'");
appendStringInfoString(&cmd, "pushstandby");
appendStringInfoString(&cmd, "\' order by t.replay_lsn limit 1");
// initStringInfo(&cmd);
// appendStringInfoString(&cmd, "SELECT t.application_name, t.replay_lsn, t.state, t.sync_state FROM pg_catalog.pg_stat_replication t WHERE t.application_name <> \'");
// appendStringInfoString(&cmd, "pushstandby");
// appendStringInfoString(&cmd, "\' order by t.replay_lsn limit 1");
pgres = PQexec(pushconn, cmd.data);
if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) {
appname = PQgetvalue(pgres, 0, 0);
replyptr = PQgetvalue(pgres, 0, 1);
bool flag;
replylsn = pg_lsn_in_internal(replyptr,&flag);
//replylsn = atol(replyptr);
state = PQgetvalue(pgres, 0, 2);
syncstate = PQgetvalue(pgres, 0, 3);
}
else if (PQresultStatus(pgres) == PGRES_BAD_RESPONSE ||
PQresultStatus(pgres) == PGRES_NONFATAL_ERROR ||
PQresultStatus(pgres) == PGRES_FATAL_ERROR)
{
PQfinish(pushconn);
pushconn = NULL;
PQclear(pgres);
return InvalidXLogRecPtr;
}
//elog(LOG,"appnamelsn: %x: replylsn %x",lsn,replylsn);
if (lsn !=InvalidXLogRecPtr && lsn < replylsn||replylsn == InvalidXLogRecPtr) {
replylsn = lsn;
}
PQclear(pgres);
return replylsn;
}
// pgres = PQexec(pushconn, cmd.data);
// if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) {
// appname = PQgetvalue(pgres, 0, 0);
// replyptr = PQgetvalue(pgres, 0, 1);
// bool flag;
// replylsn = pg_lsn_in_internal(replyptr,&flag);
// //replylsn = atol(replyptr);
// state = PQgetvalue(pgres, 0, 2);
// syncstate = PQgetvalue(pgres, 0, 3);
// }
// else if (PQresultStatus(pgres) == PGRES_BAD_RESPONSE ||
// PQresultStatus(pgres) == PGRES_NONFATAL_ERROR ||
// PQresultStatus(pgres) == PGRES_FATAL_ERROR)
// {
// PQfinish(pushconn);
// pushconn = NULL;
// PQclear(pgres);
// return InvalidXLogRecPtr;
// }
// //elog(LOG,"appnamelsn: %x: replylsn %x",lsn,replylsn);
// if (lsn !=InvalidXLogRecPtr && lsn < replylsn||replylsn == InvalidXLogRecPtr) {
// replylsn = lsn;
// }
// PQclear(pgres);
// return replylsn;
// }

View File

@ -26,6 +26,7 @@
#include "storage/filecache.h"
#include "postmaster/secondbuffer.h"
//#include "utils/hfs.h"
#include "utils/backend_status.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/guc.h"
@ -769,12 +770,12 @@ smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber
reln->smgr_cached_nblocks[forknum[i]] = InvalidBlockNumber;
if(!SmgrIsTemp(reln)) {
if (false == flag) {
XLogRecPtr pushLsn;
XLogRecPtr minApplyLsn;
do {
sleep(1);
pushLsn = QueryPushLsn();
printf("====pushlsn=%lx==lsn==%lx==\n",pushLsn,lsn);
} while(pushLsn!=InvalidXLogRecPtr && pushLsn<lsn);
minApplyLsn = He3DBQueryMinLsnFromAllStanby();
printf("====pushlsn=%lx==lsn==%lx==\n",minApplyLsn,lsn);
} while(minApplyLsn!=InvalidXLogRecPtr && minApplyLsn<lsn);
flag = true;
}
}

View File

@ -17,6 +17,8 @@
#include "pg_trace.h"
#include "pgstat.h"
#include "port/atomics.h" /* for memory barriers */
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "storage/proc.h" /* for MyProc */
#include "storage/sinvaladt.h"
@ -1148,3 +1150,42 @@ pgstat_clip_activity(const char *raw_activity)
return activity;
}
XLogRecPtr He3DBQueryMinLsnFromAllStanby()
{
int i;
XLogRecPtr minApplyLsn;
int procpid = -1;
for (i = 0; i < NumBackendStatSlots; i++)
{
if (strcmp(BackendStatusArray[i].st_appname, "pg_mirror") == 0)
{
procpid = BackendStatusArray[i].st_procpid;
break;
}
}
Assert(WalSndCtl != NULL);
for (i = 0; i < max_wal_senders; i++)
{
int pid;
XLogRecPtr apply;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
{
SpinLockRelease(&walsnd->mutex);
continue;
}
pid = walsnd->pid;
apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
if (pid != procpid)
{
if (apply < minApplyLsn)
minApplyLsn = apply;
}
}
return minApplyLsn;
}

View File

@ -25,10 +25,10 @@ extern XLogRecPtr LastPushPoint;
extern XLogRecPtr QueryMinLsn(XLogRecPtr lsn);
extern XLogRecPtr QueryPushLsn();
// extern XLogRecPtr QueryPushLsn();
extern XLogRecPtr QueryPushChkpointLsn();
extern XLogRecPtr QueryReplyLsn(XLogRecPtr lsn);
// extern XLogRecPtr QueryReplyLsn(XLogRecPtr lsn);
typedef struct DirtyPage {
XLogRecPtr startlsn;

View File

@ -317,5 +317,7 @@ extern PgBackendStatus *pgstat_fetch_stat_beentry(int beid);
extern LocalPgBackendStatus *pgstat_fetch_stat_local_beentry(int beid);
extern char *pgstat_clip_activity(const char *raw_activity);
extern XLogRecPtr He3DBQueryMinLsnFromAllStanby();
#endif /* BACKEND_STATUS_H */