!177 truncate/drop时,清理本地盘脏页

Merge pull request !177 from zoujia_yewu/dev_performance
This commit is contained in:
zoujia_yewu 2023-06-30 08:29:55 +00:00 committed by Gitee
commit cbb67ff220
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 326 additions and 40 deletions

View File

@ -36,6 +36,7 @@
#include "utils/guc.h"
#include "utils/inval.h"
#include "postmaster/bgwriter.h"
#include "postmaster/secondbuffer.h"
/* GUC variables */
int wal_skip_threshold = 2048; /* in kilobytes */
@ -279,6 +280,7 @@ RelationPreserveStorage(RelFileNode rnode, bool atCommit)
void
RelationTruncate(Relation rel, BlockNumber nblocks)
{
int i = 0;
bool fsm;
bool vm;
bool need_fsm_vacuum = false;
@ -287,6 +289,8 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
int nforks = 0;
bool disable_cancel_query = false;
LdPageKey ldKey;
/* Open it at the smgr level if not already done */
RelationOpenSmgr(rel);
@ -400,6 +404,16 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
*/
CacheInvalidateSmgr(rel->rd_smgr->smgr_rnode);
smgrtruncatelsn(rel->rd_smgr, forks, nforks, blocks,lsn);
for ( i = 0; i < nforks; i ++)
{
ldKey.sk.dbid = rel->rd_smgr->smgr_rnode.node.dbNode;
ldKey.sk.relid = rel->rd_smgr->smgr_rnode.node.relNode;
ldKey.sk.forkno = forks[i];
ldKey.sk.blkno = blocks[i];
SendInvalPage(&ldKey);
}
} else {
smgrtruncate(rel->rd_smgr, forks, nforks, blocks);
}
@ -971,6 +985,7 @@ smgr_redo(XLogReaderState *record)
}
else if (info == XLOG_SMGR_TRUNCATE)
{
int i = 0;
xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
SMgrRelation reln;
Relation rel;
@ -979,6 +994,9 @@ smgr_redo(XLogReaderState *record)
int nforks = 0;
bool need_fsm_vacuum = false;
WalLdPageKey walkey;
LdPageKey ldKey;
reln = smgropen(xlrec->rnode, InvalidBackendId);
/*
@ -1067,6 +1085,25 @@ smgr_redo(XLogReaderState *record)
/* Do the real work to truncate relation forks */
if (nforks > 0 && (!EnableHotStandby || *isPromoteIsTriggered || !he3share))
smgrtruncatelsn(reln, forks, nforks, blocks, record->ReadRecPtr);
if (EnableHotStandby && !push_standby)
{
for (i = 0; i < nforks; i ++)
{
ldKey.sk.dbid = reln->smgr_rnode.node.dbNode;
ldKey.sk.relid = reln->smgr_rnode.node.relNode;
ldKey.sk.forkno = forks[i];
ldKey.sk.blkno = blocks[i];
SendInvalPage(&ldKey);
walkey.sk.dbid = reln->smgr_rnode.node.dbNode;
walkey.sk.relid = reln->smgr_rnode.node.relNode;
walkey.sk.forkno = forks[i];
walkey.sk.blkno = blocks[i];
walkey.pageLsn = SwapLsnFromLittleToBig(record->ReadRecPtr);
walkey.partition = 1;
SendInvalWal(&walkey);
}
}
/*
* Update upper-level FSM pages to account for the truncation. This is

View File

@ -5217,6 +5217,11 @@ SubPostmasterMain(int argc, char *argv[])
static void
ExitPostmaster(int status)
{
ClosePageDBEnv();
CloseWalDBEnv();
#ifdef HAVE_PTHREAD_IS_THREADED_NP
/*

View File

@ -33,7 +33,9 @@
#include <sys/un.h>
const char *socketfile = "/tmp/he3cleanwal";
const char *p_socketfile = "/tmp/he3cleanpage";
#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8))
#define SizeOfCleanPage 16
typedef struct SingleKeyArray
{
@ -63,8 +65,10 @@ typedef struct DPageKeyArray
/*
secondbufferhash code
*/
static int IsDirExist(const char* path);
static void CleanWalsByPage(WalLdPageKey *walkey);
static void CleanWalsByTable(WalLdPageKey *walkey);
static void CleanPagesByTable(LdPageKey *lpk);
static HTAB *SecondBufferHash = NULL;
extern bool EnableHotStandby;
@ -95,7 +99,6 @@ SecondBufferShmemSize(void)
{
Size size;
size = mul_size(SNBuffers,BLKSZ);
return size;
}
@ -230,7 +233,10 @@ InitSecondBufferHash(void)
void
InitPageDBEnv()
{
pg_mkdir_p(lmdb_page_directory,0777);
if (!IsDirExist(lmdb_page_directory))
{
pg_mkdir_p(lmdb_page_directory,0777);
}
mdb_env_create(&pageEnv);
mdb_env_set_maxreaders(pageEnv,MAXREADERS);
mdb_env_set_mapsize(pageEnv,MAPSIE);
@ -239,9 +245,13 @@ InitPageDBEnv()
mdb_dbi_open(pageTxn,NULL,MDB_CREATE,&pageDbi);
mdb_txn_commit(pageTxn);
}
void InitWalDBEnv()
{
pg_mkdir_p(lmdb_wal_directory,0777);
if (!IsDirExist(lmdb_wal_directory))
{
pg_mkdir_p(lmdb_wal_directory,0777);
}
mdb_env_create(&walEnv);
mdb_env_set_maxreaders(walEnv,MAXREADERS);
mdb_env_set_mapsize(walEnv,MAPSIE);
@ -252,14 +262,16 @@ void InitWalDBEnv()
}
void ClosePageDBEnv()
{
mdb_dbi_close(pageEnv, pageDbi);
mdb_dbi_close(pageEnv, pageDbi);
mdb_env_close(pageEnv);
ereport(LOG,errmsg("close page success"));
}
void CloseWalEnv()
void CloseWalDBEnv()
{
mdb_dbi_close(pageEnv, pageDbi);
mdb_env_close(pageEnv);
mdb_dbi_close(walEnv, walDbi);
mdb_env_close(walEnv);
ereport(LOG,errmsg("close wal success"));
}
static void
@ -457,8 +469,7 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer)
{
return NULL;
}
MDB_txn *tmptxn;
LdPageKey *lk;
lk = (LdPageKey *)malloc(sizeof(LdPageKey));
@ -867,6 +878,31 @@ void SendInvalWal(WalLdPageKey *walkey) {
return;
}
void SendInvalPage(LdPageKey *ldKey) {
int sock_fd;
struct sockaddr_un un;
un.sun_family = AF_UNIX;
strcpy(un.sun_path, p_socketfile);
sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock_fd < 0)
{
elog(WARNING, "request socket failed");
return;
}
if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
{
elog(WARNING, "connect socket failed");
return;
}
// elog(LOG, "send request: rel %d, fork %d, blk %d, lsn %ld",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
send(sock_fd, ldKey, SizeOfCleanPage, 0);
// elog(LOG, "send success");
close(sock_fd);
return;
}
void *CleanWalsInLmdb(void *arg) {
int fd, new_fd;
struct sockaddr_un un;
@ -895,12 +931,67 @@ void *CleanWalsInLmdb(void *arg) {
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
CleanWalsByPage(wpk);
if (wpk->partition == 0)
{
CleanWalsByPage(wpk);
}
else
{
CleanWalsByTable(wpk);
}
close(new_fd);
}
close(fd);
unlink(socketfile);
}
void *CleanPagesInLmdb(void *arg) {
int fd, new_fd,size;
struct sockaddr_un un;
static char data_buf[SizeOfCleanPage];
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
elog(PANIC, "request cleanwal socket failed");
un.sun_family = AF_UNIX;
unlink(p_socketfile);
strcpy(un.sun_path, p_socketfile);
if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
elog(PANIC, "bind cleanwal socket failed");
if (listen(fd, MaxBackends+8) < 0)
elog(PANIC, "listen cleanwal socket failed");
while(1) {
new_fd = accept(fd, NULL, NULL);
if (new_fd < 0)
{
close(fd);
unlink(p_socketfile);
elog(PANIC, "cannot accept client connect request");
}
memset(data_buf, 0, SizeOfCleanPage);
size = read(new_fd, data_buf, SizeOfCleanPage);
LdPageKey *lpk = (LdPageKey *)data_buf;
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
// if (wpk->partition == 0)
// {
// CleanWalsByPage(wpk);
// }
// else
// {
// CleanWalsByTable(wpk);
// }
CleanPagesByTable(lpk);
close(new_fd);
}
close(fd);
unlink(socketfile);
}
static int
IsDirExist(const char* path)
{
return !access(path,F_OK);
}
static void
@ -983,6 +1074,161 @@ CleanWalsByPage(WalLdPageKey *walkey)
return;
}
static void
CleanWalsByTable(WalLdPageKey *walkey)
{
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
MDB_val key, data;
int success = -1;
uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn);
walkey->pageLsn = 0;
walkey->partition = 0;
key.mv_size = SizeOfCleanWal;
key.mv_data = walkey;
data.mv_size = 0;
data.mv_data = NULL;
uint32 dbid,relid,forkno,blkno;
dbid = walkey->sk.dbid;
relid = walkey->sk.relid;
forkno = walkey->sk.forkno;
blkno = walkey->sk.blkno;
success = mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
if (success != 0)
{
elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_open(tmptxn, walDbi,&tmpcursor);
if (success != 0)
{
elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success);
return;
}
success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE);
if (success != 0)
{
ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld",
success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn)));
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return;
}
walkey = (WalLdPageKey *)key.mv_data;
// elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
while (walkey->sk.dbid == dbid
&& walkey->sk.relid == relid
&& walkey->sk.forkno == forkno
// && walkey->sk.blkno == blkno
&& SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
{
// elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
success = mdb_cursor_del(tmpcursor,0);
if (success != 0)
elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld",
success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT))
{
break;
}
walkey = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn);
tmpcursor = NULL;
tmptxn = NULL;
return;
}
static void
CleanPagesByTable(LdPageKey *ldKey)
{
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
MDB_val key, data;
int success = -1;
key.mv_size = SizeOfCleanPage;
key.mv_data = ldKey;
data.mv_size = 0;
data.mv_data = NULL;
uint32 dbid,relid,forkno,blkno;
dbid = ldKey->sk.dbid;
relid = ldKey->sk.relid;
forkno = ldKey->sk.forkno;
blkno = ldKey->sk.blkno;
success = mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
// mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
if (success != 0)
{
elog(LOG, "mdb_txn_begin failed when clean pages, err %d", success);
return;
}
success = mdb_cursor_open(tmptxn, pageDbi,&tmpcursor);
if (success != 0)
{
elog(LOG, "mdb_cursor_open failed when clean pages, err %d", success);
return;
}
success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE);
if (success != 0)
{
elog(LOG, "mdb_cursor_get failed when clean pages, err %d, rel %d, fork %d, blk %d",
success, relid, forkno, blkno);
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return;
}
ldKey = (LdPageKey *)key.mv_data;
// elog(LOG, "get page db %d, rel %d, fork %d, blk %d",
// ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);
while (ldKey->sk.dbid == dbid
&& ldKey->sk.relid == relid
&& ldKey->sk.forkno == forkno)
{
elog(LOG, "del page dbid %d, rel %d, fork %d, blk %d",
ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);
success = mdb_cursor_del(tmpcursor,0);
if (success != 0)
elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d",
success, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);
if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT))
{
break;
}
ldKey = (LdPageKey *)key.mv_data;
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn);
tmpcursor = NULL;
tmptxn = NULL;
return;
}
static void *
RemovePageOrWalFromCurrentNode()
@ -1114,6 +1360,7 @@ RemovePageOrWalFromCurrentNode()
static void
MovePageFromSecondBufferToLocalBuffer()
{
printf("MovePageFromSecondBufferToLocalBuffer\n");
MDB_txn *tmptxn;
MDB_txn *txn = NULL;
MDB_dbi dbi;
@ -1380,6 +1627,11 @@ SecondBufferMain(void)
*/
PG_SETMASK(&UnBlockSig);
pthread_t ntid;
int err;
err = pthread_create(&ntid, NULL, CleanPagesInLmdb, NULL);
if (err != 0)
elog(PANIC,"pthread_create CleanPagesInLmdb failed %s",strerror(err));
MovePageFromSecondBufferToLocalBuffer();
}

View File

@ -275,14 +275,30 @@ void mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
*/
void mdunlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
{
LdPageKey ldKey;
/* Now do the per-fork work */
if (forkNum == InvalidForkNumber)
{
for (forkNum = 0; forkNum <= MAX_FORKNUM; forkNum++)
{
mdunlinkfork(rnode, forkNum, isRedo);
ldKey.sk.dbid = rnode.node.dbNode;
ldKey.sk.relid = rnode.node.relNode;
ldKey.sk.forkno = forkNum;
ldKey.sk.blkno = 0;
SendInvalPage(&ldKey);
}
}
else
{
mdunlinkfork(rnode, forkNum, isRedo);
ldKey.sk.dbid = rnode.node.dbNode;
ldKey.sk.relid = rnode.node.relNode;
ldKey.sk.forkno = forkNum;
ldKey.sk.blkno = 0;
SendInvalPage(&ldKey);
}
}
/*

View File

@ -393,9 +393,6 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
int i = 0;
RelFileNodeBackend *rnodes;
ForkNumber forknum;
OriginDPageKey odpk;
PageKey pk;
if (nrels == 0)
return;
@ -451,17 +448,7 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
int which = rels[i]->smgr_which;
for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
smgrsw[which].smgr_unlink(rnodes[i], forknum, isRedo);
//remove unused pages and related wals in localdisk cache.
// RemoveBufferFromLocal(rnodes[i].node.dbNode, rnodes[i].node.relNode, MAX_FORKNUM, 0);
pk.relfileNode.dbNode = rnodes[i].node.dbNode;
pk.relfileNode.relNode = rnodes[i].node.relNode;
pk.forkNo = MAX_FORKNUM;
odpk.pk = pk;
odpk.opration = DROP;
AddOneItemToDPArray(odpk);
smgrsw[which].smgr_unlink(rnodes[i], forknum, isRedo);
}
pfree(rnodes);
@ -771,9 +758,6 @@ void
smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nblocks,XLogRecPtr lsn)
{
int i;
PageKey pk;
OriginDPageKey odpk;
//push to truncate
bool flag = false;
/* Do the truncation */
@ -799,17 +783,7 @@ smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber
flag = true;
}
}
smgrsw[reln->smgr_which].smgr_truncate(reln, forknum[i], nblocks[i]);
//remove unused pages and related wals in localdisk cache.
// RemoveBufferFromLocal(reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, forknum[i], nblocks[i]);
pk.relfileNode.dbNode = reln->smgr_rnode.node.dbNode;
pk.relfileNode.relNode = reln->smgr_rnode.node.relNode;
pk.forkNo = forknum[i];
pk.blkNo = nblocks[i];
odpk.pk = pk;
odpk.opration = TRUNCATE;
AddOneItemToDPArray(odpk);
smgrsw[reln->smgr_which].smgr_truncate(reln, forknum[i], nblocks[i]);
/*
* We might as well update the local smgr_cached_nblocks values. The
* smgr cache inval message that this function sent will cause other

View File

@ -124,7 +124,7 @@ extern Bufrd GetWalFromLocalBuffer(WalLdPageKey *pk, uint64_t replyLsn);
extern void AddOneItemToDPArray(OriginDPageKey pk);
extern void SecondBufferMain(void);
extern void ClosePageDBEnv(void);
extern void CloseWalEnv(void);
extern void CloseWalDBEnv(void);
extern void CreateSecondBufferLWLocks(void);
extern Size SecondBufferLWLockShmemSize(void);
@ -132,4 +132,6 @@ extern Size SecondBufferShmemSize(void);
extern uint64_t SwapLsnFromLittleToBig(uint64_t lsn);
extern uint64_t SwapLsnFromBigToLittle(uint64_t lsn);
extern void SendInvalWal(WalLdPageKey *walkey);
extern void SendInvalPage(LdPageKey *ldKey);
extern void *CleanWalsInLmdb(void *arg);
extern void *CleanPagesInLmdb(void *arg);