fix bug about sending wal info by socket

This commit is contained in:
zoujia 2023-07-04 17:14:35 +08:00
parent 7abb6ad663
commit 363eec1217

View File

@ -101,16 +101,14 @@ char *lmdb_page_directory;
char *lmdb_wal_directory;
Size SNBuffers = 1024;
Size
SecondBufferShmemSize(void)
Size SecondBufferShmemSize(void)
{
Size size;
size = mul_size(SNBuffers, BLKSZ);
return size;
}
Size
SecondBufferLWLockShmemSize(void)
Size SecondBufferLWLockShmemSize(void)
{
Size size;
int i;
@ -134,8 +132,7 @@ InitializeSecondBufferLWLocks(void)
LWLockInitialize(&lock->lock, id);
}
void
CreateSecondBufferLWLocks(void)
void CreateSecondBufferLWLocks(void)
{
if (!IsUnderPostmaster)
{
@ -155,8 +152,7 @@ CreateSecondBufferLWLocks(void)
}
}
void
InitSecondBufferMeta(void)
void InitSecondBufferMeta(void)
{
bool found, found1;
int i;
@ -185,8 +181,7 @@ InitSecondBufferMeta(void)
}
}
void
InitDPageKeyArray(void)
void InitDPageKeyArray(void)
{
// ereport(LOG, (errmsg("initdp")));
bool found;
@ -209,8 +204,7 @@ InitDPageKeyArray(void)
/*
init SecondBufferHash
*/
void
InitSecondBufferHash(void)
void InitSecondBufferHash(void)
{
HASHCTL info;
long init_table_size,
@ -234,11 +228,9 @@ InitSecondBufferHash(void)
max_table_size,
&info,
HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
}
void
InitPageDBEnv()
void InitPageDBEnv()
{
if (!IsDirExist(lmdb_page_directory))
{
@ -300,7 +292,6 @@ convertKeyLd(LdPageKey *ldkey, PageKey *pk)
sdkey.blkno = pk->blkNo;
ldkey->sk = sdkey;
}
/*
@ -338,7 +329,6 @@ SetupSecondBufferInTable(const SdPageKey *pk)
return pv;
}
static bool
CleanUpSecondBuffer(const SdPageKey *pk)
{
@ -379,8 +369,7 @@ FindSecondBufferInTable(const SdPageKey *pk)
}
// sb -> ssb
void
ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
void ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
{
SdPageKey *sk;
sk = (SdPageKey *)malloc(sizeof(SdPageKey));
@ -453,7 +442,6 @@ GetPageFromSecondBuffer(PageKey *pk, uint8_t *buffer)
if (sk != NULL)
{
free(sk);
}
LWLockRelease(partitionLock);
return NULL;
@ -504,7 +492,8 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer)
}
uint64_t
SwapLsnFromLittleToBig(uint64_t lsn) {
SwapLsnFromLittleToBig(uint64_t lsn)
{
#ifndef WORDS_BIGENDIAN
/* trans lsn from little endian to big endian in memory
* eg: 0x12345678 ===> 0x78563412
@ -525,7 +514,8 @@ SwapLsnFromLittleToBig(uint64_t lsn) {
}
uint64_t
SwapLsnFromBigToLittle(uint64_t lsn) {
SwapLsnFromBigToLittle(uint64_t lsn)
{
#ifndef WORDS_BIGENDIAN
/* trans lsn from big endian to little endian in memory
* eg: 0x78563412 ===> 0x12345678
@ -545,8 +535,7 @@ SwapLsnFromBigToLittle(uint64_t lsn) {
return lsn;
}
Bufrd
GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
Bufrd GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
{
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
@ -597,14 +586,9 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
}
wpk = (WalLdPageKey *)key.mv_data;
// ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
while (wpk->sk.dbid == dbid
&& wpk->sk.relid == relid
&& wpk->sk.forkno == forkno
&& wpk->sk.blkno == blkno
&& SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn)
while (wpk->sk.dbid == dbid && wpk->sk.relid == relid && wpk->sk.forkno == forkno && wpk->sk.blkno == blkno && SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn)
{
memcpy(waldata + waldatalen, data.mv_data, data.mv_size);
waldatalen += data.mv_size;
@ -629,7 +613,6 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
// ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d",
// wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition));
}
}
bufrd.buf = waldata;
bufrd.count = waldatalen;
@ -640,7 +623,6 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn)
tmpcursor = NULL;
tmptxn = NULL;
return bufrd;
}
void AddOneItemToDPArray(OriginDPageKey odpk)
@ -665,13 +647,11 @@ void AddOneItemToDPArray(OriginDPageKey odpk)
// SpinLockRelease(&DPArray->append);
// pg_usleep(1);
// }
// ereport(LOG, (errmsg("AddOneItemToDPArray done")));
}
void
storeWalInLocalBuffer(kvStruct *ks,int32 length)
void storeWalInLocalBuffer(kvStruct *ks, int32 length)
{
// pthread_mutex_lock(&q_lock);
int tb = -1, co = -1, cp = -1, cc = -1, tc = -1;
@ -702,10 +682,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
{
part = 0;
uint8_t *buf = ks[i].buf;
totallen = (uint32_t)buf[0]
| (uint32_t)(buf[1] << 8)
| (uint32_t)(buf[2] << 16)
| (uint32_t)(buf[3] << 24);
totallen = (uint32_t)buf[0] | (uint32_t)(buf[1] << 8) | (uint32_t)(buf[2] << 16) | (uint32_t)(buf[3] << 24);
key.mv_size = SizeOfCleanWal;
wlpk.sk = ks[i].lpk.sk;
@ -803,12 +780,9 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
tmpcursor = NULL;
tmptxn = NULL;
// pthread_mutex_unlock(&q_lock);
}
void
GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
void GetPageFromCurrentNode(PageKey pk, Bufrd *bufrd)
{
uint8_t *page;
page = NULL;
@ -823,7 +797,6 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
page = GetPageFromLocalBuffer(&pk, bufrd->buf);
}
if (page == NULL)
{
bufrd->buf = NULL;
@ -853,6 +826,14 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
memcpy(bufrd->buf + 8192, waldata.buf, waldata.count);
wlpk.pageLsn = SwapLsnFromLittleToBig(pk.replyLsn);
SendInvalWal(&wlpk);
wlpk.sk.dbid = 0;
wlpk.sk.relid = 0;
wlpk.sk.forkno = 32;
wlpk.sk.blkno = 0;
wlpk.pageLsn = 0;
wlpk.partition = 0;
SendInvalWal(&wlpk);
}
bufrd->cap = bufrd->count = 8192 + waldata.count;
free(waldata.buf);
@ -882,7 +863,8 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
// return;
// }
void SendInvalWal(WalLdPageKey *walkey) {
void SendInvalWal(WalLdPageKey *walkey)
{
struct sockaddr_un un;
if (SocFd.walSocketFd < 0)
{
@ -907,7 +889,8 @@ void SendInvalWal(WalLdPageKey *walkey) {
return;
}
void SendInvalPage(LdPageKey *ldKey) {
void SendInvalPage(LdPageKey *ldKey)
{
struct sockaddr_un un;
if (SocFd.pageSocketFd < 0)
{
@ -942,28 +925,25 @@ void *doCleanWalInLmdb(void *fd)
recv(new_fd, data_buf, SizeOfCleanWal, 0);
WalLdPageKey *wpk = (WalLdPageKey *)data_buf;
if (0 == wpk->partition && 0 == wpk->sk.blkno &&
0 == wpk->sk.dbid && 32 == wpk->sk.forkno &&
0 == wpk->sk.relid)
if (0 == wpk->partition && 0 == wpk->pageLsn&&
0 == wpk->sk.blkno && 0 == wpk->sk.dbid &&
32 == wpk->sk.forkno && 0 == wpk->sk.relid)
{
// close(new_fd);
// break;
syncFlag = mdb_env_sync(walEnv, 1);
if (syncFlag != 0)
{
elog(FATAL, "mdb_env_sync failed when clean wal, err %d", syncFlag);
// return;
}else{
elog(LOG, "mdb_env_sync success when clean wal, err %d", syncFlag);
printf("wal mdb_env_sync is failed, errcode is :%d\n",syncFlag);
}
}else if(0 == wpk->partition && 0 == wpk->sk.blkno &&
0 == wpk->sk.dbid && 0 == wpk->sk.forkno &&
0 == wpk->sk.relid){
}
else if (0 == wpk->partition && 0 == wpk->pageLsn &&
0 == wpk->sk.blkno && 0 == wpk->sk.dbid &&
0 == wpk->sk.forkno && 0 == wpk->sk.relid)
{
close(new_fd);
break;
}else{
}
else
{
if (wpk->partition == 0)
{
CleanWalsByPage(wpk);
@ -973,12 +953,12 @@ void *doCleanWalInLmdb(void *fd)
CleanWalsByTable(wpk);
}
}
}
mdb_env_sync(walEnv, 1);
}
void *CleanWalsInLmdb(void *arg) {
void *CleanWalsInLmdb(void *arg)
{
int fd = -1, new_fd;
struct sockaddr_un un;
static char data_buf[SizeOfCleanWal];
@ -1003,7 +983,8 @@ void *CleanWalsInLmdb(void *arg) {
elog(PANIC, "listen cleanwal socket failed");
}
while(1) {
while (1)
{
pthread_t p;
new_fd = accept(fd, NULL, NULL);
if (new_fd < 0)
@ -1025,12 +1006,8 @@ void *doCleanPageInLmdb(void *fd)
while (1)
{
memset(data_buf, 0, SizeOfCleanPage);
elog(LOG, "before recv");
recv(new_fd, data_buf, SizeOfCleanPage, 0);
elog(LOG, "after recv");
LdPageKey *lpk = (LdPageKey *)data_buf;
elog(LOG,"dbid is %d, relid is %d,forkno is %d,blkno is %d",lpk->sk.dbid,lpk->sk.relid,lpk->sk.forkno,
lpk->sk.blkno);
if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 32 == lpk->sk.forkno && 0 == lpk->sk.relid)
{
// close(new_fd);
@ -1038,22 +1015,23 @@ void *doCleanPageInLmdb(void *fd)
syncFlag = mdb_env_sync(pageEnv, 1);
if (syncFlag != 0)
{
elog(FATAL, "mdb_env_sync failed when clean pages, err %d", syncFlag);
// return;
}else{
elog(LOG, "mdb_env_sync success when clean pages, err %d", syncFlag);
printf("page mdb_env_sync is failed,errcode is: %d\n",syncFlag);
}
}else if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 0 == lpk->sk.forkno && 0 == lpk->sk.relid){
}
else if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 0 == lpk->sk.forkno && 0 == lpk->sk.relid)
{
close(new_fd);
break;
}else{
}
else
{
CleanPagesByTable(lpk);
}
}
}
void *CleanPagesInLmdb(void *arg) {
void *CleanPagesInLmdb(void *arg)
{
int fd = -1, new_fd;
struct sockaddr_un un;
static char data_buf[SizeOfCleanPage];
@ -1078,7 +1056,8 @@ void *CleanPagesInLmdb(void *arg) {
elog(PANIC, "listen cleanwal socket failed");
}
while(1) {
while (1)
{
pthread_t p;
new_fd = accept(fd, NULL, NULL);
if (new_fd < 0)
@ -1149,11 +1128,7 @@ CleanWalsByPage(WalLdPageKey *walkey)
// elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
while (walkey->sk.dbid == dbid
&& walkey->sk.relid == relid
&& walkey->sk.forkno == forkno
&& walkey->sk.blkno == blkno
&& SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
while (walkey->sk.dbid == dbid && walkey->sk.relid == relid && walkey->sk.forkno == forkno && walkey->sk.blkno == blkno && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
{
// elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
@ -1230,9 +1205,7 @@ CleanWalsByTable(WalLdPageKey *walkey)
// elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d",
// walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition);
while (walkey->sk.dbid == dbid
&& walkey->sk.relid == relid
&& walkey->sk.forkno == forkno
while (walkey->sk.dbid == dbid && walkey->sk.relid == relid && walkey->sk.forkno == forkno
// && walkey->sk.blkno == blkno
&& SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn)
{
@ -1306,9 +1279,7 @@ CleanPagesByTable(LdPageKey *ldKey)
// elog(LOG, "get page db %d, rel %d, fork %d, blk %d",
// ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);
while (ldKey->sk.dbid == dbid
&& ldKey->sk.relid == relid
&& ldKey->sk.forkno == forkno)
while (ldKey->sk.dbid == dbid && ldKey->sk.relid == relid && ldKey->sk.forkno == forkno)
{
elog(LOG, "del page dbid %d, rel %d, fork %d, blk %d",
ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno);
@ -1333,7 +1304,6 @@ CleanPagesByTable(LdPageKey *ldKey)
return;
}
static void *
RemovePageOrWalFromCurrentNode()
{
@ -1352,7 +1322,6 @@ RemovePageOrWalFromCurrentNode()
LWLock *partitionLock = NULL;
uint32 newHash;
for (;;)
{
if (PageOrWal == (int)PAGE)
@ -1374,7 +1343,6 @@ RemovePageOrWalFromCurrentNode()
continue;
}
pk = &DPArray->dpk[DPArray->walIndex].pk;
}
convertKeyLd(lpk, pk);
@ -1455,10 +1423,8 @@ RemovePageOrWalFromCurrentNode()
SpinLockAcquire(&DPArray->append);
DPArray->unused++;
SpinLockRelease(&DPArray->append);
}
}
}
static void
@ -1470,7 +1436,6 @@ MovePageFromSecondBufferToLocalBuffer()
MDB_dbi dbi;
MDB_val key, data;
SingleKeyArray *ska = NULL;
int localHead = 0;
int localTail = 0;
@ -1486,7 +1451,6 @@ MovePageFromSecondBufferToLocalBuffer()
spkl.head = NULL;
spkl.tail = NULL;
SdPageValue *spv = NULL;
int tb = -1, mp = -1, mtc = -1, mdo = -1;
long sleeptime = 1000L;
@ -1534,7 +1498,7 @@ MovePageFromSecondBufferToLocalBuffer()
localUnused = ska->unused;
SpinLockRelease(&ska->oplock);
if (localUnused == 0)
if (localUnused == 0) //
{
processNum = SDLEN;
}
@ -1565,14 +1529,13 @@ MovePageFromSecondBufferToLocalBuffer()
{
spkl.head = spke;
spkl.tail = spkl.head;
}
else{
else
{
spkl.tail->next = spke;
spkl.tail = spkl.tail->next;
}
newHash = SecondBufferHashCode(&spk);
partitionLock = SecondBufferMappingPartitionLock(newHash);
LWLockAcquire(partitionLock, LW_SHARED);
@ -1613,7 +1576,6 @@ MovePageFromSecondBufferToLocalBuffer()
success = false;
ereport(LOG, errmsg("mdb_txn_commit failed,error is:%d", mtc));
mdb_txn_abort(tmptxn);
}
}
@ -1696,12 +1658,12 @@ MovePageFromSecondBufferToLocalBuffer()
}
}
void SignalStartSecondBuffer(void) {
void SignalStartSecondBuffer(void)
{
SendPostmasterSignal(PMSIGNAL_SECONDBUFFER_WORKER);
}
void
SecondBufferMain(void)
void SecondBufferMain(void)
{
MyBackendType = B_SECONDBUFFER;
@ -1738,7 +1700,3 @@ SecondBufferMain(void)
elog(PANIC, "pthread_create CleanPagesInLmdb failed %s", strerror(err));
MovePageFromSecondBufferToLocalBuffer();
}