diff --git a/src/backend/postmaster/secondbuffer.c b/src/backend/postmaster/secondbuffer.c index f8ff691..ffaecb3 100644 --- a/src/backend/postmaster/secondbuffer.c +++ b/src/backend/postmaster/secondbuffer.c @@ -2,6 +2,390 @@ #include #include +#include +#include + + +/* +secondbufferhash code +*/ +static HTAB *SecondBufferHash; + +void +InitSecondBufferMeta(void) +{ + bool found; + MultiKeyArrays = (SingleKeyArray *) + ShmemInitStruct("multi page keys arrays", + sizeof(SingleKeyArray) * 32, + &found); + + if (!found) + { + MultiKeyArrays = NULL; + } +} + +/* + init SecondBufferHash + */ +void +InitSecondBufferHash(void) +{ + HASHCTL info; + long init_table_size, + max_table_size; + bool found; + + /* + * Compute init/max size to request for lock hashtables. Note these + * calculations must agree with SecondBufferhashShmemSize! + */ + max_table_size = 200; + init_table_size = max_table_size / 2; + + info.keysize = sizeof(SdPageKey); + info.entrysize = sizeof(SdPageValue); + + info.num_partitions = NUM_LOCK_PARTITIONS; + + SecondBufferHash = ShmemInitHash("SecondBuffer hash", + init_table_size, + max_table_size, + &info, + HASH_ELEM | HASH_BLOBS | HASH_PARTITION); + + /* + * Allocate wal global structures. + */ + secondBufferGlobalOffset = + ShmemInitStruct("secondbuffer global set", + sizeof(globalOffset), &found); + if (!found){ + SpinLockInit(&secondBufferGlobalOffset->mutex); + //initglobaloffset(); + } + +} + +static void +convertKey(SdPageKey *sdkey, PageKey *pk) +{ + sdkey = (SdPageKey *)malloc(sizeof(SdPageKey)); + sdkey->dbid = pk->relfileNode.dbNode; + sdkey->relid = pk->relfileNode.relNode; + sdkey->forkno = pk->forkNo; + sdkey->blkno = pk->blkNo; +} + +static void +convertKeyLd(LdPageKey *ldkey, PageKey *pk) +{ + LdPageKey ldkey = (LdPageKey *)malloc(sizeof(LdPageKey)); + SdPageKey sdkey; + sdkey->dbid = pk->relfileNode.dbNode; + sdkey->relid = pk->relfileNode.relNode; + sdkey->forkno = pk->forkNo; + sdkey->blkno = pk->blkNo; + + ldkey->sk =sdkey; + ldkey->pageLsn = pk->replyLsn; + +} + +/* + * notification_match: match function to use with notification_hash + */ +static int +secondbuffer_match(const void *key1, const void *key2, Size keysize) +{ + const SdPageKey *k1 = (const SdPageKey *)key1; + const SdPageKey *k2 = (const SdPageKey *)key2; + + Assert(keysize == sizeof(SdPageKey)); + if (k1->dbid == k2->dbid && + k1->blkno == k2->blkno && k1->forkno == k2->forkno && k1->relid == k2->relid) + return 0; /* equal */ + return 1; /* not equal */ +} + +static uint32 +SecondBufferHashCode(const SdPageKey *pk) +{ + return get_hash_value(SecondBufferHash, (const void *)pk); +} + + +static SdPageValue * +SetupSecondBufferInTable(const SdPageKey *pk) +{ + + SdPageValue *pv; + bool found; + + pv = (SdPageValue *) + hash_search(SecondBufferHash, pk, HASH_ENTER, &found); + + if (!found) + { + printf("not found \n"); + } + + return pv; +} + + +static bool +CleanUpSecondBuffer(const SdPageKey *pk) +{ + bool found; + hash_search(SecondBufferHash, + (void *)pk, + HASH_REMOVE, + &found); + return found; +} + +static SdPageValue * +FindSecondBufferInTable(const SdPageKey *pk) +{ + + SdPageValue *pv; + bool found; + pv = (SdPageValue *) + hash_search(SecondBufferHash, pk, HASH_FIND, &found); + return pv; +} + +// sb -> ssb +void +ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer) +{ + SdPageKey *sk; + convertKey(sk,pk); + SdPageValue *sdPageValue = SetupSecondBufferInTable(sdKey); + memcpy(sdPageValue->page, buffer,8192); + + srand(time(NULL)); + int index = rand()%32; + SingleKeyArray sa; + + for (;;) + { + sa = MultiKeyArrays[index]; + SpinLockAcquire(&sa.oplock); + if (sa.tail != sa.head) + { + sa[tail] = *sk; + sa.head = (sa.head + 1)%1024; + SpinLockRelease(&sa.oplock); + break; + } + else + { + SpinLockRelease(&sa.oplock); + index = (index + 1) % 32; + } + } +} + +// ssb -> sb +static uint8_t * +GetPageFromSecondBuffer(PageKey *pk) +{ + SdPageKey *sk; + convertKey(sk,pk); + SdPageValue *sv = FindSecondBufferInTable(sk); + if (sv == NULL) + { + return NULL; + } + else + { + return sv.page; + } +} + +//lc -> sb +static uint8_t * +GetPageFromLocalBuffer(PageKey *pk) +{ +// mdb_env_copy(env, DEFAULTPTH); + SdPageKey *sk; + convertKey(sk,pk); + MDB_val key,*data = NULL; + key.mv_size = sizeof(SdPageKey); + key.mv_data = sk; + mdb_txn_begin(env, NULL, 0, &txn); + mdb_dbi_open(txn, NULL, 0, &dbi); + mdb_get(txn,dbi,&key,data); + if (data != NULL && data->mv_data != NULL) + { + free(sk); + return data -> mv_data; + } + else + { + free(sk); + return NULL; + } +} + +static Bufrd +GetWalFromLocalBuffer(PageKey *pk) +{ + Bufrd bufrd; + MDB_cursor *cursor; + MDB_val key,data; + LdPageKey lpk; + int waldatalen = 0, roomlen = 2048; + + bufrd.buf = NULL; + bufrd.cap = 0; + bufrd.count = 0; + + uint8_t *waldata = (uint8_t *)malloc(roomlen); + mdb_txn_begin(env, NULL, 0, &txn); + mdb_dbi_open(txn, NULL, 0, &dbi); + mdb_cursor_open(txn, dbi,&cursor); + if (0 != mdb_cursor_get(txn,&key,&data,MDB_SET)) + { + return bufrd; + } + + lpk = (LdPageKey*)key.mv_data; + + while (lpk.sk.dbid == pk->relfileNode.dbNode + && lpk.sk.relid == pk->relfileNode.relNode + && lpk.sk.forkno == pk->forkNo + && lpk.sk.blkno == pk->blkNo + && lpk.pageLsn < pk->replyLsn) + { + memcpy(waldata,(uint8_t *)data.mv_data,data.mv_size); + waldatalen += data.mv_size; + if (0 != mdb_cursor_get(cursor,&key,&data,MDB_NEXT)) + { + bufrd.buf = waldata; + bufrd.cap = roomlen; + bufrd.count = waldatalen; + return bufrd; + } + else + { + if (walatalen + data.mv_size > roomlen) + { + roomlen += 1024; + waldata = (uint8_t *)realloc(waldata,roomlen); + } + + lpk = (LdPageKey *)key.mv_data; + } + + } + + bufrd.buf = waldata; + bufrd.count = waldatalen; + bufrd.cap = roomlen; + + return bufrd; + +} + +Bufrd +GetPageFromCurrentNode(PageKey *pk) +{ + + //TODO pk join to a list to drop all the pagedata. + uint8_t *page = GetPageFromSecondBuffer(pk); + page = page != NULL ? page:GetPageFromLocalBuffer(pk); + Bufrd waldata = GetWalFromLocalBuffer(pk); + + Bufrd totaldata; + totaldata.buf = (uint8_t *)malloc(8192 + waldata.count); + memcpy(totaldata.buf,page,8192); + memcpy(totaldata.buf + 8192, waldata.buf,waldata.count); + totaldata.cap = totaldata.count = 8192 + waldata.count; + return totaldata; + +} + +void +RemovePageOrWalFromCurrentNode(PageKey *pk) +{ + MDB_cursor *cursor; + MDB_val key,data; + LdPageKey *lpk; + + convertKeyLd(lpk,pk); + key.mv_size = sizeof(LdPageKey); + key.mv_data = lpk; + + mdb_txn_begin(env, NULL, 0, &txn); + mdb_dbi_open(txn, NULL, 0, &dbi); + mdb_cursor_open(txn, dbi,&cursor); + while (0 == mdb_cursor_get(txn,&key,&data,MDB_PREV)) + { + lpk = (LdPageKey*)key.mv_data; + if (lpk->sk.dbid == pk->relfileNode.dbNode + && lpk->sk.relid == pk->relfileNode.relNode + && lpk->sk.forkno == pk->forkNo + && lpk->sk.blkno == pk->forkNo) + { + mdb_cursor_del(cursor,0); + } + } + mdb_cursor_close(cursor); + mdb_txn_commit(txn); + +} + +void * +MovePageFromSecondBufferToLocalBuffer() +{ + int i = 0; + SingleKeyArray ska; + int localHead = 0; + int localTail = 0; + MDB_txn *txn = NULL; + MDB_dbi dbi; + MDB_val key,data; + SdPageKey spk; + LdPageKey lpk; + SdPageValue *spv = NULL; + mdb_txn_begin(env, NULL, 0, &txn); + mdb_dbi_open(txn, NULL, 0 &dbi); + for (;;) + { + for (i = 0; i < 32; i ++) + { + ska = MultiKeyArrays[i]; + localHead = ska.head; + localTail = ska.tail; + for (int j = 0; j < (localTail + 1024 - localHead)%1024; j++) + { + spk = ska.SdPageKeyList[(head + j) % 1024]; + spv = FindSecondBufferInTable(spk); + lpk.sk = spk; + lpk.pageLsn = PageGetLSN(spv->pagecontent); + + key.mv_size = sizeof(lpk); + key.mv_data = &lpk; + + data.mv_size = sizeof(spv->pagecontent); + data.mv_data = spv->pagecontent; + mdb_txn_begin(env, NULL, 0, &txn); + mdb_put(txn,dbi,key,data, 0); + mdb_txn_commit(txn); + CleanUpSecondBuffer(&spk); + } + + SpinLockAcquire(&ska.oplock); + ska.head = ska.tail; + SpinLockRelease(&ska.oplock); + } + + } +} void SecondMemoryMain(void) @@ -17,4 +401,19 @@ SecondMemoryMain(void) pqsignal(SIGCHLD, SIG_DFL); -} \ No newline at end of file + mdb_env_create(&env); + mdb_env_set_maxreaders(env,MAXREADERS); + mdb_env_set_mapsize(env,MAPSIE); + mdb_env_open(env,DEFAULTPTH,0664); + + + + for (;;) + {} + + +} + + + + diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 6e5a1cb..6775a8b 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -286,10 +286,7 @@ static HTAB *LockMethodLockHash; static HTAB *LockMethodProcLockHash; static HTAB *LockMethodLocalHash; -/* -secondbufferhash code -*/ -static HTAB *SecondBufferHash; + /* fs meta code @@ -484,47 +481,6 @@ void InitLocks(void) HASH_ELEM | HASH_BLOBS); } -/* - init SecondBufferHash - */ -void -InitSecondBufferHash(void) -{ - HASHCTL info; - long init_table_size, - max_table_size; - bool found; - - /* - * Compute init/max size to request for lock hashtables. Note these - * calculations must agree with SecondBufferhashShmemSize! - */ - max_table_size = 200; - init_table_size = max_table_size / 2; - - info.keysize = sizeof(PageKey); - info.entrysize = sizeof(PageValue); - - info.num_partitions = NUM_LOCK_PARTITIONS; - - SecondBufferHash = ShmemInitHash("SecondBuffer hash", - init_table_size, - max_table_size, - &info, - HASH_ELEM | HASH_BLOBS | HASH_PARTITION); - - /* - * Allocate wal global structures. - */ - secondBbufferglobalOffset = - ShmemInitStruct("secondbuffer global set", - sizeof(globalOffset), &found); - if (!found){ - SpinLockInit(&secondBufferGlobalOffset->mutex); - //initglobaloffset(); - } - -} void setglobaloffset(uint64 offset,uint64 ino){ SpinLockAcquire(&secondBufferGlobalOffset->mutex); @@ -561,21 +517,6 @@ void InitFSMetaHash(void) HASH_ELEM | HASH_BLOBS | HASH_PARTITION); } -/* - * notification_match: match function to use with notification_hash - */ -static int -secondbuffer_match(const void *key1, const void *key2, Size keysize) -{ - const pageKey *k1 = (const pageKey *)key1; - const pageKey *k2 = (const pageKey *)key2; - - Assert(keysize == sizeof(pageKey)); - if (k1->dbid == k2->dbid && - k1->blkno == k2->blkno && k1->forkno == k2->forkno && k1->relid == k2->relid) - return 0; /* equal */ - return 1; /* not equal */ -} /* * Fetch the lock method table associated with a given lock @@ -4801,71 +4742,6 @@ int LockWaiterCount(const LOCKTAG *locktag) return waiters; } -//************************** - -//* ops for wal hash code -//************************* - -/* - -WAL LOG HASH - */ -uint32 -SecondBufferHashCode(const pageKey *pk) -{ - return get_hash_value(SecondBufferHash, (const void *)pk); -} - -/* - * Find or create LOCK and PROCLOCK objects as needed for a new lock - * request. - * - * Returns the PROCLOCK object, or NULL if we failed to create the objects - * for lack of shared memory. - * - * The appropriate partition lock must be held at entry, and will be - * held at exit. - */ -PageValue * -SetupSecondBufferInTable(const pageKey *pk) -{ - - PageValue *pv; - bool found; - - wl = (PageValue *) - hash_search(SecondBufferHash, pk, HASH_ENTER, &found); - - if (!found) - { - printf("not found \n"); - } - - return pv; -} - -void CleanUpSecondBuffer(const PageKey *pk, uint32 hashcode) -{ - hash_search_with_hash_value(SecondBufferHash, - (void *)pk, - hashcode, - HASH_REMOVE, - NULL); -} - -PageValue * -FindSecondBufferInTable(const PageKey *pk) -{ - - PageValue *pv; - bool found; - pv = (PageValue *) - hash_search(SecondBufferHash, pk, HASH_FIND, &found); - - return pv; -} - - /* for fs meta diff --git a/src/include/postmaster/seconfbuffer.h b/src/include/postmaster/seconfbuffer.h new file mode 100644 index 0000000..5ab6e9f --- /dev/null +++ b/src/include/postmaster/seconfbuffer.h @@ -0,0 +1,62 @@ +#include +#include "storage/s_lock.h" +#include "storage/spin.h" +#include "storage/shmem.h" +#include "storage/bufpage.h" +#include "utils/dynahash.h" +#include "utils/hfs.h" +#include + +#include +#include +#include + +#define MAXREADERS 512; +#define MAPSIE 1024*1024*1024; +#define DEFAULTPTH "/tmp/lmdb"; + +/* +for secondbufferhash code +*/ +typedef struct SdPageKey +{ + uint32 dbid; + uint32 relid; + uint32 forkno; + uint32 blkno; +} SdPageKey; + +typedef struct LdPageKey +{ + SdPageKey sk; + uint64 pageLsn; +} LdPageKey; + +typedef struct SdPageValue +{ + SdPageKey pk; + uint8 pagecontent[BLKSZ]; +} SdPageValue; + +typedef struct SingleKeyArray +{ + SdPageKey SdPageKeyList[1024]; + uint16 head; + uint16 tail; + s_lock oplock; +} SingleKeyArray; + +SingleKeyArray MultiKeyArrays[32]; + + +MDB_env *env; +// MDB_dbi dbi; +// MDB_txn *txn; + +// MDB_stat mst; + +// MDB_cursor *cursor; +// MDB_cursor_op op; + + + diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index fac2648..507b418 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -214,23 +214,23 @@ typedef struct WalList slock_t append_lck; } WalList; -/* -for secondbufferhash code -*/ -typedef struct PageKey -{ - uint32 dbid; - uint32 relid; - uint32 forkno; - uint32 blkno; -} PageKey; +// /* +// for secondbufferhash code +// */ +// typedef struct SdPageKey +// { +// uint32 dbid; +// uint32 relid; +// uint32 forkno; +// uint32 blkno; +// } SdPageKey; -typedef struct PageValue -{ - PageKey pk; - uint8_t page[BLKSZ]; - uint8_t pageLsn[LSNSZ]; -} PageVlue; +// typedef struct SdPageValue +// { +// SdPageKey pk; +// uint8_t page[BLKSZ]; +// uint8_t pageLsn[LSNSZ]; +// } SdPageValue; //**************for fs meta************ @@ -679,8 +679,8 @@ extern void InitDeadLockChecking(void); extern int LockWaiterCount(const LOCKTAG *locktag); extern void InitSecondBufferHash(void); -extern PageValue *SetupSecondBufferInTable(const PageKey *pageKey); -extern PageValue *FindSecondBufferInTable(const PageKey *pageKey); +extern SdPageValue *SetupSecondBufferInTable(const SdPageKey *pageKey); +extern SdPageValue *FindSecondBufferInTable(const SdPageKey *pageKey); #ifdef LOCK_DEBUG extern void DumpLocks(PGPROC *proc); diff --git a/src/include/utils/hfs.h b/src/include/utils/hfs.h index 53ef4fa..c6f1af2 100644 --- a/src/include/utils/hfs.h +++ b/src/include/utils/hfs.h @@ -3,6 +3,7 @@ #include #include #include "utils/pg_lsn.h" +#include "storage/relfilenode.h" typedef struct{ uint8_t *buf;