Merge branch 'dev_performance' of gitee.com:he3db/he3pg into dev_performance

This commit is contained in:
shenzhengntu 2023-06-09 16:57:07 +08:00
commit a865a7669d
7 changed files with 299 additions and 39 deletions

View File

@ -0,0 +1,44 @@
#!/bin/bash
export PATH=/home/postgres/psql14/bin:$PATH
export PGDATABASE=postgres
export PGHOST=127.0.0.1
export PGUSER=postgres
export PGPORT=15433
export PGPASSWORD=123456
slaveDataDir=/home/postgres/slavedata/pgdata
slavepushDataDir=/home/postgres/slavepushdata/pgdata
sed -i 's/^he3share/#he3share/g' $slaveDataDir/postgresql.auto.conf
sed -i 's/^he3share/#he3share/g' $slaveDataDir/postgresql.conf
sed -i 's/^hot_standby/#hot_standby/g' $slaveDataDir/postgresql.conf
sed -i 's/^primary_conninfo/#primary_conninfo/g' $slaveDataDir/postgresql.auto.conf
sed -i 's/^primary_conninfo/#primary_conninfo/g' $slaveDataDir/postgresql.conf
echo -e "he3share = on" >> $slaveDataDir/postgresql.conf
echo -e "hot_standby=off" >> $slaveDataDir/postgresql.conf
sed -i 's/^mpush/#mpush/g' $slavepushDataDir/postgresql.conf
sed -i 's/^mpush/#mpush/g' $slavepushDataDir/postgresql.auto.conf
echo -e "mpush=on" >> $slavepushDataDir/postgresql.conf
psql -c 'SELECT pg_promote(true, 30)'
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB slave instance promote failed!"
exit 1
fi
pg_ctl -D $slaveDataDir reload
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB slave instance reload failed!"
exit 1
fi
pg_ctl -D $slavepushDataDir reload
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB push instance reload failed!"
exit 1
fi

View File

@ -5808,7 +5808,7 @@ static void
xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid,
XLogRecPtr lsn,
RepOriginId origin_id)
RepOriginId origin_id, XLogRecPtr startlsn)
{
TransactionId max_xid;
TimestampTz commit_time;
@ -5916,6 +5916,15 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
if (push_standby)
{
pushTikv(0, hashMapSize(), true);
} else {
XLogRecPtr consistPtr;
consistPtr = GetXLogPushToDisk();
while (consistPtr < startlsn)
{
pg_usleep(100000L);
elog(LOG, "standby consist lsn %ld, commit lsn %ld", consistPtr, startlsn);
consistPtr = GetXLogPushToDisk();
}
}
/* Make sure files supposed to be dropped are dropped */
@ -5957,7 +5966,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
*/
static void
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
XLogRecPtr lsn, RepOriginId origin_id)
XLogRecPtr lsn, RepOriginId origin_id, XLogRecPtr startlsn)
{
TransactionId max_xid;
@ -6025,6 +6034,15 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
if (push_standby)
{
pushTikv(0, hashMapSize(), true);
} else {
XLogRecPtr consistPtr;
consistPtr = GetXLogPushToDisk();
while (consistPtr < startlsn)
{
pg_usleep(100000L);
elog(LOG, "standby consist lsn %ld, abort lsn %ld", consistPtr, startlsn);
consistPtr = GetXLogPushToDisk();
}
}
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
@ -6046,7 +6064,7 @@ xact_redo(XLogReaderState *record)
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_commit(&parsed, XLogRecGetXid(record),
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
}
else if (info == XLOG_XACT_COMMIT_PREPARED)
{
@ -6055,7 +6073,7 @@ xact_redo(XLogReaderState *record)
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_commit(&parsed, parsed.twophase_xid,
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
@ -6069,7 +6087,7 @@ xact_redo(XLogReaderState *record)
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_abort(&parsed, XLogRecGetXid(record),
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
@ -6078,7 +6096,7 @@ xact_redo(XLogReaderState *record)
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_abort(&parsed, parsed.twophase_xid,
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);

View File

@ -8151,6 +8151,14 @@ void StartupXLOG(void)
startupPid = getpid();
if(IsBootstrapProcessingMode() != true && InitdbSingle != true) {
initPthreadPool();
if ((EnableHotStandby && *isPromoteIsTriggered == false && !push_standby))
{
pthread_t ntid;
int err;
err = pthread_create(&ntid, NULL, CleanWalsInLmdb, NULL);
if (err != 0)
elog(PANIC,"pthread_create CleanWalsInLmdb failed %s",strerror(err));
}
}
/*
@ -9322,6 +9330,20 @@ void StartupXLOG(void)
if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) {
pushTikv(0,hashMapSize(),true);
}
} else {
RmgrId rmgrId = XLogRecGetRmid(xlogreader);
uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) {
XLogRecPtr consistPtr, startlsn;
consistPtr = GetXLogPushToDisk();
startlsn = record->xl_end - record->xl_tot_len;
while (consistPtr < startlsn)
{
pg_usleep(100000L);
elog(LOG, "standby consist lsn %ld, truncate lsn %ld", consistPtr, startlsn);
consistPtr = GetXLogPushToDisk();
}
}
}
RmgrTable[record->xl_rmid].rm_redo(xlogreader);
hasReplay = true;
@ -14519,15 +14541,15 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
bool pushFlag = false;
do {
if (gRingBufferManger->maxIdx > spaceNum/16 || timeCount > 1000 || pushFlag == true) {
if (push_standby == true || EnableHotStandby == false || *isPromoteIsTriggered) {
if (push_standby == true || EnableHotStandby == false) {
pushTikv(0, hashMapSize(), true);
}
if (gRingBufferManger->maxIdx != 0 ) {
ring_buffer_dequeue_arr(gRingBufferManger,gRingBufferManger->maxIdx);
gRingBufferManger->maxIdx = 0;
if (pushFlag == true) {
return -1;
}
return -1;
}
}
}
if (1 == ring_buffer_peek(gRingBufferManger,&FirstData,gRingBufferManger->maxIdx)) {
@ -14541,15 +14563,15 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
} else if (startPtr > FirstData->startLsn) {
if (FirstData->startLsn == 0) {
if (he3mirror) {
if (FirstData->checkPointLsn != 0) {
SetFileReplayLsn(FirstData->checkPointLsn);
}
if (FirstData->checkPointLsn != 0) {
SetFileReplayLsn(FirstData->checkPointLsn);
}
}
gRingBufferManger->maxIdx++;
pushFlag = true;
}
// for master startup donot know wal endLsn,default failed to get 8 times of lsn is wal file end
if (!EnableHotStandby || *isPromoteIsTriggered) {
if (!EnableHotStandby || LocalPromoteIsTriggered) {
WalTaskImmediateFree();
}
continue;
@ -14693,7 +14715,7 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
XLogRecPtr maxFlushedUpto = 0;
//storage batch is 16k batch,we has 8 of pthreads
if (!EnableHotStandby || *isPromoteIsTriggered) {
if (!EnableHotStandby || LocalPromoteIsTriggered) {
maxFlushedUpto = readedUpto + 8 * 16 * 1024;
} else {
if (readedUpto < flushedUpto) {

View File

@ -29,7 +29,11 @@
#include <stdlib.h>
#include <time.h>
#include "storage/bufmgr.h"
#include <sys/socket.h>
#include <sys/un.h>
const char *socketfile = "/tmp/he3cleanwal";
#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8))
typedef struct SingleKeyArray
{
@ -59,6 +63,9 @@ typedef struct DPageKeyArray
/*
secondbufferhash code
*/
static void CleanWalsByPage(WalLdPageKey *walkey);
static HTAB *SecondBufferHash = NULL;
extern bool EnableHotStandby;
DPageKeyArray *DPArray = NULL;
@ -478,7 +485,7 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer)
}
uint64_t
SwapLsn(uint64_t lsn) {
SwapLsnFromLittleToBig(uint64_t lsn) {
#ifndef WORDS_BIGENDIAN
/* trans lsn from little endian to big endian in memory
* eg: 0x12345678 ===> 0x78563412
@ -498,6 +505,27 @@ SwapLsn(uint64_t lsn) {
return lsn;
}
uint64_t
SwapLsnFromBigToLittle(uint64_t lsn) {
#ifndef WORDS_BIGENDIAN
/* trans lsn from big endian to little endian in memory
* eg: 0x78563412 ===> 0x12345678
*/
uint32 low, high;
low = (uint32) (lsn);
high = (uint32) ((lsn) >> 32);
low = (low << 16) | (low >> 16);
low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF);
high = (high << 16) | (high >> 16);
high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF);
return ((uint64)(low)) << 32 | (uint64)(high);
#endif
return lsn;
}
Bufrd
GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
{
@ -511,7 +539,7 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
data.mv_size = 0;
data.mv_data = NULL;
key.mv_size = sizeof(WalLdPageKey);
key.mv_size = SizeOfCleanWal;
key.mv_data = wpk;
int waldatalen = 0, roomlen = 2048;
uint32 dbid,relid,forkno,blkno;
@ -538,11 +566,11 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
ereport(PANIC,errmsg("mdb_txn_open failed,error is:%d",co));
}
// ereport(LOG,errmsg("535 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE)) != 0)
{
ereport(LOG, errmsg("mdb_txn_get failed,error is:%d, dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
cg, dbid, relid, forkno, blkno, SwapLsn(wpk->pageLsn), wpk->partition));
cg, dbid, relid, forkno, blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
bufrd.buf = waldata;
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
@ -552,12 +580,12 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
// ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
while (wpk->sk.dbid == dbid
&& wpk->sk.relid == relid
&& wpk->sk.forkno == forkno
&& wpk->sk.blkno == blkno
&& SwapLsn(wpk->pageLsn) < replyLsn)
&& SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn)
{
memcpy(waldata + waldatalen,data.mv_data,data.mv_size);
waldatalen += data.mv_size;
@ -580,7 +608,7 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
wpk = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
}
@ -660,7 +688,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
| (uint32_t)(buf[2] << 16)
| (uint32_t)(buf[3] << 24);
key.mv_size = sizeof(WalLdPageKey);
key.mv_size = SizeOfCleanWal;
wlpk.sk = ks[i].lpk.sk;
if (totallen > 511)
@ -672,7 +700,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = 511;
data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(511);
wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
wlpk.partition = part;
memcpy(xlogContent,buf + (part * 511), 511); //502 = 511 - 9
@ -683,8 +711,8 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0)
{
ereport(PANIC,errmsg("651 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsn(wlpk.pageLsn), wlpk.partition));
ereport(LOG,errmsg("mdb_txn_put big wal failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition));
if (cp == MDB_KEYEXIST)
{
break;
@ -698,7 +726,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = totallen;
data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(totallen);
wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
wlpk.partition = part;
key.mv_data = &wlpk;
memcpy(xlogContent,buf + (part * 511),totallen);
@ -706,7 +734,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0)
{
ereport(LOG,errmsg("673 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d",
ereport(LOG,errmsg("mdb_txn_put last of big wal failed,error is:%d, rel %d, forkno %d, blk %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
if (cp == MDB_KEYEXIST)
{
@ -724,7 +752,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = totallen;
data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(totallen);
wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
wlpk.partition = 0;
key.mv_data = &wlpk;
memcpy(xlogContent,buf,totallen);
@ -732,14 +760,14 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0)
{
ereport(LOG,errmsg("730 mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
ereport(LOG,errmsg("mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno,
SwapLsn(wlpk.pageLsn), wlpk.partition));
SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition));
if (cp == MDB_KEYEXIST)
{
cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET);
if (cp != 0)
ereport(LOG,errmsg(" 737 mdb_txn_get failed,error is:%d, rel %d, forkno %d, blk %d",
ereport(LOG,errmsg(" mdb_txn_get failed when put exist,error is:%d, rel %d, forkno %d, blk %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
continue;
}
@ -797,13 +825,15 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
wlpk.sk.relid = pk.relfileNode.relNode;
wlpk.sk.forkno = pk.forkNo;
wlpk.sk.blkno = pk.blkNo;
wlpk.pageLsn = SwapLsn(pk.pageLsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(pk.pageLsn);
wlpk.partition = 0;
Bufrd waldata = GetWalFromLocalBuffer(&wlpk, pk.replyLsn);
if (waldata.count > 0)
{
bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count);
memcpy(bufrd->buf + 8192, waldata.buf,waldata.count);
wlpk.pageLsn = SwapLsnFromLittleToBig(pk.replyLsn);
SendInvalWal(&wlpk);
}
bufrd->cap = bufrd->count = 8192 + waldata.count;
free(waldata.buf);
@ -811,6 +841,148 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
}
}
void SendInvalWal(WalLdPageKey *walkey) {
int sock_fd;
struct sockaddr_un un;
un.sun_family = AF_UNIX;
strcpy(un.sun_path, socketfile);
sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock_fd < 0)
{
elog(WARNING, "request socket failed");
return;
}
if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
{
elog(WARNING, "connect socket failed");
return;
}
// elog(LOG, "send request: rel %d, fork %d, blk %d, lsn %ld",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
send(sock_fd, walkey, SizeOfCleanWal, 0);
// elog(LOG, "send success");
close(sock_fd);
return;
}
void *CleanWalsInLmdb(void *arg) {
int fd, new_fd;
struct sockaddr_un un;
static char data_buf[SizeOfCleanWal];
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
elog(PANIC, "request cleanwal socket failed");
un.sun_family = AF_UNIX;
unlink(socketfile);
strcpy(un.sun_path, socketfile);
if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
elog(PANIC, "bind cleanwal socket failed");
if (listen(fd, MaxBackends+8) < 0)
elog(PANIC, "listen cleanwal socket failed");
while(1) {
new_fd = accept(fd, NULL, NULL);
if (new_fd < 0)
{
close(fd);
unlink(socketfile);
elog(PANIC, "cannot accept client connect request");
}
memset(data_buf, 0, SizeOfCleanWal);
read(new_fd, data_buf, SizeOfCleanWal);
WalLdPageKey *wpk = (WalLdPageKey *)data_buf;
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
CleanWalsByPage(wpk);
close(new_fd);
}
close(fd);
unlink(socketfile);
}
static void
CleanWalsByPage(WalLdPageKey *walkey)
{
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
MDB_val key, data;
int success = -1;
uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn);
walkey->pageLsn = 0;
key.mv_size = SizeOfCleanWal;
key.mv_data = walkey;
data.mv_size = 0;
data.mv_data = NULL;
uint32 dbid,relid,forkno,blkno;
dbid = walkey->sk.dbid;
relid = walkey->sk.relid;
forkno = walkey->sk.forkno;
blkno = walkey->sk.blkno;
success = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
if (success != 0)
{
elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_open(tmptxn, walDbi,&tmpcursor);
if (success != 0)
{
elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE);
if (success != 0)
{
ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld",
success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn)));
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return;
}
walkey = (WalLdPageKey *)key.mv_data;
// elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
while (walkey->sk.dbid == dbid
&& walkey->sk.relid == relid
&& walkey->sk.forkno == forkno
&& walkey->sk.blkno == blkno
&& SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
{
// elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
success = mdb_cursor_del(tmpcursor,0);
if (success != 0)
elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld",
success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT))
{
break;
}
walkey = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn);
tmpcursor = NULL;
tmptxn = NULL;
return;
}
static void *
RemovePageOrWalFromCurrentNode()
{
@ -1033,8 +1205,7 @@ MovePageFromSecondBufferToLocalBuffer()
data.mv_size = 8192;
data.mv_data = spv->pagecontent;
mp = mdb_put(tmptxn,pageDbi,&key,&data, MDB_NODUPDATA);
mp = mdb_put(tmptxn,pageDbi,&key,&data, 0);
LWLockRelease(partitionLock);
if (mp != 0)
{

View File

@ -5086,7 +5086,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
if (!(IsBootstrapProcessingMode() == true || InitdbSingle == true) && (InRecovery || EnableHotStandby) && set_flag_bits == BM_VALID)
if (!(IsBootstrapProcessingMode() == true || InitdbSingle == true) && (InRecovery || (EnableHotStandby && *isPromoteIsTriggered == false)) && set_flag_bits == BM_VALID)
{
XLogRecPtr pageLsn = BufferGetLSN(buf);
XLogRecPtr replayLsn = GetXLogReplayRecPtr(NULL);

View File

@ -773,7 +773,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
pageKey.relfileNode.relNode = pageTag.rnode.relNode;
pageKey.forkNo = (uint32)pageTag.forkNum;
pageKey.blkNo = pageTag.blockNum;
pageKey.pageLsn = 0;
pageKey.pageLsn = GetXLogPushToDisk();
pageKey.replyLsn = lsn;
odpk.pk = pageKey;
@ -827,7 +827,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
return nbytes;
}
pageKey.pageLsn = PageGetLSN(*buffer);
pageKey.pageLsn = GetXLogPushToDisk();
pageKey.replyLsn = lsn;
@ -842,7 +842,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
wlpk.sk.relid = pageKey.relfileNode.relNode;
wlpk.sk.forkno = pageKey.forkNo;
wlpk.sk.blkno = pageKey.blkNo;
wlpk.pageLsn = SwapLsn(pageKey.pageLsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(pageKey.pageLsn);
wlpk.partition = 0;
result.count = 0;
result = GetWalFromLocalBuffer(&wlpk, lsn);
@ -857,6 +857,8 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
nbytes += result.count;
*buffer = (uint8_t *)realloc(*buffer, BLCKSZ + result.count);
memcpy((*buffer) + BLCKSZ, result.buf, result.count);
wlpk.pageLsn = SwapLsnFromLittleToBig(lsn);
SendInvalWal(&wlpk);
// TODO free result
free_dataRead(result.buf, result.count, result.cap);
}

View File

@ -128,4 +128,7 @@ extern void CloseWalEnv(void);
extern void CreateSecondBufferLWLocks(void);
extern Size SecondBufferLWLockShmemSize(void);
extern Size SecondBufferShmemSize(void);
extern uint64_t SwapLsn(uint64_t lsn);
extern uint64_t SwapLsnFromLittleToBig(uint64_t lsn);
extern uint64_t SwapLsnFromBigToLittle(uint64_t lsn);
extern void SendInvalWal(WalLdPageKey *walkey);
extern void *CleanWalsInLmdb(void *arg);