diff --git a/src/backend/postmaster/secondbuffer.c b/src/backend/postmaster/secondbuffer.c index 0a113c3..26392b6 100644 --- a/src/backend/postmaster/secondbuffer.c +++ b/src/backend/postmaster/secondbuffer.c @@ -34,13 +34,13 @@ const char *socketfile = "/tmp/he3cleanwal"; const char *p_socketfile = "/tmp/he3cleanpage"; -#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8)) -#define SizeOfCleanPage 16 +#define SizeOfCleanWal (offsetof(WalLdPageKey, partition) + sizeof(uint8)) +#define SizeOfCleanPage 16 typedef struct SocketFd { int walSocketFd; - int pageSocketFd; + int pageSocketFd; } SocketFd; typedef struct SingleKeyArray @@ -71,14 +71,14 @@ typedef struct DPageKeyArray /* secondbufferhash code */ -static int IsDirExist(const char* path); +static int IsDirExist(const char *path); static void CleanWalsByPage(WalLdPageKey *walkey); static void CleanWalsByTable(WalLdPageKey *walkey); static void CleanPagesByTable(LdPageKey *lpk); static HTAB *SecondBufferHash = NULL; extern bool EnableHotStandby; -SocketFd SocFd = {-1,-1}; +SocketFd SocFd = {-1, -1}; DPageKeyArray *DPArray = NULL; SingleKeyArray *MultiKeyArrays; @@ -93,82 +93,78 @@ MDB_txn *pageTxn = NULL; MDB_txn *walTxn = NULL; MDB_cursor *cursor = NULL; -Statisticnum *statisticnum = NULL; +Statisticnum *statisticnum = NULL; LWLockPadded *SecondBufferMainLWLockArray = NULL; -char *lmdb_page_directory; -char *lmdb_wal_directory; +char *lmdb_page_directory; +char *lmdb_wal_directory; Size SNBuffers = 1024; -Size -SecondBufferShmemSize(void) +Size SecondBufferShmemSize(void) { - Size size; - size = mul_size(SNBuffers,BLKSZ); - return size; + Size size; + size = mul_size(SNBuffers, BLKSZ); + return size; } -Size -SecondBufferLWLockShmemSize(void) +Size SecondBufferLWLockShmemSize(void) { - Size size; - int i; - int numLocks = NUM_LOCK_PARTITIONS; + Size size; + int i; + int numLocks = NUM_LOCK_PARTITIONS; - /* Space for the LWLock array. */ - size = mul_size(numLocks, sizeof(LWLockPadded)); - size = add_size(size,LWLOCK_PADDED_SIZE); + /* Space for the LWLock array. */ + size = mul_size(numLocks, sizeof(LWLockPadded)); + size = add_size(size, LWLOCK_PADDED_SIZE); - return size; + return size; } static void InitializeSecondBufferLWLocks(void) { - int id; - int i; - LWLockPadded *lock; + int id; + int i; + LWLockPadded *lock; - for (id = 0, lock = SecondBufferMainLWLockArray; id < NUM_LOCK_PARTITIONS; id++, lock++) - LWLockInitialize(&lock->lock, id); + for (id = 0, lock = SecondBufferMainLWLockArray; id < NUM_LOCK_PARTITIONS; id++, lock++) + LWLockInitialize(&lock->lock, id); } -void -CreateSecondBufferLWLocks(void) +void CreateSecondBufferLWLocks(void) { - if (!IsUnderPostmaster) - { - Size spaceLocks = SecondBufferLWLockShmemSize(); - char *ptr; + if (!IsUnderPostmaster) + { + Size spaceLocks = SecondBufferLWLockShmemSize(); + char *ptr; - /* Allocate space */ - ptr = (char *) ShmemAlloc(spaceLocks); + /* Allocate space */ + ptr = (char *)ShmemAlloc(spaceLocks); - /* Ensure desired alignment of LWLock array */ - ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; + /* Ensure desired alignment of LWLock array */ + ptr += LWLOCK_PADDED_SIZE - ((uintptr_t)ptr) % LWLOCK_PADDED_SIZE; - SecondBufferMainLWLockArray = (LWLockPadded *) ptr; + SecondBufferMainLWLockArray = (LWLockPadded *)ptr; - /* Initialize all LWLocks */ - InitializeSecondBufferLWLocks(); - } + /* Initialize all LWLocks */ + InitializeSecondBufferLWLocks(); + } } -void -InitSecondBufferMeta(void) +void InitSecondBufferMeta(void) { - bool found,found1; + bool found, found1; int i; MultiKeyArrays = (SingleKeyArray *) - ShmemInitStruct("multi page keys arrays", - sizeof(SingleKeyArray) * SDNUM, - &found); + ShmemInitStruct("multi page keys arrays", + sizeof(SingleKeyArray) * SDNUM, + &found); statisticnum = (Statisticnum *) - ShmemInitStruct("statistic num", - sizeof(Statisticnum), - &found1); + ShmemInitStruct("statistic num", + sizeof(Statisticnum), + &found1); if (MultiKeyArrays == NULL) { @@ -177,7 +173,7 @@ InitSecondBufferMeta(void) statisticnum->totalunused = SDLEN * SDNUM; SpinLockInit(&statisticnum->change); - for(i = 0; i < SDNUM; i ++) + for (i = 0; i < SDNUM; i++) { SpinLockInit(&MultiKeyArrays[i].oplock); MultiKeyArrays[i].head = MultiKeyArrays[i].tail = 0; @@ -185,17 +181,16 @@ InitSecondBufferMeta(void) } } -void -InitDPageKeyArray(void) +void InitDPageKeyArray(void) { // ereport(LOG, (errmsg("initdp"))); bool found; int i; - DPArray = (DPageKeyArray*) - ShmemInitStruct("deleted page keys arrays", - sizeof(DPageKeyArray), - &found); -// ereport(LOG, (errmsg("initdp doing"))); + DPArray = (DPageKeyArray *) + ShmemInitStruct("deleted page keys arrays", + sizeof(DPageKeyArray), + &found); + // ereport(LOG, (errmsg("initdp doing"))); if (DPArray == NULL) { ereport(PANIC, (errmsg("init DPArray fail"))); @@ -204,52 +199,49 @@ InitDPageKeyArray(void) SpinLockInit(&DPArray->append); DPArray->head = DPArray->tail = DPArray->pageIndex = DPArray->walIndex = 0; DPArray->unused = 1024; - // ereport(LOG, (errmsg("initdp done"))); + // ereport(LOG, (errmsg("initdp done"))); } /* init SecondBufferHash */ -void -InitSecondBufferHash(void) +void InitSecondBufferHash(void) { - HASHCTL info; - long init_table_size, - max_table_size; - bool found; + HASHCTL info; + long init_table_size, + max_table_size; + bool found; - /* - * Compute init/max size to request for lock hashtables. Note these - * calculations must agree with SecondBufferhashShmemSize! - */ - max_table_size = 200; - init_table_size = max_table_size / 2; + /* + * Compute init/max size to request for lock hashtables. Note these + * calculations must agree with SecondBufferhashShmemSize! + */ + max_table_size = 200; + init_table_size = max_table_size / 2; - info.keysize = sizeof(SdPageKey); - info.entrysize = sizeof(SdPageValue); + info.keysize = sizeof(SdPageKey); + info.entrysize = sizeof(SdPageValue); - info.num_partitions = NUM_LOCK_PARTITIONS; - - SecondBufferHash = ShmemInitHash("SecondBuffer hash", - init_table_size, - max_table_size, - &info, - HASH_ELEM | HASH_BLOBS | HASH_PARTITION); + info.num_partitions = NUM_LOCK_PARTITIONS; + SecondBufferHash = ShmemInitHash("SecondBuffer hash", + init_table_size, + max_table_size, + &info, + HASH_ELEM | HASH_BLOBS | HASH_PARTITION); } -void -InitPageDBEnv() +void InitPageDBEnv() { if (!IsDirExist(lmdb_page_directory)) - { - pg_mkdir_p(lmdb_page_directory,0777); + { + pg_mkdir_p(lmdb_page_directory, 0777); } mdb_env_create(&pageEnv); - mdb_env_set_maxreaders(pageEnv,MAXREADERS); - mdb_env_set_mapsize(pageEnv,MAPSIE); - mdb_env_open(pageEnv,lmdb_page_directory,MDB_FIXEDMAP|MDB_NOSYNC,0664); + mdb_env_set_maxreaders(pageEnv, MAXREADERS); + mdb_env_set_mapsize(pageEnv, MAPSIE); + mdb_env_open(pageEnv, lmdb_page_directory, MDB_FIXEDMAP | MDB_NOSYNC, 0664); mdb_txn_begin(pageEnv, NULL, 0, &pageTxn); - mdb_dbi_open(pageTxn,NULL,MDB_CREATE,&pageDbi); + mdb_dbi_open(pageTxn, NULL, MDB_CREATE, &pageDbi); mdb_txn_commit(pageTxn); } @@ -257,31 +249,31 @@ void InitWalDBEnv() { if (!IsDirExist(lmdb_wal_directory)) { - pg_mkdir_p(lmdb_wal_directory,0777); + pg_mkdir_p(lmdb_wal_directory, 0777); } mdb_env_create(&walEnv); - mdb_env_set_maxreaders(walEnv,MAXREADERS); - mdb_env_set_mapsize(walEnv,MAPSIE); - mdb_env_open(walEnv,lmdb_wal_directory,MDB_FIXEDMAP|MDB_NOSYNC,0664); + mdb_env_set_maxreaders(walEnv, MAXREADERS); + mdb_env_set_mapsize(walEnv, MAPSIE); + mdb_env_open(walEnv, lmdb_wal_directory, MDB_FIXEDMAP | MDB_NOSYNC, 0664); mdb_txn_begin(walEnv, NULL, 0, &walTxn); - mdb_dbi_open(walTxn,NULL,MDB_CREATE|MDB_DUPSORT,&walDbi); + mdb_dbi_open(walTxn, NULL, MDB_CREATE | MDB_DUPSORT, &walDbi); mdb_txn_commit(walTxn); } void ClosePageDBEnv() { - mdb_dbi_close(pageEnv, pageDbi); + mdb_dbi_close(pageEnv, pageDbi); mdb_env_close(pageEnv); - ereport(LOG,errmsg("close page success")); + ereport(LOG, errmsg("close page success")); } void CloseWalDBEnv() { mdb_dbi_close(walEnv, walDbi); mdb_env_close(walEnv); - ereport(LOG,errmsg("close wal success")); + ereport(LOG, errmsg("close wal success")); } -static void +static void convertKey(SdPageKey *sdkey, PageKey *pk) { sdkey->dbid = pk->relfileNode.dbNode; @@ -290,7 +282,7 @@ convertKey(SdPageKey *sdkey, PageKey *pk) sdkey->blkno = pk->blkNo; } -static void +static void convertKeyLd(LdPageKey *ldkey, PageKey *pk) { SdPageKey sdkey; @@ -299,8 +291,7 @@ convertKeyLd(LdPageKey *ldkey, PageKey *pk) sdkey.forkno = pk->forkNo; sdkey.blkno = pk->blkNo; - ldkey->sk =sdkey; - + ldkey->sk = sdkey; } /* @@ -309,37 +300,36 @@ convertKeyLd(LdPageKey *ldkey, PageKey *pk) static int secondbuffer_match(const void *key1, const void *key2, Size keysize) { - const SdPageKey *k1 = (const SdPageKey *)key1; - const SdPageKey *k2 = (const SdPageKey *)key2; + const SdPageKey *k1 = (const SdPageKey *)key1; + const SdPageKey *k2 = (const SdPageKey *)key2; - Assert(keysize == sizeof(SdPageKey)); - if (k1->dbid == k2->dbid && - k1->blkno == k2->blkno && k1->forkno == k2->forkno && k1->relid == k2->relid) - return 0; /* equal */ - return 1; /* not equal */ + Assert(keysize == sizeof(SdPageKey)); + if (k1->dbid == k2->dbid && + k1->blkno == k2->blkno && k1->forkno == k2->forkno && k1->relid == k2->relid) + return 0; /* equal */ + return 1; /* not equal */ } static uint32 SecondBufferHashCode(const SdPageKey *pk) { - return get_hash_value(SecondBufferHash, (const void *)pk); + return get_hash_value(SecondBufferHash, (const void *)pk); } static SdPageValue * SetupSecondBufferInTable(const SdPageKey *pk) { - SdPageValue *pv; - bool found; + SdPageValue *pv; + bool found; - pv = (SdPageValue *) - hash_search(SecondBufferHash, pk, HASH_ENTER_NULL, &found); + pv = (SdPageValue *) + hash_search(SecondBufferHash, pk, HASH_ENTER_NULL, &found); - return pv; + return pv; } - -static bool +static bool CleanUpSecondBuffer(const SdPageKey *pk) { @@ -349,10 +339,10 @@ CleanUpSecondBuffer(const SdPageKey *pk) // partitionLock = SecondBufferMappingPartitionLock(newHash); // LWLockAcquire(partitionLock, LW_EXCLUSIVE); bool found; - hash_search(SecondBufferHash, - (void *)pk, - HASH_REMOVE, - &found); + hash_search(SecondBufferHash, + (void *)pk, + HASH_REMOVE, + &found); // LWLockRelease(partitionLock); return found; } @@ -360,34 +350,33 @@ CleanUpSecondBuffer(const SdPageKey *pk) static SdPageValue * FindSecondBufferInTable(const SdPageKey *pk) { - SdPageValue *pv; - bool found; + SdPageValue *pv; + bool found; if (SecondBufferHash == NULL) { return NULL; } - pv = (SdPageValue *) - hash_search(SecondBufferHash, - pk, - HASH_FIND, - &found); + pv = (SdPageValue *) + hash_search(SecondBufferHash, + pk, + HASH_FIND, + &found); if (!found) { return NULL; } - return pv; + return pv; } // sb -> ssb -void -ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer) +void ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer) { SdPageKey *sk; sk = (SdPageKey *)malloc(sizeof(SdPageKey)); - convertKey(sk,pk); + convertKey(sk, pk); - LWLock *partitionLock; - uint32 newHash; + LWLock *partitionLock; + uint32 newHash; newHash = SecondBufferHashCode(sk); partitionLock = SecondBufferMappingPartitionLock(newHash); SdPageValue *sdPageValue = NULL; @@ -401,14 +390,14 @@ ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer) continue; } sdPageValue->canDelete = false; - memcpy(sdPageValue->pagecontent, buffer,8192); + memcpy(sdPageValue->pagecontent, buffer, 8192); LWLockRelease(partitionLock); } srand((int)clock()); - int index = rand()%SDNUM; + int index = rand() % SDNUM; SingleKeyArray *sa; - + for (;;) { sa = &MultiKeyArrays[index]; @@ -416,10 +405,10 @@ ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer) if (sa->unused > 0) { sa->SdPageKeyList[sa->tail] = *sk; - sa->tail = (sa->tail + 1)%SDLEN; - sa->unused --; + sa->tail = (sa->tail + 1) % SDLEN; + sa->unused--; SpinLockAcquire(&statisticnum->change); - statisticnum->totalunused --; + statisticnum->totalunused--; SpinLockRelease(&statisticnum->change); SpinLockRelease(&sa->oplock); break; @@ -439,10 +428,10 @@ GetPageFromSecondBuffer(PageKey *pk, uint8_t *buffer) { SdPageKey *sk = NULL; sk = (SdPageKey *)malloc(sizeof(SdPageKey)); - convertKey(sk,pk); + convertKey(sk, pk); - LWLock *partitionLock; - uint32 newHash; + LWLock *partitionLock; + uint32 newHash; newHash = SecondBufferHashCode(sk); partitionLock = SecondBufferMappingPartitionLock(newHash); LWLockAcquire(partitionLock, LW_SHARED); @@ -453,7 +442,6 @@ GetPageFromSecondBuffer(PageKey *pk, uint8_t *buffer) if (sk != NULL) { free(sk); - } LWLockRelease(partitionLock); return NULL; @@ -461,38 +449,38 @@ GetPageFromSecondBuffer(PageKey *pk, uint8_t *buffer) else { free(sk); - memcpy(buffer,sv->pagecontent,BLCKSZ); + memcpy(buffer, sv->pagecontent, BLCKSZ); LWLockRelease(partitionLock); return buffer; } } -//lc -> sb +// lc -> sb static uint8_t * -GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer) +GetPageFromLocalBuffer(PageKey *pk, uint8_t *buffer) { int i = 0; if (pageEnv == NULL) { return NULL; } - + MDB_txn *tmptxn; LdPageKey *lk; lk = (LdPageKey *)malloc(sizeof(LdPageKey)); - convertKeyLd(lk,pk); - MDB_val key,data; + convertKeyLd(lk, pk); + MDB_val key, data; data.mv_data = NULL; data.mv_size = 0; key.mv_size = sizeof(LdPageKey); key.mv_data = lk; mdb_txn_begin(pageEnv, NULL, 0, &tmptxn); - mdb_get(tmptxn,pageDbi,&key,&data); + mdb_get(tmptxn, pageDbi, &key, &data); if (data.mv_data != NULL) { mdb_txn_abort(tmptxn); free(lk); - memcpy(buffer,data.mv_data,data.mv_size); + memcpy(buffer, data.mv_data, data.mv_size); return data.mv_data; } else @@ -503,56 +491,57 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer) } } -uint64_t -SwapLsnFromLittleToBig(uint64_t lsn) { - #ifndef WORDS_BIGENDIAN - /* trans lsn from little endian to big endian in memory - * eg: 0x12345678 ===> 0x78563412 - */ +uint64_t +SwapLsnFromLittleToBig(uint64_t lsn) +{ +#ifndef WORDS_BIGENDIAN + /* trans lsn from little endian to big endian in memory + * eg: 0x12345678 ===> 0x78563412 + */ - uint32 low, high; - low = (uint32) (lsn); - high = (uint32) ((lsn) >> 32); + uint32 low, high; + low = (uint32)(lsn); + high = (uint32)((lsn) >> 32); - low = (low << 16) | (low >> 16); - low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF); + low = (low << 16) | (low >> 16); + low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF); - high = (high << 16) | (high >> 16); - high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF); - return ((uint64)(low)) << 32 | (uint64)(high); - #endif + high = (high << 16) | (high >> 16); + high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF); + return ((uint64)(low)) << 32 | (uint64)(high); +#endif return lsn; } -uint64_t -SwapLsnFromBigToLittle(uint64_t lsn) { - #ifndef WORDS_BIGENDIAN - /* trans lsn from big endian to little endian in memory - * eg: 0x78563412 ===> 0x12345678 - */ +uint64_t +SwapLsnFromBigToLittle(uint64_t lsn) +{ +#ifndef WORDS_BIGENDIAN + /* trans lsn from big endian to little endian in memory + * eg: 0x78563412 ===> 0x12345678 + */ - uint32 low, high; - low = (uint32) (lsn); - high = (uint32) ((lsn) >> 32); + uint32 low, high; + low = (uint32)(lsn); + high = (uint32)((lsn) >> 32); - low = (low << 16) | (low >> 16); - low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF); + low = (low << 16) | (low >> 16); + low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF); - high = (high << 16) | (high >> 16); - high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF); - return ((uint64)(low)) << 32 | (uint64)(high); - #endif + high = (high << 16) | (high >> 16); + high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF); + return ((uint64)(low)) << 32 | (uint64)(high); +#endif return lsn; } -Bufrd -GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) +Bufrd GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) { MDB_txn *tmptxn; MDB_cursor *tmpcursor; Bufrd bufrd; - MDB_val key,data; + MDB_val key, data; int tb = -1, co = -1, cg = -1; data.mv_size = 0; @@ -561,7 +550,7 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) key.mv_size = SizeOfCleanWal; key.mv_data = wpk; int waldatalen = 0, roomlen = 2048; - uint32 dbid,relid,forkno,blkno; + uint32 dbid, relid, forkno, blkno; dbid = wpk->sk.dbid; relid = wpk->sk.relid; forkno = wpk->sk.forkno; @@ -575,14 +564,14 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) tb = mdb_txn_begin(walEnv, NULL, 0, &tmptxn); if (tb != 0) { - //TODO - ereport(PANIC,errmsg("mdb_txn_begin failed,error is:%d",tb)); + // TODO + ereport(PANIC, errmsg("mdb_txn_begin failed,error is:%d", tb)); } - co = mdb_cursor_open(tmptxn, walDbi,&tmpcursor); + co = mdb_cursor_open(tmptxn, walDbi, &tmpcursor); if (co != 0) { - //TODO - ereport(PANIC,errmsg("mdb_txn_open failed,error is:%d",co)); + // TODO + ereport(PANIC, errmsg("mdb_txn_open failed,error is:%d", co)); } // ereport(LOG,errmsg("535 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); @@ -597,18 +586,13 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) } wpk = (WalLdPageKey *)key.mv_data; - // ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); - while (wpk->sk.dbid == dbid - && wpk->sk.relid == relid - && wpk->sk.forkno == forkno - && wpk->sk.blkno == blkno - && SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn) + while (wpk->sk.dbid == dbid && wpk->sk.relid == relid && wpk->sk.forkno == forkno && wpk->sk.blkno == blkno && SwapLsnFromBigToLittle(wpk->pageLsn) < replyLsn) { - memcpy(waldata + waldatalen,data.mv_data,data.mv_size); + memcpy(waldata + waldatalen, data.mv_data, data.mv_size); waldatalen += data.mv_size; - if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT)) + if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT)) { bufrd.buf = waldata; bufrd.cap = roomlen; @@ -622,25 +606,23 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) if (waldatalen + data.mv_size > roomlen) { roomlen += 1024; - waldata = (uint8_t *)realloc(waldata,roomlen); + waldata = (uint8_t *)realloc(waldata, roomlen); } wpk = (WalLdPageKey *)key.mv_data; // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); } - } bufrd.buf = waldata; bufrd.count = waldatalen; bufrd.cap = roomlen; - + mdb_cursor_close(tmpcursor); mdb_txn_abort(tmptxn); tmpcursor = NULL; tmptxn = NULL; return bufrd; - } void AddOneItemToDPArray(OriginDPageKey odpk) @@ -650,10 +632,10 @@ void AddOneItemToDPArray(OriginDPageKey odpk) // dpk.pk = odpk.pk; // dpk.operation = odpk.opration; // dpk.pagedeleted = false; - + // while(1) // { - // SpinLockAcquire(&DPArray->append); + // SpinLockAcquire(&DPArray->append); // if (DPArray->unused > 0) // { // DPArray->dpk[DPArray->tail] = dpk; @@ -664,19 +646,17 @@ void AddOneItemToDPArray(OriginDPageKey odpk) // } // SpinLockRelease(&DPArray->append); // pg_usleep(1); - // } // ereport(LOG, (errmsg("AddOneItemToDPArray done"))); } -void -storeWalInLocalBuffer(kvStruct *ks,int32 length) +void storeWalInLocalBuffer(kvStruct *ks, int32 length) { // pthread_mutex_lock(&q_lock); - int tb = -1,co = -1,cp =-1,cc = -1,tc = -1; + int tb = -1, co = -1, cp = -1, cc = -1, tc = -1; MDB_txn *tmptxn = NULL; - MDB_val key,data; + MDB_val key, data; MDB_cursor *tmpcursor = NULL; uint8_t *xlogContent = NULL; @@ -687,26 +667,23 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) tb = mdb_txn_begin(walEnv, NULL, 0, &tmptxn); if (tb != 0) { - //TODO - ereport(LOG,errmsg("put mdb_txn_begin failed,error is:%d",tb)); + // TODO + ereport(LOG, errmsg("put mdb_txn_begin failed,error is:%d", tb)); } - co = mdb_cursor_open(tmptxn,walDbi,&tmpcursor); + co = mdb_cursor_open(tmptxn, walDbi, &tmpcursor); if (co != 0) { - //TODO - ereport(LOG,errmsg("put mdb_txn_open failed,error is:%d",co)); + // TODO + ereport(LOG, errmsg("put mdb_txn_open failed,error is:%d", co)); } - for(int i = 0; i < length; i ++) + for (int i = 0; i < length; i++) { part = 0; uint8_t *buf = ks[i].buf; - totallen = (uint32_t)buf[0] - | (uint32_t)(buf[1] << 8) - | (uint32_t)(buf[2] << 16) - | (uint32_t)(buf[3] << 24); - + totallen = (uint32_t)buf[0] | (uint32_t)(buf[1] << 8) | (uint32_t)(buf[2] << 16) | (uint32_t)(buf[3] << 24); + key.mv_size = SizeOfCleanWal; wlpk.sk = ks[i].lpk.sk; @@ -721,21 +698,21 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) xlogContent = (uint8_t *)malloc(511); wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn); wlpk.partition = part; - - memcpy(xlogContent,buf + (part * 511), 511); //502 = 511 - 9 - part ++; + + memcpy(xlogContent, buf + (part * 511), 511); // 502 = 511 - 9 + part++; totallen -= 511; key.mv_data = &wlpk; data.mv_data = xlogContent; - cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); + cp = mdb_cursor_put(tmpcursor, &key, &data, MDB_NODUPDATA); if (cp != 0) { - ereport(LOG,errmsg("mdb_txn_put big wal failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", - cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition)); + ereport(LOG, errmsg("mdb_txn_put big wal failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", + cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition)); if (cp == MDB_KEYEXIST) { break; - } + } } free(xlogContent); xlogContent = NULL; @@ -748,13 +725,13 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn); wlpk.partition = part; key.mv_data = &wlpk; - memcpy(xlogContent,buf + (part * 511),totallen); + memcpy(xlogContent, buf + (part * 511), totallen); data.mv_data = xlogContent; - cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); + cp = mdb_cursor_put(tmpcursor, &key, &data, MDB_NODUPDATA); if (cp != 0) { - ereport(LOG,errmsg("mdb_txn_put last of big wal failed,error is:%d, rel %d, forkno %d, blk %d", - cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); + ereport(LOG, errmsg("mdb_txn_put last of big wal failed,error is:%d, rel %d, forkno %d, blk %d", + cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); if (cp == MDB_KEYEXIST) { break; @@ -763,7 +740,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) free(xlogContent); xlogContent = NULL; break; - } + } } } else @@ -774,22 +751,22 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) wlpk.pageLsn = SwapLsnFromLittleToBig(ks[i].lsn); wlpk.partition = 0; key.mv_data = &wlpk; - memcpy(xlogContent,buf,totallen); + memcpy(xlogContent, buf, totallen); data.mv_data = xlogContent; - cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); + cp = mdb_cursor_put(tmpcursor, &key, &data, MDB_NODUPDATA); if (cp != 0) { - ereport(LOG,errmsg("mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", - cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, - SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition)); + ereport(LOG, errmsg("mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", + cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, + SwapLsnFromBigToLittle(wlpk.pageLsn), wlpk.partition)); if (cp == MDB_KEYEXIST) - { - cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET); - if (cp != 0) - ereport(LOG,errmsg(" mdb_txn_get failed when put exist,error is:%d, rel %d, forkno %d, blk %d", - cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); - continue; - } + { + cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET); + if (cp != 0) + ereport(LOG, errmsg(" mdb_txn_get failed when put exist,error is:%d, rel %d, forkno %d, blk %d", + cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); + continue; + } } free(xlogContent); xlogContent = NULL; @@ -799,32 +776,28 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) co = mdb_txn_commit(tmptxn); if (co != 0) - ereport(LOG,errmsg("put mdb_txn_commit failed,error is:%d",co)); + ereport(LOG, errmsg("put mdb_txn_commit failed,error is:%d", co)); tmpcursor = NULL; tmptxn = NULL; // pthread_mutex_unlock(&q_lock); - - } -void -GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd) +void GetPageFromCurrentNode(PageKey pk, Bufrd *bufrd) { uint8_t *page; - page = NULL; + page = NULL; if (bufrd->buf == NULL) { bufrd->buf = (uint8_t *)malloc(BLKSZ); } - page = GetPageFromSecondBuffer(&pk,bufrd->buf); + page = GetPageFromSecondBuffer(&pk, bufrd->buf); if (page == NULL) { - page = GetPageFromLocalBuffer(&pk, bufrd->buf); + page = GetPageFromLocalBuffer(&pk, bufrd->buf); } - - if ( page == NULL) + if (page == NULL) { bufrd->buf = NULL; bufrd->cap = 0; @@ -849,10 +822,18 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd) Bufrd waldata = GetWalFromLocalBuffer(&wlpk, pk.replyLsn); if (waldata.count > 0) { - bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count); - memcpy(bufrd->buf + 8192, waldata.buf,waldata.count); + bufrd->buf = (uint8_t *)realloc(bufrd->buf, 8192 + waldata.count); + memcpy(bufrd->buf + 8192, waldata.buf, waldata.count); wlpk.pageLsn = SwapLsnFromLittleToBig(pk.replyLsn); SendInvalWal(&wlpk); + + wlpk.sk.dbid = 0; + wlpk.sk.relid = 0; + wlpk.sk.forkno = 32; + wlpk.sk.blkno = 0; + wlpk.pageLsn = 0; + wlpk.partition = 0; + SendInvalWal(&wlpk); } bufrd->cap = bufrd->count = 8192 + waldata.count; free(waldata.buf); @@ -871,7 +852,7 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd) // elog(WARNING, "request socket failed"); // return; // } - + // if (connect(sock_fd, (struct sockaddr *)&un, sizeof(un)) < 0) // { // elog(WARNING, "connect socket failed"); @@ -882,8 +863,9 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd) // return; // } -void SendInvalWal(WalLdPageKey *walkey) { - struct sockaddr_un un; +void SendInvalWal(WalLdPageKey *walkey) +{ + struct sockaddr_un un; if (SocFd.walSocketFd < 0) { un.sun_family = AF_UNIX; @@ -903,12 +885,13 @@ void SendInvalWal(WalLdPageKey *walkey) { } send(SocFd.walSocketFd, walkey, SizeOfCleanWal, 0); -// close(sock_fd); + // close(sock_fd); return; } -void SendInvalPage(LdPageKey *ldKey) { - struct sockaddr_un un; +void SendInvalPage(LdPageKey *ldKey) +{ + struct sockaddr_un un; if (SocFd.pageSocketFd < 0) { un.sun_family = AF_UNIX; @@ -928,59 +911,56 @@ void SendInvalPage(LdPageKey *ldKey) { } send(SocFd.pageSocketFd, ldKey, SizeOfCleanPage, 0); - return; + return; } void *doCleanWalInLmdb(void *fd) { int syncFlag; int new_fd = *(int *)fd; static char data_buf[SizeOfCleanWal]; - while(1) + while (1) { memset(data_buf, 0, SizeOfCleanWal); - recv(new_fd, data_buf, SizeOfCleanWal,0); + recv(new_fd, data_buf, SizeOfCleanWal, 0); WalLdPageKey *wpk = (WalLdPageKey *)data_buf; - - if (0 == wpk->partition && 0 == wpk->sk.blkno && - 0 == wpk->sk.dbid && 32 == wpk->sk.forkno && - 0 == wpk->sk.relid) + if (0 == wpk->partition && 0 == wpk->pageLsn&& + 0 == wpk->sk.blkno && 0 == wpk->sk.dbid && + 32 == wpk->sk.forkno && 0 == wpk->sk.relid) { - // close(new_fd); - // break; - syncFlag = mdb_env_sync(walEnv,1); + syncFlag = mdb_env_sync(walEnv, 1); if (syncFlag != 0) { - elog(FATAL, "mdb_env_sync failed when clean wal, err %d", syncFlag); - // return; - }else{ - elog(LOG, "mdb_env_sync success when clean wal, err %d", syncFlag); + printf("wal mdb_env_sync is failed, errcode is :%d\n",syncFlag); } - }else if(0 == wpk->partition && 0 == wpk->sk.blkno && - 0 == wpk->sk.dbid && 0 == wpk->sk.forkno && - 0 == wpk->sk.relid){ - close(new_fd); - break; - - }else{ + } + else if (0 == wpk->partition && 0 == wpk->pageLsn && + 0 == wpk->sk.blkno && 0 == wpk->sk.dbid && + 0 == wpk->sk.forkno && 0 == wpk->sk.relid) + { + close(new_fd); + break; + } + else + { if (wpk->partition == 0) { CleanWalsByPage(wpk); } - else + else { CleanWalsByTable(wpk); } } - } - mdb_env_sync(walEnv,1); + mdb_env_sync(walEnv, 1); } -void *CleanWalsInLmdb(void *arg) { - int fd = -1, new_fd; - struct sockaddr_un un; +void *CleanWalsInLmdb(void *arg) +{ + int fd = -1, new_fd; + struct sockaddr_un un; static char data_buf[SizeOfCleanWal]; if (fd < 0) { @@ -998,13 +978,14 @@ void *CleanWalsInLmdb(void *arg) { { elog(PANIC, "bind cleanwal socket failed"); } - if (listen(fd, MaxBackends+8) < 0) + if (listen(fd, MaxBackends + 8) < 0) { elog(PANIC, "listen cleanwal socket failed"); } - while(1) { - pthread_t p; + while (1) + { + pthread_t p; new_fd = accept(fd, NULL, NULL); if (new_fd < 0) { @@ -1013,7 +994,7 @@ void *CleanWalsInLmdb(void *arg) { elog(PANIC, "cannot accept client connect request"); } - pthread_create(&p,NULL,doCleanWalInLmdb,&new_fd); + pthread_create(&p, NULL, doCleanWalInLmdb, &new_fd); } } @@ -1022,40 +1003,37 @@ void *doCleanPageInLmdb(void *fd) int new_fd = *(int *)fd; int syncFlag; static char data_buf[SizeOfCleanPage]; - while(1) + while (1) { memset(data_buf, 0, SizeOfCleanPage); - elog(LOG, "before recv"); - recv(new_fd, data_buf, SizeOfCleanPage,0); - elog(LOG, "after recv"); + recv(new_fd, data_buf, SizeOfCleanPage, 0); LdPageKey *lpk = (LdPageKey *)data_buf; - elog(LOG,"dbid is %d, relid is %d,forkno is %d,blkno is %d",lpk->sk.dbid,lpk->sk.relid,lpk->sk.forkno, - lpk->sk.blkno); if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 32 == lpk->sk.forkno && 0 == lpk->sk.relid) { // close(new_fd); // break; - syncFlag = mdb_env_sync(pageEnv,1); + syncFlag = mdb_env_sync(pageEnv, 1); if (syncFlag != 0) { - elog(FATAL, "mdb_env_sync failed when clean pages, err %d", syncFlag); - // return; - }else{ - elog(LOG, "mdb_env_sync success when clean pages, err %d", syncFlag); + printf("page mdb_env_sync is failed,errcode is: %d\n",syncFlag); } - }else if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 0 == lpk->sk.forkno && 0 == lpk->sk.relid){ + + } + else if (0 == lpk->sk.blkno && 0 == lpk->sk.dbid && 0 == lpk->sk.forkno && 0 == lpk->sk.relid) + { close(new_fd); break; - }else{ + } + else + { CleanPagesByTable(lpk); } - } - } -void *CleanPagesInLmdb(void *arg) { - int fd = -1, new_fd; - struct sockaddr_un un; +void *CleanPagesInLmdb(void *arg) +{ + int fd = -1, new_fd; + struct sockaddr_un un; static char data_buf[SizeOfCleanPage]; if (fd < 0) { @@ -1073,14 +1051,15 @@ void *CleanPagesInLmdb(void *arg) { { elog(PANIC, "bind cleanwal socket failed"); } - if (listen(fd, MaxBackends+8) < 0) + if (listen(fd, MaxBackends + 8) < 0) { elog(PANIC, "listen cleanwal socket failed"); } - while(1) { + while (1) + { pthread_t p; - new_fd = accept(fd, NULL, NULL); + new_fd = accept(fd, NULL, NULL); if (new_fd < 0) { close(fd); @@ -1088,14 +1067,14 @@ void *CleanPagesInLmdb(void *arg) { elog(PANIC, "cannot accept client connect request"); } - pthread_create(&p,NULL,doCleanPageInLmdb,&new_fd); + pthread_create(&p, NULL, doCleanPageInLmdb, &new_fd); } } static int -IsDirExist(const char* path) +IsDirExist(const char *path) { - return !access(path,F_OK); + return !access(path, F_OK); } static void @@ -1105,7 +1084,7 @@ CleanWalsByPage(WalLdPageKey *walkey) MDB_cursor *tmpcursor; MDB_val key, data; int success = -1; - uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn); + uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn); walkey->pageLsn = 0; key.mv_size = SizeOfCleanWal; @@ -1114,7 +1093,7 @@ CleanWalsByPage(WalLdPageKey *walkey) data.mv_size = 0; data.mv_data = NULL; - uint32 dbid,relid,forkno,blkno; + uint32 dbid, relid, forkno, blkno; dbid = walkey->sk.dbid; relid = walkey->sk.relid; forkno = walkey->sk.forkno; @@ -1126,49 +1105,45 @@ CleanWalsByPage(WalLdPageKey *walkey) elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success); return; } - - success = mdb_cursor_open(tmptxn, walDbi,&tmpcursor); + + success = mdb_cursor_open(tmptxn, walDbi, &tmpcursor); if (success != 0) { elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success); return; } - success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE); + success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE); if (success != 0) { ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld", - success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn))); + success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn))); mdb_cursor_close(tmpcursor); mdb_txn_abort(tmptxn); return; } - + walkey = (WalLdPageKey *)key.mv_data; // elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d", // walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition); - while (walkey->sk.dbid == dbid - && walkey->sk.relid == relid - && walkey->sk.forkno == forkno - && walkey->sk.blkno == blkno - && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn) + while (walkey->sk.dbid == dbid && walkey->sk.relid == relid && walkey->sk.forkno == forkno && walkey->sk.blkno == blkno && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn) { // elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d", // walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition); - success = mdb_cursor_del(tmpcursor,0); + success = mdb_cursor_del(tmpcursor, 0); if (success != 0) elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld", - success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn)); - if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT)) + success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn)); + if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT)) { - break; + break; } - + walkey = (WalLdPageKey *)key.mv_data; - // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", - // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); + // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); } mdb_cursor_close(tmpcursor); @@ -1185,7 +1160,7 @@ CleanWalsByTable(WalLdPageKey *walkey) MDB_cursor *tmpcursor; MDB_val key, data; int success = -1; - uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn); + uint64 replayLsn = SwapLsnFromBigToLittle(walkey->pageLsn); walkey->pageLsn = 0; walkey->partition = 0; @@ -1195,7 +1170,7 @@ CleanWalsByTable(WalLdPageKey *walkey) data.mv_size = 0; data.mv_data = NULL; - uint32 dbid,relid,forkno,blkno; + uint32 dbid, relid, forkno, blkno; dbid = walkey->sk.dbid; relid = walkey->sk.relid; forkno = walkey->sk.forkno; @@ -1207,49 +1182,47 @@ CleanWalsByTable(WalLdPageKey *walkey) elog(LOG, "mdb_txn_begin failed when clean wals, err %d", success); return; } - - success = mdb_cursor_open(tmptxn, walDbi,&tmpcursor); + + success = mdb_cursor_open(tmptxn, walDbi, &tmpcursor); if (success != 0) { elog(LOG, "mdb_cursor_open failed when clean wals, err %d", success); return; } - success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE); + success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE); if (success != 0) { ereport(LOG, errmsg("mdb_cursor_get failed when clean wals, err %d, rel %d, fork %d, blk %d, lsn %ld", - success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn))); + success, relid, forkno, blkno, SwapLsnFromBigToLittle(walkey->pageLsn))); mdb_cursor_close(tmpcursor); mdb_txn_abort(tmptxn); return; } - + walkey = (WalLdPageKey *)key.mv_data; // elog(LOG, "get wal rel %d, fork %d, blk %d, lsn %ld, part %d", // walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition); - while (walkey->sk.dbid == dbid - && walkey->sk.relid == relid - && walkey->sk.forkno == forkno - // && walkey->sk.blkno == blkno - && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn) + while (walkey->sk.dbid == dbid && walkey->sk.relid == relid && walkey->sk.forkno == forkno + // && walkey->sk.blkno == blkno + && SwapLsnFromBigToLittle(walkey->pageLsn) < replayLsn) { // elog(LOG, "del wal rel %d, fork %d, blk %d, lsn %ld, part %d", // walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn), walkey->partition); - success = mdb_cursor_del(tmpcursor,0); + success = mdb_cursor_del(tmpcursor, 0); if (success != 0) elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d, lsn %ld", - success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn)); - if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT)) + success, walkey->sk.relid, walkey->sk.forkno, walkey->sk.blkno, SwapLsnFromBigToLittle(walkey->pageLsn)); + if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT)) { - break; + break; } - + walkey = (WalLdPageKey *)key.mv_data; - // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", - // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); + // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); } mdb_cursor_close(tmpcursor); @@ -1273,57 +1246,55 @@ CleanPagesByTable(LdPageKey *ldKey) data.mv_size = 0; data.mv_data = NULL; - uint32 dbid,relid,forkno,blkno; + uint32 dbid, relid, forkno, blkno; dbid = ldKey->sk.dbid; relid = ldKey->sk.relid; forkno = ldKey->sk.forkno; blkno = ldKey->sk.blkno; success = mdb_txn_begin(pageEnv, NULL, 0, &tmptxn); -// mdb_txn_begin(pageEnv, NULL, 0, &tmptxn); + // mdb_txn_begin(pageEnv, NULL, 0, &tmptxn); if (success != 0) { elog(LOG, "mdb_txn_begin failed when clean pages, err %d", success); return; } - success = mdb_cursor_open(tmptxn, pageDbi,&tmpcursor); + success = mdb_cursor_open(tmptxn, pageDbi, &tmpcursor); if (success != 0) { elog(LOG, "mdb_cursor_open failed when clean pages, err %d", success); return; } - success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET_RANGE); + success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE); if (success != 0) { elog(LOG, "mdb_cursor_get failed when clean pages, err %d, rel %d, fork %d, blk %d", - success, relid, forkno, blkno); + success, relid, forkno, blkno); mdb_cursor_close(tmpcursor); mdb_txn_abort(tmptxn); return; } - + ldKey = (LdPageKey *)key.mv_data; // elog(LOG, "get page db %d, rel %d, fork %d, blk %d", - // ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno); + // ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno); - while (ldKey->sk.dbid == dbid - && ldKey->sk.relid == relid - && ldKey->sk.forkno == forkno) + while (ldKey->sk.dbid == dbid && ldKey->sk.relid == relid && ldKey->sk.forkno == forkno) { - elog(LOG, "del page dbid %d, rel %d, fork %d, blk %d", - ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno); - success = mdb_cursor_del(tmpcursor,0); + elog(LOG, "del page dbid %d, rel %d, fork %d, blk %d", + ldKey->sk.dbid, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno); + success = mdb_cursor_del(tmpcursor, 0); if (success != 0) elog(WARNING, "del wal failed: err %d, rel %d, fork %d, blk %d", - success, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno); - if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT)) + success, ldKey->sk.relid, ldKey->sk.forkno, ldKey->sk.blkno); + if (0 != mdb_cursor_get(tmpcursor, &key, &data, MDB_NEXT)) { - break; + break; } - + ldKey = (LdPageKey *)key.mv_data; - // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", - // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); + // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsnFromBigToLittle(wpk->pageLsn), wpk->partition)); } mdb_cursor_close(tmpcursor); @@ -1333,14 +1304,13 @@ CleanPagesByTable(LdPageKey *ldKey) return; } - -static void * +static void * RemovePageOrWalFromCurrentNode() { -// ereport(INFO, (errmsg("RemovePageOrWalFromCurrentNode in processing"))); + // ereport(INFO, (errmsg("RemovePageOrWalFromCurrentNode in processing"))); MDB_txn *tmptxn; MDB_cursor *tmpcursor; - MDB_val key,data; + MDB_val key, data; LdPageKey *lpk = NULL; PageKey *pk = NULL; SdPageValue *spv = NULL; @@ -1349,10 +1319,9 @@ RemovePageOrWalFromCurrentNode() lpk = (LdPageKey *)malloc(sizeof(LdPageKey)); - LWLock *partitionLock = NULL; - uint32 newHash; + LWLock *partitionLock = NULL; + uint32 newHash; - for (;;) { if (PageOrWal == (int)PAGE) @@ -1374,35 +1343,34 @@ RemovePageOrWalFromCurrentNode() continue; } pk = &DPArray->dpk[DPArray->walIndex].pk; - } - convertKeyLd(lpk,pk); + convertKeyLd(lpk, pk); key.mv_size = sizeof(LdPageKey); key.mv_data = lpk; if (PageOrWal == 1) { mdb_txn_begin(pageEnv, NULL, 0, &tmptxn); - mdb_cursor_open(tmptxn, pageDbi,&tmpcursor); + mdb_cursor_open(tmptxn, pageDbi, &tmpcursor); } else { mdb_txn_begin(walEnv, NULL, 0, &tmptxn); - mdb_cursor_open(tmptxn, walDbi,&tmpcursor); + mdb_cursor_open(tmptxn, walDbi, &tmpcursor); } - success = mdb_cursor_get(tmpcursor,&key,&data,MDB_SET); + success = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET); if (success == 0) { - mdb_cursor_del(tmpcursor,0); + mdb_cursor_del(tmpcursor, 0); } else { - //DROP - mdb_cursor_get(tmpcursor, &key,&data,MDB_PREV); - mdb_cursor_del(tmpcursor,0); + // DROP + mdb_cursor_get(tmpcursor, &key, &data, MDB_PREV); + mdb_cursor_del(tmpcursor, 0); if (PageOrWal == (int)PAGE) { // CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk); @@ -1416,7 +1384,7 @@ RemovePageOrWalFromCurrentNode() if (PageOrWal == (int)PAGE && success == 0) { - //TRUNCATE + // TRUNCATE if (DPArray->dpk[DPArray->walIndex].operation == (int)TRUNCATE) { // CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk); @@ -1426,9 +1394,9 @@ RemovePageOrWalFromCurrentNode() CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk); LWLockRelease(partitionLock); } - else if (NULL == FindSecondBufferInTable(&lpk->sk)) //EVICT + else if (NULL == FindSecondBufferInTable(&lpk->sk)) // EVICT { - mdb_put(tmptxn,pageDbi,&key,&data,MDB_NODUPDATA); + mdb_put(tmptxn, pageDbi, &key, &data, MDB_NODUPDATA); } } @@ -1443,33 +1411,30 @@ RemovePageOrWalFromCurrentNode() { DPArray->head = (DPArray->head + 1) % 1024; SpinLockAcquire(&DPArray->append); - DPArray->unused ++; + DPArray->unused++; SpinLockRelease(&DPArray->append); } } else { - DPArray->head ++; + DPArray->head++; DPArray->walIndex = (DPArray->walIndex + 1) % 1024; DPArray->head = (DPArray->head + 1) % 1024; SpinLockAcquire(&DPArray->append); - DPArray->unused ++; + DPArray->unused++; SpinLockRelease(&DPArray->append); - } } - } -static void +static void MovePageFromSecondBufferToLocalBuffer() { printf("MovePageFromSecondBufferToLocalBuffer\n"); MDB_txn *tmptxn; MDB_txn *txn = NULL; MDB_dbi dbi; - MDB_val key,data; - + MDB_val key, data; SingleKeyArray *ska = NULL; int localHead = 0; @@ -1486,16 +1451,15 @@ MovePageFromSecondBufferToLocalBuffer() spkl.head = NULL; spkl.tail = NULL; - SdPageValue *spv = NULL; - int tb = -1,mp = -1, mtc=-1,mdo = -1; + int tb = -1, mp = -1, mtc = -1, mdo = -1; long sleeptime = 1000L; bool success; - LWLock *partitionLock = NULL; - uint32 newHash; - bool canShutDown = false; - pid_t ckp_pid; + LWLock *partitionLock = NULL; + uint32 newHash; + bool canShutDown = false; + pid_t ckp_pid; for (;;) { @@ -1519,13 +1483,13 @@ MovePageFromSecondBufferToLocalBuffer() } else ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("could not check the existence of the backend with PID %d: %m", + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not check the existence of the backend with PID %d: %m", ckp_pid))); } } - for (i = 0; i < SDNUM; i ++) + for (i = 0; i < SDNUM; i++) { ska = &MultiKeyArrays[i]; SpinLockAcquire(&ska->oplock); @@ -1533,25 +1497,25 @@ MovePageFromSecondBufferToLocalBuffer() localTail = ska->tail; localUnused = ska->unused; SpinLockRelease(&ska->oplock); - - if (localUnused == 0) + + if (localUnused == 0) // { - processNum = SDLEN; + processNum = SDLEN; } - else if(localUnused == SDLEN) + else if (localUnused == SDLEN) { continue; } else { - processNum = (localTail + SDLEN - localHead)%SDLEN; + processNum = (localTail + SDLEN - localHead) % SDLEN; } success = true; tb = mdb_txn_begin(pageEnv, NULL, 0, &tmptxn); if (tb != 0) { - ereport(LOG,errmsg("mdb_txn_begin failed,error code is %d",tb)); - continue; + ereport(LOG, errmsg("mdb_txn_begin failed,error code is %d", tb)); + continue; } for (j = 0; j < processNum; j++) @@ -1565,13 +1529,12 @@ MovePageFromSecondBufferToLocalBuffer() { spkl.head = spke; spkl.tail = spkl.head; - } - else{ + else + { spkl.tail->next = spke; spkl.tail = spkl.tail->next; } - newHash = SecondBufferHashCode(&spk); partitionLock = SecondBufferMappingPartitionLock(newHash); @@ -1590,16 +1553,16 @@ MovePageFromSecondBufferToLocalBuffer() data.mv_size = 8192; data.mv_data = spv->pagecontent; - mp = mdb_put(tmptxn,pageDbi,&key,&data, 0); + mp = mdb_put(tmptxn, pageDbi, &key, &data, 0); spv->canDelete = true; LWLockRelease(partitionLock); if (mp != 0) { success = false; - ereport(LOG,errmsg("mdb_put failed, mp is %d",mp)); + ereport(LOG, errmsg("mdb_put failed, mp is %d", mp)); break; } - } + } if (!success) { @@ -1611,16 +1574,15 @@ MovePageFromSecondBufferToLocalBuffer() if (mtc != 0) { success = false; - ereport(LOG,errmsg("mdb_txn_commit failed,error is:%d",mtc)); + ereport(LOG, errmsg("mdb_txn_commit failed,error is:%d", mtc)); mdb_txn_abort(tmptxn); - } } SdPageKeyEntity *s = NULL; - SdPageValue *spv = NULL; - while(spkl.head != NULL) - { + SdPageValue *spv = NULL; + while (spkl.head != NULL) + { s = spkl.head; if (success) { @@ -1653,7 +1615,7 @@ MovePageFromSecondBufferToLocalBuffer() { continue; } - + ska->head = localTail; SpinLockAcquire(&ska->oplock); if (ska->unused == 0) @@ -1674,9 +1636,9 @@ MovePageFromSecondBufferToLocalBuffer() SpinLockRelease(&ska->oplock); } - double rate = statisticnum->totalunused/(SDLEN * SDNUM); - if (rate < 0.1) - { + double rate = statisticnum->totalunused / (SDLEN * SDNUM); + if (rate < 0.1) + { sleeptime = 0; } else if (rate > 0.6) @@ -1685,35 +1647,35 @@ MovePageFromSecondBufferToLocalBuffer() } else if (rate < 0.5) { - sleeptime = sleeptime/2; + sleeptime = sleeptime / 2; } pg_usleep(sleeptime); - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - 50L /* convert to ms */ , - WAIT_EVENT_SECONDBUFFER_MAIN); + (void)WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 50L /* convert to ms */, + WAIT_EVENT_SECONDBUFFER_MAIN); } } -void SignalStartSecondBuffer(void) { - SendPostmasterSignal(PMSIGNAL_SECONDBUFFER_WORKER); +void SignalStartSecondBuffer(void) +{ + SendPostmasterSignal(PMSIGNAL_SECONDBUFFER_WORKER); } -void -SecondBufferMain(void) +void SecondBufferMain(void) { MyBackendType = B_SECONDBUFFER; MemoryContext SecondBuffer_context; - SecondBuffer_context = AllocSetContextCreate(TopMemoryContext, - "SecondBuffer", - ALLOCSET_DEFAULT_SIZES); - MemoryContextSwitchTo(SecondBuffer_context); + SecondBuffer_context = AllocSetContextCreate(TopMemoryContext, + "SecondBuffer", + ALLOCSET_DEFAULT_SIZES); + MemoryContextSwitchTo(SecondBuffer_context); init_ps_display(NULL); - SetProcessingMode(InitProcessing); + SetProcessingMode(InitProcessing); pqsignal(SIGHUP, SIG_IGN); pqsignal(SIGINT, SIG_IGN); @@ -1731,14 +1693,10 @@ SecondBufferMain(void) */ PG_SETMASK(&UnBlockSig); - pthread_t ntid; - int err; + pthread_t ntid; + int err; err = pthread_create(&ntid, NULL, CleanPagesInLmdb, NULL); if (err != 0) - elog(PANIC,"pthread_create CleanPagesInLmdb failed %s",strerror(err)); + elog(PANIC, "pthread_create CleanPagesInLmdb failed %s", strerror(err)); MovePageFromSecondBufferToLocalBuffer(); } - - - -