support clean wals in lmdb

Signed-off-by: shipixian <shipixian_yewu@cmss.chinamobile.com>
This commit is contained in:
shipixian 2023-06-08 17:14:00 +08:00
parent cba0c24fb2
commit 463b479c2f
4 changed files with 206 additions and 22 deletions

View File

@ -8151,6 +8151,14 @@ void StartupXLOG(void)
startupPid = getpid();
if(IsBootstrapProcessingMode() != true && InitdbSingle != true) {
initPthreadPool();
if ((EnableHotStandby && *isPromoteIsTriggered == false && !push_standby))
{
pthread_t ntid;
int err;
err = pthread_create(&ntid, NULL, CleanWalsInLmdb, NULL);
if (err != 0)
elog(PANIC,"pthread_create CleanWalsInLmdb failed %s",strerror(err));
}
}
/*

View File

@ -29,7 +29,11 @@
#include <stdlib.h>
#include <time.h>
#include "storage/bufmgr.h"
#include <sys/socket.h>
#include <sys/un.h>
const char *socketfile = "/tmp/he3cleanwal";
#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8))
typedef struct SingleKeyArray
{
@ -59,6 +63,9 @@ typedef struct DPageKeyArray
/*
secondbufferhash code
*/
static void CleanWalsByPage(WalLdPageKey *walkey);
static HTAB *SecondBufferHash = NULL;
extern bool EnableHotStandby;
DPageKeyArray *DPArray = NULL;
@ -478,7 +485,7 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer)
}
uint64_t
SwapLsn(uint64_t lsn) {
SwapLsnFromLittleToBig(uint64_t lsn) {
#ifndef WORDS_BIGENDIAN
/* trans lsn from little endian to big endian in memory
* eg: 0x12345678 ===> 0x78563412
@ -498,6 +505,27 @@ SwapLsn(uint64_t lsn) {
return lsn;
}
uint64_t
SwapLsnFromBigToLittle(uint64_t lsn) {
#ifndef WORDS_BIGENDIAN
/* trans lsn from big endian to little endian in memory
* eg: 0x78563412 ===> 0x12345678
*/
uint32 low, high;
low = (uint32) (lsn);
high = (uint32) ((lsn) >> 32);
low = (low << 16) | (low >> 16);
low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF);
high = (high << 16) | (high >> 16);
high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF);
return ((uint64)(low)) << 32 | (uint64)(high);
#endif
return lsn;
}
Bufrd
GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
{
@ -511,7 +539,7 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
data.mv_size = 0;
data.mv_data = NULL;
key.mv_size = sizeof(WalLdPageKey);
key.mv_size = SizeOfCleanWal;
key.mv_data = wpk;
int waldatalen = 0, roomlen = 2048;
uint32 dbid,relid,forkno,blkno;
@ -538,11 +566,11 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
ereport(PANIC,errmsg("mdb_txn_open failed,error is:%d",co));
}
// ereport(LOG,errmsg("535 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE)) != 0)
{
ereport(LOG, errmsg("mdb_txn_get failed,error is:%d, dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
cg, dbid, relid, forkno, blkno, SwapLsn(wpk->pageLsn), wpk->partition));
cg, dbid, relid, forkno, blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
bufrd.buf = waldata;
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
@ -552,12 +580,12 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
// ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
while (wpk->sk.dbid == dbid
&& wpk->sk.relid == relid
&& wpk->sk.forkno == forkno
&& wpk->sk.blkno == blkno
&& SwapLsn(wpk->pageLsn) < replyLsn)
&& SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn)
{
memcpy(waldata + waldatalen,data.mv_data,data.mv_size);
waldatalen += data.mv_size;
@ -580,7 +608,7 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
wpk = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition));
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
}
@ -660,7 +688,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
| (uint32_t)(buf[2] << 16)
| (uint32_t)(buf[3] << 24);
key.mv_size = sizeof(WalLdPageKey);
key.mv_size = SizeOfCleanWal;
wlpk.sk = ks[i].lpk.sk;
if (totallen > 511)
@ -672,7 +700,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = 511;
data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(511);
wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
wlpk.partition = part;
memcpy(xlogContent,buf + (part * 511), 511); //502 = 511 - 9
@ -683,8 +711,8 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0)
{
ereport(PANIC,errmsg("651 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsn(wlpk.pageLsn), wlpk.partition));
ereport(LOG,errmsg("mdb_txn_put big wal failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition));
if (cp == MDB_KEYEXIST)
{
break;
@ -698,7 +726,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = totallen;
data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(totallen);
wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
wlpk.partition = part;
key.mv_data = &wlpk;
memcpy(xlogContent,buf + (part * 511),totallen);
@ -706,7 +734,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0)
{
ereport(LOG,errmsg("673 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d",
ereport(LOG,errmsg("mdb_txn_put last of big wal failed,error is:%d, rel %d, forkno %d, blk %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
if (cp == MDB_KEYEXIST)
{
@ -724,7 +752,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
data.mv_size = totallen;
data.mv_data = NULL;
xlogContent = (uint8_t *)malloc(totallen);
wlpk.pageLsn = SwapLsn(ks[i].lsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn);
wlpk.partition = 0;
key.mv_data = &wlpk;
memcpy(xlogContent,buf,totallen);
@ -732,14 +760,14 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA);
if (cp != 0)
{
ereport(LOG,errmsg("730 mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
ereport(LOG,errmsg("mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d",
cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno,
SwapLsn(wlpk.pageLsn), wlpk.partition));
SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition));
if (cp == MDB_KEYEXIST)
{
cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET);
if (cp != 0)
ereport(LOG,errmsg(" 737 mdb_txn_get failed,error is:%d, rel %d, forkno %d, blk %d",
ereport(LOG,errmsg(" mdb_txn_get failed when put exist,error is:%d, rel %d, forkno %d, blk %d",
cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno));
continue;
}
@ -797,13 +825,15 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
wlpk.sk.relid = pk.relfileNode.relNode;
wlpk.sk.forkno = pk.forkNo;
wlpk.sk.blkno = pk.blkNo;
wlpk.pageLsn = SwapLsn(pk.pageLsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(pk.pageLsn);
wlpk.partition = 0;
Bufrd waldata = GetWalFromLocalBuffer(&wlpk, pk.replyLsn);
if (waldata.count > 0)
{
bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count);
memcpy(bufrd->buf + 8192, waldata.buf,waldata.count);
wlpk.pageLsn = SwapLsnFromLittleToBig(pk.replyLsn);
SendInvalWal(&wlpk);
}
bufrd->cap = bufrd->count = 8192 + waldata.count;
free(waldata.buf);
@ -811,6 +841,148 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
}
}
void SendInvalWal(WalLdPageKey *walkey) {
int sock_fd;
struct sockaddr_un un;
un.sun_family = AF_UNIX;
strcpy(un.sun_path, socketfile);
sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock_fd < 0)
{
elog(WARNING, "request socket failed");
return;
}
if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
{
elog(WARNING, "connect socket failed");
return;
}
// elog(LOG, "send request: rel %d, fork %d, blk %d, lsn %ld",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
send(sock_fd, walkey, SizeOfCleanWal, 0);
// elog(LOG, "send success");
close(sock_fd);
return;
}
void *CleanWalsInLmdb(void *arg) {
int fd, new_fd;
struct sockaddr_un un;
static char data_buf[SizeOfCleanWal];
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
elog(PANIC, "request cleanwal socket failed");
un.sun_family = AF_UNIX;
unlink(socketfile);
strcpy(un.sun_path, socketfile);
if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
elog(PANIC, "bind cleanwal socket failed");
if (listen(fd, MaxBackends+8) < 0)
elog(PANIC, "listen cleanwal socket failed");
while(1) {
new_fd = accept(fd, NULL, NULL);
if (new_fd < 0)
{
close(fd);
unlink(socketfile);
elog(PANIC, "cannot accept client connect request");
}
memset(data_buf, 0, SizeOfCleanWal);
read(new_fd, data_buf, SizeOfCleanWal);
WalLdPageKey *wpk = (WalLdPageKey *)data_buf;
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
CleanWalsByPage(wpk);
close(new_fd);
}
close(fd);
unlink(socketfile);
}
static void
CleanWalsByPage(WalLdPageKey *walkey)
{
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
MDB_val key, data;
int success = -1;
uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn);
walkey->pageLsn = 0;
key.mv_size = SizeOfCleanWal;
key.mv_data = walkey;
data.mv_size = 0;
data.mv_data = NULL;
uint32 dbid,relid,forkno,blkno;
dbid = walkey->sk.dbid;
relid = walkey->sk.relid;
forkno = walkey->sk.forkno;
blkno = walkey->sk.blkno;
success = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
if (success != 0)
{
elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_open(tmptxn, walDbi,&tmpcursor);
if (success != 0)
{
elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE);
if (success != 0)
{
ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld",
success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn)));
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return;
}
walkey = (WalLdPageKey *)key.mv_data;
// elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
while (walkey->sk.dbid == dbid
&& walkey->sk.relid == relid
&& walkey->sk.forkno == forkno
&& walkey->sk.blkno == blkno
&& SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
{
// elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
success = mdb_cursor_del(tmpcursor,0);
if (success != 0)
elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld",
success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT))
{
break;
}
walkey = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn);
tmpcursor = NULL;
tmptxn = NULL;
return;
}
static void *
RemovePageOrWalFromCurrentNode()
{
@ -1033,8 +1205,7 @@ MovePageFromSecondBufferToLocalBuffer()
data.mv_size = 8192;
data.mv_data = spv->pagecontent;
mp = mdb_put(tmptxn,pageDbi,&key,&data, MDB_NODUPDATA);
mp = mdb_put(tmptxn,pageDbi,&key,&data, 0);
LWLockRelease(partitionLock);
if (mp != 0)
{

View File

@ -842,7 +842,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
wlpk.sk.relid = pageKey.relfileNode.relNode;
wlpk.sk.forkno = pageKey.forkNo;
wlpk.sk.blkno = pageKey.blkNo;
wlpk.pageLsn = SwapLsn(pageKey.pageLsn);
wlpk.pageLsn = SwapLsnFromLittleToBig(pageKey.pageLsn);
wlpk.partition = 0;
result.count = 0;
result = GetWalFromLocalBuffer(&wlpk, lsn);
@ -857,6 +857,8 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
nbytes += result.count;
*buffer = (uint8_t *)realloc(*buffer, BLCKSZ + result.count);
memcpy((*buffer) + BLCKSZ, result.buf, result.count);
wlpk.pageLsn = SwapLsnFromLittleToBig(lsn);
SendInvalWal(&wlpk);
// TODO free result
free_dataRead(result.buf, result.count, result.cap);
}

View File

@ -128,4 +128,7 @@ extern void CloseWalEnv(void);
extern void CreateSecondBufferLWLocks(void);
extern Size SecondBufferLWLockShmemSize(void);
extern Size SecondBufferShmemSize(void);
extern uint64_t SwapLsn(uint64_t lsn);
extern uint64_t SwapLsnFromLittleToBig(uint64_t lsn);
extern uint64_t SwapLsnFromBigToLittle(uint64_t lsn);
extern void SendInvalWal(WalLdPageKey *walkey);
extern void *CleanWalsInLmdb(void *arg);