diff --git a/script/activehe3pgfromhe3pgforprivate.sh b/script/activehe3pgfromhe3pgforprivate.sh new file mode 100644 index 0000000..b78f566 --- /dev/null +++ b/script/activehe3pgfromhe3pgforprivate.sh @@ -0,0 +1,44 @@ +#!/bin/bash +export PATH=/home/postgres/psql14/bin:$PATH +export PGDATABASE=postgres +export PGHOST=127.0.0.1 +export PGUSER=postgres +export PGPORT=15433 +export PGPASSWORD=123456 +slaveDataDir=/home/postgres/slavedata/pgdata +slavepushDataDir=/home/postgres/slavepushdata/pgdata + +sed -i 's/^he3share/#he3share/g' $slaveDataDir/postgresql.auto.conf +sed -i 's/^he3share/#he3share/g' $slaveDataDir/postgresql.conf +sed -i 's/^hot_standby/#hot_standby/g' $slaveDataDir/postgresql.conf +sed -i 's/^primary_conninfo/#primary_conninfo/g' $slaveDataDir/postgresql.auto.conf +sed -i 's/^primary_conninfo/#primary_conninfo/g' $slaveDataDir/postgresql.conf + +echo -e "he3share = on" >> $slaveDataDir/postgresql.conf +echo -e "hot_standby=off" >> $slaveDataDir/postgresql.conf + +sed -i 's/^mpush/#mpush/g' $slavepushDataDir/postgresql.conf +sed -i 's/^mpush/#mpush/g' $slavepushDataDir/postgresql.auto.conf + +echo -e "mpush=on" >> $slavepushDataDir/postgresql.conf + +psql -c 'SELECT pg_promote(true, 30)' +if [ $? -ne 0 ] +then + echo "$(date "+%F %T"): He3DB slave instance promote failed!" + exit 1 +fi + +pg_ctl -D $slaveDataDir reload +if [ $? -ne 0 ] +then + echo "$(date "+%F %T"): He3DB slave instance reload failed!" + exit 1 +fi + +pg_ctl -D $slavepushDataDir reload +if [ $? -ne 0 ] +then + echo "$(date "+%F %T"): He3DB push instance reload failed!" + exit 1 +fi \ No newline at end of file diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 00f3a63..0d096de 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5808,7 +5808,7 @@ static void xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionId xid, XLogRecPtr lsn, - RepOriginId origin_id) + RepOriginId origin_id, XLogRecPtr startlsn) { TransactionId max_xid; TimestampTz commit_time; @@ -5916,6 +5916,15 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (push_standby) { pushTikv(0, hashMapSize(), true); + } else { + XLogRecPtr consistPtr; + consistPtr = GetXLogPushToDisk(); + while (consistPtr < startlsn) + { + pg_usleep(100000L); + elog(LOG, "standby consist lsn %ld, commit lsn %ld", consistPtr, startlsn); + consistPtr = GetXLogPushToDisk(); + } } /* Make sure files supposed to be dropped are dropped */ @@ -5957,7 +5966,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, */ static void xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, - XLogRecPtr lsn, RepOriginId origin_id) + XLogRecPtr lsn, RepOriginId origin_id, XLogRecPtr startlsn) { TransactionId max_xid; @@ -6025,6 +6034,15 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, if (push_standby) { pushTikv(0, hashMapSize(), true); + } else { + XLogRecPtr consistPtr; + consistPtr = GetXLogPushToDisk(); + while (consistPtr < startlsn) + { + pg_usleep(100000L); + elog(LOG, "standby consist lsn %ld, abort lsn %ld", consistPtr, startlsn); + consistPtr = GetXLogPushToDisk(); + } } DropRelationFiles(parsed->xnodes, parsed->nrels, true); @@ -6046,7 +6064,7 @@ xact_redo(XLogReaderState *record) ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_commit(&parsed, XLogRecGetXid(record), - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); } else if (info == XLOG_XACT_COMMIT_PREPARED) { @@ -6055,7 +6073,7 @@ xact_redo(XLogReaderState *record) ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_commit(&parsed, parsed.twophase_xid, - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); /* Delete TwoPhaseState gxact entry and/or 2PC file. */ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); @@ -6069,7 +6087,7 @@ xact_redo(XLogReaderState *record) ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_abort(&parsed, XLogRecGetXid(record), - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); } else if (info == XLOG_XACT_ABORT_PREPARED) { @@ -6078,7 +6096,7 @@ xact_redo(XLogReaderState *record) ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_abort(&parsed, parsed.twophase_xid, - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); /* Delete TwoPhaseState gxact entry and/or 2PC file. */ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a5d5898..af31ce9 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8151,6 +8151,14 @@ void StartupXLOG(void) startupPid = getpid(); if(IsBootstrapProcessingMode() != true && InitdbSingle != true) { initPthreadPool(); + if ((EnableHotStandby && *isPromoteIsTriggered == false && !push_standby)) + { + pthread_t ntid; + int err; + err = pthread_create(&ntid, NULL, CleanWalsInLmdb, NULL); + if (err != 0) + elog(PANIC,"pthread_create CleanWalsInLmdb failed %s",strerror(err)); + } } /* @@ -9322,6 +9330,20 @@ void StartupXLOG(void) if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) { pushTikv(0,hashMapSize(),true); } + } else { + RmgrId rmgrId = XLogRecGetRmid(xlogreader); + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) { + XLogRecPtr consistPtr, startlsn; + consistPtr = GetXLogPushToDisk(); + startlsn = record->xl_end - record->xl_tot_len; + while (consistPtr < startlsn) + { + pg_usleep(100000L); + elog(LOG, "standby consist lsn %ld, truncate lsn %ld", consistPtr, startlsn); + consistPtr = GetXLogPushToDisk(); + } + } } RmgrTable[record->xl_rmid].rm_redo(xlogreader); hasReplay = true; @@ -14519,15 +14541,15 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL bool pushFlag = false; do { if (gRingBufferManger->maxIdx > spaceNum/16 || timeCount > 1000 || pushFlag == true) { - if (push_standby == true || EnableHotStandby == false || *isPromoteIsTriggered) { + if (push_standby == true || EnableHotStandby == false) { pushTikv(0, hashMapSize(), true); } if (gRingBufferManger->maxIdx != 0 ) { ring_buffer_dequeue_arr(gRingBufferManger,gRingBufferManger->maxIdx); gRingBufferManger->maxIdx = 0; if (pushFlag == true) { - return -1; - } + return -1; + } } } if (1 == ring_buffer_peek(gRingBufferManger,&FirstData,gRingBufferManger->maxIdx)) { @@ -14541,15 +14563,15 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL } else if (startPtr > FirstData->startLsn) { if (FirstData->startLsn == 0) { if (he3mirror) { - if (FirstData->checkPointLsn != 0) { - SetFileReplayLsn(FirstData->checkPointLsn); - } + if (FirstData->checkPointLsn != 0) { + SetFileReplayLsn(FirstData->checkPointLsn); + } } gRingBufferManger->maxIdx++; pushFlag = true; } // for master startup donot know wal endLsn,default failed to get 8 times of lsn is wal file end - if (!EnableHotStandby || *isPromoteIsTriggered) { + if (!EnableHotStandby || LocalPromoteIsTriggered) { WalTaskImmediateFree(); } continue; @@ -14693,7 +14715,7 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, XLogRecPtr maxFlushedUpto = 0; //storage batch is 16k batch,we has 8 of pthreads - if (!EnableHotStandby || *isPromoteIsTriggered) { + if (!EnableHotStandby || LocalPromoteIsTriggered) { maxFlushedUpto = readedUpto + 8 * 16 * 1024; } else { if (readedUpto < flushedUpto) { diff --git a/src/backend/postmaster/secondbuffer.c b/src/backend/postmaster/secondbuffer.c index 04894fe..5bb031e 100644 --- a/src/backend/postmaster/secondbuffer.c +++ b/src/backend/postmaster/secondbuffer.c @@ -29,7 +29,11 @@ #include #include #include "storage/bufmgr.h" +#include +#include +const char *socketfile = "/tmp/he3cleanwal"; +#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8)) typedef struct SingleKeyArray { @@ -59,6 +63,9 @@ typedef struct DPageKeyArray /* secondbufferhash code */ + +static void CleanWalsByPage(WalLdPageKey *walkey); + static HTAB *SecondBufferHash = NULL; extern bool EnableHotStandby; DPageKeyArray *DPArray = NULL; @@ -478,7 +485,7 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer) } uint64_t -SwapLsn(uint64_t lsn) { +SwapLsnFromLittleToBig(uint64_t lsn) { #ifndef WORDS_BIGENDIAN /* trans lsn from little endian to big endian in memory * eg: 0x12345678 ===> 0x78563412 @@ -498,6 +505,27 @@ SwapLsn(uint64_t lsn) { return lsn; } +uint64_t +SwapLsnFromBigToLittle(uint64_t lsn) { + #ifndef WORDS_BIGENDIAN + /* trans lsn from big endian to little endian in memory + * eg: 0x78563412 ===> 0x12345678 + */ + + uint32 low, high; + low = (uint32) (lsn); + high = (uint32) ((lsn) >> 32); + + low = (low << 16) | (low >> 16); + low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF); + + high = (high << 16) | (high >> 16); + high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF); + return ((uint64)(low)) << 32 | (uint64)(high); + #endif + return lsn; +} + Bufrd GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) { @@ -511,7 +539,7 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) data.mv_size = 0; data.mv_data = NULL; - key.mv_size = sizeof(WalLdPageKey); + key.mv_size = SizeOfCleanWal; key.mv_data = wpk; int waldatalen = 0, roomlen = 2048; uint32 dbid,relid,forkno,blkno; @@ -538,11 +566,11 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) ereport(PANIC,errmsg("mdb_txn_open failed,error is:%d",co)); } // ereport(LOG,errmsg("535 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", - // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition)); + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE)) != 0) { ereport(LOG, errmsg("mdb_txn_get failed,error is:%d, dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", - cg, dbid, relid, forkno, blkno, SwapLsn(wpk->pageLsn), wpk->partition)); + cg, dbid, relid, forkno, blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); bufrd.buf = waldata; mdb_cursor_close(tmpcursor); mdb_txn_abort(tmptxn); @@ -552,12 +580,12 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) // ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", - // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition)); + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); while (wpk->sk.dbid == dbid && wpk->sk.relid == relid && wpk->sk.forkno == forkno && wpk->sk.blkno == blkno - && SwapLsn(wpk->pageLsn) < replyLsn) + && SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn) { memcpy(waldata + waldatalen,data.mv_data,data.mv_size); waldatalen += data.mv_size; @@ -580,7 +608,7 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) wpk = (WalLdPageKey *)key.mv_data; // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", - // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition)); + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); } } @@ -660,7 +688,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) | (uint32_t)(buf[2] << 16) | (uint32_t)(buf[3] << 24); - key.mv_size = sizeof(WalLdPageKey); + key.mv_size = SizeOfCleanWal; wlpk.sk = ks[i].lpk.sk; if (totallen > 511) @@ -672,7 +700,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) data.mv_size = 511; data.mv_data = NULL; xlogContent = (uint8_t *)malloc(511); - wlpk.pageLsn = SwapLsn(ks[i].lsn); + wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn); wlpk.partition = part; memcpy(xlogContent,buf + (part * 511), 511); //502 = 511 - 9 @@ -683,8 +711,8 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); if (cp != 0) { - ereport(PANIC,errmsg("651 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", - cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsn(wlpk.pageLsn), wlpk.partition)); + ereport(LOG,errmsg("mdb_txn_put big wal failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", + cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition)); if (cp == MDB_KEYEXIST) { break; @@ -698,7 +726,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) data.mv_size = totallen; data.mv_data = NULL; xlogContent = (uint8_t *)malloc(totallen); - wlpk.pageLsn = SwapLsn(ks[i].lsn); + wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn); wlpk.partition = part; key.mv_data = &wlpk; memcpy(xlogContent,buf + (part * 511),totallen); @@ -706,7 +734,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); if (cp != 0) { - ereport(LOG,errmsg("673 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d", + ereport(LOG,errmsg("mdb_txn_put last of big wal failed,error is:%d, rel %d, forkno %d, blk %d", cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); if (cp == MDB_KEYEXIST) { @@ -724,7 +752,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) data.mv_size = totallen; data.mv_data = NULL; xlogContent = (uint8_t *)malloc(totallen); - wlpk.pageLsn = SwapLsn(ks[i].lsn); + wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn); wlpk.partition = 0; key.mv_data = &wlpk; memcpy(xlogContent,buf,totallen); @@ -732,14 +760,14 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); if (cp != 0) { - ereport(LOG,errmsg("730 mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", + ereport(LOG,errmsg("mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, - SwapLsn(wlpk.pageLsn), wlpk.partition)); + SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition)); if (cp == MDB_KEYEXIST) { cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET); if (cp != 0) - ereport(LOG,errmsg(" 737 mdb_txn_get failed,error is:%d, rel %d, forkno %d, blk %d", + ereport(LOG,errmsg(" mdb_txn_get failed when put exist,error is:%d, rel %d, forkno %d, blk %d", cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); continue; } @@ -797,13 +825,15 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd) wlpk.sk.relid = pk.relfileNode.relNode; wlpk.sk.forkno = pk.forkNo; wlpk.sk.blkno = pk.blkNo; - wlpk.pageLsn = SwapLsn(pk.pageLsn); + wlpk.pageLsn = SwapLsnFromLittleToBig(pk.pageLsn); wlpk.partition = 0; Bufrd waldata = GetWalFromLocalBuffer(&wlpk, pk.replyLsn); if (waldata.count > 0) { bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count); memcpy(bufrd->buf + 8192, waldata.buf,waldata.count); + wlpk.pageLsn = SwapLsnFromLittleToBig(pk.replyLsn); + SendInvalWal(&wlpk); } bufrd->cap = bufrd->count = 8192 + waldata.count; free(waldata.buf); @@ -811,6 +841,148 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd) } } +void SendInvalWal(WalLdPageKey *walkey) { + int sock_fd; + struct sockaddr_un un; + un.sun_family = AF_UNIX; + strcpy(un.sun_path, socketfile); + sock_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock_fd < 0) + { + elog(WARNING, "request socket failed"); + return; + } + + if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0) + { + elog(WARNING, "connect socket failed"); + return; + } + // elog(LOG, "send request: rel %d, fork %d, blk %d, lsn %ld", + // walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn)); + send(sock_fd, walkey, SizeOfCleanWal, 0); + // elog(LOG, "send success"); + close(sock_fd); + return; +} + +void *CleanWalsInLmdb(void *arg) { + int fd, new_fd; + struct sockaddr_un un; + static char data_buf[SizeOfCleanWal]; + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + elog(PANIC, "request cleanwal socket failed"); + un.sun_family = AF_UNIX; + unlink(socketfile); + strcpy(un.sun_path, socketfile); + if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0) + elog(PANIC, "bind cleanwal socket failed"); + if (listen(fd, MaxBackends+8) < 0) + elog(PANIC, "listen cleanwal socket failed"); + while(1) { + new_fd = accept(fd, NULL, NULL); + if (new_fd < 0) + { + close(fd); + unlink(socketfile); + elog(PANIC, "cannot accept client connect request"); + } + memset(data_buf, 0, SizeOfCleanWal); + read(new_fd, data_buf, SizeOfCleanWal); + WalLdPageKey *wpk = (WalLdPageKey *)data_buf; + + // elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld", + // wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn)); + CleanWalsByPage(wpk); + close(new_fd); + } + close(fd); + unlink(socketfile); + +} + +static void +CleanWalsByPage(WalLdPageKey *walkey) +{ + MDB_txn *tmptxn; + MDB_cursor *tmpcursor; + MDB_val key, data; + int success = -1; + uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn); + + walkey->pageLsn = 0; + key.mv_size = SizeOfCleanWal; + key.mv_data = walkey; + + data.mv_size = 0; + data.mv_data = NULL; + + uint32 dbid,relid,forkno,blkno; + dbid = walkey->sk.dbid; + relid = walkey->sk.relid; + forkno = walkey->sk.forkno; + blkno = walkey->sk.blkno; + + success = mdb_txn_begin(walEnv, NULL, 0, &tmptxn); + if (success != 0) + { + elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success); + return; + } + + success = mdb_cursor_open(tmptxn, walDbi,&tmpcursor); + if (success != 0) + { + elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success); + return; + } + + success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE); + + if (success != 0) + { + ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld", + success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn))); + mdb_cursor_close(tmpcursor); + mdb_txn_abort(tmptxn); + return; + } + + walkey = (WalLdPageKey *)key.mv_data; + // elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d", + // walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition); + + while (walkey->sk.dbid == dbid + && walkey->sk.relid == relid + && walkey->sk.forkno == forkno + && walkey->sk.blkno == blkno + && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn) + { + // elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d", + // walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition); + success = mdb_cursor_del(tmpcursor,0); + if (success != 0) + elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld", + success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn)); + if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT)) + { + break; + } + + walkey = (WalLdPageKey *)key.mv_data; + // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); + } + + mdb_cursor_close(tmpcursor); + mdb_txn_commit(tmptxn); + tmpcursor = NULL; + tmptxn = NULL; + return; +} + + static void * RemovePageOrWalFromCurrentNode() { @@ -1033,8 +1205,7 @@ MovePageFromSecondBufferToLocalBuffer() data.mv_size = 8192; data.mv_data = spv->pagecontent; - - mp = mdb_put(tmptxn,pageDbi,&key,&data, MDB_NODUPDATA); + mp = mdb_put(tmptxn,pageDbi,&key,&data, 0); LWLockRelease(partitionLock); if (mp != 0) { diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 113b765..319fb19 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -5086,7 +5086,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - if (!(IsBootstrapProcessingMode() == true || InitdbSingle == true) && (InRecovery || EnableHotStandby) && set_flag_bits == BM_VALID) + if (!(IsBootstrapProcessingMode() == true || InitdbSingle == true) && (InRecovery || (EnableHotStandby && *isPromoteIsTriggered == false)) && set_flag_bits == BM_VALID) { XLogRecPtr pageLsn = BufferGetLSN(buf); XLogRecPtr replayLsn = GetXLogReplayRecPtr(NULL); diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index e168ffd..a4d836b 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -773,7 +773,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo pageKey.relfileNode.relNode = pageTag.rnode.relNode; pageKey.forkNo = (uint32)pageTag.forkNum; pageKey.blkNo = pageTag.blockNum; - pageKey.pageLsn = 0; + pageKey.pageLsn = GetXLogPushToDisk(); pageKey.replyLsn = lsn; odpk.pk = pageKey; @@ -827,7 +827,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo return nbytes; } - pageKey.pageLsn = PageGetLSN(*buffer); + pageKey.pageLsn = GetXLogPushToDisk(); pageKey.replyLsn = lsn; @@ -842,7 +842,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo wlpk.sk.relid = pageKey.relfileNode.relNode; wlpk.sk.forkno = pageKey.forkNo; wlpk.sk.blkno = pageKey.blkNo; - wlpk.pageLsn = SwapLsn(pageKey.pageLsn); + wlpk.pageLsn = SwapLsnFromLittleToBig(pageKey.pageLsn); wlpk.partition = 0; result.count = 0; result = GetWalFromLocalBuffer(&wlpk, lsn); @@ -857,6 +857,8 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo nbytes += result.count; *buffer = (uint8_t *)realloc(*buffer, BLCKSZ + result.count); memcpy((*buffer) + BLCKSZ, result.buf, result.count); + wlpk.pageLsn = SwapLsnFromLittleToBig(lsn); + SendInvalWal(&wlpk); // TODO free result free_dataRead(result.buf, result.count, result.cap); } diff --git a/src/include/postmaster/secondbuffer.h b/src/include/postmaster/secondbuffer.h index d2964fc..ae6b0ac 100644 --- a/src/include/postmaster/secondbuffer.h +++ b/src/include/postmaster/secondbuffer.h @@ -128,4 +128,7 @@ extern void CloseWalEnv(void); extern void CreateSecondBufferLWLocks(void); extern Size SecondBufferLWLockShmemSize(void); extern Size SecondBufferShmemSize(void); -extern uint64_t SwapLsn(uint64_t lsn); +extern uint64_t SwapLsnFromLittleToBig(uint64_t lsn); +extern uint64_t SwapLsnFromBigToLittle(uint64_t lsn); +extern void SendInvalWal(WalLdPageKey *walkey); +extern void *CleanWalsInLmdb(void *arg);