repl user no permission visit pg_stat_replication

Code Source From: Self Code
Description:  【Optional】
Jira:  #【Optional】
市场项目编号(名称):【Optional】
This commit is contained in:
shenzhengntu 2023-06-13 21:21:51 +08:00
parent af65be0892
commit 087df8478c
4 changed files with 56 additions and 21 deletions

View File

@ -179,25 +179,30 @@ XLogRecPtr QueryMinLsn(XLogRecPtr lsn)
} }
initStringInfo(&cmd); initStringInfo(&cmd);
appendStringInfoString(&cmd, "SELECT t.application_name, t.replay_lsn, t.state, t.sync_state FROM pg_catalog.pg_stat_replication t WHERE t.application_name <> \'"); appendStringInfoString(&cmd, "SELECT t.application_name, t.replay_lsn, t.state, t.sync_state FROM pg_catalog.pg_stat_replication t WHERE t.application_name not like \'");
appendStringInfoString(&cmd, "pushstandby"); appendStringInfoString(&cmd, "push%");
appendStringInfoString(&cmd, "\' order by t.replay_lsn limit 1"); appendStringInfoString(&cmd, "\' and t.application_name not like \'priv%\' order by t.replay_lsn limit 1");
pgres = PQexec(pushconn, cmd.data); pgres = PQexec(pushconn, cmd.data);
if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) { if (PQresultStatus(pgres) == PGRES_TUPLES_OK) {
replyptr = PQgetvalue(pgres, 0, 1); if (PQntuples(pgres) == 1) {
bool flag; replyptr = PQgetvalue(pgres, 0, 1);
replylsn = pg_lsn_in_internal(replyptr,&flag); bool flag;
} replylsn = pg_lsn_in_internal(replyptr,&flag);
else if (PQresultStatus(pgres) == PGRES_BAD_RESPONSE || if (replylsn == InvalidXLogRecPtr) {
PQresultStatus(pgres) == PGRES_NONFATAL_ERROR || elog(ERROR,"query pg_stat_replication replylsn failed");
PQresultStatus(pgres) == PGRES_FATAL_ERROR) PQclear(pgres);
return 1;
}
}
}
else if (PQresultStatus(pgres) != PGRES_COMMAND_OK)
{ {
PQfinish(pushconn); PQfinish(pushconn);
pushconn = NULL; pushconn = NULL;
PQclear(pgres); PQclear(pgres);
return InvalidXLogRecPtr; return 1;
} }
//elog(LOG,"appnamelsn: %x: replylsn %x",lsn,replylsn); //elog(LOG,"appnamelsn: %x: replylsn %x",lsn,replylsn);
if ((lsn !=InvalidXLogRecPtr && lsn < replylsn)||(replylsn == InvalidXLogRecPtr)) { if ((lsn !=InvalidXLogRecPtr && lsn < replylsn)||(replylsn == InvalidXLogRecPtr)) {
replylsn = lsn; replylsn = lsn;

View File

@ -11403,7 +11403,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointMultiXact(); CheckPointMultiXact();
CheckPointPredicate(); CheckPointPredicate();
//only CHECKPOINT_IS_SHUTDOWN flush page,master only bufferpool full need to evict page //only CHECKPOINT_IS_SHUTDOWN flush page,master only bufferpool full need to evict page
if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY))|| if ((flags & CHECKPOINT_IS_SHUTDOWN) ||
IsBootstrapProcessingMode() == true || InitdbSingle == true) IsBootstrapProcessingMode() == true || InitdbSingle == true)
{ {
CheckPointBuffers(flags); CheckPointBuffers(flags);

View File

@ -752,7 +752,7 @@ restart:
#ifndef PG_NOREPLAY #ifndef PG_NOREPLAY
if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) { if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
if (getpid() == startupPid) { if (getpid() == startupPid) {
if (push_standby == true || EnableHotStandby == false || *isPromoteIsTriggered) { if (push_standby == true) {
/* Update shared-memory status */ /* Update shared-memory status */
XLogRecPtr prevPushPoint = PrePushPtr; XLogRecPtr prevPushPoint = PrePushPtr;
if (!XLogRecPtrIsInvalid(CheckPointPtr)) { if (!XLogRecPtrIsInvalid(CheckPointPtr)) {
@ -762,8 +762,23 @@ restart:
printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr); printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr);
CheckPointPtr = InvalidXLogRecPtr; CheckPointPtr = InvalidXLogRecPtr;
} }
if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { //need to tell push standby has new standby add
ApplyLsn = InvalidXLogRecPtr; while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
if (!he3mirror) {
tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
}
if (tmpLsn !=InvalidXLogRecPtr) {
if (tmpLsn <= RecPtr) {
pg_usleep(200000);
continue;
} else {
ApplyLsn = tmpLsn;
}
} else {
ApplyLsn = InvalidXLogRecPtr;
}
break;
} }
} }
} }
@ -1099,7 +1114,7 @@ restart:
#ifndef PG_NOREPLAY #ifndef PG_NOREPLAY
if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) { if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
if (getpid() == startupPid) { if (getpid() == startupPid) {
if (push_standby == true || EnableHotStandby == false || *isPromoteIsTriggered) { if (push_standby == true) {
// Update shared-memory status // Update shared-memory status
XLogRecPtr prevPushPoint = PrePushPtr; XLogRecPtr prevPushPoint = PrePushPtr;
if (!XLogRecPtrIsInvalid(CheckPointPtr)) { if (!XLogRecPtrIsInvalid(CheckPointPtr)) {
@ -1111,8 +1126,23 @@ restart:
printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr); printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr);
CheckPointPtr = InvalidXLogRecPtr; CheckPointPtr = InvalidXLogRecPtr;
} }
if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { //need to tell push standby has new standby add
ApplyLsn = InvalidXLogRecPtr; while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
if (!he3mirror) {
tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
}
if (tmpLsn !=InvalidXLogRecPtr) {
if (tmpLsn <= RecPtr) {
pg_usleep(10000);
continue;
} else {
ApplyLsn = tmpLsn;
}
} else {
ApplyLsn = InvalidXLogRecPtr;
}
break;
} }
} }
} }

View File

@ -827,7 +827,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
return nbytes; return nbytes;
} }
pageKey.pageLsn = GetXLogPushToDisk(); pageKey.pageLsn = Max(GetXLogPushToDisk(), PageGetLSN(*buffer));
pageKey.replyLsn = lsn; pageKey.replyLsn = lsn;