mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-12-01 19:58:06 +08:00
!189 lmdb ansync commit
Merge pull request !189 from zoujia_yewu/dev_performance
This commit is contained in:
commit
36c56be4b2
@ -414,6 +414,12 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
|
||||
SendInvalPage(&ldKey);
|
||||
}
|
||||
|
||||
ldKey.sk.dbid = 0;
|
||||
ldKey.sk.relid = 0;
|
||||
ldKey.sk.forkno = 32;
|
||||
ldKey.sk.blkno = 0;
|
||||
SendInvalPage(&ldKey);
|
||||
|
||||
} else {
|
||||
smgrtruncate(rel->rd_smgr, forks, nforks, blocks);
|
||||
}
|
||||
@ -1085,24 +1091,38 @@ smgr_redo(XLogReaderState *record)
|
||||
/* Do the real work to truncate relation forks */
|
||||
if (nforks > 0 && (!EnableHotStandby || *isPromoteIsTriggered || !he3share))
|
||||
smgrtruncatelsn(reln, forks, nforks, blocks, record->ReadRecPtr);
|
||||
if (EnableHotStandby && !push_standby)
|
||||
if (EnableHotStandby && !push_standby)
|
||||
{
|
||||
for (i = 0; i < nforks; i ++)
|
||||
{
|
||||
for (i = 0; i < nforks; i ++)
|
||||
{
|
||||
ldKey.sk.dbid = reln->smgr_rnode.node.dbNode;
|
||||
ldKey.sk.relid = reln->smgr_rnode.node.relNode;
|
||||
ldKey.sk.forkno = forks[i];
|
||||
ldKey.sk.blkno = blocks[i];
|
||||
SendInvalPage(&ldKey);
|
||||
ldKey.sk.dbid = reln->smgr_rnode.node.dbNode;
|
||||
ldKey.sk.relid = reln->smgr_rnode.node.relNode;
|
||||
ldKey.sk.forkno = forks[i];
|
||||
ldKey.sk.blkno = blocks[i];
|
||||
SendInvalPage(&ldKey);
|
||||
|
||||
walkey.sk.dbid = reln->smgr_rnode.node.dbNode;
|
||||
walkey.sk.relid = reln->smgr_rnode.node.relNode;
|
||||
walkey.sk.forkno = forks[i];
|
||||
walkey.sk.blkno = blocks[i];
|
||||
walkey.pageLsn = SwapLsnFromLittleToBig(record->ReadRecPtr);
|
||||
walkey.partition = 1;
|
||||
SendInvalWal(&walkey);
|
||||
}
|
||||
walkey.sk.dbid = reln->smgr_rnode.node.dbNode;
|
||||
walkey.sk.relid = reln->smgr_rnode.node.relNode;
|
||||
walkey.sk.forkno = forks[i];
|
||||
walkey.sk.blkno = blocks[i];
|
||||
walkey.pageLsn = SwapLsnFromLittleToBig(record->ReadRecPtr);
|
||||
walkey.partition = 1;
|
||||
SendInvalWal(&walkey);
|
||||
}
|
||||
|
||||
ldKey.sk.dbid = 0;
|
||||
ldKey.sk.relid = 0;
|
||||
ldKey.sk.forkno = 32;
|
||||
ldKey.sk.blkno = 0;
|
||||
SendInvalPage(&ldKey);
|
||||
|
||||
walkey.sk.dbid = 0;
|
||||
walkey.sk.relid = 0;
|
||||
walkey.sk.forkno = 32;
|
||||
walkey.sk.blkno = 0;
|
||||
walkey.pageLsn = 0;
|
||||
walkey.partition = 0;
|
||||
SendInvalWal(&walkey);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -37,6 +37,12 @@ const char *p_socketfile = "/tmp/he3cleanpage";
|
||||
#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8))
|
||||
#define SizeOfCleanPage 16
|
||||
|
||||
typedef struct SocketFd
|
||||
{
|
||||
int walSocketFd;
|
||||
int pageSocketFd;
|
||||
} SocketFd;
|
||||
|
||||
typedef struct SingleKeyArray
|
||||
{
|
||||
SdPageKey SdPageKeyList[SDLEN];
|
||||
@ -72,6 +78,7 @@ static void CleanPagesByTable(LdPageKey *lpk);
|
||||
|
||||
static HTAB *SecondBufferHash = NULL;
|
||||
extern bool EnableHotStandby;
|
||||
SocketFd SocFd = {-1,-1};
|
||||
DPageKeyArray *DPArray = NULL;
|
||||
|
||||
SingleKeyArray *MultiKeyArrays;
|
||||
@ -853,71 +860,151 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
|
||||
}
|
||||
}
|
||||
|
||||
void SendInvalWal(WalLdPageKey *walkey) {
|
||||
int sock_fd;
|
||||
struct sockaddr_un un;
|
||||
un.sun_family = AF_UNIX;
|
||||
strcpy(un.sun_path, socketfile);
|
||||
sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (sock_fd < 0)
|
||||
{
|
||||
elog(WARNING, "request socket failed");
|
||||
return;
|
||||
}
|
||||
// void SendInvalWal(WalLdPageKey *walkey) {
|
||||
// int sock_fd;
|
||||
// struct sockaddr_un un;
|
||||
// un.sun_family = AF_UNIX;
|
||||
// strcpy(un.sun_path, socketfile);
|
||||
// sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
// if (sock_fd < 0)
|
||||
// {
|
||||
// elog(WARNING, "request socket failed");
|
||||
// return;
|
||||
// }
|
||||
|
||||
if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
|
||||
// if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
|
||||
// {
|
||||
// elog(WARNING, "connect socket failed");
|
||||
// return;
|
||||
// }
|
||||
// send(sock_fd, walkey, SizeOfCleanWal, 0);
|
||||
// close(sock_fd);
|
||||
// return;
|
||||
// }
|
||||
|
||||
void SendInvalWal(WalLdPageKey *walkey) {
|
||||
struct sockaddr_un un;
|
||||
if (SocFd.walSocketFd < 0)
|
||||
{
|
||||
elog(WARNING, "connect socket failed");
|
||||
return;
|
||||
un.sun_family = AF_UNIX;
|
||||
strcpy(un.sun_path, socketfile);
|
||||
SocFd.walSocketFd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (SocFd.walSocketFd < 0)
|
||||
{
|
||||
elog(WARNING, "request socket failed");
|
||||
return;
|
||||
}
|
||||
|
||||
if (connect(SocFd.walSocketFd, (struct sockaddr *)&un, sizeof(un)) < 0)
|
||||
{
|
||||
elog(WARNING, "connect socket failed");
|
||||
return;
|
||||
}
|
||||
}
|
||||
// elog(LOG, "send request: rel %d, fork %d, blk %d, lsn %ld",
|
||||
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
|
||||
send(sock_fd, walkey, SizeOfCleanWal, 0);
|
||||
// elog(LOG, "send success");
|
||||
close(sock_fd);
|
||||
|
||||
send(SocFd.walSocketFd, walkey, SizeOfCleanWal, 0);
|
||||
// close(sock_fd);
|
||||
return;
|
||||
}
|
||||
|
||||
void SendInvalPage(LdPageKey *ldKey) {
|
||||
int sock_fd;
|
||||
struct sockaddr_un un;
|
||||
un.sun_family = AF_UNIX;
|
||||
strcpy(un.sun_path, p_socketfile);
|
||||
sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (sock_fd < 0)
|
||||
if (SocFd.pageSocketFd < 0)
|
||||
{
|
||||
elog(WARNING, "request socket failed");
|
||||
return;
|
||||
un.sun_family = AF_UNIX;
|
||||
strcpy(un.sun_path, p_socketfile);
|
||||
SocFd.pageSocketFd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (SocFd.pageSocketFd < 0)
|
||||
{
|
||||
elog(WARNING, "request socket failed");
|
||||
return;
|
||||
}
|
||||
|
||||
if (connect(SocFd.pageSocketFd, (struct sockaddr *)&un, sizeof(un)) < 0)
|
||||
{
|
||||
elog(WARNING, "connect socket failed");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
send(SocFd.pageSocketFd, ldKey, SizeOfCleanPage, 0);
|
||||
return;
|
||||
}
|
||||
void *doCleanWalInLmdb(void *fd)
|
||||
{
|
||||
int syncFlag;
|
||||
int new_fd = *(int *)fd;
|
||||
static char data_buf[SizeOfCleanWal];
|
||||
while(1)
|
||||
{
|
||||
|
||||
memset(data_buf, 0, SizeOfCleanWal);
|
||||
recv(new_fd, data_buf, SizeOfCleanWal,0);
|
||||
WalLdPageKey *wpk = (WalLdPageKey *)data_buf;
|
||||
|
||||
|
||||
if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0)
|
||||
{
|
||||
elog(WARNING, "connect socket failed");
|
||||
return;
|
||||
if (0 == wpk->partition && 0 == wpk->sk.blkno &&
|
||||
0 == wpk->sk.dbid && 32 == wpk->sk.forkno &&
|
||||
0 == wpk->sk.relid)
|
||||
{
|
||||
// close(new_fd);
|
||||
// break;
|
||||
syncFlag = mdb_env_sync(walEnv,1);
|
||||
if (syncFlag != 0)
|
||||
{
|
||||
elog(FATAL, "mdb_env_sync failed when clean wal, err %d", syncFlag);
|
||||
// return;
|
||||
}else{
|
||||
elog(LOG, "mdb_env_sync success when clean wal, err %d", syncFlag);
|
||||
}
|
||||
}else if(0 == wpk->partition && 0 == wpk->sk.blkno &&
|
||||
0 == wpk->sk.dbid && 0 == wpk->sk.forkno &&
|
||||
0 == wpk->sk.relid){
|
||||
close(new_fd);
|
||||
break;
|
||||
|
||||
}else{
|
||||
if (wpk->partition == 0)
|
||||
{
|
||||
CleanWalsByPage(wpk);
|
||||
}
|
||||
else
|
||||
{
|
||||
CleanWalsByTable(wpk);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
// elog(LOG, "send request: rel %d, fork %d, blk %d, lsn %ld",
|
||||
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn));
|
||||
send(sock_fd, ldKey, SizeOfCleanPage, 0);
|
||||
// elog(LOG, "send success");
|
||||
close(sock_fd);
|
||||
return;
|
||||
mdb_env_sync(walEnv,1);
|
||||
}
|
||||
|
||||
void *CleanWalsInLmdb(void *arg) {
|
||||
int fd, new_fd;
|
||||
int fd = -1, new_fd;
|
||||
struct sockaddr_un un;
|
||||
static char data_buf[SizeOfCleanWal];
|
||||
fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (fd < 0)
|
||||
elog(PANIC, "request cleanwal socket failed");
|
||||
{
|
||||
fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(PANIC, "request cleanwal socket failed");
|
||||
}
|
||||
}
|
||||
un.sun_family = AF_UNIX;
|
||||
unlink(socketfile);
|
||||
strcpy(un.sun_path, socketfile);
|
||||
|
||||
if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
|
||||
{
|
||||
elog(PANIC, "bind cleanwal socket failed");
|
||||
}
|
||||
if (listen(fd, MaxBackends+8) < 0)
|
||||
{
|
||||
elog(PANIC, "listen cleanwal socket failed");
|
||||
while(1) {
|
||||
}
|
||||
|
||||
while(1) {
|
||||
pthread_t p;
|
||||
new_fd = accept(fd, NULL, NULL);
|
||||
if (new_fd < 0)
|
||||
{
|
||||
@ -925,67 +1012,84 @@ void *CleanWalsInLmdb(void *arg) {
|
||||
unlink(socketfile);
|
||||
elog(PANIC, "cannot accept client connect request");
|
||||
}
|
||||
memset(data_buf, 0, SizeOfCleanWal);
|
||||
read(new_fd, data_buf, SizeOfCleanWal);
|
||||
WalLdPageKey *wpk = (WalLdPageKey *)data_buf;
|
||||
|
||||
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
|
||||
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
|
||||
if (wpk->partition == 0)
|
||||
{
|
||||
CleanWalsByPage(wpk);
|
||||
}
|
||||
else
|
||||
{
|
||||
CleanWalsByTable(wpk);
|
||||
}
|
||||
close(new_fd);
|
||||
pthread_create(&p,NULL,doCleanWalInLmdb,&new_fd);
|
||||
}
|
||||
close(fd);
|
||||
unlink(socketfile);
|
||||
}
|
||||
|
||||
void *doCleanPageInLmdb(void *fd)
|
||||
{
|
||||
int new_fd = *(int *)fd;
|
||||
int syncFlag;
|
||||
static char data_buf[SizeOfCleanPage];
|
||||
while(1)
|
||||
{
|
||||
memset(data_buf, 0, SizeOfCleanPage);
|
||||
elog(LOG, "before recv");
|
||||
recv(new_fd, data_buf, SizeOfCleanPage,0);
|
||||
elog(LOG, "after recv");
|
||||
LdPageKey *lpk = (LdPageKey *)data_buf;
|
||||
elog(LOG,"dbid is %d, relid is %d,forkno is %d,blkno is %d",lpk->sk.dbid,lpk->sk.relid,lpk->sk.forkno,
|
||||
lpk->sk.blkno);
|
||||
if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 32 == lpk->sk.forkno && 0 == lpk->sk.relid)
|
||||
{
|
||||
// close(new_fd);
|
||||
// break;
|
||||
syncFlag = mdb_env_sync(pageEnv,1);
|
||||
if (syncFlag != 0)
|
||||
{
|
||||
elog(FATAL, "mdb_env_sync failed when clean pages, err %d", syncFlag);
|
||||
// return;
|
||||
}else{
|
||||
elog(LOG, "mdb_env_sync success when clean pages, err %d", syncFlag);
|
||||
}
|
||||
}else if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 0 == lpk->sk.forkno && 0 == lpk->sk.relid){
|
||||
close(new_fd);
|
||||
break;
|
||||
}else{
|
||||
CleanPagesByTable(lpk);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
void *CleanPagesInLmdb(void *arg) {
|
||||
int fd, new_fd,size;
|
||||
int fd = -1, new_fd;
|
||||
struct sockaddr_un un;
|
||||
static char data_buf[SizeOfCleanPage];
|
||||
fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (fd < 0)
|
||||
elog(PANIC, "request cleanwal socket failed");
|
||||
{
|
||||
fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(PANIC, "request cleanwal socket failed");
|
||||
}
|
||||
}
|
||||
un.sun_family = AF_UNIX;
|
||||
unlink(p_socketfile);
|
||||
strcpy(un.sun_path, p_socketfile);
|
||||
|
||||
if (bind(fd, (struct sockaddr *)&un, sizeof(un)) < 0)
|
||||
{
|
||||
elog(PANIC, "bind cleanwal socket failed");
|
||||
}
|
||||
if (listen(fd, MaxBackends+8) < 0)
|
||||
{
|
||||
elog(PANIC, "listen cleanwal socket failed");
|
||||
while(1) {
|
||||
new_fd = accept(fd, NULL, NULL);
|
||||
}
|
||||
|
||||
while(1) {
|
||||
pthread_t p;
|
||||
new_fd = accept(fd, NULL, NULL);
|
||||
if (new_fd < 0)
|
||||
{
|
||||
close(fd);
|
||||
unlink(p_socketfile);
|
||||
elog(PANIC, "cannot accept client connect request");
|
||||
}
|
||||
memset(data_buf, 0, SizeOfCleanPage);
|
||||
size = read(new_fd, data_buf, SizeOfCleanPage);
|
||||
LdPageKey *lpk = (LdPageKey *)data_buf;
|
||||
|
||||
// elog(LOG, "receive request: rel %d, fork %d, blk %d, lsn %ld",
|
||||
// wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn));
|
||||
// if (wpk->partition == 0)
|
||||
// {
|
||||
// CleanWalsByPage(wpk);
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// CleanWalsByTable(wpk);
|
||||
// }
|
||||
CleanPagesByTable(lpk);
|
||||
close(new_fd);
|
||||
|
||||
pthread_create(&p,NULL,doCleanPageInLmdb,&new_fd);
|
||||
}
|
||||
close(fd);
|
||||
unlink(socketfile);
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -299,6 +299,14 @@ void mdunlink(RelFileNodeBackend rnode, ForkNumber forkNum, bool isRedo)
|
||||
SendInvalPage(&ldKey);
|
||||
}
|
||||
|
||||
ldKey.sk.dbid = 0;
|
||||
ldKey.sk.relid = 0;
|
||||
ldKey.sk.forkno = 32;
|
||||
ldKey.sk.blkno = 0;
|
||||
SendInvalPage(&ldKey);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
@ -875,6 +883,14 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
||||
memcpy((*buffer) + BLCKSZ, result.buf, result.count);
|
||||
wlpk.pageLsn = SwapLsnFromLittleToBig(lsn);
|
||||
SendInvalWal(&wlpk);
|
||||
|
||||
wlpk.sk.dbid = 0;
|
||||
wlpk.sk.relid = 0;
|
||||
wlpk.sk.forkno = 32;
|
||||
wlpk.sk.blkno = 0;
|
||||
wlpk.pageLsn = 0;
|
||||
wlpk.partition = 0;
|
||||
SendInvalWal(&wlpk);
|
||||
// TODO free result
|
||||
free_dataRead(result.buf, result.count, result.cap);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user