remove wals and pages in localbuffer when execute drop and truncate

This commit is contained in:
zoujia 2023-06-16 16:09:39 +08:00
parent c46bf1fede
commit c22376f81d
5 changed files with 327 additions and 27 deletions

View File

@ -4081,6 +4081,8 @@ PostmasterStateMachine(void)
* anything here, since the UnlinkLockFiles proc_exit callback * anything here, since the UnlinkLockFiles proc_exit callback
* will do so, and that should be the last user-visible action. * will do so, and that should be the last user-visible action.
*/ */
ClosePageDBEnv();
CloseWalEnv();
ExitPostmaster(0); ExitPostmaster(0);
} }
} }

View File

@ -33,7 +33,9 @@
#include <sys/un.h> #include <sys/un.h>
const char *socketfile = "/tmp/he3cleanwal"; const char *socketfile = "/tmp/he3cleanwal";
#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8)) const char *p_socketfile = "/tmp/he3cleanpage";
#define SizeOfCleanWal (offsetof(WalLdPageKeyWithFlag, flag) + sizeof(uint8))
#define SizeOfCleanPage 16
typedef struct SingleKeyArray typedef struct SingleKeyArray
{ {
@ -63,8 +65,9 @@ typedef struct DPageKeyArray
/* /*
secondbufferhash code secondbufferhash code
*/ */
static void IsDirExist(const char* path);
static void CleanWalsByPage(WalLdPageKey *walkey); static void CleanWalsByPage(WalLdPageKey *walkey);
static void CleanWalsByTable(WalLdPageKey *walkey);
static HTAB *SecondBufferHash = NULL; static HTAB *SecondBufferHash = NULL;
extern bool EnableHotStandby; extern bool EnableHotStandby;
@ -95,7 +98,6 @@ SecondBufferShmemSize(void)
{ {
Size size; Size size;
size = mul_size(SNBuffers,BLKSZ); size = mul_size(SNBuffers,BLKSZ);
return size; return size;
} }
@ -230,7 +232,10 @@ InitSecondBufferHash(void)
void void
InitPageDBEnv() InitPageDBEnv()
{ {
pg_mkdir_p(lmdb_page_directory,0777); if !IsDirExist(lmdb_page_directory)
{
pg_mkdir_p(lmdb_page_directory,0777);
}
mdb_env_create(&pageEnv); mdb_env_create(&pageEnv);
mdb_env_set_maxreaders(pageEnv,MAXREADERS); mdb_env_set_maxreaders(pageEnv,MAXREADERS);
mdb_env_set_mapsize(pageEnv,MAPSIE); mdb_env_set_mapsize(pageEnv,MAPSIE);
@ -239,9 +244,13 @@ InitPageDBEnv()
mdb_dbi_open(pageTxn,NULL,MDB_CREATE,&pageDbi); mdb_dbi_open(pageTxn,NULL,MDB_CREATE,&pageDbi);
mdb_txn_commit(pageTxn); mdb_txn_commit(pageTxn);
} }
void InitWalDBEnv() void InitWalDBEnv()
{ {
pg_mkdir_p(lmdb_wal_directory,0777); if !IsDirExist(lmdb_wal_directory)
{
pg_mkdir_p(lmdb_wal_directory,0777);
}
mdb_env_create(&walEnv); mdb_env_create(&walEnv);
mdb_env_set_maxreaders(walEnv,MAXREADERS); mdb_env_set_maxreaders(walEnv,MAXREADERS);
mdb_env_set_mapsize(walEnv,MAPSIE); mdb_env_set_mapsize(walEnv,MAPSIE);
@ -252,7 +261,7 @@ void InitWalDBEnv()
} }
void ClosePageDBEnv() void ClosePageDBEnv()
{ {
mdb_dbi_close(pageEnv, pageDbi); mdb_dbi_close(pageEnv, pageDbi);
mdb_env_close(pageEnv); mdb_env_close(pageEnv);
} }
@ -456,8 +465,7 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer)
{ {
return NULL; return NULL;
} }
MDB_txn *tmptxn; MDB_txn *tmptxn;
LdPageKey *lk; LdPageKey *lk;
lk = (LdPageKey *)malloc(sizeof(LdPageKey)); lk = (LdPageKey *)malloc(sizeof(LdPageKey));
@ -866,6 +874,31 @@ void SendInvalWal(WalLdPageKey *walkey) {
return; return;
} }
void SendInvalPage(LdPageKey *ldKey) {
int sock_fd;
struct sockaddr_un un;
un.sun_family = AF_UNIX;
strcpy(un.sun_path, p_socketfile);
sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock_fd < 0)
{
elog(WARNING, "request socket failed");
return;
}
if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
{
elog(WARNING, "connect socket failed");
return;
}
// elog(LOG, "send request: rel %d, fork %d, blk %d, lsn %ld",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
send(sock_fd, ldKey, SizeOfCleanPage, 0);
// elog(LOG, "send success");
close(sock_fd);
return;
}
void *CleanWalsInLmdb(void *arg) { void *CleanWalsInLmdb(void *arg) {
int fd, new_fd; int fd, new_fd;
struct sockaddr_un un; struct sockaddr_un un;
@ -894,12 +927,67 @@ void *CleanWalsInLmdb(void *arg) {
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld", // elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn)); // wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
CleanWalsByPage(wpk); if (wpk->partition == 0)
{
CleanWalsByPage(wpk);
}
else
{
CleanWalsByTable(wpk);
}
close(new_fd); close(new_fd);
} }
close(fd); close(fd);
unlink(socketfile); unlink(socketfile);
}
void *CleanPagesInLmdb(void *arg) {
int fd, new_fd;
struct sockaddr_un un;
static char data_buf[SizeOfCleanPage];
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
elog(PANIC, "request cleanwal socket failed");
un.sun_family = AF_UNIX;
unlink(socketfile);
strcpy(un.sun_path, socketfile);
if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
elog(PANIC, "bind cleanwal socket failed");
if (listen(fd, MaxBackends+8) < 0)
elog(PANIC, "listen cleanwal socket failed");
while(1) {
new_fd = accept(fd, NULL, NULL);
if (new_fd < 0)
{
close(fd);
unlink(socketfile);
elog(PANIC, "cannot accept client connect request");
}
memset(data_buf, 0, SizeOfCleanPage);
read(new_fd, data_buf, SizeOfCleanPage);
LdPageKey *lpk = (LdPageKey *)data_buf;
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
// if (wpk->partition == 0)
// {
// CleanWalsByPage(wpk);
// }
// else
// {
// CleanWalsByTable(wpk);
// }
CleanPagesByTable(lpk);
close(new_fd);
}
close(fd);
unlink(socketfile);
}
static int
IsDirExist(const char* path)
{
return !access(path,F_OK);
} }
static void static void
@ -982,6 +1070,162 @@ CleanWalsByPage(WalLdPageKey *walkey)
return; return;
} }
static void
CleanWalsByTable(WalLdPageKey *walkey)
{
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
MDB_val key, data;
int success = -1;
uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn);
walkey->pageLsn = 0;
walkey->partition = 0;
key.mv_size = SizeOfCleanWal;
key.mv_data = walkey;
data.mv_size = 0;
data.mv_data = NULL;
uint32 dbid,relid,forkno,blkno;
dbid = walkey->sk.dbid;
relid = walkey->sk.relid;
forkno = walkey->sk.forkno;
blkno = walkey->sk.blkno;
success = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
if (success != 0)
{
elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_open(tmptxn, walDbi,&tmpcursor);
if (success != 0)
{
elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE);
if (success != 0)
{
ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld",
success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn)));
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return;
}
walkey = (WalLdPageKey *)key.mv_data;
// elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
while (walkey->sk.dbid == dbid
&& walkey->sk.relid == relid
&& walkey->sk.forkno == forkno
// && walkey->sk.blkno == blkno
&& SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
{
// elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
success = mdb_cursor_del(tmpcursor,0);
if (success != 0)
elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld",
success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT))
{
break;
}
walkey = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn);
tmpcursor = NULL;
tmptxn = NULL;
return;
}
static void
CleanPagesByTable(LdPageKey *ldKey)
{
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
MDB_val key, data;
int success = -1;
key.mv_size = SizeOfCleanPage;
key.mv_data = sdKey;
data.mv_size = 0;
data.mv_data = NULL;
uint32 dbid,relid,forkno,blkno;
dbid = ldKey->sk.dbid;
relid = sdKey->sk.relid;
forkno = sdKey->sk.forkno;
blkno = sdKey->sk.blkno;
success = mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
// mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
if (success != 0)
{
elog(LOG, "mdb_txn_begin failed when clean pages, err %d", success);
return;
}
success = mdb_cursor_open(tmptxn, pageDbi,&tmpcursor);
if (success != 0)
{
elog(LOG, "mdb_cursor_open failed when clean pages, err %d", success);
return;
}
success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE);
if (success != 0)
{
ereport(LOG, errmsg("mdb_cursor_get failed when clean pages, err %d, rel %d, fork %d, blk %d",
success, relid, forkno, blkno)));
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return;
}
ldKey = (LdPageKey *)key.mv_data;
// elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
while (walkey->sk.dbid == dbid
&& walkey->sk.relid == relid
&& walkey->sk.forkno == forkno)
{
// elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
success = mdb_cursor_del(tmpcursor,0);
if (success != 0)
elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld",
success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT))
{
break;
}
walkey = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn);
tmpcursor = NULL;
tmptxn = NULL;
return;
}
static void * static void *
RemovePageOrWalFromCurrentNode() RemovePageOrWalFromCurrentNode()
@ -1363,6 +1607,11 @@ SecondBufferMain(void)
*/ */
PG_SETMASK(&UnBlockSig); PG_SETMASK(&UnBlockSig);
pthread_t ntid;
int err;
err = pthread_create(&ntid, NULL, CleanPagesInLmdb, NULL);
if (err != 0)
elog(PANIC,"pthread_create CleanPagesInLmdb failed %s",strerror(err));
MovePageFromSecondBufferToLocalBuffer(); MovePageFromSecondBufferToLocalBuffer();
} }

View File

@ -275,14 +275,30 @@ void mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
*/ */
void mdunlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo) void mdunlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
{ {
SdPageKey sdKey;
/* Now do the per-fork work */ /* Now do the per-fork work */
if (forkNum == InvalidForkNumber) if (forkNum == InvalidForkNumber)
{ {
for (forkNum = 0; forkNum <= MAX_FORKNUM; forkNum++) for (forkNum = 0; forkNum <= MAX_FORKNUM; forkNum++)
{
mdunlinkfork(rnode, forkNum, isRedo); mdunlinkfork(rnode, forkNum, isRedo);
sdKey.dbid = rnode.node.dbNode;
sdKey.relid = rnode.node.relNode;
sdKey.forkno = forkNum;
sdKey.blkno = 0;
SendInvalPage(sdKey);
}
} }
else else
{
mdunlinkfork(rnode, forkNum, isRedo); mdunlinkfork(rnode, forkNum, isRedo);
sdKey.dbid = rnode.node.dbNode;
sdKey.relid = rnode.node.relNode;
sdKey.forkno = forkNum;
sdKey.blkno = 0;
SendInvalPage(sdKey);
}
} }
/* /*

View File

@ -392,8 +392,10 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
int i = 0; int i = 0;
RelFileNodeBackend *rnodes; RelFileNodeBackend *rnodes;
ForkNumber forknum; ForkNumber forknum;
OriginDPageKey odpk; // OriginDPageKey odpk;
PageKey pk; // PageKey pk;
LdPageKey lpk;
if (nrels == 0) if (nrels == 0)
@ -450,17 +452,23 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
int which = rels[i]->smgr_which; int which = rels[i]->smgr_which;
for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
smgrsw[which].smgr_unlink(rnodes[i], forknum, isRedo); smgrsw[which].smgr_unlink(rnodes[i], forknum, isRedo);
lpk.sk.dbid = rnodes[i].node.dbNode;
lpk.sk.relid = rnodes[i].node.relNode;
lpk.sk.forkno = forknum;
lpk.sk.blkno = 0;
SendInvalPage(&lpk);
}
//remove unused pages and related wals in localdisk cache. //remove unused pages and related wals in localdisk cache.
// RemoveBufferFromLocal(rnodes[i].node.dbNode, rnodes[i].node.relNode, MAX_FORKNUM, 0); // RemoveBufferFromLocal(rnodes[i].node.dbNode, rnodes[i].node.relNode, MAX_FORKNUM, 0);
pk.relfileNode.dbNode = rnodes[i].node.dbNode; // pk.relfileNode.dbNode = rnodes[i].node.dbNode;
pk.relfileNode.relNode = rnodes[i].node.relNode; // pk.relfileNode.relNode = rnodes[i].node.relNode;
pk.forkNo = MAX_FORKNUM; // pk.forkNo = MAX_FORKNUM;
odpk.pk = pk; // odpk.pk = pk;
odpk.opration = DROP; // odpk.opration = DROP;
AddOneItemToDPArray(odpk); // AddOneItemToDPArray(odpk);
} }
pfree(rnodes); pfree(rnodes);
@ -770,8 +778,10 @@ void
smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nblocks,XLogRecPtr lsn) smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nblocks,XLogRecPtr lsn)
{ {
int i; int i;
PageKey pk; // PageKey pk;
OriginDPageKey odpk; // OriginDPageKey odpk;
WalLdPageKey walkey;
LdPageKey ldKey;
/* /*
* Get rid of any buffers for the about-to-be-deleted blocks. bufmgr will * Get rid of any buffers for the about-to-be-deleted blocks. bufmgr will
* just drop them without bothering to write the contents. * just drop them without bothering to write the contents.
@ -811,13 +821,28 @@ smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber
//remove unused pages and related wals in localdisk cache. //remove unused pages and related wals in localdisk cache.
// RemoveBufferFromLocal(reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, forknum[i], nblocks[i]); // RemoveBufferFromLocal(reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, forknum[i], nblocks[i]);
pk.relfileNode.dbNode = reln->smgr_rnode.node.dbNode; // pk.relfileNode.dbNode = reln->smgr_rnode.node.dbNode;
pk.relfileNode.relNode = reln->smgr_rnode.node.relNode; // pk.relfileNode.relNode = reln->smgr_rnode.node.relNode;
pk.forkNo = forknum[i]; // pk.forkNo = forknum[i];
pk.blkNo = nblocks[i]; // pk.blkNo = 0;//nblocks[i];
odpk.pk = pk; // odpk.pk = pk;
odpk.opration = TRUNCATE; // odpk.opration = TRUNCATE;
AddOneItemToDPArray(odpk); // AddOneItemToDPArray(odpk);
walkey.sk.dbid = reln->smgr_rnode.node.dbNode;
walkey.sk.relid = reln->smgr_rnode.node.relNode;
walkey.sk.forkno = forknum[i];
walkey.sk.blkno = nblocks[i];
walkey.pageLsn = SwapLsnFromLittleToBig(lsn);
walkey.partition = 1;
SendInvalWal(&walkey);
ldKey.sk.dbid = reln->smgr_rnode.node.dbNode;
ldKey.sk.relid = reln->smgr_rnode.node.relNode;
ldKey.sk.forkno = forknum[i];
ldKey.sk.blkno = nblocks[i];
SendInvalPage(&ldKey);
/* /*
* We might as well update the local smgr_cached_nblocks values. The * We might as well update the local smgr_cached_nblocks values. The
* smgr cache inval message that this function sent will cause other * smgr cache inval message that this function sent will cause other

View File

@ -62,6 +62,12 @@ typedef struct WalLdPageKey
uint8 partition; uint8 partition;
} WalLdPageKey; } WalLdPageKey;
typedef struct WalLdPageKeyWithFlag
{
WalLdPageKey wpk;
uint8 flag;
}
typedef struct OriginDPageKey typedef struct OriginDPageKey
{ {
PageKey pk; PageKey pk;
@ -131,4 +137,6 @@ extern Size SecondBufferShmemSize(void);
extern uint64_t SwapLsnFromLittleToBig(uint64_t lsn); extern uint64_t SwapLsnFromLittleToBig(uint64_t lsn);
extern uint64_t SwapLsnFromBigToLittle(uint64_t lsn); extern uint64_t SwapLsnFromBigToLittle(uint64_t lsn);
extern void SendInvalWal(WalLdPageKey *walkey); extern void SendInvalWal(WalLdPageKey *walkey);
extern void SendInvalPage(SdPageKey *sdKey);
extern void *CleanWalsInLmdb(void *arg); extern void *CleanWalsInLmdb(void *arg);
extern void *CleanPagesInLmdb(void *arg);