!158 suppoort private deploy

Merge pull request !158 from shipixian/dev_performance
This commit is contained in:
zoujia_yewu 2023-06-06 01:22:59 +00:00 committed by Gitee
commit 541e850aed
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
17 changed files with 202 additions and 76 deletions

View File

@ -411,6 +411,8 @@ CleanLogIndexMain(int argc, char *argv[])
TagNode *next = tagList->next; TagNode *next = tagList->next;
int pageNum = 0; int pageNum = 0;
while(next!=NULL) { while(next!=NULL) {
// elog(LOG,"add tag rel %d, fork %d, blk %d",
// next->tag.tag.rnode.relNode, next->tag.tag.forkNum, next->tag.tag.blockNum);
addFileKey(&next->tag.tag); addFileKey(&next->tag.tag);
next = next->next; next = next->next;
pageNum++; pageNum++;
@ -419,10 +421,12 @@ CleanLogIndexMain(int argc, char *argv[])
pushSlaveReplayQueue(pageNum); pushSlaveReplayQueue(pageNum);
hasData++; hasData++;
PrevPushPoint = LastPushPoint+1; PrevPushPoint = LastPushPoint+1;
SetXLogPushToDisk(PrevPushPoint);
pushStandbyPoint = GetConsistLsn(PrevPushPoint); pushStandbyPoint = GetConsistLsn(PrevPushPoint);
} else { } else {
LastPushPoint = PrevPushPoint = lastReplPtr; LastPushPoint = PrevPushPoint = lastReplPtr;
if (pushStandbyPrePoint < PrevPushPoint) { if (pushStandbyPrePoint < PrevPushPoint) {
SetXLogPushToDisk(PrevPushPoint);
pushStandbyPoint = GetConsistLsn(PrevPushPoint+1); pushStandbyPoint = GetConsistLsn(PrevPushPoint+1);
} }
} }
@ -438,7 +442,7 @@ CleanLogIndexMain(int argc, char *argv[])
elog(LOG,"start threadCleanLogIndex lsn from %X/%X to %X/%X",LSN_FORMAT_ARGS(pushStandbyPrePoint),LSN_FORMAT_ARGS(pushStandbyPoint)); elog(LOG,"start threadCleanLogIndex lsn from %X/%X to %X/%X",LSN_FORMAT_ARGS(pushStandbyPrePoint),LSN_FORMAT_ARGS(pushStandbyPoint));
CleanLogIndexByPage(pushStandbyPoint); CleanLogIndexByPage(pushStandbyPoint);
//threadCleanLogIndex(LastPushPoint); //threadCleanLogIndex(LastPushPoint);
elog(LOG,"end threadCleanLogIndex lsn from %X/%X to %X/%X",LSN_FORMAT_ARGS(pushStandbyPrePoint),LSN_FORMAT_ARGS(pushStandbyPrePoint)); elog(LOG,"end threadCleanLogIndex lsn from %X/%X to %X/%X",LSN_FORMAT_ARGS(pushStandbyPrePoint),LSN_FORMAT_ARGS(pushStandbyPoint));
} }
if (hasData != 0) { if (hasData != 0) {
continue; continue;
@ -672,10 +676,13 @@ void ProcFlushBufferToDisk(BufferTag*tag) {
RBM_NORMAL); RBM_NORMAL);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
{ {
elog(FATAL,"ProcFlushBufferToDisk is invalid rel %d,flk %d,blk %d",tag->rnode.relNode,tag->forkNum,tag->blockNum); elog(PANIC,"ProcFlushBufferToDisk is invalid rel %d,flk %d,blk %d",tag->rnode.relNode,tag->forkNum,tag->blockNum);
pg_atomic_fetch_add_u32(&PageHashQueueShmem->taskNum,1); pg_atomic_fetch_add_u32(&PageHashQueueShmem->taskNum,1);
return; return;
} }
// elog(LOG, "replay rel %d, fork %d, blkno %d, pagelsn %X/%X", tag->rnode.relNode,
// tag->forkNum,tag->blockNum, LSN_FORMAT_ARGS(PageGetLSN(BufferGetPage(buffer))));
//slave no need to flush disk //slave no need to flush disk
if (push_standby == true) { if (push_standby == true) {
BufferDesc *buf; BufferDesc *buf;

View File

@ -7990,7 +7990,7 @@ void pushTikv(int onePageListLen,int pageNum,bool flag)
if (ApplyLsn < PushPtr) { if (ApplyLsn < PushPtr) {
ApplyLsn = PushPtr; ApplyLsn = PushPtr;
} }
if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn) { if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn && !mpush) {
InsertConsistToKV(ApplyLsn); InsertConsistToKV(ApplyLsn);
} }
XLogCtl->pushToDisk = ApplyLsn; XLogCtl->pushToDisk = ApplyLsn;
@ -8001,7 +8001,7 @@ void pushTikv(int onePageListLen,int pageNum,bool flag)
if (ApplyLsn < PushPtr) { if (ApplyLsn < PushPtr) {
ApplyLsn = PushPtr; ApplyLsn = PushPtr;
} }
if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn) { if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn && !mpush) {
InsertConsistToKV(ApplyLsn); InsertConsistToKV(ApplyLsn);
} }
XLogCtl->pushToDisk = ApplyLsn; XLogCtl->pushToDisk = ApplyLsn;

View File

@ -762,21 +762,8 @@ restart:
printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr); printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr);
CheckPointPtr = InvalidXLogRecPtr; CheckPointPtr = InvalidXLogRecPtr;
} }
//need to tell push standby has new standby add if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { ApplyLsn = InvalidXLogRecPtr;
//XLogRecPtr lsn = GetFlushXlogPtr();
XLogRecPtr tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
if (tmpLsn !=InvalidXLogRecPtr) {
if (tmpLsn <= RecPtr) {
pg_usleep(50000);
continue;
} else {
ApplyLsn = tmpLsn;
}
} else {
ApplyLsn = InvalidXLogRecPtr;
}
break;
} }
} }
} }
@ -1124,21 +1111,8 @@ restart:
printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr); printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr);
CheckPointPtr = InvalidXLogRecPtr; CheckPointPtr = InvalidXLogRecPtr;
} }
//need to tell push standby has new standby add if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { ApplyLsn = InvalidXLogRecPtr;
//XLogRecPtr lsn = GetFlushXlogPtr();
XLogRecPtr tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
if (tmpLsn !=InvalidXLogRecPtr) {
if (tmpLsn <= RecPtr) {
usleep(200);
continue;
} else {
ApplyLsn = tmpLsn;
}
} else {
ApplyLsn = InvalidXLogRecPtr;
}
break;
} }
} }
} }

View File

@ -632,7 +632,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
else else
{ {
/* hm, page doesn't exist in file */ /* hm, page doesn't exist in file */
if(!he3mirror){ if(!he3mirror && he3share){
if (mode == RBM_NORMAL && EnableHotStandby != false && *isPromoteIsTriggered == false) if (mode == RBM_NORMAL && EnableHotStandby != false && *isPromoteIsTriggered == false)
{ {
log_invalid_page(rnode, forknum, blkno, false); log_invalid_page(rnode, forknum, blkno, false);
@ -668,7 +668,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
} }
} }
if (!he3mirror && mode == RBM_NORMAL) if (he3share && !he3mirror && mode == RBM_NORMAL)
{ {
/* check that page has been initialized */ /* check that page has been initialized */
Page page = (Page) BufferGetPage(buffer); Page page = (Page) BufferGetPage(buffer);

View File

@ -925,7 +925,7 @@ smgr_redo(XLogReaderState *record)
reln = smgropen(xlrec->rnode, InvalidBackendId); reln = smgropen(xlrec->rnode, InvalidBackendId);
/* He3DB: propeller instance and He3DB slave instance not create rel file*/ /* He3DB: propeller instance and He3DB slave instance not create rel file*/
if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror) if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror || !he3share)
{ {
smgrcreate(reln, xlrec->forkNum, true); smgrcreate(reln, xlrec->forkNum, true);
} }
@ -949,7 +949,7 @@ smgr_redo(XLogReaderState *record)
* log as best we can until the drop is seen. * log as best we can until the drop is seen.
*/ */
/* He3DB: propeller instance and He3DB slave instance not create rel file*/ /* He3DB: propeller instance and He3DB slave instance not create rel file*/
if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror) if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror || !he3share)
{ {
smgrcreate(reln, MAIN_FORKNUM, true); smgrcreate(reln, MAIN_FORKNUM, true);
} }
@ -1008,8 +1008,8 @@ smgr_redo(XLogReaderState *record)
} }
/* Do the real work to truncate relation forks */ /* Do the real work to truncate relation forks */
if (nforks > 0 && (!EnableHotStandby || *isPromoteIsTriggered)) if (nforks > 0 && (!EnableHotStandby || *isPromoteIsTriggered || !he3share))
smgrtruncate(reln, forks, nforks, blocks); smgrtruncatelsn(reln, forks, nforks, blocks, record->ReadRecPtr);
/* /*
* Update upper-level FSM pages to account for the truncation. This is * Update upper-level FSM pages to account for the truncation. This is

View File

@ -2258,8 +2258,28 @@ dbase_redo(XLogReaderState *record)
* We don't need to copy subdirectories * We don't need to copy subdirectories
*/ */
/* He3DB: propeller instance and He3DB slave instance not create db file*/ /* He3DB: propeller instance and He3DB slave instance not create db file*/
if (!EnableHotStandby || *isPromoteIsTriggered || he3mirror) if (!EnableHotStandby || *isPromoteIsTriggered || he3mirror || !he3share)
{ {
// int count = 0;
// for (;;)
// {
// XLogRecPtr pushlsn;
// XLogRecPtr lastlsn = record->currRecPtr;
// pushlsn = QueryPushChkpointLsn();
// if (pushlsn == InvalidXLogRecPtr)
// ereport(ERROR,
// (errcode(ERRCODE_INTERNAL_ERROR),
// errmsg("push standby's latest apply lsn shouldn't be 0")));
// if (lastlsn <= pushlsn)
// break;
// if (count > 100)
// ereport(ERROR,
// (errcode(ERRCODE_INTERNAL_ERROR),
// errmsg("push standby's latest apply lsn(%X/%X) is still behind primary(%X/%X) after try 100 times.",
// LSN_FORMAT_ARGS(pushlsn), LSN_FORMAT_ARGS(lastlsn))));
// pg_usleep(1000000L);
// count++;
// }
copydir(src_path, dst_path, false); copydir(src_path, dst_path, false);
} }
} }

View File

@ -477,8 +477,29 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer)
} }
} }
uint64_t
SwapLsn(uint64_t lsn) {
#ifndef WORDS_BIGENDIAN
/* trans lsn from little endian to big endian in memory
* eg: 0x12345678 ===> 0x78563412
*/
uint32 low, high;
low = (uint32) (lsn);
high = (uint32) ((lsn) >> 32);
low = (low << 16) | (low >> 16);
low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF);
high = (high << 16) | (high >> 16);
high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF);
return ((uint64)(low)) << 32 | (uint64)(high);
#endif
return lsn;
}
Bufrd Bufrd
GetWalFromLocalBuffer(WalLdPageKey *wpk) GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
{ {
MDB_txn *tmptxn; MDB_txn *tmptxn;
MDB_cursor *tmpcursor; MDB_cursor *tmpcursor;
@ -508,24 +529,35 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk)
if (tb != 0) if (tb != 0)
{ {
//TODO //TODO
ereport(PANIC,errmsg("mdb_txn_begin failed,error is:%d",tb));
} }
co = mdb_cursor_open(tmptxn, walDbi,&tmpcursor); co = mdb_cursor_open(tmptxn, walDbi,&tmpcursor);
if (tb != 0) if (co != 0)
{ {
//TODO //TODO
ereport(PANIC,errmsg("mdb_txn_open failed,error is:%d",co));
} }
if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET)) != 0) // ereport(LOG,errmsg("535 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE)) != 0)
{ {
ereport(LOG, errmsg("mdb_txn_get failed,error is:%d, dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
cg, dbid, relid, forkno, blkno, SwapLsn(wpk->pageLsn), wpk->partition));
bufrd.buf = waldata; bufrd.buf = waldata;
mdb_cursor_close(tmpcursor); mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn); mdb_txn_abort(tmptxn);
return bufrd; return bufrd;
} }
wpk = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
while (wpk->sk.dbid == dbid while (wpk->sk.dbid == dbid
&& wpk->sk.relid == relid && wpk->sk.relid == relid
&& wpk->sk.forkno == forkno && wpk->sk.forkno == forkno
&& wpk->sk.blkno == blkno) && wpk->sk.blkno == blkno
&& SwapLsn(wpk->pageLsn) < replyLsn)
{ {
memcpy(waldata + waldatalen,data.mv_data,data.mv_size); memcpy(waldata + waldatalen,data.mv_data,data.mv_size);
waldatalen += data.mv_size; waldatalen += data.mv_size;
@ -547,6 +579,8 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk)
} }
wpk = (WalLdPageKey *)key.mv_data; wpk = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
} }
} }
@ -607,12 +641,14 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
if (tb != 0) if (tb != 0)
{ {
//TODO //TODO
ereport(LOG,errmsg("put mdb_txn_begin failed,error is:%d",tb));
} }
co = mdb_cursor_open(tmptxn,walDbi,&tmpcursor); co = mdb_cursor_open(tmptxn,walDbi,&tmpcursor);
if (co != 0) if (co != 0)
{ {
//TODO //TODO
ereport(LOG,errmsg("put mdb_txn_open failed,error is:%d",co));
} }
for(int i = 0; i < length; i ++) for(int i = 0; i < length; i ++)
@ -636,17 +672,19 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = 511; data.mv_size = 511;
data.mv_data = NULL; data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(511); xlogContent = (uint8_t *)malloc(511);
wlpk.pageLsn = ks[i].lsn; wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.partition = part; wlpk.partition = part;
memcpy(xlogContent,buf + (part * 511), 511); //502 = 511 - 9
part ++; part ++;
memcpy(xlogContent,buf, 511); //502 = 511 - 9
totallen -= 511; totallen -= 511;
key.mv_data = &wlpk; key.mv_data = &wlpk;
data.mv_data = xlogContent; data.mv_data = xlogContent;
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0) if (cp != 0)
{ {
ereport(PANIC,errmsg("651 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsn(wlpk.pageLsn), wlpk.partition));
if (cp == MDB_KEYEXIST) if (cp == MDB_KEYEXIST)
{ {
break; break;
@ -660,14 +698,16 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = totallen; data.mv_size = totallen;
data.mv_data = NULL; data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(totallen); xlogContent = (uint8_t *)malloc(totallen);
wlpk.pageLsn = ks[i].lsn; wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.partition = part; wlpk.partition = part;
key.mv_data = &wlpk; key.mv_data = &wlpk;
memcpy(xlogContent,buf,totallen); memcpy(xlogContent,buf + (part * 511),totallen);
data.mv_data = xlogContent; data.mv_data = xlogContent;
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0) if (cp != 0)
{ {
ereport(LOG,errmsg("673 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
if (cp == MDB_KEYEXIST) if (cp == MDB_KEYEXIST)
{ {
break; break;
@ -684,7 +724,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = totallen; data.mv_size = totallen;
data.mv_data = NULL; data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(totallen); xlogContent = (uint8_t *)malloc(totallen);
wlpk.pageLsn = ks[i].lsn; wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.partition = 0; wlpk.partition = 0;
key.mv_data = &wlpk; key.mv_data = &wlpk;
memcpy(xlogContent,buf,totallen); memcpy(xlogContent,buf,totallen);
@ -692,8 +732,15 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0) if (cp != 0)
{ {
ereport(LOG,errmsg("730 mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno,
SwapLsn(wlpk.pageLsn), wlpk.partition));
if (cp == MDB_KEYEXIST) if (cp == MDB_KEYEXIST)
{ {
cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET);
if (cp != 0)
ereport(LOG,errmsg(" 737 mdb_txn_get failed,error is:%d, rel %d, forkno %d, blk %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
continue; continue;
} }
} }
@ -703,7 +750,9 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
} }
mdb_cursor_close(tmpcursor); mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn); co = mdb_txn_commit(tmptxn);
if (co != 0)
ereport(LOG,errmsg("put mdb_txn_commit failed,error is:%d",co));
tmpcursor = NULL; tmpcursor = NULL;
tmptxn = NULL; tmptxn = NULL;
// pthread_mutex_unlock(&q_lock); // pthread_mutex_unlock(&q_lock);
@ -746,9 +795,11 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
WalLdPageKey wlpk; WalLdPageKey wlpk;
wlpk.sk.dbid = pk.relfileNode.dbNode; wlpk.sk.dbid = pk.relfileNode.dbNode;
wlpk.sk.relid = pk.relfileNode.relNode; wlpk.sk.relid = pk.relfileNode.relNode;
wlpk.pageLsn = pk.pageLsn; wlpk.sk.forkno = pk.forkNo;
wlpk.sk.blkno = pk.blkNo;
wlpk.pageLsn = SwapLsn(pk.pageLsn);
wlpk.partition = 0; wlpk.partition = 0;
Bufrd waldata = GetWalFromLocalBuffer(&pk); Bufrd waldata = GetWalFromLocalBuffer(&wlpk, pk.replyLsn);
if (waldata.count > 0) if (waldata.count > 0)
{ {
bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count); bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count);

View File

@ -245,6 +245,7 @@ static void XLogSendPhysical(void);
static void XLogSendLogical(void); static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data); static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void); static XLogRecPtr GetStandbyFlushRecPtr(void);
static XLogRecPtr GetStandbyReplayRecPtr(void);
static void IdentifySystem(void); static void IdentifySystem(void);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
@ -412,7 +413,7 @@ IdentifySystem(void)
if (am_cascading_walsender) if (am_cascading_walsender)
{ {
/* this also updates ThisTimeLineID */ /* this also updates ThisTimeLineID */
logptr = GetStandbyFlushRecPtr(); logptr = GetStandbyReplayRecPtr();
} }
else else
logptr = GetFlushRecPtr(); logptr = GetFlushRecPtr();
@ -639,7 +640,7 @@ StartReplication(StartReplicationCmd *cmd)
if (am_cascading_walsender) if (am_cascading_walsender)
{ {
/* this also updates ThisTimeLineID */ /* this also updates ThisTimeLineID */
FlushPtr = GetStandbyFlushRecPtr(); FlushPtr = GetStandbyReplayRecPtr();
} }
else else
FlushPtr = GetFlushRecPtr(); FlushPtr = GetFlushRecPtr();
@ -2911,7 +2912,7 @@ XLogSendTiKVPhysical(void)
*/ */
bool becameHistoric = false; bool becameHistoric = false;
SendRqstPtr = GetStandbyFlushRecPtr(); SendRqstPtr = GetStandbyReplayRecPtr();
if (!RecoveryInProgress()) if (!RecoveryInProgress())
{ {
@ -2927,7 +2928,7 @@ XLogSendTiKVPhysical(void)
/* /*
* Still a cascading standby. But is the timeline we're sending * Still a cascading standby. But is the timeline we're sending
* still the one recovery is recovering from? ThisTimeLineID was * still the one recovery is recovering from? ThisTimeLineID was
* updated by the GetStandbyFlushRecPtr() call above. * updated by the GetStandbyReplayRecPtr() call above.
*/ */
if (sendTimeLine != ThisTimeLineID) if (sendTimeLine != ThisTimeLineID)
becameHistoric = true; becameHistoric = true;
@ -3220,7 +3221,7 @@ XLogSendPhysical(void)
*/ */
bool becameHistoric = false; bool becameHistoric = false;
SendRqstPtr = GetStandbyFlushRecPtr(); SendRqstPtr = GetStandbyReplayRecPtr();
if (!RecoveryInProgress()) if (!RecoveryInProgress())
{ {
@ -3236,7 +3237,7 @@ XLogSendPhysical(void)
/* /*
* Still a cascading standby. But is the timeline we're sending * Still a cascading standby. But is the timeline we're sending
* still the one recovery is recovering from? ThisTimeLineID was * still the one recovery is recovering from? ThisTimeLineID was
* updated by the GetStandbyFlushRecPtr() call above. * updated by the GetStandbyReplayRecPtr() call above.
*/ */
if (sendTimeLine != ThisTimeLineID) if (sendTimeLine != ThisTimeLineID)
becameHistoric = true; becameHistoric = true;
@ -3627,6 +3628,28 @@ GetStandbyFlushRecPtr(void)
return result; return result;
} }
static XLogRecPtr
GetStandbyReplayRecPtr(void)
{
XLogRecPtr replayPtr;
XLogRecPtr consistPtr;
XLogRecPtr result;
TimeLineID replayTLI;
/*
* We can safely send what's already been replayed.
*/
replayPtr = GetXLogReplayRecPtr(&replayTLI);
consistPtr = GetXLogPushToDisk();
Assert(consistPtr <= replayPtr);
ThisTimeLineID = replayTLI;
return consistPtr;
}
/* /*
* Request walsenders to reload the currently-open WAL file * Request walsenders to reload the currently-open WAL file
*/ */

View File

@ -1933,7 +1933,8 @@ retry:
/* safety check: should definitely not be our *own* pin */ /* safety check: should definitely not be our *own* pin */
if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
elog(ERROR, "buffer is pinned in InvalidateBuffer"); elog(ERROR, "buffer is pinned in InvalidateBuffer");
WaitIO(buf); // WaitIO(buf);
pg_usleep(1000L);
goto retry; goto retry;
} }
@ -5077,7 +5078,17 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
buf_state |= set_flag_bits; if (!(IsBootstrapProcessingMode() == true || InitdbSingle == true) && (InRecovery || EnableHotStandby) && set_flag_bits == BM_VALID)
{
XLogRecPtr pageLsn = BufferGetLSN(buf);
XLogRecPtr replayLsn = GetXLogReplayRecPtr(NULL);
LsnNode *head = GetLogIndexByPage(&buf->tag, pageLsn, replayLsn);
if (head->next != NULL)
buf_state &= ~BM_VALID;
FreeLsnNode(head);
} else {
buf_state |= set_flag_bits;
}
UnlockBufHdr(buf, buf_state); UnlockBufHdr(buf, buf_state);
if (!bulk_io_is_in_progress) if (!bulk_io_is_in_progress)
{ {

View File

@ -603,7 +603,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
if(mem_tbl->meta.max_lsn < start_lsn) if(mem_tbl->meta.max_lsn < start_lsn)
{ {
continue; continue;
}else if(mem_tbl->meta.min_lsn >= end_lsn) }else if(mem_tbl->meta.min_lsn > end_lsn)
{ {
// there is no suitability lsn_list after this mem table // there is no suitability lsn_list after this mem table
LWLockRelease(LogIndexMemListLock); LWLockRelease(LogIndexMemListLock);
@ -628,7 +628,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]); page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]);
uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1); uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1);
page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]); page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]);
if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn) if(page_min_lsn > end_lsn || page_max_lsn < start_lsn)
{ {
continue; continue;
} }
@ -646,7 +646,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]); LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]);
// current mem table no suitability lsn_list // current mem table no suitability lsn_list
if(!(mem_tbl->meta.max_lsn < start_lsn || mem_tbl->meta.min_lsn >= end_lsn)) if(!(mem_tbl->meta.max_lsn < start_lsn || mem_tbl->meta.min_lsn > end_lsn))
{ {
end_lsn = Min(end_lsn, mem_tbl->meta.max_lsn); end_lsn = Min(end_lsn, mem_tbl->meta.max_lsn);
head_node->tag.lsn = end_lsn; head_node->tag.lsn = end_lsn;
@ -665,7 +665,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]); page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]);
uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1); uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1);
page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]); page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]);
if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn) if(page_min_lsn > end_lsn || page_max_lsn < start_lsn)
{ {
continue; continue;
} }

View File

@ -781,6 +781,8 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
GetPageFromCurrentNode(pageKey, bufrd); GetPageFromCurrentNode(pageKey, bufrd);
count = bufrd->count; count = bufrd->count;
// elog(LOG, "read page from local rel %d, fork %d, blk %d, nbytes %d, replaylsn %X/%X",
// pageTag.rnode.relNode, pageTag.forkNum, pageTag.blockNum, count, LSN_FORMAT_ARGS(lsn));
} }
if (count > 0) if (count > 0)
@ -838,10 +840,12 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
WalLdPageKey wlpk; WalLdPageKey wlpk;
wlpk.sk.dbid = pageKey.relfileNode.dbNode; wlpk.sk.dbid = pageKey.relfileNode.dbNode;
wlpk.sk.relid = pageKey.relfileNode.relNode; wlpk.sk.relid = pageKey.relfileNode.relNode;
wlpk.pageLsn = pageKey.pageLsn; wlpk.sk.forkno = pageKey.forkNo;
wlpk.sk.blkno = pageKey.blkNo;
wlpk.pageLsn = SwapLsn(pageKey.pageLsn);
wlpk.partition = 0; wlpk.partition = 0;
result.count = 0; result.count = 0;
result = GetWalFromLocalBuffer(&wlpk); result = GetWalFromLocalBuffer(&wlpk, lsn);
if (result.count == 0) if (result.count == 0)
{ {

View File

@ -484,7 +484,7 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
// return; // return;
// } // }
if ((push_standby != true && (EnableHotStandby != true || *isPromoteIsTriggered)) || IsBootstrapProcessingMode() || InitdbSingle || he3mirror) { if (!he3share || (push_standby != true && (EnableHotStandby != true || *isPromoteIsTriggered)) || IsBootstrapProcessingMode() || InitdbSingle || he3mirror) {
smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum, smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
buffer, skipFsync); buffer, skipFsync);
// elog(LOG,"smgrextend reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum); // elog(LOG,"smgrextend reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum);

View File

@ -379,7 +379,7 @@ ProcessSyncRequests(void)
* all. (We delay checking until this point so that changing fsync on * all. (We delay checking until this point so that changing fsync on
* the fly behaves sensibly.) * the fly behaves sensibly.)
*/ */
if (enableFsync) if (enableFsync && push_standby)
{ {
/* /*
* If in checkpointer, we want to absorb pending requests every so * If in checkpointer, we want to absorb pending requests every so

View File

@ -1155,13 +1155,15 @@ XLogRecPtr He3DBQueryMinLsnFromAllStanby()
{ {
int i; int i;
XLogRecPtr minApplyLsn = 0; XLogRecPtr minApplyLsn = 0;
int procpid = -1; int *procpids;
int maxid = 0;
procpids = (int *) malloc(max_wal_senders * sizeof(int));
for (i = 0; i < NumBackendStatSlots; i++) for (i = 0; i < NumBackendStatSlots; i++)
{ {
if (strcmp(BackendStatusArray[i].st_appname, "pgmirror") == 0) if (strcmp(BackendStatusArray[i].st_appname, "pgmirror") == 0 || memcmp(BackendStatusArray[i].st_appname, "priv", 4) == 0)
{ {
procpid = BackendStatusArray[i].st_procpid; procpids[maxid] = BackendStatusArray[i].st_procpid;
break; maxid++;
} }
} }
Assert(WalSndCtl != NULL); Assert(WalSndCtl != NULL);
@ -1180,12 +1182,23 @@ XLogRecPtr He3DBQueryMinLsnFromAllStanby()
pid = walsnd->pid; pid = walsnd->pid;
apply = walsnd->apply; apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
if (pid != procpid) int j;
bool exist = false;
for (j = 0; j < maxid; j++)
{
if (pid == procpids[j])
{
exist = true;
break;
}
}
if (!exist)
{ {
if (apply < minApplyLsn || minApplyLsn == 0) if (apply < minApplyLsn || minApplyLsn == 0)
minApplyLsn = apply; minApplyLsn = apply;
} }
} }
free(procpids);
return minApplyLsn; return minApplyLsn;
} }

View File

@ -607,6 +607,8 @@ char *pgstat_temp_directory;
char *application_name; char *application_name;
bool push_standby = false; bool push_standby = false;
bool he3share = true;
bool mpush = false;
bool he3_point_in_time_recovery; bool he3_point_in_time_recovery;
bool he3mirror = false; bool he3mirror = false;
bool pgmirror = false; bool pgmirror = false;
@ -2137,6 +2139,24 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL NULL, NULL, NULL
}, },
{
{"he3share", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY,
gettext_noop("Sets storage is shared if he3share is configured true."),
},
&he3share,
true,
NULL, NULL, NULL
},
{
{"mpush", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY,
gettext_noop("Sets push_standby is belong to master if mpush is configured true."),
},
&mpush,
false,
NULL, NULL, NULL
},
{ {
{"he3_point_in_time_recovery", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, {"he3_point_in_time_recovery", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY,
gettext_noop("Sets whether we are in he3 pitr"), gettext_noop("Sets whether we are in he3 pitr"),

View File

@ -119,7 +119,7 @@ extern void storeWalInLocalBuffer(kvStruct *ks,int32 length);
extern void ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer); // when evict one page out databuffer, we should call this to store the page. extern void ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer); // when evict one page out databuffer, we should call this to store the page.
extern void GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd); // async delete old version page and wal. we should call this when move page from ld/sdb to db. extern void GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd); // async delete old version page and wal. we should call this when move page from ld/sdb to db.
extern Bufrd GetWalFromLd(PageKey *pk); extern Bufrd GetWalFromLd(PageKey *pk);
extern Bufrd GetWalFromLocalBuffer(WalLdPageKey *pk); extern Bufrd GetWalFromLocalBuffer(WalLdPageKey *pk, uint64_t replyLsn);
extern void AddOneItemToDPArray(OriginDPageKey pk); extern void AddOneItemToDPArray(OriginDPageKey pk);
extern void SecondBufferMain(void); extern void SecondBufferMain(void);
extern void ClosePageDBEnv(void); extern void ClosePageDBEnv(void);
@ -128,3 +128,4 @@ extern void CloseWalEnv(void);
extern void CreateSecondBufferLWLocks(void); extern void CreateSecondBufferLWLocks(void);
extern Size SecondBufferLWLockShmemSize(void); extern Size SecondBufferLWLockShmemSize(void);
extern Size SecondBufferShmemSize(void); extern Size SecondBufferShmemSize(void);
extern uint64_t SwapLsn(uint64_t lsn);

View File

@ -277,6 +277,8 @@ extern PGDLLIMPORT bool push_standby;
extern PGDLLIMPORT bool he3_point_in_time_recovery; extern PGDLLIMPORT bool he3_point_in_time_recovery;
extern PGDLLIMPORT bool he3mirror; extern PGDLLIMPORT bool he3mirror;
extern PGDLLIMPORT bool pgmirror; extern PGDLLIMPORT bool pgmirror;
extern PGDLLIMPORT bool he3share;
extern PGDLLIMPORT bool mpush;
extern int tcp_keepalives_idle; extern int tcp_keepalives_idle;