add local buffer

This commit is contained in:
zoujia 2023-04-02 21:50:29 +08:00
parent 9379e3681f
commit 1f777a150a
2 changed files with 233 additions and 52 deletions

View File

@ -3,6 +3,7 @@
#include <signal.h>
#include <unistd.h>
#include <lmdb.h>
#include <pthread.h>
#include <postmaster/seconfbuffer.h>
@ -15,6 +16,7 @@ void
InitSecondBufferMeta(void)
{
bool found;
int i;
MultiKeyArrays = (SingleKeyArray *)
ShmemInitStruct("multi page keys arrays",
sizeof(SingleKeyArray) * 32,
@ -24,8 +26,34 @@ InitSecondBufferMeta(void)
{
MultiKeyArrays = NULL;
}
for(i = 0; i < 32; i ++)
{
SpinLockInit(&MultiKeyArrays[i].oplock);
MultiKeyArrays[i].head = MultiKeyArrays[i].tail = 0;
MultiKeyArrays[i].unused = 1024;
}
}
void
InitDPageKeyArray(void)
{
bool found;
int i;
DPArray = ( *)
ShmemInitStruct("deleted page keys arrays",
sizeof(DPageKeyArray),
&found);
if (!found)
{
DPArray = NULL;
}
SpinLockInit(&DPArray.append);
DPArray.head = DPArray.tail = DPArray.pageIndex = DPArray.walIndex = 0;
DPArray.unused = 1024;
}
/*
init SecondBufferHash
*/
@ -68,6 +96,28 @@ InitSecondBufferHash(void)
}
void
InitPageDBEnv()
{
mdb_env_create(&pageEnv);
mdb_env_set_maxreaders(pageEnv,MAXREADERS);
mdb_env_set_mapsize(pageEnv,MAPSIE);
mdb_env_open(pageEnv,DEFAULTPAGEPATH,MDB_FIXEDMAP,0664);
mdb_txn_begin(pageEnv, NULL, 0, &pageTxn);
mdb_dbi_open(pageTxn,NULL,0,&pageDbi);
mdb_txn_commit(pageTxn);
}
void InitWalDBEnv()
{
mdb_env_create(&walEnv);
mdb_env_set_maxreaders(walEnv,MAXREADERS);
mdb_env_set_mapsize(walEnv,MAPSIE);
mdb_env_open(walEnv,DEFAULTWALPATH,MDB_FIXEDMAP,0664);
mdb_txn_begin(walEnv, NULL, 0, &walTxn);
mdb_dbi_open(walTxn,NULL,0,&walDbi);
mdb_txn_commit(walTxn);
}
static void
convertKey(SdPageKey *sdkey, PageKey *pk)
{
@ -174,10 +224,11 @@ ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
{
sa = MultiKeyArrays[index];
SpinLockAcquire(&sa.oplock);
if (sa.tail != sa.head)
if (sa.unused > 0)
{
sa[tail] = *sk;
sa.head = (sa.head + 1)%1024;
sa.tail = (sa.tail + 1)%1024;
sa.unused --;
SpinLockRelease(&sa.oplock);
break;
}
@ -210,22 +261,30 @@ GetPageFromSecondBuffer(PageKey *pk)
static uint8_t *
GetPageFromLocalBuffer(PageKey *pk)
{
// mdb_env_copy(env, DEFAULTPTH);
if (pageEnv == NULL)
{
mdb_env_copy(pageEnv,DEFAULTPAGEPATH);
mdb_txn_begin(pageEnv, NULL, 0, &pageTxn);
mdb_dbi_open(pageTxn, NULL, 0, &pageDbi);
mdb_txn_commit(pageTxn);
}
MDB_txn *tmptxn;
SdPageKey *sk;
convertKey(sk,pk);
MDB_val key,*data = NULL;
key.mv_size = sizeof(SdPageKey);
key.mv_data = sk;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0, &dbi);
mdb_get(txn,dbi,&key,data);
mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
mdb_get(tmptxn,pageDbi,&key,data);
if (data != NULL && data->mv_data != NULL)
{
mdb_txn_abort(tmptxn);
free(sk);
return data -> mv_data;
}
else
{
mdb_txn_abort(tmptxn);
free(sk);
return NULL;
}
@ -234,6 +293,17 @@ GetPageFromLocalBuffer(PageKey *pk)
static Bufrd
GetWalFromLocalBuffer(PageKey *pk)
{
if (env == NULL)
{
mdb_env_copy(walEnv,DEFAULTWALPATH);
mdb_txn_begin(walEnv, NULL, 0, &walTxn);
mdb_dbi_open(walTxn, NULL, 0, &walDbi);
mdb_txn_commit(walTxn);
}
MDB_txn *tmptxn;
MDB_txn *tmpcursor;
Bufrd bufrd;
MDB_cursor *cursor;
MDB_val key,data;
@ -245,11 +315,11 @@ GetWalFromLocalBuffer(PageKey *pk)
bufrd.count = 0;
uint8_t *waldata = (uint8_t *)malloc(roomlen);
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0, &dbi);
mdb_cursor_open(txn, dbi,&cursor);
if (0 != mdb_cursor_get(txn,&key,&data,MDB_SET))
mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
mdb_cursor_open(tmptxn, walDbi,&tmpcursor);
if (0 != mdb_cursor_get(tmptxn,&key,&data,MDB_SET))
{
return bufrd;
}
@ -263,11 +333,13 @@ GetWalFromLocalBuffer(PageKey *pk)
{
memcpy(waldata,(uint8_t *)data.mv_data,data.mv_size);
waldatalen += data.mv_size;
if (0 != mdb_cursor_get(cursor,&key,&data,MDB_NEXT))
if (0 != mdb_cursor_get(tmpcursor,&key,&data,MDB_NEXT))
{
bufrd.buf = waldata;
bufrd.cap = roomlen;
bufrd.count = waldatalen;
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return bufrd;
}
else
@ -287,10 +359,33 @@ GetWalFromLocalBuffer(PageKey *pk)
bufrd.count = waldatalen;
bufrd.cap = roomlen;
mdb_cursor_close(tmpcursor);
mdb_txn_abort(tmptxn);
return bufrd;
}
void
storeWalInLocalBuffer(void)
{
MDB_txn *tmptxn;
MDB_val key,data;
if (env == NULL)
{
mdb_env_copy(walEnv,DEFAULTWALPATH);
mdb_txn_begin(walEnv, NULL, 0, &walTxn);
mdb_dbi_open(walTxn, NULL, 0, &walDbi);
mdb_txn_commit(walTxn);
}
mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
mdb_put(tmptxn,pageDbi,&key,&data, 0);
mdb_txn_commit(tmptxn);
}
Bufrd
GetPageFromCurrentNode(PageKey *pk)
{
@ -309,39 +404,90 @@ GetPageFromCurrentNode(PageKey *pk)
}
void
RemovePageOrWalFromCurrentNode(PageKey *pk)
static void *
RemovePageOrWalFromCurrentNode(void *PageOrWal)
{
MDB_cursor *cursor;
MDB_txn *tmptxn;
MDB_cursor *tmpcursor;
MDB_val key,data;
LdPageKey *lpk;
convertKeyLd(lpk,pk);
key.mv_size = sizeof(LdPageKey);
key.mv_data = lpk;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0, &dbi);
mdb_cursor_open(txn, dbi,&cursor);
while (0 == mdb_cursor_get(txn,&key,&data,MDB_PREV))
PageKey *pk;
for (;;)
{
lpk = (LdPageKey*)key.mv_data;
if (lpk->sk.dbid == pk->relfileNode.dbNode
&& lpk->sk.relid == pk->relfileNode.relNode
&& lpk->sk.forkno == pk->forkNo
&& lpk->sk.blkno == pk->forkNo)
if (*PageOrWal == PAGE)
{
mdb_cursor_del(cursor,0);
if (DPArray.pageIndex >= DPArray.tail)
{
continue;
}
pk = &DPArray.dpk[DPArray.pageIndex].pk;
}
else
{
if (DPArray.walIndex >= DPArray.pageIndex)
{
continue;
}
if (DPArray.dpk[DPArray.walIndex].pagedeleted = false)
{
continue;
}
pk = &DPArray.dpk[DPArray.walIndex].pk;
}
convertKeyLd(lpk,pk);
key.mv_size = sizeof(LdPageKey);
key.mv_data = lpk;
if (*PageOrWal == PAGE)
{
mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
mdb_cursor_open(tmptxn, pageDbi,&tmpcursor);
}
else
{
mdb_txn_begin(walEnv, NULL, 0, &tmptxn);
mdb_cursor_open(tmptxn, walDbi,&tmpcursor);
}
mdb_cursor_open(tmptxn, dbi,&tmpcursor);
while (0 == mdb_cursor_get(tmptxn,&key,&data,MDB_PREV))
{
lpk = (LdPageKey*)key.mv_data;
if (lpk->sk.dbid == pk->relfileNode.dbNode
&& lpk->sk.relid == pk->relfileNode.relNode
&& lpk->sk.forkno == pk->forkNo
&& lpk->sk.blkno == pk->forkNo)
{
mdb_cursor_del(tmpcursor,0);
}
}
mdb_cursor_close(tmpcursor);
mdb_txn_commit(tmptxn);
if (PageOrWal == PAGE)
{
DPArray.dpk[DPArray.pageIndex].pagedeleted = true;
DPArray.pageIndex = (DPArray.pageIndex + 1) % 1024;
}
else
{
DPArray.head ++;
DPArray.walIndex = (DPArray.walIndex + 1) % 1024;
DPArray.head = (DPArray.head + 1) % 1024;
SpinLockAcquire(DPArray.append);
DPArray.unused ++;
SpinLockRelease(DPArray.append);
}
}
mdb_cursor_close(cursor);
mdb_txn_commit(txn);
}
void *
static void *
MovePageFromSecondBufferToLocalBuffer()
{
MDB_txn *tmptxn;
int i = 0;
SingleKeyArray ska;
int localHead = 0;
@ -352,8 +498,6 @@ MovePageFromSecondBufferToLocalBuffer()
SdPageKey spk;
LdPageKey lpk;
SdPageValue *spv = NULL;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0 &dbi);
for (;;)
{
for (i = 0; i < 32; i ++)
@ -373,20 +517,22 @@ MovePageFromSecondBufferToLocalBuffer()
data.mv_size = sizeof(spv->pagecontent);
data.mv_data = spv->pagecontent;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_put(txn,dbi,key,data, 0);
mdb_txn_commit(txn);
mdb_txn_begin(pageEnv, NULL, 0, &tmptxn);
mdb_put(tmptxn,pageDbi,&key,&data, 0);
mdb_txn_commit(tmptxn);
CleanUpSecondBuffer(&spk);
}
SpinLockAcquire(&ska.oplock);
ska.head = ska.tail;
ska.unused = ska.unused + (ska.tail + 1024 - ska.head) % 1024;
SpinLockRelease(&ska.oplock);
}
}
}
void
SecondMemoryMain(void)
{
@ -401,15 +547,13 @@ SecondMemoryMain(void)
pqsignal(SIGCHLD, SIG_DFL);
mdb_env_create(&env);
mdb_env_set_maxreaders(env,MAXREADERS);
mdb_env_set_mapsize(env,MAPSIE);
mdb_env_open(env,DEFAULTPTH,0664);
pthread_t sdbToLb;
pthread_t cleanupLb;
for (;;)
{}
pthread_create(&sdbToLb,NULL,MovePageFromSecondBufferToLocalBuffer,NULL);
pthread_create(&cleanupLb,NULL,RemovePageOrWalFromCurrentNode,(void *)PAGE);
pthread_join(sdbToLb,NULL);
pthread_join(cleanupLb,NULL);
}

View File

@ -13,7 +13,10 @@
#define MAXREADERS 512;
#define MAPSIE 1024*1024*1024;
#define DEFAULTPTH "/tmp/lmdb";
#define DEFAULTPAGEPATH "/tmp/pagedb";
#define DEFAULTWALPATH "/tmp/waldb";
#define PAGE 1;
#define WAL 2;
/*
for secondbufferhash code
@ -43,20 +46,54 @@ typedef struct SingleKeyArray
SdPageKey SdPageKeyList[1024];
uint16 head;
uint16 tail;
s_lock oplock;
uint16 unused;
slock_t oplock;
} SingleKeyArray;
typedef struct DPageKey
{
PageKey pk;
bool pagedeleted;
} DPageKey;
typedef struct DPageKeyArray
{
DPageKey dpk[1024];
uint16 unused;
uint16 head;
uint16 tail;
uint16 pageIndex;
uint16 walIndex;
slock_t append;
} DPageKeyArray;
SingleKeyArray MultiKeyArrays[32];
DPageKeyArray DPArray;
MDB_env *env;
// MDB_dbi dbi;
// MDB_txn *txn;
MDB_env *pageEnv;
MDB_env *walEnv;
MDB_dbi pageDbi;
MDB_dbi walDbi;
MDB_txn *pageTxn;
MDB_txn *walTxn;
MDB_cursor *cursor;
// MDB_stat mst;
// MDB_cursor *cursor;
// MDB_cursor_op op;
extern void InitSecondBufferMeta(void);
extern void InitSecondBufferHash(void);
extern void InitDPageKeyArray(void);
extern void InitPageDBEnv(void);
extern void InitWalDBEnv(void);
extern void storeWalInLocalBuffer(void);
extern void ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer); //when evict one page out databuffer, we should call this to store the page.
extern Bufrd GetPageFromCurrentNode(PageKey *pk); //async delete old version page and wal. we should call this when move page from ld/sdb to db.