add localbuffer

This commit is contained in:
zoujia 2023-04-02 01:43:49 +08:00
parent 83ba25ac3d
commit 9379e3681f
5 changed files with 482 additions and 144 deletions

View File

@ -2,6 +2,390 @@
#include <signal.h>
#include <unistd.h>
#include <lmdb.h>
#include <postmaster/seconfbuffer.h>
/*
secondbufferhash code
*/
static HTAB *SecondBufferHash;
void
InitSecondBufferMeta(void)
{
bool found;
MultiKeyArrays = (SingleKeyArray *)
ShmemInitStruct("multi page keys arrays",
sizeof(SingleKeyArray) * 32,
&found);
if (!found)
{
MultiKeyArrays = NULL;
}
}
/*
init SecondBufferHash
*/
void
InitSecondBufferHash(void)
{
HASHCTL info;
long init_table_size,
max_table_size;
bool found;
/*
* Compute init/max size to request for lock hashtables. Note these
* calculations must agree with SecondBufferhashShmemSize!
*/
max_table_size = 200;
init_table_size = max_table_size / 2;
info.keysize = sizeof(SdPageKey);
info.entrysize = sizeof(SdPageValue);
info.num_partitions = NUM_LOCK_PARTITIONS;
SecondBufferHash = ShmemInitHash("SecondBuffer hash",
init_table_size,
max_table_size,
&info,
HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
/*
* Allocate wal global structures.
*/
secondBufferGlobalOffset =
ShmemInitStruct("secondbuffer global set",
sizeof(globalOffset), &found);
if (!found){
SpinLockInit(&secondBufferGlobalOffset->mutex);
//initglobaloffset();
}
}
static void
convertKey(SdPageKey *sdkey, PageKey *pk)
{
sdkey = (SdPageKey *)malloc(sizeof(SdPageKey));
sdkey->dbid = pk->relfileNode.dbNode;
sdkey->relid = pk->relfileNode.relNode;
sdkey->forkno = pk->forkNo;
sdkey->blkno = pk->blkNo;
}
static void
convertKeyLd(LdPageKey *ldkey, PageKey *pk)
{
LdPageKey ldkey = (LdPageKey *)malloc(sizeof(LdPageKey));
SdPageKey sdkey;
sdkey->dbid = pk->relfileNode.dbNode;
sdkey->relid = pk->relfileNode.relNode;
sdkey->forkno = pk->forkNo;
sdkey->blkno = pk->blkNo;
ldkey->sk =sdkey;
ldkey->pageLsn = pk->replyLsn;
}
/*
* notification_match: match function to use with notification_hash
*/
static int
secondbuffer_match(const void *key1, const void *key2, Size keysize)
{
const SdPageKey *k1 = (const SdPageKey *)key1;
const SdPageKey *k2 = (const SdPageKey *)key2;
Assert(keysize == sizeof(SdPageKey));
if (k1->dbid == k2->dbid &&
k1->blkno == k2->blkno && k1->forkno == k2->forkno && k1->relid == k2->relid)
return 0; /* equal */
return 1; /* not equal */
}
static uint32
SecondBufferHashCode(const SdPageKey *pk)
{
return get_hash_value(SecondBufferHash, (const void *)pk);
}
static SdPageValue *
SetupSecondBufferInTable(const SdPageKey *pk)
{
SdPageValue *pv;
bool found;
pv = (SdPageValue *)
hash_search(SecondBufferHash, pk, HASH_ENTER, &found);
if (!found)
{
printf("not found \n");
}
return pv;
}
static bool
CleanUpSecondBuffer(const SdPageKey *pk)
{
bool found;
hash_search(SecondBufferHash,
(void *)pk,
HASH_REMOVE,
&found);
return found;
}
static SdPageValue *
FindSecondBufferInTable(const SdPageKey *pk)
{
SdPageValue *pv;
bool found;
pv = (SdPageValue *)
hash_search(SecondBufferHash, pk, HASH_FIND, &found);
return pv;
}
// sb -> ssb
void
ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
{
SdPageKey *sk;
convertKey(sk,pk);
SdPageValue *sdPageValue = SetupSecondBufferInTable(sdKey);
memcpy(sdPageValue->page, buffer,8192);
srand(time(NULL));
int index = rand()%32;
SingleKeyArray sa;
for (;;)
{
sa = MultiKeyArrays[index];
SpinLockAcquire(&sa.oplock);
if (sa.tail != sa.head)
{
sa[tail] = *sk;
sa.head = (sa.head + 1)%1024;
SpinLockRelease(&sa.oplock);
break;
}
else
{
SpinLockRelease(&sa.oplock);
index = (index + 1) % 32;
}
}
}
// ssb -> sb
static uint8_t *
GetPageFromSecondBuffer(PageKey *pk)
{
SdPageKey *sk;
convertKey(sk,pk);
SdPageValue *sv = FindSecondBufferInTable(sk);
if (sv == NULL)
{
return NULL;
}
else
{
return sv.page;
}
}
//lc -> sb
static uint8_t *
GetPageFromLocalBuffer(PageKey *pk)
{
// mdb_env_copy(env, DEFAULTPTH);
SdPageKey *sk;
convertKey(sk,pk);
MDB_val key,*data = NULL;
key.mv_size = sizeof(SdPageKey);
key.mv_data = sk;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0, &dbi);
mdb_get(txn,dbi,&key,data);
if (data != NULL && data->mv_data != NULL)
{
free(sk);
return data -> mv_data;
}
else
{
free(sk);
return NULL;
}
}
static Bufrd
GetWalFromLocalBuffer(PageKey *pk)
{
Bufrd bufrd;
MDB_cursor *cursor;
MDB_val key,data;
LdPageKey lpk;
int waldatalen = 0, roomlen = 2048;
bufrd.buf = NULL;
bufrd.cap = 0;
bufrd.count = 0;
uint8_t *waldata = (uint8_t *)malloc(roomlen);
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0, &dbi);
mdb_cursor_open(txn, dbi,&cursor);
if (0 != mdb_cursor_get(txn,&key,&data,MDB_SET))
{
return bufrd;
}
lpk = (LdPageKey*)key.mv_data;
while (lpk.sk.dbid == pk->relfileNode.dbNode
&& lpk.sk.relid == pk->relfileNode.relNode
&& lpk.sk.forkno == pk->forkNo
&& lpk.sk.blkno == pk->blkNo
&& lpk.pageLsn < pk->replyLsn)
{
memcpy(waldata,(uint8_t *)data.mv_data,data.mv_size);
waldatalen += data.mv_size;
if (0 != mdb_cursor_get(cursor,&key,&data,MDB_NEXT))
{
bufrd.buf = waldata;
bufrd.cap = roomlen;
bufrd.count = waldatalen;
return bufrd;
}
else
{
if (walatalen + data.mv_size > roomlen)
{
roomlen += 1024;
waldata = (uint8_t *)realloc(waldata,roomlen);
}
lpk = (LdPageKey *)key.mv_data;
}
}
bufrd.buf = waldata;
bufrd.count = waldatalen;
bufrd.cap = roomlen;
return bufrd;
}
Bufrd
GetPageFromCurrentNode(PageKey *pk)
{
//TODO pk join to a list to drop all the pagedata.
uint8_t *page = GetPageFromSecondBuffer(pk);
page = page != NULL ? page:GetPageFromLocalBuffer(pk);
Bufrd waldata = GetWalFromLocalBuffer(pk);
Bufrd totaldata;
totaldata.buf = (uint8_t *)malloc(8192 + waldata.count);
memcpy(totaldata.buf,page,8192);
memcpy(totaldata.buf + 8192, waldata.buf,waldata.count);
totaldata.cap = totaldata.count = 8192 + waldata.count;
return totaldata;
}
void
RemovePageOrWalFromCurrentNode(PageKey *pk)
{
MDB_cursor *cursor;
MDB_val key,data;
LdPageKey *lpk;
convertKeyLd(lpk,pk);
key.mv_size = sizeof(LdPageKey);
key.mv_data = lpk;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0, &dbi);
mdb_cursor_open(txn, dbi,&cursor);
while (0 == mdb_cursor_get(txn,&key,&data,MDB_PREV))
{
lpk = (LdPageKey*)key.mv_data;
if (lpk->sk.dbid == pk->relfileNode.dbNode
&& lpk->sk.relid == pk->relfileNode.relNode
&& lpk->sk.forkno == pk->forkNo
&& lpk->sk.blkno == pk->forkNo)
{
mdb_cursor_del(cursor,0);
}
}
mdb_cursor_close(cursor);
mdb_txn_commit(txn);
}
void *
MovePageFromSecondBufferToLocalBuffer()
{
int i = 0;
SingleKeyArray ska;
int localHead = 0;
int localTail = 0;
MDB_txn *txn = NULL;
MDB_dbi dbi;
MDB_val key,data;
SdPageKey spk;
LdPageKey lpk;
SdPageValue *spv = NULL;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_dbi_open(txn, NULL, 0 &dbi);
for (;;)
{
for (i = 0; i < 32; i ++)
{
ska = MultiKeyArrays[i];
localHead = ska.head;
localTail = ska.tail;
for (int j = 0; j < (localTail + 1024 - localHead)%1024; j++)
{
spk = ska.SdPageKeyList[(head + j) % 1024];
spv = FindSecondBufferInTable(spk);
lpk.sk = spk;
lpk.pageLsn = PageGetLSN(spv->pagecontent);
key.mv_size = sizeof(lpk);
key.mv_data = &lpk;
data.mv_size = sizeof(spv->pagecontent);
data.mv_data = spv->pagecontent;
mdb_txn_begin(env, NULL, 0, &txn);
mdb_put(txn,dbi,key,data, 0);
mdb_txn_commit(txn);
CleanUpSecondBuffer(&spk);
}
SpinLockAcquire(&ska.oplock);
ska.head = ska.tail;
SpinLockRelease(&ska.oplock);
}
}
}
void
SecondMemoryMain(void)
@ -17,4 +401,19 @@ SecondMemoryMain(void)
pqsignal(SIGCHLD, SIG_DFL);
}
mdb_env_create(&env);
mdb_env_set_maxreaders(env,MAXREADERS);
mdb_env_set_mapsize(env,MAPSIE);
mdb_env_open(env,DEFAULTPTH,0664);
for (;;)
{}
}

View File

@ -286,10 +286,7 @@ static HTAB *LockMethodLockHash;
static HTAB *LockMethodProcLockHash;
static HTAB *LockMethodLocalHash;
/*
secondbufferhash code
*/
static HTAB *SecondBufferHash;
/*
fs meta code
@ -484,47 +481,6 @@ void InitLocks(void)
HASH_ELEM | HASH_BLOBS);
}
/*
init SecondBufferHash
*/
void
InitSecondBufferHash(void)
{
HASHCTL info;
long init_table_size,
max_table_size;
bool found;
/*
* Compute init/max size to request for lock hashtables. Note these
* calculations must agree with SecondBufferhashShmemSize!
*/
max_table_size = 200;
init_table_size = max_table_size / 2;
info.keysize = sizeof(PageKey);
info.entrysize = sizeof(PageValue);
info.num_partitions = NUM_LOCK_PARTITIONS;
SecondBufferHash = ShmemInitHash("SecondBuffer hash",
init_table_size,
max_table_size,
&info,
HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
/*
* Allocate wal global structures.
*/
secondBbufferglobalOffset =
ShmemInitStruct("secondbuffer global set",
sizeof(globalOffset), &found);
if (!found){
SpinLockInit(&secondBufferGlobalOffset->mutex);
//initglobaloffset();
}
}
void setglobaloffset(uint64 offset,uint64 ino){
SpinLockAcquire(&secondBufferGlobalOffset->mutex);
@ -561,21 +517,6 @@ void InitFSMetaHash(void)
HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
}
/*
* notification_match: match function to use with notification_hash
*/
static int
secondbuffer_match(const void *key1, const void *key2, Size keysize)
{
const pageKey *k1 = (const pageKey *)key1;
const pageKey *k2 = (const pageKey *)key2;
Assert(keysize == sizeof(pageKey));
if (k1->dbid == k2->dbid &&
k1->blkno == k2->blkno && k1->forkno == k2->forkno && k1->relid == k2->relid)
return 0; /* equal */
return 1; /* not equal */
}
/*
* Fetch the lock method table associated with a given lock
@ -4801,71 +4742,6 @@ int LockWaiterCount(const LOCKTAG *locktag)
return waiters;
}
//**************************
//* ops for wal hash code
//*************************
/*
WAL LOG HASH
*/
uint32
SecondBufferHashCode(const pageKey *pk)
{
return get_hash_value(SecondBufferHash, (const void *)pk);
}
/*
* Find or create LOCK and PROCLOCK objects as needed for a new lock
* request.
*
* Returns the PROCLOCK object, or NULL if we failed to create the objects
* for lack of shared memory.
*
* The appropriate partition lock must be held at entry, and will be
* held at exit.
*/
PageValue *
SetupSecondBufferInTable(const pageKey *pk)
{
PageValue *pv;
bool found;
wl = (PageValue *)
hash_search(SecondBufferHash, pk, HASH_ENTER, &found);
if (!found)
{
printf("not found \n");
}
return pv;
}
void CleanUpSecondBuffer(const PageKey *pk, uint32 hashcode)
{
hash_search_with_hash_value(SecondBufferHash,
(void *)pk,
hashcode,
HASH_REMOVE,
NULL);
}
PageValue *
FindSecondBufferInTable(const PageKey *pk)
{
PageValue *pv;
bool found;
pv = (PageValue *)
hash_search(SecondBufferHash, pk, HASH_FIND, &found);
return pv;
}
/*
for fs meta

View File

@ -0,0 +1,62 @@
#include <lmdb.h>
#include "storage/s_lock.h"
#include "storage/spin.h"
#include "storage/shmem.h"
#include "storage/bufpage.h"
#include "utils/dynahash.h"
#include "utils/hfs.h"
#include <c.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#define MAXREADERS 512;
#define MAPSIE 1024*1024*1024;
#define DEFAULTPTH "/tmp/lmdb";
/*
for secondbufferhash code
*/
typedef struct SdPageKey
{
uint32 dbid;
uint32 relid;
uint32 forkno;
uint32 blkno;
} SdPageKey;
typedef struct LdPageKey
{
SdPageKey sk;
uint64 pageLsn;
} LdPageKey;
typedef struct SdPageValue
{
SdPageKey pk;
uint8 pagecontent[BLKSZ];
} SdPageValue;
typedef struct SingleKeyArray
{
SdPageKey SdPageKeyList[1024];
uint16 head;
uint16 tail;
s_lock oplock;
} SingleKeyArray;
SingleKeyArray MultiKeyArrays[32];
MDB_env *env;
// MDB_dbi dbi;
// MDB_txn *txn;
// MDB_stat mst;
// MDB_cursor *cursor;
// MDB_cursor_op op;

View File

@ -214,23 +214,23 @@ typedef struct WalList
slock_t append_lck;
} WalList;
/*
for secondbufferhash code
*/
typedef struct PageKey
{
uint32 dbid;
uint32 relid;
uint32 forkno;
uint32 blkno;
} PageKey;
// /*
// for secondbufferhash code
// */
// typedef struct SdPageKey
// {
// uint32 dbid;
// uint32 relid;
// uint32 forkno;
// uint32 blkno;
// } SdPageKey;
typedef struct PageValue
{
PageKey pk;
uint8_t page[BLKSZ];
uint8_t pageLsn[LSNSZ];
} PageVlue;
// typedef struct SdPageValue
// {
// SdPageKey pk;
// uint8_t page[BLKSZ];
// uint8_t pageLsn[LSNSZ];
// } SdPageValue;
//**************for fs meta************
@ -679,8 +679,8 @@ extern void InitDeadLockChecking(void);
extern int LockWaiterCount(const LOCKTAG *locktag);
extern void InitSecondBufferHash(void);
extern PageValue *SetupSecondBufferInTable(const PageKey *pageKey);
extern PageValue *FindSecondBufferInTable(const PageKey *pageKey);
extern SdPageValue *SetupSecondBufferInTable(const SdPageKey *pageKey);
extern SdPageValue *FindSecondBufferInTable(const SdPageKey *pageKey);
#ifdef LOCK_DEBUG
extern void DumpLocks(PGPROC *proc);

View File

@ -3,6 +3,7 @@
#include <stdint.h>
#include <stdlib.h>
#include "utils/pg_lsn.h"
#include "storage/relfilenode.h"
typedef struct{
uint8_t *buf;