From 4ca7f526edddc5a09358d4ebd65e9cc71e991945 Mon Sep 17 00:00:00 2001 From: zoujia Date: Wed, 15 Mar 2023 16:47:25 +0800 Subject: [PATCH 01/10] remove relationcleanup --- src/backend/bootstrap/bootstrap.c | 5 - src/backend/postmaster/Makefile | 1 - src/backend/postmaster/postmaster.c | 38 ------ src/backend/postmaster/relationcleanup.c | 164 ----------------------- src/backend/storage/ipc/ipci.c | 3 - src/backend/storage/lmgr/lock.c | 51 ------- src/backend/utils/init/miscinit.c | 3 - src/include/miscadmin.h | 3 - src/include/postmaster/relationcleanup.h | 13 -- src/include/storage/lock.h | 1 - src/include/utils/wait_event.h | 3 +- 11 files changed, 1 insertion(+), 284 deletions(-) delete mode 100644 src/backend/postmaster/relationcleanup.c delete mode 100644 src/include/postmaster/relationcleanup.h diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 0593777..deb59cf 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -333,9 +333,6 @@ AuxiliaryProcessMain(int argc, char *argv[]) case WalReceiverProcess: MyBackendType = B_WAL_RECEIVER; break; - case RelationCleanupProcess: - MyBackendType = B_RELATIONCLEANUP; - break; default: MyBackendType = B_INVALID; } @@ -471,8 +468,6 @@ AuxiliaryProcessMain(int argc, char *argv[]) case WalReceiverProcess: WalReceiverMain(); proc_exit(1); - case RelationCleanupProcess: - RelationCleanupMain(); default: elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType); diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 3af5bf2..bfdf6a8 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -16,7 +16,6 @@ OBJS = \ autovacuum.o \ bgworker.o \ bgwriter.o \ - relationcleanup.o \ checkpointer.o \ fork_process.o \ interrupt.o \ diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index e3e9d2b..4616e4e 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -565,7 +565,6 @@ static void ShmemBackendArrayRemove(Backend *bn); #define StartCheckpointer() StartChildProcess(CheckpointerProcess) #define StartWalWriter() StartChildProcess(WalWriterProcess) #define StartWalReceiver() StartChildProcess(WalReceiverProcess) -#define StartRelationCleanup() StartChildProcess(RelationCleanupProcess) /* Macros to check exit status of a child process */ #define EXIT_STATUS_0(st) ((st) == 0) #define EXIT_STATUS_1(st) (WIFEXITED(st) && WEXITSTATUS(st) == 1) @@ -1787,10 +1786,6 @@ ServerLoop(void) */ if (WalWriterPID == 0 && pmState == PM_RUN) WalWriterPID = StartWalWriter(); - - - if (RelationCleanupID == 0 && pmState == PM_RUN) - RelationCleanupID = StartRelationCleanup(); /* * If we have lost the autovacuum launcher, try to start a new one. We @@ -2744,10 +2739,6 @@ SIGHUP_handler(SIGNAL_ARGS) if (PgStatPID != 0) signal_child(PgStatPID, SIGHUP); - - if (RelationCleanupID !=0 ) - signal_child(RelationCleanupID, SIGHUP); - /* Reload authentication config files too */ if (!load_hba()) ereport(LOG, @@ -3111,14 +3102,6 @@ reaper(SIGNAL_ARGS) continue; } - if (pid == RelationCleanupID) - { - RelationCleanupID = 0; - if (!EXIT_STATUS_0(exitstatus)) - HandleChildCrash(pid, exitstatus, - _("relationcleanup process")); - continue; - } /* * Was it the checkpointer? @@ -3642,16 +3625,6 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) signal_child(BgWriterPID, (SendStop ? SIGSTOP : SIGQUIT)); } - if (pid == RelationCleanupID) - RelationCleanupID = 0; - else if (RelationCleanupID != 0 && take_action) - { - ereport(DEBUG2, - (errmsg_internal("sending %s to process %d", - (SendStop ? "SIGSTOP" : "SIGQUIT"), - (int) RelationCleanupID))); - signal_child(RelationCleanupID, (SendStop ? SIGSTOP : SIGQUIT)); - } /* Take care of the checkpointer too */ if (pid == CheckpointerPID) CheckpointerPID = 0; @@ -3877,9 +3850,6 @@ PostmasterStateMachine(void) signal_child(WalReceiverPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ - if (RelationCleanupID != 0) - signal_child(RelationCleanupID,SIGTERM); - /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ pmState = PM_WAIT_BACKENDS; @@ -3906,7 +3876,6 @@ PostmasterStateMachine(void) StartupPID == 0 && WalReceiverPID == 0 && BgWriterPID == 0 && - RelationCleanupID == 0 && (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && WalWriterPID == 0 && @@ -4001,7 +3970,6 @@ PostmasterStateMachine(void) Assert(StartupPID == 0); Assert(WalReceiverPID == 0); Assert(BgWriterPID == 0); - Assert(RelationCleanupID == 0); Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); @@ -4218,8 +4186,6 @@ TerminateChildren(int signal) signal_child(PgArchPID, signal); if (PgStatPID != 0) signal_child(PgStatPID, signal); - if (RelationCleanupID != 0) - signal_child(RelationCleanupID, signal); } /* @@ -4317,10 +4283,6 @@ BackendStartup(Port *port) report_fork_failure_to_client(port, save_errno); return STATUS_ERROR; } - if (!GetReqFlag()) - { - SetReqFlag(); - } /* in parent, successful fork */ ereport(DEBUG2, (errmsg_internal("forked new backend, pid=%d socket=%d", diff --git a/src/backend/postmaster/relationcleanup.c b/src/backend/postmaster/relationcleanup.c deleted file mode 100644 index 0a4205b..0000000 --- a/src/backend/postmaster/relationcleanup.c +++ /dev/null @@ -1,164 +0,0 @@ -#include "postgres.h" - -#include -#include -#include -#include -#include "miscadmin.h" - -#include "access/xlog.h" -#include "access/pushpage.h" -#include "postmaster/interrupt.h" -#include "postmaster/relationcleanup.h" -#include "storage/procsignal.h" -#include "storage/latch.h" -#include "storage/proc.h" -#include "storage/s_lock.h" -#include "storage/lock.h" -#include "storage/spin.h" -#include "utils/guc.h" - - -#include "utils/wait_event.h" - - -typedef struct CleanupInfo -{ - XLogRecPtr applyLSN; - int ReqFlag; -} CleanupInfo; - -static CleanupInfo *cleanupInfo = NULL; -static int ReqFlag; - -void RelationCleanupMain(void) -{ - pqsignal(SIGHUP, SignalHandlerForConfigReload); - pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, SignalHandlerForShutdownRequest); - - pqsignal(SIGALRM, SIG_IGN); - pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, procsignal_sigusr1_handler); - pqsignal(SIGUSR2, SIG_IGN); - - pqsignal(SIGCHLD, SIG_DFL); - cleanupInfo->ReqFlag = 0; - cleanupInfo->applyLSN = 0; - - for (;;) - { - if ((cleanupInfo->ReqFlag == 1) || push_standby) - { - cleanupInfo->applyLSN = QueryReplyLsn(InvalidXLogRecPtr); - } -// usleep(10L); - } -} - -void Cleanup() -{ - //TODO cut the lists and cleanup shmem -} - -Size RelCutShmemSize(void) -{ - return (Size)sizeof(CleanupInfo); -} - -void InitCleanupInfo() -{ - bool found; - - /* - * Get or create the shared strategy control block - */ - cleanupInfo = (CleanupInfo *) - ShmemInitStruct("PrefixList cut Status", - sizeof(CleanupInfo), - &found); - - if (!found) - { - cleanupInfo->applyLSN = 0; - } -} - -XLogRecPtr getApplyLSN() -{ - return cleanupInfo->applyLSN; -} - -int GetReqFlag() -{ - return cleanupInfo->ReqFlag; -} - -void SetReqFlag() -{ - cleanupInfo->ReqFlag = 1; -} -/* -void CutWalList(WalList *wl) -{ - int size, low, high, mid; - XLogRecPtr applyLSN, lsn, firstLSN; - double rate; - SpinLockAcquire(&(wl->append_lck)); - - rate = (double)wl->len / wl->cap; - while (rate > 0.9) - { - if (localApplyLSN >= getApplyLSN()) - { - SpinLockRelease(&(wl->append_lck)); - while (localApplyLSN >= getApplyLSN()) - { - usleep(5L); - } - SpinLockAcquire(&(wl->append_lck)); - if ((double)wl->len / wl->cap < 0.9) - { - break; - } - } - - firstLSN = ((WalLoc *)(wl->wals))->Lsn; - - if (localApplyLSN < firstLSN) - { - continue; - } - size = wl->len;// / sizeof(WalLoc); - low = 0; - high = size - 1; - mid = -1; - - - while (low <= high) - { - mid = (low + high) / 2; - lsn = ((WalLoc *)(wl->wals + sizeof(WalLoc) * mid))->Lsn; - - if (lsn > localApplyLSN) - { - high = mid - 1; - } - else if (lsn < localApplyLSN) - { - low = mid + 1; - } - else - { - break; - } - } - - memcpy(wl->wals, (char *)(wl->wals + (mid + 1) * sizeof(WalLoc)), (wl->len - (mid + 1)) * sizeof(WalLoc)); - wl->len = wl->len - (mid + 1); //* sizeof(WalLoc); - - rate = (double)wl->len / wl->cap; - } - SpinLockRelease(&(wl->append_lck)); -} -*/ \ No newline at end of file diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 95c71a1..5423fc6 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -150,7 +150,6 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); - size = add_size(size, RelCutShmemSize()); size = add_size(size, PageHashQueueShmemSize()); size = add_size(size, PageHashMapSize()); #ifdef EXEC_BACKEND @@ -242,8 +241,6 @@ CreateSharedMemoryAndSemaphores(void) */ InitWalLogHash(); - InitCleanupInfo(); - /* * set up fs meta */ diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 29a92b6..a432b92 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -4953,55 +4953,4 @@ FindFSMetaInTable(const FSKey *fsk) hash_search(FSMetaHash, fsk, HASH_FIND, &found); return fsm; -} - -void CutWalList(WalList *wl) -{ - int size, low, high, mid; - XLogRecPtr applyLSN, lsn, firstLSN; - double rate; - SpinLockAcquire(&(wl->append_lck)); - - rate = (double)wl->len / wl->cap; - while (rate > 0.9) - { - if (localApplyLSN < getApplyLSN()) - localApplyLSN = getApplyLSN(); - firstLSN = ((WalLoc *)(wl->wals))->Lsn; - - if (localApplyLSN < firstLSN) - { - continue; - } - size = wl->len;// / sizeof(WalLoc); - low = 0; - high = size - 1; - mid = -1; - - - while (low <= high) - { - mid = (low + high) / 2; - lsn = ((WalLoc *)(wl->wals + sizeof(WalLoc) * mid))->Lsn; - - if (lsn > localApplyLSN) - { - high = mid - 1; - } - else if (lsn < localApplyLSN) - { - low = mid + 1; - } - else - { - break; - } - } - - memcpy(wl->wals, (char *)(wl->wals + (mid + 1) * sizeof(WalLoc)), (wl->len - (mid + 1)) * sizeof(WalLoc)); - wl->len = wl->len - (mid + 1); //* sizeof(WalLoc); - - rate = (double)wl->len / wl->cap; - } - SpinLockRelease(&(wl->append_lck)); } \ No newline at end of file diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 7dd28fb..07c6eb8 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -279,9 +279,6 @@ GetBackendTypeDesc(BackendType backendType) case B_PARALLEL_FLUSH: backendDesc = "parallel flush"; break; - case B_RELATIONCLEANUP: - backendDesc = "relation cleanup"; - break; } return backendDesc; diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 85a9dde..b0c8fe2 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -339,7 +339,6 @@ typedef enum BackendType B_STATS_COLLECTOR, B_LOGGER, B_PARALLEL_FLUSH, - B_RELATIONCLEANUP, } BackendType; extern BackendType MyBackendType; @@ -439,8 +438,6 @@ typedef enum CheckpointerProcess, WalWriterProcess, WalReceiverProcess, - - RelationCleanupProcess, NUM_AUXPROCTYPES /* Must be last! */ } AuxProcType; diff --git a/src/include/postmaster/relationcleanup.h b/src/include/postmaster/relationcleanup.h deleted file mode 100644 index a6aa5d9..0000000 --- a/src/include/postmaster/relationcleanup.h +++ /dev/null @@ -1,13 +0,0 @@ -#include "postgres.h" -#include "access/xlogdefs.h" -#include "storage/lock.h" - - - -extern void RelationCleanupMain(void); -extern Size RelCutShmemSize(void); -extern void InitCleanupInfo(); -extern XLogRecPtr getApplyLSN(); -// extern void CutWalList(WalList *wl); -extern void SetReqFlag(); -extern int GetReqFlag(); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index ce031fa..c25edf0 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -660,7 +660,6 @@ extern void InitDeadLockChecking(void); extern int LockWaiterCount(const LOCKTAG *locktag); extern WalList *SetupWalLOGInTable(const Prefix *wallog); extern WalList *FindWalLOGInTable(const Prefix *wallog); -extern void CutWalList(WalList *wl); #ifdef LOCK_DEBUG extern void DumpLocks(PGPROC *proc); diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 6d88f8b..4cd82f2 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -48,8 +48,7 @@ typedef enum WAIT_EVENT_WAL_RECEIVER_MAIN, WAIT_EVENT_WAL_SENDER_MAIN, WAIT_EVENT_WAL_WRITER_MAIN, - WAIT_EVENT_PAGEFLUSH_MAIN, - WAIT_EVENT_RELATIONCLEANUP_MAIN + WAIT_EVENT_PAGEFLUSH_MAIN } WaitEventActivity; /* ---------- From 1d9359ee85ab32c322784efc0e9b2d6798bbb03b Mon Sep 17 00:00:00 2001 From: zoujia Date: Wed, 15 Mar 2023 17:05:36 +0800 Subject: [PATCH 02/10] remove relationcleanup --- src/backend/access/transam/xlog.c | 1 - src/backend/postmaster/postmaster.c | 10 +--------- src/backend/storage/file/fd.c | 1 - 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 938ff3b..a2e127b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -55,7 +55,6 @@ #include "port/pg_iovec.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" -#include "postmaster/relationcleanup.h" #include "postmaster/walwriter.h" #include "replication/basebackup.h" #include "replication/logical.h" diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 4616e4e..a53b473 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -115,7 +115,6 @@ #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" -#include "postmaster/relationcleanup.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -257,8 +256,7 @@ static pid_t StartupPID = 0, AutoVacPID = 0, PgArchPID = 0, PgStatPID = 0, - SysLoggerPID = 0, - RelationCleanupID = 0; + SysLoggerPID = 0; /* Startup process's status */ typedef enum @@ -3058,9 +3056,6 @@ reaper(SIGNAL_ARGS) if (WalWriterPID == 0) WalWriterPID = StartWalWriter(); - // if (RelationCleanupID == 0) - // RelationCleanupID = StartRelationCleanup(); - /* * Likewise, start other special children as needed. In a restart * situation, some of them may be alive already. @@ -5212,9 +5207,6 @@ sigusr1_handler(SIGNAL_ARGS) CheckpointerPID = StartCheckpointer(); Assert(BgWriterPID == 0); BgWriterPID = StartBackgroundWriter(); - // Assert(RelationCleanupID == 0); - // RelationCleanupID = StartRelationCleanup(); - /* * Start the archiver if we're responsible for (re-)archiving received * files. diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index d6a593c..aff105f 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -96,7 +96,6 @@ #include "pgstat.h" #include "port/pg_iovec.h" #include "portability/mem.h" -#include "postmaster/relationcleanup.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/spin.h" From 24a8be44ea015de6da68c4e3623448a84cf875e5 Mon Sep 17 00:00:00 2001 From: zoujia Date: Fri, 17 Mar 2023 10:56:46 +0800 Subject: [PATCH 03/10] write page to local cache --- src/backend/storage/buffer/bufmgr.c | 22 ++++++++++++++++------ src/include/utils/hfs.h | 13 ++++++++++++- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index aa52154..e932412 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1452,7 +1452,6 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, // if (push_standby == true) { FlushBuffer(buf, NULL); // } else { - // He3DBFlushBuffer(buf, NULL); // } @@ -3228,11 +3227,22 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) /* * bufToWrite is either the shared buffer or a copy, as appropriate. */ - smgrwrite(reln, - buf->tag.forkNum, - buf->tag.blockNum, - bufToWrite, - false); + // smgrwrite(reln, + // buf->tag.forkNum, + // buf->tag.blockNum, + // bufToWrite, + // false); + + PageKey pageKey; + pageKey.relfileNode.dbNode = buf->tag.rnode.dbNode;; + pageKey.relfileNode.relNode = buf->tag.rnode.relNode; + pageKey.relfileNode.spcNode = buf->tag.rnode.spcNode; + + pageKey.blkNo = buf->tag.blockNum; + pageKey.forkNo = buf->tag.forkNum; + pageKey.pageLsn = BufferGetLSN(buf); + //将page放到本地盘 + insertOnePage(pageKey, (char *)BufHdrGetBlock(buf)); if (track_io_timing) { diff --git a/src/include/utils/hfs.h b/src/include/utils/hfs.h index 622df1f..fb4dc6a 100644 --- a/src/include/utils/hfs.h +++ b/src/include/utils/hfs.h @@ -30,6 +30,14 @@ typedef struct XLogItem struct XLogItem *next; } XLogItem; +typedef struct PageKey +{ + RelFileNode relfileNode; + uint32 forkNo; + uint32 blkNo; + uint64 pageLsn; +} PageKey; + extern IOResult openfs(const char *pathname, int flags); @@ -62,4 +70,7 @@ extern Bufrd ReadWalsByPage(uint32_t dbid, uint32_t blkno, uint64_t startlsn, uint64_t endlsn, - uint32_t timeline); \ No newline at end of file + uint32_t timeline); + + +extern int insertOnePage(PageKey pageKey, char *value); \ No newline at end of file From de9047481232dd2462350e923b5209e51864b0d8 Mon Sep 17 00:00:00 2001 From: zoujia Date: Thu, 23 Mar 2023 09:41:49 +0800 Subject: [PATCH 04/10] add new logic when data transmiting between momery and disk --- src/backend/storage/buffer/bufmgr.c | 2 +- src/backend/storage/file/fd.c | 144 +++++++++++++++++++--------- src/include/utils/hfs.h | 6 +- 3 files changed, 107 insertions(+), 45 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e932412..7a92369 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -3242,7 +3242,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) pageKey.forkNo = buf->tag.forkNum; pageKey.pageLsn = BufferGetLSN(buf); //将page放到本地盘 - insertOnePage(pageKey, (char *)BufHdrGetBlock(buf)); + EvictOnePageOutOfMemory(pageKey, (char *)BufHdrGetBlock(buf)); if (track_io_timing) { diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index aff105f..44ab3b9 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -193,6 +193,8 @@ int recovery_init_sync_method = RECOVERY_INIT_SYNC_METHOD_FSYNC; #define FD_CLOSE_AT_EOXACT (1 << 1) /* T = close at eoXact */ #define FD_TEMP_FILE_LIMIT (1 << 2) /* T = respect temp_file_limit */ +extern bool PageIsHot(); + typedef struct vfd { int64_t fd; /* current FD, or VFD_CLOSED if none */ @@ -2357,56 +2359,112 @@ He3DBFileRead(File file, char **buffer, off_t offset, uint32 wait_event_info, XLogRecPtr lsn, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno) { - int returnCode; - Vfd *vfdP; + //TODO 先从本地盘读取数据,如果存在则返回 + PageKey pageKey; + Bufrd bufrd; + bufrd.count = 0; + + pageKey.relfileNode.dbNode = dbid; + pageKey.relfileNode.relNode = relid; + pageKey.forkNo = forkno; + pageKey.blkNo = (offset >>13) + segno * (1 << 17); + pageKey.pageLsn = 0; + pageKey.replyLsn = lsn; - Assert(FileIsValid(file)); - - DO_DB(elog(LOG, "FileRead: %d (%s) " INT64_FORMAT " %p", - file, VfdCache[file].fileName, - (int64) offset, - buffer)); - - returnCode = FileAccess(file); - if (returnCode < 0) - return returnCode; - - vfdP = &VfdCache[file]; - -retry: - pgstat_report_wait_start(wait_event_info); - returnCode = he3fs_pread(vfdP->fd, ((void**)buffer), offset, lsn, DataRead, dbid, relid, segno, forkno); - pgstat_report_wait_end(); - - if (returnCode < 0) + bufrd = MoveOnePageToMemory(pageKey); + if (bufrd.count > 0) { - /* - * Windows may run out of kernel buffers and return "Insufficient - * system resources" error. Wait a bit and retry to solve it. - * - * It is rumored that EINTR is also possible on some Unix filesystems, - * in which case immediate retry is indicated. - */ -#ifdef WIN32 - DWORD error = GetLastError(); + *buffer = bufrd.buf; + return bufrd.count; + } + else + { + //TODO 如果本地盘不存在,则调用标准接口读取page,再调用tikv的借口获取范围的wal + uint8_t *buf = (uint8_t *)malloc(BLCKSZ); + Bufrd result; + uint64 pageLSN = 0; + - switch (error) + FileRead(file,buf,BLCKSZ,off_t,wait_event_info); + + pageLSN = (buf[0] << 8) | + (buf[1] << 16) | + (buf[1] << 24) | + (buf[1] << 32) | + (buf[1] << 40) | + (buf[1] << 48) | + (buf[1] << 56); + + pageKey.pageLsn = pageLSN; + pageKey.replyLsn = lsn; + //if links exists, the wals about the page must be in the disk. + if (PageIsHot()) { - case ERROR_NO_SYSTEM_RESOURCES: - pg_usleep(1000L); - errno = EINTR; - break; - default: - _dosmaperr(error); - break; + result = GetWalsFromDisk(pageKey); } -#endif - /* OK to retry if interrupted */ - if (errno == EINTR) - goto retry; + else + { + result = ReadWalsByPage(pageKey); + } + + buf = (uint8_t *)realloc(buf, BLCKSZ + bufrd.count); + strcat(buf,bufrd.buf); + *buffer = buf; + return BLCKSZ + bufrd.count; } - return returnCode; + + +// int returnCode; +// Vfd *vfdP; + +// Assert(FileIsValid(file)); + +// DO_DB(elog(LOG, "FileRead: %d (%s) " INT64_FORMAT " %p", +// file, VfdCache[file].fileName, +// (int64) offset, +// buffer)); + +// returnCode = FileAccess(file); +// if (returnCode < 0) +// return returnCode; + +// vfdP = &VfdCache[file]; + +// retry: +// pgstat_report_wait_start(wait_event_info); + returnCode = he3fs_pread(vfdP->fd, ((void**)buffer), offset, lsn, DataRead, dbid, relid, segno, forkno); +// pgstat_report_wait_end(); + +// if (returnCode < 0) +// { +// /* +// * Windows may run out of kernel buffers and return "Insufficient +// * system resources" error. Wait a bit and retry to solve it. +// * +// * It is rumored that EINTR is also possible on some Unix filesystems, +// * in which case immediate retry is indicated. +// */ +// #ifdef WIN32 +// DWORD error = GetLastError(); + +// switch (error) +// { +// case ERROR_NO_SYSTEM_RESOURCES: +// pg_usleep(1000L); +// errno = EINTR; +// break; +// default: +// _dosmaperr(error); +// break; +// } +// #endif +// /* OK to retry if interrupted */ +// if (errno == EINTR) +// goto retry; +// } + +// return returnCode; } int diff --git a/src/include/utils/hfs.h b/src/include/utils/hfs.h index fb4dc6a..65f284c 100644 --- a/src/include/utils/hfs.h +++ b/src/include/utils/hfs.h @@ -36,6 +36,7 @@ typedef struct PageKey uint32 forkNo; uint32 blkNo; uint64 pageLsn; + uint64 replyLsn; } PageKey; @@ -73,4 +74,7 @@ extern Bufrd ReadWalsByPage(uint32_t dbid, uint32_t timeline); -extern int insertOnePage(PageKey pageKey, char *value); \ No newline at end of file + +extern uint8_t EvictOnePageOutOfMemory(PageKey pageKey, char *value); +extern Bufrd MoveOnePageToMemory(PageKey pageKey); +extern Bufrd GetWalsFromDisk(PageKey pageKey); \ No newline at end of file From c7401a817fbef7dfb17bd664d3802810106631ec Mon Sep 17 00:00:00 2001 From: shipixian Date: Thu, 23 Mar 2023 11:12:06 +0800 Subject: [PATCH 05/10] add a parameter for batchread api Signed-off-by: shipixian --- src/backend/access/transam/xlog.c | 6 +++++- src/backend/access/transam/xlogreader.c | 2 +- src/backend/storage/smgr/smgr.c | 9 +++++++++ src/include/utils/hfs.h | 5 +++-- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 66f9113..fa1d078 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -14220,7 +14220,11 @@ retry: readOff = targetOff; pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = batchRead((uint8_t *) readBuf, ControlFile->checkPointCopy.ThisTimeLineID, targetOff); + bool walStoreToLocal = false; + if (EnableHotStandby && !push_standby) + walStoreToLocal = true; + + r = batchRead((uint8_t *) readBuf, ControlFile->checkPointCopy.ThisTimeLineID, targetOff, walStoreToLocal); pgstat_report_wait_end(); /* diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 2590c6c..6bc14c8 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1429,7 +1429,7 @@ He3DBWALRead(XLogReaderState *state, pgstat_report_wait_start(WAIT_EVENT_WAL_READ); #endif - nbytes = batchRead((uint8_t *) buf, state->currTLI, startptr); + nbytes = batchRead((uint8_t *) buf, state->currTLI, startptr, false); #ifndef FRONTEND pgstat_report_wait_end(); diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 49dda09..cfa5c5c 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -24,6 +24,7 @@ #include "storage/md.h" #include "storage/smgr.h" #include "storage/filecache.h" +#include "utils/hfs.h" #include "utils/hsearch.h" #include "utils/inval.h" #include "utils/guc.h" @@ -446,6 +447,9 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo) for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) smgrsw[which].smgr_unlink(rnodes[i], forknum, isRedo); + + //remove unused pages and related wals in localdisk cache. + RemoveBufferFromLocal(rnodes[i].node.dbNode, rnodes[i].node.relNode, MAX_FORKNUM, 0); } pfree(rnodes); @@ -674,6 +678,8 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb smgrsw[reln->smgr_which].smgr_truncate(reln, forknum[i], nblocks[i]); + //remove unused pages and related wals in localdisk cache. + RemoveBufferFromLocal(reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, forknum[i], nblocks[i]); /* * We might as well update the local smgr_cached_nblocks values. The * smgr cache inval message that this function sent will cause other @@ -737,6 +743,9 @@ smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber } smgrsw[reln->smgr_which].smgr_truncate(reln, forknum[i], nblocks[i]); + //remove unused pages and related wals in localdisk cache. + RemoveBufferFromLocal(reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, forknum[i], nblocks[i]); + /* * We might as well update the local smgr_cached_nblocks values. The * smgr cache inval message that this function sent will cause other diff --git a/src/include/utils/hfs.h b/src/include/utils/hfs.h index 622df1f..cefc9e4 100644 --- a/src/include/utils/hfs.h +++ b/src/include/utils/hfs.h @@ -52,7 +52,7 @@ extern Bufrd dataRead(int64_t fd, extern void free_dataRead(uint8_t *buf, size_t count, size_t cap); extern Bufrd readfs(int64_t fd, int64_t offset, uint32_t size); -extern int batchRead(uint8_t *buf, uint32_t timeline, uint64_t startPtr); +extern int batchRead(uint8_t *buf, uint32_t timeline, uint64_t startPtr, bool needStore); extern uint8_t kvwrite(XLogItem *xlogItem); extern uint8_t flushwals(XLogItem *xlogItem, uint32_t timeline); extern uint8_t kvflush(XLogRecPtr lsn); @@ -62,4 +62,5 @@ extern Bufrd ReadWalsByPage(uint32_t dbid, uint32_t blkno, uint64_t startlsn, uint64_t endlsn, - uint32_t timeline); \ No newline at end of file + uint32_t timeline); +extern void RemoveBufferFromLocal(uint32_t dbid, uint32_t relid, uint32_t forkno, uint32_t blkno); \ No newline at end of file From 44119883b61b9c852b7faf881cd5d953487afdee Mon Sep 17 00:00:00 2001 From: zoujia Date: Thu, 23 Mar 2023 11:36:45 +0800 Subject: [PATCH 06/10] code optimization --- src/backend/storage/file/fd.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index f391692..e0b334b 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -77,6 +77,7 @@ #include #include #include +#include #ifndef WIN32 #include #endif @@ -2367,7 +2368,10 @@ He3DBFileRead(File file, char **buffer, off_t offset, bufrd = MoveOnePageToMemory(pageKey); if (bufrd.count > 0) { - *buffer = bufrd.buf; + *buffer = (uint8_t *)malloc(bufrd.count); + memcpy(buffer, bufrd.buf,bufrd.count); + free_dataRead(bufrd.buf, bufrd.count, burfrd.cap); +// *buffer = bufrd.buf; return bufrd.count; } else @@ -2400,10 +2404,12 @@ He3DBFileRead(File file, char **buffer, off_t offset, result = ReadWalsByPage(pageKey); } - buf = (uint8_t *)realloc(buf, BLCKSZ + bufrd.count); - strcat(buf,bufrd.buf); + buf = (uint8_t *)realloc(buf, BLCKSZ + result.count); + strcat(buf,result.buf); + //TODO free result + free_dataRead(result.buf, result.count, result.cap); *buffer = buf; - return BLCKSZ + bufrd.count; + return BLCKSZ + result.count; } @@ -2426,7 +2432,7 @@ He3DBFileRead(File file, char **buffer, off_t offset, // retry: // pgstat_report_wait_start(wait_event_info); - returnCode = he3fs_pread(vfdP->fd, ((void**)buffer), offset, lsn, DataRead, dbid, relid, segno, forkno); +// returnCode = he3fs_pread(vfdP->fd, ((void**)buffer), offset, lsn, DataRead, dbid, relid, segno, forkno); // pgstat_report_wait_end(); // if (returnCode < 0) From bd378a22423f6968b64a85e250a33d2ccd8815bb Mon Sep 17 00:00:00 2001 From: zoujia Date: Thu, 23 Mar 2023 15:39:39 +0800 Subject: [PATCH 07/10] identify branches of master and standby --- src/backend/storage/buffer/bufmgr.c | 27 +++++--- src/backend/storage/file/fd.c | 102 ++++++++++------------------ src/backend/storage/smgr/md.c | 8 ++- src/include/storage/fd.h | 1 + 4 files changed, 63 insertions(+), 75 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 10d4e7c..381354e 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -49,6 +49,7 @@ #include "storage/proc.h" #include "storage/smgr.h" #include "storage/standby.h" +#include "storage/md.h" #include "utils/memdebug.h" #include "utils/ps_status.h" #include "utils/rel.h" @@ -856,11 +857,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, bool isExtend; bool isLocalBuf = SmgrIsTemp(smgr); /* he3db: local tem buffer for pageXlog */ - // char *pageXlogBuf; - + char *pageXlogBuf; + int nbytes; *hit = false; - // pageXlogBuf = NULL; + pageXlogBuf = NULL; /* Make sure we will have room to remember the buffer pin */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); @@ -1061,7 +1062,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, INSTR_TIME_SET_CURRENT(io_start); // XLogRecPtr replayLsn = GetXLogWriteRecPtr(); - smgrread(smgr, forkNum, blockNum, (char *) bufBlock); + if (!EnableHotStandby) { + smgrread(smgr, forkNum, blockNum, (char *) bufBlock); + } else { + XLogRecPtr replayLsn = GetXLogWriteRecPtr(); + nbytes = he3db_mdread(smgr,forkNum, blockNum, &pageXlogBuf, true, replayLsn); + memcpy((char *) bufBlock, pageXlogBuf, BLCKSZ); + } if (track_io_timing) @@ -1084,6 +1091,12 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, blockNum, relpath(smgr->smgr_rnode, forkNum)))); MemSet((char *) bufBlock, 0, BLCKSZ); + + if(pageXlogBuf != NULL) + { + free_dataRead(pageXlogBuf, 1, 1); + pageXlogBuf = NULL; + } } else ereport(ERROR, @@ -1122,22 +1135,20 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, else { //todo: read related wals in standby instance. - - /* * He3DB: page-replay. * * apply logs to this old page when read from disk. * */ - /*if (pageXlogBuf) + if (pageXlogBuf) { he3db_apply_page(bufHdr, pageXlogBuf + BLCKSZ, nbytes - BLCKSZ); free_dataRead(pageXlogBuf, 1, 1); - }*/ + } /* He3DB end */ /* Set BM_VALID, terminate IO, and wake up any waiters */ TerminateBufferIO(bufHdr, false, BM_VALID); diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index e0b334b..d0c073b 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -2384,25 +2384,25 @@ He3DBFileRead(File file, char **buffer, off_t offset, FileRead(file,buf,BLCKSZ,off_t,wait_event_info); - pageLSN = (buf[0] << 8) | - (buf[1] << 16) | - (buf[1] << 24) | - (buf[1] << 32) | - (buf[1] << 40) | - (buf[1] << 48) | - (buf[1] << 56); + pageLSN = buf[0] | + (buf[1] << 8) | + (buf[2] << 16) | + (buf[3] << 24) | + (buf[4] << 32) | + (buf[5] << 40) | + (buf[6] << 48) | + (buf[7] << 56); pageKey.pageLsn = pageLSN; pageKey.replyLsn = lsn; - //if links exists, the wals about the page must be in the disk. - if (PageIsHot()) - { - result = GetWalsFromDisk(pageKey); - } - else - { - result = ReadWalsByPage(pageKey); + + result = GetWalsFromDisk(pageKey); + if (result.count == 0) { + result = ReadWalsByPage(pageKey); } + + + buf = (uint8_t *)realloc(buf, BLCKSZ + result.count); strcat(buf,result.buf); @@ -2411,59 +2411,29 @@ He3DBFileRead(File file, char **buffer, off_t offset, *buffer = buf; return BLCKSZ + result.count; } +} +int +MasterFileRead(char *buffer,uint32_t dbid, uint32_t relid, uint32_t forkno, uint32_t blockno){ + PageKey pageKey; + Bufrd bufrd; + bufrd.count = 0; + + pageKey.relfileNode.dbNode = dbid; + pageKey.relfileNode.relNode = relid; + pageKey.forkNo = forkno; + pageKey.blkNo = blockno; + pageKey.pageLsn = 0; + pageKey.replyLsn = GetXLogWriteRecPtr(); - -// int returnCode; -// Vfd *vfdP; - -// Assert(FileIsValid(file)); - -// DO_DB(elog(LOG, "FileRead: %d (%s) " INT64_FORMAT " %p", -// file, VfdCache[file].fileName, -// (int64) offset, -// buffer)); - -// returnCode = FileAccess(file); -// if (returnCode < 0) -// return returnCode; - -// vfdP = &VfdCache[file]; - -// retry: -// pgstat_report_wait_start(wait_event_info); -// returnCode = he3fs_pread(vfdP->fd, ((void**)buffer), offset, lsn, DataRead, dbid, relid, segno, forkno); -// pgstat_report_wait_end(); - -// if (returnCode < 0) -// { -// /* -// * Windows may run out of kernel buffers and return "Insufficient -// * system resources" error. Wait a bit and retry to solve it. -// * -// * It is rumored that EINTR is also possible on some Unix filesystems, -// * in which case immediate retry is indicated. -// */ -// #ifdef WIN32 -// DWORD error = GetLastError(); - -// switch (error) -// { -// case ERROR_NO_SYSTEM_RESOURCES: -// pg_usleep(1000L); -// errno = EINTR; -// break; -// default: -// _dosmaperr(error); -// break; -// } -// #endif -// /* OK to retry if interrupted */ -// if (errno == EINTR) -// goto retry; -// } - -// return returnCode; + bufrd = MoveOnePageToMemory(pageKey); + if (bufrd.count > 0) + { + memcpy(buffer,bufrd.buf); + free_dataRead(bufrd.buf,bufrd.count,bufrd.cap); + } + + return bufrd.count; } int diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index d161706..0c050ba 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -676,7 +676,13 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); + //TODO read page from disk + nbytes = MasterFileRead(buffer,reln->smgr_rnode.node.dbNode,reln->smgr_rnode.node.relNode,forknum,blocknum); + + if (nbytes == 0) + { + nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); + } TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 1fb3b52..1330f65 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -94,6 +94,7 @@ extern void FileClose(File file); extern void He3DBFileClose(File file); extern int FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info); extern int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); +extern int MasterFileRead(char *buffer, uint32_t dbid, uint32_t relid, uint32_t forkno, uint32_t blockno); extern int He3DBFileRead(File file, char **buffer, off_t offset, uint32 wait_event_info, XLogRecPtr lsn, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno); extern int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); extern int He3DBFileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); From 3bac5aa6a2b11bbdc226b01576c2b2901beb871b Mon Sep 17 00:00:00 2001 From: zoujia Date: Thu, 23 Mar 2023 16:03:04 +0800 Subject: [PATCH 08/10] modify memory-free function --- src/backend/storage/buffer/bufmgr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 381354e..acd8927 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1094,7 +1094,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if(pageXlogBuf != NULL) { - free_dataRead(pageXlogBuf, 1, 1); + free(pageXlogBuf); pageXlogBuf = NULL; } } From 367e8c0fc04159adc73a498e0dceb7a519529db8 Mon Sep 17 00:00:00 2001 From: zoujia Date: Thu, 23 Mar 2023 17:01:12 +0800 Subject: [PATCH 09/10] modify grpc APIs --- src/backend/access/transam/xlog.c | 21 +- src/backend/replication/walreceiver.c | 2 +- src/backend/storage/buffer/bufmgr.c | 6 +- src/backend/storage/file/fd.c | 812 +++++++++++++------------- src/include/storage/fd.h | 18 +- 5 files changed, 426 insertions(+), 433 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3b85dbd..bab06cb 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -3370,8 +3370,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) INSTR_TIME_SET_CURRENT(start); pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE); - // written = pg_pwrite(openLogFile, from, nleft, startoffset); - written = writefs(openLogFile, from, nleft, startoffset); + written = pg_pwrite(openLogFile, from, nleft, startoffset); pgstat_report_wait_end(); /* @@ -4842,8 +4841,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) */ if (*use_existent) { - // fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); - fd = He3DBBasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); + fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); if (fd < 0) { if (errno != ENOENT) @@ -5001,8 +4999,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) *use_existent = false; /* Now open original target segment (might not be file I just made) */ - // fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); - fd = He3DBBasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); + fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); if (fd < 0) ereport(ERROR, (errcode_for_file_access(), @@ -5241,8 +5238,7 @@ XLogFileOpen(XLogSegNo segno) XLogFilePath(path, ThisTimeLineID, segno, wal_segment_size); - // fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); - fd = He3DBBasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); + fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); if (fd < 0) ereport(PANIC, (errcode_for_file_access(), @@ -5308,7 +5304,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname); } - fd = He3DBBasicOpenFile(path, O_RDONLY | PG_BINARY); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); if (fd >= 0) { /* Success! */ @@ -5485,8 +5481,7 @@ XLogFileClose(void) (void)posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED); #endif - // if (close(openLogFile) != 0) - if (closefs(openLogFile) != 0) + if (close(openLogFile) != 0) { char xlogfname[MAXFNAMELEN]; int save_errno = errno; @@ -5527,7 +5522,7 @@ PreallocXlogFiles(XLogRecPtr endptr) _logSegNo++; use_existent = true; lf = XLogFileInit(_logSegNo, &use_existent, true); - closefs(lf); + close(lf); if (!use_existent) CheckpointStats.ckpt_segs_added++; } @@ -7214,7 +7209,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog) fd = XLogFileInit(startLogSegNo, &use_existent, true); - if (closefs(fd) != 0) + if (close(fd) != 0) { char xlogfname[MAXFNAMELEN]; int save_errno = errno; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 126bb50..cbb29be 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -611,7 +611,7 @@ WalReceiverMain(void) XLogWalRcvFlush(false); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); - if (closefs(recvFile) != 0) + if (close(recvFile) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index acd8927..96d25af 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -20,7 +20,7 @@ * is using it. * * ReleaseBuffer() -- unpin a buffer - * + *f * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". * The disk write is delayed until buffer replacement or checkpoint. * @@ -1065,7 +1065,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (!EnableHotStandby) { smgrread(smgr, forkNum, blockNum, (char *) bufBlock); } else { - XLogRecPtr replayLsn = GetXLogWriteRecPtr(); + XLogRecPtr replayLsn = GetXLogReplayRecPtr(NULL); nbytes = he3db_mdread(smgr,forkNum, blockNum, &pageXlogBuf, true, replayLsn); memcpy((char *) bufBlock, pageXlogBuf, BLCKSZ); } @@ -1146,7 +1146,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, he3db_apply_page(bufHdr, pageXlogBuf + BLCKSZ, nbytes - BLCKSZ); - free_dataRead(pageXlogBuf, 1, 1); + free(pageXlogBuf); } /* He3DB end */ diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index d0c073b..59b91d2 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -352,8 +352,8 @@ static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel); static int fsync_parent_path(const char *fname, int elevel); /* He3DB: He3FS */ -ssize_t he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno); -ssize_t he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset); +//ssize_t he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno); +//ssize_t he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset); /* @@ -1056,11 +1056,11 @@ BasicOpenFile(const char *fileName, int fileFlags) * * Modified points: He3FS replace OSFS. */ -int64_t -He3DBBasicOpenFile(const char *fileName, int fileFlags) -{ - return He3DBBasicOpenFilePerm(fileName, fileFlags | PG_O_DIRECT, pg_file_create_mode); -} +// int64_t +// He3DBBasicOpenFile(const char *fileName, int fileFlags) +// { +// return He3DBBasicOpenFilePerm(fileName, fileFlags | PG_O_DIRECT, pg_file_create_mode); +// } /* * BasicOpenFilePerm --- same as open(2) except can free other FDs if needed @@ -1111,53 +1111,53 @@ tryAgain: * * Modified points: He3FS replace OSFS. */ -int64_t -He3DBBasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode) -{ - IOResult ioResult; +// int64_t +// He3DBBasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode) +// { +// IOResult ioResult; -tryAgain: - //fd = open(fileName, fileFlags, fileMode); - /* He3DB: He3FS replace OSFS */ - // if (push_standby) - // { - // /* propeller instance */ - // fi = he3Open(fileName, fileFlags, fileMode, 3); - // } - // else - // { - // /* primary instance */ - // fi = he3Open(fileName, fileFlags, fileMode, 1); - // } - // errno = fi.errNo; - // if (fi.fd >= 0) - // return fi.fd; /* success! */ +// tryAgain: +// //fd = open(fileName, fileFlags, fileMode); +// /* He3DB: He3FS replace OSFS */ +// // if (push_standby) +// // { +// // /* propeller instance */ +// // fi = he3Open(fileName, fileFlags, fileMode, 3); +// // } +// // else +// // { +// // /* primary instance */ +// // fi = he3Open(fileName, fileFlags, fileMode, 1); +// // } +// // errno = fi.errNo; +// // if (fi.fd >= 0) +// // return fi.fd; /* success! */ - ioResult = openfs(fileName, fileFlags); +// ioResult = openfs(fileName, fileFlags); - if (ioResult.fd >= 0){ - return ioResult.fd; - } +// if (ioResult.fd >= 0){ +// return ioResult.fd; +// } - errno = ioResult.error; - if (errno == EMFILE || errno == ENFILE) - { - int save_errno = errno; +// errno = ioResult.error; +// if (errno == EMFILE || errno == ENFILE) +// { +// int save_errno = errno; - ereport(LOG, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("out of file descriptors: %m; release and retry"))); - errno = 0; - if (ReleaseLruFile()) - goto tryAgain; - errno = save_errno; - } +// ereport(LOG, +// (errcode(ERRCODE_INSUFFICIENT_RESOURCES), +// errmsg("out of file descriptors: %m; release and retry"))); +// errno = 0; +// if (ReleaseLruFile()) +// goto tryAgain; +// errno = save_errno; +// } - return -1; /* failure */ -} +// return -1; /* failure */ +// } /* * AcquireExternalFD - attempt to reserve an external file descriptor @@ -1366,49 +1366,49 @@ LruInsert(File file) } /* returns 0 on success, -1 on re-open failure (with errno set) */ -int -He3LruInsert(File file) -{ - Vfd *vfdP; +// int +// He3LruInsert(File file) +// { +// Vfd *vfdP; - Assert(file != 0); +// Assert(file != 0); - DO_DB(elog(LOG, "LruInsert %d (%s)", - file, VfdCache[file].fileName)); +// DO_DB(elog(LOG, "LruInsert %d (%s)", +// file, VfdCache[file].fileName)); - vfdP = &VfdCache[file]; +// vfdP = &VfdCache[file]; - if (FileIsNotOpen(file)) - { - /* Close excess kernel FDs. */ - ReleaseLruFiles(); +// if (FileIsNotOpen(file)) +// { +// /* Close excess kernel FDs. */ +// ReleaseLruFiles(); - /* - * The open could still fail for lack of file descriptors, eg due to - * overall system file table being full. So, be prepared to release - * another FD if necessary... - */ - vfdP->fd = He3DBBasicOpenFilePerm(vfdP->fileName, vfdP->fileFlags, - vfdP->fileMode); - if (vfdP->fd < 0) - { - DO_DB(elog(LOG, "re-open failed: %m")); - return -1; - } - else - { - ++nfile; - } - } +// /* +// * The open could still fail for lack of file descriptors, eg due to +// * overall system file table being full. So, be prepared to release +// * another FD if necessary... +// */ +// vfdP->fd = He3DBBasicOpenFilePerm(vfdP->fileName, vfdP->fileFlags, +// vfdP->fileMode); +// if (vfdP->fd < 0) +// { +// DO_DB(elog(LOG, "re-open failed: %m")); +// return -1; +// } +// else +// { +// ++nfile; +// } +// } - /* - * put it at the head of the Lru ring - */ +// /* +// * put it at the head of the Lru ring +// */ - Insert(file); +// Insert(file); - return 0; -} +// return 0; +// } /* * Release one kernel FD by closing the least-recently-used VFD. */ @@ -1556,38 +1556,38 @@ FileAccess(File file) } /* returns 0 on success, -1 on re-open failure (with errno set) */ -int -He3FileAccess(File file) -{ - int returnValue; +// int +// He3FileAccess(File file) +// { +// int returnValue; - DO_DB(elog(LOG, "FileAccess %d (%s)", - file, VfdCache[file].fileName)); +// DO_DB(elog(LOG, "FileAccess %d (%s)", +// file, VfdCache[file].fileName)); - /* - * Is the file open? If not, open it and put it at the head of the LRU - * ring (possibly closing the least recently used file to get an FD). - */ +// /* +// * Is the file open? If not, open it and put it at the head of the LRU +// * ring (possibly closing the least recently used file to get an FD). +// */ - if (FileIsNotOpen(file)) - { - returnValue = He3LruInsert(file); - if (returnValue != 0) - return returnValue; - } - else if (VfdCache[0].lruLessRecently != file) - { - /* - * We now know that the file is open and that it is not the last one - * accessed, so we need to move it to the head of the Lru ring. - */ +// if (FileIsNotOpen(file)) +// { +// returnValue = He3LruInsert(file); +// if (returnValue != 0) +// return returnValue; +// } +// else if (VfdCache[0].lruLessRecently != file) +// { +// /* +// * We now know that the file is open and that it is not the last one +// * accessed, so we need to move it to the head of the Lru ring. +// */ - Delete(file); - Insert(file); - } +// Delete(file); +// Insert(file); +// } - return 0; -} +// return 0; +// } /* * Called whenever a temporary file is deleted to report its size. */ @@ -1650,16 +1650,16 @@ PathNameOpenFile(const char *fileName, int fileFlags) * * Modified points: He3FS replace OSFS. */ -File -He3DBPathNameOpenFile(const char *fileName, int fileFlags) -{ - File fd; - // clock_t start1 = clock(); - fd = He3DBPathNameOpenFilePerm(fileName, fileFlags, pg_file_create_mode); - // clock_t end1 = clock(); - // elog(LOG,"===He3DBPathNameOpenFile==fileName=%s=fileFlags=%d===%lu=",fileName,fileFlags,end1-start1); - return fd; -} +// File +// He3DBPathNameOpenFile(const char *fileName, int fileFlags) +// { +// File fd; +// // clock_t start1 = clock(); +// fd = He3DBPathNameOpenFilePerm(fileName, fileFlags, pg_file_create_mode); +// // clock_t end1 = clock(); +// // elog(LOG,"===He3DBPathNameOpenFile==fileName=%s=fileFlags=%d===%lu=",fileName,fileFlags,end1-start1); +// return fd; +// } /* * open a file in an arbitrary directory @@ -1727,59 +1727,59 @@ PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode) * * Modified points: He3FS replace OSFS. */ -File -He3DBPathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode) -{ - char *fnamecopy; - File file; - Vfd *vfdP; +// File +// He3DBPathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode) +// { +// char *fnamecopy; +// File file; +// Vfd *vfdP; - DO_DB(elog(LOG, "PathNameOpenFilePerm: %s %x %o", - fileName, fileFlags, fileMode)); +// DO_DB(elog(LOG, "PathNameOpenFilePerm: %s %x %o", +// fileName, fileFlags, fileMode)); - /* - * We need a malloc'd copy of the file name; fail cleanly if no room. - */ - fnamecopy = strdup(fileName); - if (fnamecopy == NULL) - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); +// /* +// * We need a malloc'd copy of the file name; fail cleanly if no room. +// */ +// fnamecopy = strdup(fileName); +// if (fnamecopy == NULL) +// ereport(ERROR, +// (errcode(ERRCODE_OUT_OF_MEMORY), +// errmsg("out of memory"))); - file = AllocateVfd(); - vfdP = &VfdCache[file]; +// file = AllocateVfd(); +// vfdP = &VfdCache[file]; - /* Close excess kernel FDs. */ - ReleaseLruFiles(); +// /* Close excess kernel FDs. */ +// ReleaseLruFiles(); - /* He3DB: He3FS replace OSFS */ - vfdP->fd = He3DBBasicOpenFilePerm(fileName, fileFlags, fileMode); +// /* He3DB: He3FS replace OSFS */ +// vfdP->fd = He3DBBasicOpenFilePerm(fileName, fileFlags, fileMode); - if (vfdP->fd < 0) - { - int save_errno = errno; +// if (vfdP->fd < 0) +// { +// int save_errno = errno; - FreeVfd(file); - free(fnamecopy); - errno = save_errno; - return -1; - } - ++nfile; - DO_DB(elog(LOG, "PathNameOpenFile: success %d", - vfdP->fd)); +// FreeVfd(file); +// free(fnamecopy); +// errno = save_errno; +// return -1; +// } +// ++nfile; +// DO_DB(elog(LOG, "PathNameOpenFile: success %d", +// vfdP->fd)); - vfdP->fileName = fnamecopy; - /* Saved flags are adjusted to be OK for re-opening file */ - vfdP->fileFlags = fileFlags & ~(O_CREAT | O_TRUNC | O_EXCL); - vfdP->fileMode = fileMode; - vfdP->fileSize = 0; - vfdP->fdstate = 0x0; - vfdP->resowner = NULL; +// vfdP->fileName = fnamecopy; +// /* Saved flags are adjusted to be OK for re-opening file */ +// vfdP->fileFlags = fileFlags & ~(O_CREAT | O_TRUNC | O_EXCL); +// vfdP->fileMode = fileMode; +// vfdP->fileSize = 0; +// vfdP->fdstate = 0x0; +// vfdP->resowner = NULL; - Insert(file); +// Insert(file); - return file; -} +// return file; +// } /* * Create directory 'directory'. If necessary, create 'basedir', which must @@ -2195,97 +2195,97 @@ FileClose(File file) FreeVfd(file); } -void -He3DBFileClose(File file) -{ - Vfd *vfdP; +// void +// He3DBFileClose(File file) +// { +// Vfd *vfdP; - Assert(FileIsValid(file)); +// Assert(FileIsValid(file)); - DO_DB(elog(LOG, "FileClose: %d (%s)", - file, VfdCache[file].fileName)); +// DO_DB(elog(LOG, "FileClose: %d (%s)", +// file, VfdCache[file].fileName)); - vfdP = &VfdCache[file]; +// vfdP = &VfdCache[file]; - if (!FileIsNotOpen(file)) - { - /* close the file */ - //if (close(vfdP->fd) != 0) - if (closefs(vfdP->fd) != 0) - { - /* - * We may need to panic on failure to close non-temporary files; - * see LruDelete. - */ - elog(vfdP->fdstate & FD_TEMP_FILE_LIMIT ? LOG : data_sync_elevel(LOG), - "could not close file \"%s\": %m", vfdP->fileName); - } +// if (!FileIsNotOpen(file)) +// { +// /* close the file */ +// //if (close(vfdP->fd) != 0) +// if (closefs(vfdP->fd) != 0) +// { +// /* +// * We may need to panic on failure to close non-temporary files; +// * see LruDelete. +// */ +// elog(vfdP->fdstate & FD_TEMP_FILE_LIMIT ? LOG : data_sync_elevel(LOG), +// "could not close file \"%s\": %m", vfdP->fileName); +// } - --nfile; - vfdP->fd = VFD_CLOSED; +// --nfile; +// vfdP->fd = VFD_CLOSED; - /* remove the file from the lru ring */ - Delete(file); - } +// /* remove the file from the lru ring */ +// Delete(file); +// } - if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) - { - /* Subtract its size from current usage (do first in case of error) */ - temporary_files_size -= vfdP->fileSize; - vfdP->fileSize = 0; - } +// if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) +// { +// /* Subtract its size from current usage (do first in case of error) */ +// temporary_files_size -= vfdP->fileSize; +// vfdP->fileSize = 0; +// } - /* - * Delete the file if it was temporary, and make a log entry if wanted - */ - if (vfdP->fdstate & FD_DELETE_AT_CLOSE) - { - struct stat filestats; - int stat_errno; +// /* +// * Delete the file if it was temporary, and make a log entry if wanted +// */ +// if (vfdP->fdstate & FD_DELETE_AT_CLOSE) +// { +// struct stat filestats; +// int stat_errno; - /* - * If we get an error, as could happen within the ereport/elog calls, - * we'll come right back here during transaction abort. Reset the - * flag to ensure that we can't get into an infinite loop. This code - * is arranged to ensure that the worst-case consequence is failing to - * emit log message(s), not failing to attempt the unlink. - */ - vfdP->fdstate &= ~FD_DELETE_AT_CLOSE; +// /* +// * If we get an error, as could happen within the ereport/elog calls, +// * we'll come right back here during transaction abort. Reset the +// * flag to ensure that we can't get into an infinite loop. This code +// * is arranged to ensure that the worst-case consequence is failing to +// * emit log message(s), not failing to attempt the unlink. +// */ +// vfdP->fdstate &= ~FD_DELETE_AT_CLOSE; - /* first try the stat() */ - if (stat(vfdP->fileName, &filestats)) - stat_errno = errno; - else - stat_errno = 0; +// /* first try the stat() */ +// if (stat(vfdP->fileName, &filestats)) +// stat_errno = errno; +// else +// stat_errno = 0; - /* in any case do the unlink */ - if (unlink(vfdP->fileName)) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not delete file \"%s\": %m", vfdP->fileName))); +// /* in any case do the unlink */ +// if (unlink(vfdP->fileName)) +// ereport(LOG, +// (errcode_for_file_access(), +// errmsg("could not delete file \"%s\": %m", vfdP->fileName))); - /* and last report the stat results */ - if (stat_errno == 0) - ReportTemporaryFileUsage(vfdP->fileName, filestats.st_size); - else - { - errno = stat_errno; - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", vfdP->fileName))); - } - } +// /* and last report the stat results */ +// if (stat_errno == 0) +// ReportTemporaryFileUsage(vfdP->fileName, filestats.st_size); +// else +// { +// errno = stat_errno; +// ereport(LOG, +// (errcode_for_file_access(), +// errmsg("could not stat file \"%s\": %m", vfdP->fileName))); +// } +// } - /* Unregister it from the resource owner */ - if (vfdP->resowner) - ResourceOwnerForgetFile(vfdP->resowner, file); +// /* Unregister it from the resource owner */ +// if (vfdP->resowner) +// ResourceOwnerForgetFile(vfdP->resowner, file); - /* - * Return the Vfd slot to the free list - */ - FreeVfd(file); -} +// /* +// * Return the Vfd slot to the free list +// */ +// FreeVfd(file); +// } /* @@ -2370,7 +2370,7 @@ He3DBFileRead(File file, char **buffer, off_t offset, { *buffer = (uint8_t *)malloc(bufrd.count); memcpy(buffer, bufrd.buf,bufrd.count); - free_dataRead(bufrd.buf, bufrd.count, burfrd.cap); + free_dataRead(bufrd.buf, bufrd.count, bufrd.cap); // *buffer = bufrd.buf; return bufrd.count; } @@ -2382,7 +2382,7 @@ He3DBFileRead(File file, char **buffer, off_t offset, uint64 pageLSN = 0; - FileRead(file,buf,BLCKSZ,off_t,wait_event_info); + FileRead(file,buf,BLCKSZ,offset,wait_event_info); pageLSN = buf[0] | (buf[1] << 8) | @@ -2398,7 +2398,8 @@ He3DBFileRead(File file, char **buffer, off_t offset, result = GetWalsFromDisk(pageKey); if (result.count == 0) { - result = ReadWalsByPage(pageKey); + result = ReadWalsByPage(pageKey.relfileNode.dbNode,pageKey.relfileNode.relNode, + pageKey.forkNo,pageKey.blkNo,pageKey.pageLsn,pageKey.replyLsn,0); } @@ -2429,7 +2430,7 @@ MasterFileRead(char *buffer,uint32_t dbid, uint32_t relid, uint32_t forkno, uint bufrd = MoveOnePageToMemory(pageKey); if (bufrd.count > 0) { - memcpy(buffer,bufrd.buf); + memcpy(buffer,bufrd.buf,bufrd.count); free_dataRead(bufrd.buf,bufrd.count,bufrd.cap); } @@ -2492,103 +2493,100 @@ retry: return returnCode; } -int -He3DBFileWrite(File file, char *buffer, int amount, off_t offset, - uint32 wait_event_info) -{ - int returnCode; - Vfd *vfdP; +// int +// He3DBFileWrite(File file, char *buffer, int amount, off_t offset, +// uint32 wait_event_info) +// { +// int returnCode; +// Vfd *vfdP; - Assert(FileIsValid(file)); +// Assert(FileIsValid(file)); - DO_DB(elog(LOG, "FileWrite: %d (%s) " INT64_FORMAT " %d %p", - file, VfdCache[file].fileName, - (int64) offset, - amount, buffer)); +// DO_DB(elog(LOG, "FileWrite: %d (%s) " INT64_FORMAT " %d %p", +// file, VfdCache[file].fileName, +// (int64) offset, +// amount, buffer)); - returnCode = FileAccess(file); - if (returnCode < 0) - return returnCode; +// returnCode = FileAccess(file); +// if (returnCode < 0) +// return returnCode; +// /* +// * If enforcing temp_file_limit and it's a temp file, check to see if the +// * write would overrun temp_file_limit, and throw error if so. Note: it's +// * really a modularity violation to throw error here; we should set errno +// * and return -1. However, there's no way to report a suitable error +// * message if we do that. All current callers would just throw error +// * immediately anyway, so this is safe at present. +// */ +// if (temp_file_limit >= 0 && (vfdP->fdstate & FD_TEMP_FILE_LIMIT)) +// { +// off_t past_write = offset + amount; - vfdP = &VfdCache[file]; +// if (past_write > vfdP->fileSize) +// { +// uint64 newTotal = temporary_files_size; - /* - * If enforcing temp_file_limit and it's a temp file, check to see if the - * write would overrun temp_file_limit, and throw error if so. Note: it's - * really a modularity violation to throw error here; we should set errno - * and return -1. However, there's no way to report a suitable error - * message if we do that. All current callers would just throw error - * immediately anyway, so this is safe at present. - */ - if (temp_file_limit >= 0 && (vfdP->fdstate & FD_TEMP_FILE_LIMIT)) - { - off_t past_write = offset + amount; +// newTotal += past_write - vfdP->fileSize; +// if (newTotal > (uint64) temp_file_limit * (uint64) 1024) +// ereport(ERROR, +// (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), +// errmsg("temporary file size exceeds temp_file_limit (%dkB)", +// temp_file_limit))); +// } +// } - if (past_write > vfdP->fileSize) - { - uint64 newTotal = temporary_files_size; +// retry: +// errno = 0; +// pgstat_report_wait_start(wait_event_info); +// returnCode = he3fs_pwrite(VfdCache[file].fd, buffer, amount, offset); +// pgstat_report_wait_end(); - newTotal += past_write - vfdP->fileSize; - if (newTotal > (uint64) temp_file_limit * (uint64) 1024) - ereport(ERROR, - (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), - errmsg("temporary file size exceeds temp_file_limit (%dkB)", - temp_file_limit))); - } - } +// /* if write didn't set errno, assume problem is no disk space */ +// if (returnCode != amount && errno == 0) +// errno = ENOSPC; -retry: - errno = 0; - pgstat_report_wait_start(wait_event_info); - returnCode = he3fs_pwrite(VfdCache[file].fd, buffer, amount, offset); - pgstat_report_wait_end(); +// if (returnCode >= 0) +// { +// /* +// * Maintain fileSize and temporary_files_size if it's a temp file. +// */ +// if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) +// { +// off_t past_write = offset + amount; - /* if write didn't set errno, assume problem is no disk space */ - if (returnCode != amount && errno == 0) - errno = ENOSPC; +// if (past_write > vfdP->fileSize) +// { +// temporary_files_size += past_write - vfdP->fileSize; +// vfdP->fileSize = past_write; +// } +// } +// } +// else +// { +// /* +// * See comments in FileRead() +// */ +// #ifdef WIN32 +// DWORD error = GetLastError(); - if (returnCode >= 0) - { - /* - * Maintain fileSize and temporary_files_size if it's a temp file. - */ - if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) - { - off_t past_write = offset + amount; +// switch (error) +// { +// case ERROR_NO_SYSTEM_RESOURCES: +// pg_usleep(1000L); +// errno = EINTR; +// break; +// default: +// _dosmaperr(error); +// break; +// } +// #endif +// /* OK to retry if interrupted */ +// if (errno == EINTR) +// goto retry; +// } - if (past_write > vfdP->fileSize) - { - temporary_files_size += past_write - vfdP->fileSize; - vfdP->fileSize = past_write; - } - } - } - else - { - /* - * See comments in FileRead() - */ -#ifdef WIN32 - DWORD error = GetLastError(); - - switch (error) - { - case ERROR_NO_SYSTEM_RESOURCES: - pg_usleep(1000L); - errno = EINTR; - break; - default: - _dosmaperr(error); - break; - } -#endif - /* OK to retry if interrupted */ - if (errno == EINTR) - goto retry; - } - - return returnCode; -} +// return returnCode; +// } int FileWrite(File file, char *buffer, int amount, off_t offset, @@ -2731,22 +2729,22 @@ FileSize(File file) /* * He3DB: He3FS replace OSFS. */ -off_t -He3DBFileSize(File file) -{ - Assert(FileIsValid(file)); +// off_t +// He3DBFileSize(File file) +// { +// Assert(FileIsValid(file)); - DO_DB(elog(LOG, "FileSize %d (%s)", - file, VfdCache[file].fileName)); +// DO_DB(elog(LOG, "FileSize %d (%s)", +// file, VfdCache[file].fileName)); - if (FileIsNotOpen(file)) - { - if (FileAccess(file) < 0) - return (off_t) -1; - } +// if (FileIsNotOpen(file)) +// { +// if (FileAccess(file) < 0) +// return (off_t) -1; +// } - return (off_t)lseekfs(VfdCache[file].fd, 0, SEEK_END); -} +// return (off_t)lseekfs(VfdCache[file].fd, 0, SEEK_END); +// } int FileTruncate(File file, off_t offset, uint32 wait_event_info) @@ -2777,36 +2775,36 @@ FileTruncate(File file, off_t offset, uint32 wait_event_info) return returnCode; } -int -He3FileTruncate(File file, off_t offset, uint32 wait_event_info,bool isTemp) -{ - int returnCode; +// int +// He3FileTruncate(File file, off_t offset, uint32 wait_event_info,bool isTemp) +// { +// int returnCode; - Assert(FileIsValid(file)); +// Assert(FileIsValid(file)); - DO_DB(elog(LOG, "FileTruncate %d (%s)", - file, VfdCache[file].fileName)); +// DO_DB(elog(LOG, "FileTruncate %d (%s)", +// file, VfdCache[file].fileName)); - returnCode = He3FileAccess(file); - if (returnCode < 0) - return returnCode; +// returnCode = He3FileAccess(file); +// if (returnCode < 0) +// return returnCode; - pgstat_report_wait_start(wait_event_info); - if (isTemp == true || push_standby == true) { - returnCode = truncatefs(VfdCache[file].fd, offset); - } - pgstat_report_wait_end(); +// pgstat_report_wait_start(wait_event_info); +// if (isTemp == true || push_standby == true) { +// returnCode = truncatefs(VfdCache[file].fd, offset); +// } +// pgstat_report_wait_end(); - if (returnCode == 0 && VfdCache[file].fileSize > offset) - { - /* adjust our state for truncation of a temp file */ - Assert(VfdCache[file].fdstate & FD_TEMP_FILE_LIMIT); - temporary_files_size -= VfdCache[file].fileSize - offset; - VfdCache[file].fileSize = offset; - } +// if (returnCode == 0 && VfdCache[file].fileSize > offset) +// { +// /* adjust our state for truncation of a temp file */ +// Assert(VfdCache[file].fdstate & FD_TEMP_FILE_LIMIT); +// temporary_files_size -= VfdCache[file].fileSize - offset; +// VfdCache[file].fileSize = offset; +// } - return returnCode; -} +// return returnCode; +// } /* * Return the pathname associated with an open file. * @@ -4349,55 +4347,55 @@ pg_pwritev_with_retry(int fd, const struct iovec *iov, int iovcnt, off_t offset) /* * He3DB: He3FS replace OSFS. */ -ssize_t -he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno) -{ - Bufrd result; +// ssize_t +// he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno) +// { +// Bufrd result; - switch (type) - { - case DataRead: - result = dataRead(fd, offset, lsn, dbid, relid, segno, forkno); - *buf = result.buf; - return (ssize_t)result.count; - default: - elog(ERROR, "unrecognized strategy number: %d", type); - } -} +// switch (type) +// { +// case DataRead: +// result = dataRead(fd, offset, lsn, dbid, relid, segno, forkno); +// *buf = result.buf; +// return (ssize_t)result.count; +// default: +// elog(ERROR, "unrecognized strategy number: %d", type); +// } +// } /* * He3DB: He3FS replace OSFS. */ -ssize_t -he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset) -{ - return (ssize_t)writefs(fd, (char *)buf, size, offset); -} +//ssize_t +// he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset) +// { +// return (ssize_t)writefs(fd, (char *)buf, size, offset); +// } -ssize_t -he3fs_xlogread(int64_t fd, void *buf, off_t offset, size_t size) -{ - Bufrd result; - size_t count; - // printf("start read xlog %ld\n", fd); - // pg_usleep(20000000); - // printf("end sleep, offset %d, size %d\n", offset, size); - result = readfs(fd, offset, size); - if (result.count <= 0) - return (ssize_t)result.count; - else if (result.count <= XLOG_BLCKSZ) - { - memcpy(buf, result.buf, result.count); - count = result.count; - } - else - { - memcpy(buf, result.buf, XLOG_BLCKSZ); - count = BLCKSZ; - } +// ssize_t +// he3fs_xlogread(int64_t fd, void *buf, off_t offset, size_t size) +// { +// Bufrd result; +// size_t count; +// // printf("start read xlog %ld\n", fd); +// // pg_usleep(20000000); +// // printf("end sleep, offset %d, size %d\n", offset, size); +// result = readfs(fd, offset, size); +// if (result.count <= 0) +// return (ssize_t)result.count; +// else if (result.count <= XLOG_BLCKSZ) +// { +// memcpy(buf, result.buf, result.count); +// count = result.count; +// } +// else +// { +// memcpy(buf, result.buf, XLOG_BLCKSZ); +// count = BLCKSZ; +// } - free_dataRead(result.buf, 1, 1); - // *buf = result.buf; - return (ssize_t)count; +// free_dataRead(result.buf, 1, 1); +// // *buf = result.buf; +// return (ssize_t)count; -} +// } diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 1330f65..9442332 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -86,12 +86,12 @@ extern int max_safe_fds; /* Operations on virtual Files --- equivalent to Unix kernel file ops */ extern File PathNameOpenFile(const char *fileName, int fileFlags); -extern File He3DBPathNameOpenFile(const char *fileName, int fileFlags); +//extern File He3DBPathNameOpenFile(const char *fileName, int fileFlags); extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); -extern File He3DBPathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); +//extern File He3DBPathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); extern File OpenTemporaryFile(bool interXact); extern void FileClose(File file); -extern void He3DBFileClose(File file); +//extern void He3DBFileClose(File file); extern int FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info); extern int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); extern int MasterFileRead(char *buffer, uint32_t dbid, uint32_t relid, uint32_t forkno, uint32_t blockno); @@ -100,9 +100,9 @@ extern int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 w extern int He3DBFileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); extern int FileSync(File file, uint32 wait_event_info); extern off_t FileSize(File file); -extern off_t He3DBFileSize(File file); +//extern off_t He3DBFileSize(File file); extern int FileTruncate(File file, off_t offset, uint32 wait_event_info); -extern int He3FileTruncate(File file, off_t offset, uint32 wait_event_info,bool isTemp); +//extern int He3FileTruncate(File file, off_t offset, uint32 wait_event_info,bool isTemp); extern void FileWriteback(File file, off_t offset, off_t nbytes, uint32 wait_event_info); extern char *FilePathName(File file); extern int FileGetRawDesc(File file); @@ -141,7 +141,7 @@ extern int CloseTransientFile(int fd); extern int BasicOpenFile(const char *fileName, int fileFlags); extern int64_t He3DBBasicOpenFile(const char *fileName, int fileFlags); extern int BasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); -extern int64_t He3DBBasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); +//extern int64_t He3DBBasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); /* Use these for other cases, and also for long-lived BasicOpenFile FDs */ extern bool AcquireExternalFD(void); @@ -186,9 +186,9 @@ extern void SyncDataDirectory(void); extern int data_sync_elevel(int elevel); /* He3DB: He3FS */ -extern ssize_t he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno); -extern ssize_t he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset); -extern ssize_t he3fs_xlogread(int64_t fd, void *buf, off_t Offset, size_t size); +//extern ssize_t he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno);// +//extern ssize_t he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset); +//extern ssize_t he3fs_xlogread(int64_t fd, void *buf, off_t Offset, size_t size); /* Filename components */ #define PG_TEMP_FILES_DIR "pgsql_tmp" From f5da1a230cea61c4586948502c2a67514f6769fb Mon Sep 17 00:00:00 2001 From: shipixian Date: Thu, 23 Mar 2023 17:49:26 +0800 Subject: [PATCH 10/10] remove unused codes Signed-off-by: shipixian --- src/backend/access/transam/xlog.c | 176 ++++++++++++------------ src/backend/access/transam/xlogreader.c | 42 +++--- src/backend/replication/walreceiver.c | 4 +- 3 files changed, 111 insertions(+), 111 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index bab06cb..c9221d1 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -3558,102 +3558,102 @@ void freeItemList(XLogItemList *xlogItemList) xlogItemList->tail = xlogItemList->head = NULL; } -static void -He3DBXLogWrite(XLogwrtRqst WriteRqst, bool flexible) -{ - LogwrtResult = XLogCtl->LogwrtResult; - char *from; - uint8 part[4]; - int count; - int stp; - int xlogLength; +// static void +// He3DBXLogWrite(XLogwrtRqst WriteRqst, bool flexible) +// { +// LogwrtResult = XLogCtl->LogwrtResult; +// char *from; +// uint8 part[4]; +// int count; +// int stp; +// int xlogLength; - from = XLogCtl->pages + LogwrtResult.Write % ((XLOGbuffers-1) * XLOG_BLCKSZ); - count = LogwrtResult.Write; - // printf("invoke xlog write, upto %ld\n", WriteRqst.Write); - while (count < WriteRqst.Write) - { - //TODO splite the xlog_buffer and construct k-v pairs - if (count < WriteRqst.Write) - { - if (xlogItemList == NULL) - { - xlogItemList = (XLogItemList *)malloc(sizeof(XLogItemList)); - xlogItemList->head = NULL; - xlogItemList->tail = NULL; - } - for(stp = 0; stp < 4; stp ++) - { - part[stp] = *(from + stp); - } - xlogLength = GetXlogLength(part,4); - XLogItem *xlogItem = (XLogItem *)malloc(sizeof(XLogItem)); +// from = XLogCtl->pages + LogwrtResult.Write % ((XLOGbuffers-1) * XLOG_BLCKSZ); +// count = LogwrtResult.Write; +// // printf("invoke xlog write, upto %ld\n", WriteRqst.Write); +// while (count < WriteRqst.Write) +// { +// //TODO splite the xlog_buffer and construct k-v pairs +// if (count < WriteRqst.Write) +// { +// if (xlogItemList == NULL) +// { +// xlogItemList = (XLogItemList *)malloc(sizeof(XLogItemList)); +// xlogItemList->head = NULL; +// xlogItemList->tail = NULL; +// } +// for(stp = 0; stp < 4; stp ++) +// { +// part[stp] = *(from + stp); +// } +// xlogLength = GetXlogLength(part,4); +// XLogItem *xlogItem = (XLogItem *)malloc(sizeof(XLogItem)); - (xlogItem->xlogKey).lsn = count; - xlogItem->begin = from; +// (xlogItem->xlogKey).lsn = count; +// xlogItem->begin = from; - if ((count + xlogLength)/((XLOGbuffers-1) * XLOG_BLCKSZ) > count / ((XLOGbuffers-1) * XLOG_BLCKSZ)) - { - from = XLogCtl->pages + ((count + xlogLength)%((XLOGbuffers-1) * XLOG_BLCKSZ)); - } - else - { - from = from + xlogLength; - } +// if ((count + xlogLength)/((XLOGbuffers-1) * XLOG_BLCKSZ) > count / ((XLOGbuffers-1) * XLOG_BLCKSZ)) +// { +// from = XLogCtl->pages + ((count + xlogLength)%((XLOGbuffers-1) * XLOG_BLCKSZ)); +// } +// else +// { +// from = from + xlogLength; +// } - xlogItem->length = xlogLength; - xlogItem->next = NULL; - if (xlogItemList->head == NULL) - { - xlogItemList->head = xlogItem; - xlogItemList->tail = xlogItemList->head; - } - else - { - xlogItemList->tail->next = xlogItem; - xlogItemList->tail = xlogItemList->tail->next; - } - count += xlogLength; - } - } +// xlogItem->length = xlogLength; +// xlogItem->next = NULL; +// if (xlogItemList->head == NULL) +// { +// xlogItemList->head = xlogItem; +// xlogItemList->tail = xlogItemList->head; +// } +// else +// { +// xlogItemList->tail->next = xlogItem; +// xlogItemList->tail = xlogItemList->tail->next; +// } +// count += xlogLength; +// } +// } - //TODO flush into queue - if (xlogItemList != NULL) - { - if (xlogItemList->head != NULL) - { - kvwrite(xlogItemList->head); - freeItemList(xlogItemList); - WalStats.m_wal_write++; - LogwrtResult.Write = WriteRqst.Write; - } - free(xlogItemList); - xlogItemList = NULL; - } +// //TODO flush into queue +// if (xlogItemList != NULL) +// { +// if (xlogItemList->head != NULL) +// { +// kvwrite(xlogItemList->head); +// freeItemList(xlogItemList); +// WalStats.m_wal_write++; +// LogwrtResult.Write = WriteRqst.Write; +// } +// free(xlogItemList); +// xlogItemList = NULL; +// } - if (LogwrtResult.Flush < WriteRqst.Flush && - LogwrtResult.Flush < LogwrtResult.Write) - { - //TODO signal flush - kvflush(LogwrtResult.Write); - LogwrtResult.Flush = LogwrtResult.Write; - } +// if (LogwrtResult.Flush < WriteRqst.Flush && +// LogwrtResult.Flush < LogwrtResult.Write) +// { +// //TODO signal flush +// kvflush(LogwrtResult.Write); +// LogwrtResult.Flush = LogwrtResult.Write; +// } - /* - * Update shared-memory status - */ - { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->LogwrtResult = LogwrtResult; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; - XLogCtl->globalUpto = 0; - SpinLockRelease(&XLogCtl->info_lck); - } -} +// /* +// * Update shared-memory status +// */ +// { +// SpinLockAcquire(&XLogCtl->info_lck); +// XLogCtl->LogwrtResult = LogwrtResult; +// if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) +// XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; +// if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) +// XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; +// XLogCtl->globalUpto = 0; +// SpinLockRelease(&XLogCtl->info_lck); +// } +// } static void FlushWal(XLogwrtRqst WriteRqst) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 6bc14c8..9a4f21c 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -236,29 +236,29 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) // snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir); // } -static int -xlogread(int64_t fd, void *buf, off_t offset, size_t size) -{ - Bufrd result; - size_t count; - result = readfs(fd, offset, size); - if (result.count <= 0) - return (ssize_t)result.count; - else if (result.count <= XLOG_BLCKSZ) - { - memcpy(buf, result.buf, result.count); - count = result.count; - } - else - { - memcpy(buf, result.buf, XLOG_BLCKSZ); - count = BLCKSZ; - } +// static int +// xlogread(int64_t fd, void *buf, off_t offset, size_t size) +// { +// Bufrd result; +// size_t count; +// result = readfs(fd, offset, size); +// if (result.count <= 0) +// return (ssize_t)result.count; +// else if (result.count <= XLOG_BLCKSZ) +// { +// memcpy(buf, result.buf, result.count); +// count = result.count; +// } +// else +// { +// memcpy(buf, result.buf, XLOG_BLCKSZ); +// count = BLCKSZ; +// } - free_dataRead(result.buf, 1, 1); - return (ssize_t)count; +// free_dataRead(result.buf, 1, 1); +// return (ssize_t)count; -} +// } /* * Begin reading WAL at 'RecPtr'. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index cbb29be..e759330 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -930,8 +930,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* OK to write the logs */ errno = 0; - // byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); - byteswritten = writefs(recvFile, buf, segbytes, (off_t) startoff); + byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); + // byteswritten = writefs(recvFile, buf, segbytes, (off_t) startoff); if (byteswritten <= 0) {