mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-11-29 18:58:35 +08:00
improve: push standby wait slave
This commit is contained in:
parent
dd5777cfc1
commit
77ca8b0aa9
@ -179,24 +179,29 @@ XLogRecPtr QueryMinLsn(XLogRecPtr lsn)
|
||||
}
|
||||
|
||||
initStringInfo(&cmd);
|
||||
appendStringInfoString(&cmd, "SELECT t.application_name, t.replay_lsn, t.state, t.sync_state FROM pg_catalog.pg_stat_replication t WHERE t.application_name <> \'");
|
||||
appendStringInfoString(&cmd, "pushstandby");
|
||||
appendStringInfoString(&cmd, "\' order by t.replay_lsn limit 1");
|
||||
appendStringInfoString(&cmd, "SELECT t.application_name, t.replay_lsn, t.state, t.sync_state FROM pg_catalog.pg_stat_replication t WHERE t.application_name not like \'");
|
||||
appendStringInfoString(&cmd, "push%");
|
||||
appendStringInfoString(&cmd, "\' and t.application_name not like \'priv%\' order by t.replay_lsn limit 1");
|
||||
|
||||
pgres = PQexec(pushconn, cmd.data);
|
||||
if (PQresultStatus(pgres) == PGRES_TUPLES_OK && PQntuples(pgres) == 1) {
|
||||
replyptr = PQgetvalue(pgres, 0, 1);
|
||||
bool flag;
|
||||
replylsn = pg_lsn_in_internal(replyptr,&flag);
|
||||
if (PQresultStatus(pgres) == PGRES_TUPLES_OK) {
|
||||
if (PQntuples(pgres) == 1) {
|
||||
replyptr = PQgetvalue(pgres, 0, 1);
|
||||
bool flag;
|
||||
replylsn = pg_lsn_in_internal(replyptr,&flag);
|
||||
if (replylsn == InvalidXLogRecPtr) {
|
||||
elog(ERROR,"query pg_stat_replication replylsn failed");
|
||||
PQclear(pgres);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (PQresultStatus(pgres) == PGRES_BAD_RESPONSE ||
|
||||
PQresultStatus(pgres) == PGRES_NONFATAL_ERROR ||
|
||||
PQresultStatus(pgres) == PGRES_FATAL_ERROR)
|
||||
else if (PQresultStatus(pgres) != PGRES_COMMAND_OK)
|
||||
{
|
||||
PQfinish(pushconn);
|
||||
pushconn = NULL;
|
||||
PQclear(pgres);
|
||||
return InvalidXLogRecPtr;
|
||||
return 1;
|
||||
}
|
||||
//elog(LOG,"appnamelsn: %x: replylsn %x",lsn,replylsn);
|
||||
if ((lsn !=InvalidXLogRecPtr && lsn < replylsn)||(replylsn == InvalidXLogRecPtr)) {
|
||||
|
@ -11405,7 +11405,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
|
||||
CheckPointMultiXact();
|
||||
CheckPointPredicate();
|
||||
//only CHECKPOINT_IS_SHUTDOWN flush page,master only bufferpool full need to evict page
|
||||
if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY))||
|
||||
if ((flags & CHECKPOINT_IS_SHUTDOWN) ||
|
||||
IsBootstrapProcessingMode() == true || InitdbSingle == true)
|
||||
{
|
||||
CheckPointBuffers(flags);
|
||||
|
@ -752,7 +752,7 @@ restart:
|
||||
#ifndef PG_NOREPLAY
|
||||
if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
|
||||
if (getpid() == startupPid) {
|
||||
if (push_standby == true || EnableHotStandby == false || *isPromoteIsTriggered) {
|
||||
if (push_standby == true) {
|
||||
/* Update shared-memory status */
|
||||
XLogRecPtr prevPushPoint = PrePushPtr;
|
||||
if (!XLogRecPtrIsInvalid(CheckPointPtr)) {
|
||||
@ -762,8 +762,23 @@ restart:
|
||||
printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr);
|
||||
CheckPointPtr = InvalidXLogRecPtr;
|
||||
}
|
||||
if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
|
||||
ApplyLsn = InvalidXLogRecPtr;
|
||||
//need to tell push standby has new standby add
|
||||
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
|
||||
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
|
||||
if (!he3mirror) {
|
||||
tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
|
||||
}
|
||||
if (tmpLsn !=InvalidXLogRecPtr) {
|
||||
if (tmpLsn <= RecPtr) {
|
||||
pg_usleep(200000);
|
||||
continue;
|
||||
} else {
|
||||
ApplyLsn = tmpLsn;
|
||||
}
|
||||
} else {
|
||||
ApplyLsn = InvalidXLogRecPtr;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1099,7 +1114,7 @@ restart:
|
||||
#ifndef PG_NOREPLAY
|
||||
if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
|
||||
if (getpid() == startupPid) {
|
||||
if (push_standby == true || EnableHotStandby == false || *isPromoteIsTriggered) {
|
||||
if (push_standby == true) {
|
||||
// Update shared-memory status
|
||||
XLogRecPtr prevPushPoint = PrePushPtr;
|
||||
if (!XLogRecPtrIsInvalid(CheckPointPtr)) {
|
||||
@ -1111,8 +1126,23 @@ restart:
|
||||
printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr);
|
||||
CheckPointPtr = InvalidXLogRecPtr;
|
||||
}
|
||||
if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
|
||||
ApplyLsn = InvalidXLogRecPtr;
|
||||
//need to tell push standby has new standby add
|
||||
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
|
||||
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
|
||||
if (!he3mirror) {
|
||||
tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
|
||||
}
|
||||
if (tmpLsn !=InvalidXLogRecPtr) {
|
||||
if (tmpLsn <= RecPtr) {
|
||||
pg_usleep(10000);
|
||||
continue;
|
||||
} else {
|
||||
ApplyLsn = tmpLsn;
|
||||
}
|
||||
} else {
|
||||
ApplyLsn = InvalidXLogRecPtr;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -827,7 +827,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
pageKey.pageLsn = GetXLogPushToDisk();
|
||||
pageKey.pageLsn = Max(GetXLogPushToDisk(), PageGetLSN(*buffer));
|
||||
|
||||
pageKey.replyLsn = lsn;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user