initdb for he3db

Code Source From: Self Code
Description:  【Optional】
Jira:  #【Optional】
市场项目编号(名称):【Optional】
This commit is contained in:
shenzheng 2022-09-07 15:22:32 +08:00
parent 1c09175a89
commit 17a197c0b2
13 changed files with 99 additions and 69 deletions

View File

@ -5025,7 +5025,9 @@ ReadControlFile(void)
void
UpdateControlFile()
{
return;
if (push_standby != true) {
return;
}
update_controlfile(DataDir, ControlFile, true);
}
@ -10339,11 +10341,13 @@ xlog_redo(XLogReaderState *record)
}
else if (info == XLOG_CHECKPOINT_SHUTDOWN)
{
#ifndef PG_NOREPLAY
if (startupPid == getpid() && PushPtr != record->currRecPtr) {
PushPtr = record->currRecPtr;
memcpy(&GlobalCheckPoint,XLogRecGetData(record), sizeof(CheckPoint));
GlobalState = DB_SHUTDOWNED;
}
#endif
CheckPoint checkPoint;
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
@ -10439,11 +10443,13 @@ xlog_redo(XLogReaderState *record)
}
else if (info == XLOG_CHECKPOINT_ONLINE)
{
#ifndef PG_NOREPLAY
if (startupPid == getpid() && PushPtr != record->currRecPtr) {
PushPtr = record->currRecPtr;
memcpy(&GlobalCheckPoint,XLogRecGetData(record), sizeof(CheckPoint));
GlobalState = DB_IN_ARCHIVE_RECOVERY;
}
#endif
CheckPoint checkPoint;
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));

View File

@ -330,63 +330,65 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
restart:
#ifndef PG_NOREPLAY
if (push_standby == true && getpid() == startupPid) {
/* Update shared-memory status */
if (PushPtr != PrePushPtr) {
PushCheckPointGuts(PushPtr,GlobalState);
int count = 0;
char buffer[20480];
int len1 = snprintf(buffer,sizeof(buffer),"sadd boost_%ld_%ld ",PushPtr,PrePushPtr);
char oneV[128];
int pos = len1;
while(!QueueEmpty() && QueueHeadEndLsn() <= PushPtr) {
QDataType data = QueuePop();
if (data.startlsn != PrePushPtr||data.endlsn > PushPtr) {
printf("Error lsn =====================%ld,%ld,%ld,%ld\n",data.startlsn,PrePushPtr,data.endlsn,PushPtr);
if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
if (push_standby == true && getpid() == startupPid) {
/* Update shared-memory status */
if (PushPtr != PrePushPtr) {
PushCheckPointGuts(PushPtr,GlobalState);
int count = 0;
char buffer[20480];
int len1 = snprintf(buffer,sizeof(buffer),"sadd boost_%ld_%ld ",PushPtr,PrePushPtr);
char oneV[128];
int pos = len1;
while(!QueueEmpty() && QueueHeadEndLsn() <= PushPtr) {
QDataType data = QueuePop();
if (data.startlsn != PrePushPtr||data.endlsn > PushPtr) {
printf("Error lsn =====================%ld,%ld,%ld,%ld\n",data.startlsn,PrePushPtr,data.endlsn,PushPtr);
}
count++;
int len2 = snprintf(oneV,sizeof(oneV),"%d_%d_%d_%d ",data.dbNode,data.relNode,data.forkNum,data.blockNum);
memcpy(buffer + pos,oneV,len2);
pos += len2;
if (count % 100 == 0) {
buffer[pos] = '\0';
pushRedisList(buffer);
pos = len1;
}
}
count++;
int len2 = snprintf(oneV,sizeof(oneV),"%d_%d_%d_%d ",data.dbNode,data.relNode,data.forkNum,data.blockNum);
memcpy(buffer + pos,oneV,len2);
pos += len2;
if (count % 100 == 0) {
if (count == 0) {
int len2 = snprintf(oneV,sizeof(oneV),"%s ","null");
memcpy(buffer + pos,oneV,len2);
pos += len2;
}
if (count % 100 != 0 || count == 0) {
buffer[pos] = '\0';
pushRedisList(buffer);
pos = len1;
}
}
if (count == 0) {
int len2 = snprintf(oneV,sizeof(oneV),"%s ","null");
memcpy(buffer + pos,oneV,len2);
pos += len2;
}
if (count % 100 != 0 || count == 0) {
buffer[pos] = '\0';
pushRedisList(buffer);
}
FlushNewRecoveryPoint(PushPtr);
printf("curlsn==%x==pre check point==%x==%ld==last check point lsn==%x==%ld\n",RecPtr,PrePushPtr,PrePushPtr,PushPtr,PushPtr);
PrePushPtr = PushPtr;
if (ApplyLsn < PushPtr) {
ApplyLsn = PushPtr;
}
}
//need to tell push standby has new standby add
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
//XLogRecPtr lsn = GetFlushXlogPtr();
XLogRecPtr tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
if (tmpLsn !=InvalidXLogRecPtr) {
if (tmpLsn <= RecPtr) {
sleep(3);
continue;
} else {
ApplyLsn = tmpLsn;
}
FlushNewRecoveryPoint(PushPtr);
printf("curlsn==%x==pre check point==%x==%ld==last check point lsn==%x==%ld\n",RecPtr,PrePushPtr,PrePushPtr,PushPtr,PushPtr);
PrePushPtr = PushPtr;
if (ApplyLsn < PushPtr) {
ApplyLsn = PushPtr;
}
} else {
ApplyLsn = InvalidXLogRecPtr;
}
break;
}
}
//need to tell push standby has new standby add
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
//XLogRecPtr lsn = GetFlushXlogPtr();
XLogRecPtr tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
if (tmpLsn !=InvalidXLogRecPtr) {
if (tmpLsn <= RecPtr) {
sleep(3);
continue;
} else {
ApplyLsn = tmpLsn;
}
} else {
ApplyLsn = InvalidXLogRecPtr;
}
break;
}
}
}
#endif
state->currRecPtr = RecPtr;

View File

@ -378,17 +378,21 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
/* Caller specified a bogus block_id */
elog(PANIC, "failed to locate backup block with ID %d", block_id);
}
if (getpid() == startupPid && push_standby == true) {
QDataType x;
x.endlsn = record->EndRecPtr;
x.startlsn = PrePushPtr;
x.blockNum = blkno;
x.forkNum = forknum;
x.relNode = rnode.relNode;
x.dbNode = rnode.dbNode;
QueuePush(x);
#ifndef PG_NOREPLAY
if (IsBootstrapProcessingMode() != true && InitdbSingle != true) {
//push standby collect dirty page
if (getpid() == startupPid && push_standby == true) {
QDataType x;
x.endlsn = record->EndRecPtr;
x.startlsn = PrePushPtr;
x.blockNum = blkno;
x.forkNum = forknum;
x.relNode = rnode.relNode;
x.dbNode = rnode.dbNode;
QueuePush(x);
}
}
#endif
/*
* Make sure that if the block is marked with WILL_INIT, the caller is

View File

@ -521,7 +521,7 @@ BootstrapModeMain(void)
attrtypes[i] = NULL;
Nulls[i] = false;
}
ufs_init_client();
/*
* Process bootstrap input.
*/

View File

@ -365,7 +365,11 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
}
/* Do the real work to truncate relation forks */
smgrtruncatelsn(rel->rd_smgr, forks, nforks, blocks,lsn);
if (IsBootstrapProcessingMode()!=true && InitdbSingle!=true) {
smgrtruncatelsn(rel->rd_smgr, forks, nforks, blocks,lsn);
} else {
smgrtruncate(rel->rd_smgr, forks, nforks, blocks);
}
/*

View File

@ -1077,6 +1077,9 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
/* propeller and slave instance */
//nbytes = he3dbsmgrread(smgr, forkNum, blockNum, bufBlock, InvalidXLogRecPtr);
XLogRecPtr replayLsn = GetXLogReplayRecPtr(NULL);
if (IsBootstrapProcessingMode() || InitdbSingle) {
replayLsn = GetXLogWriteRecPtr();
}
nbytes = he3dbsmgrread(smgr, forkNum, blockNum, &pageXlogBuf,replayLsn);
memcpy((char *) bufBlock, pageXlogBuf, BLCKSZ);
/* propeller instance no page xlog replay */

View File

@ -1336,7 +1336,7 @@ LruInsert(File file)
* overall system file table being full. So, be prepared to release
* another FD if necessary...
*/
vfdP->fd = BasicOpenFilePerm(vfdP->fileName, vfdP->fileFlags,
vfdP->fd = He3DBBasicOpenFilePerm(vfdP->fileName, vfdP->fileFlags,
vfdP->fileMode);
if (vfdP->fd < 0)
{

View File

@ -27,6 +27,7 @@
#include "utils/inval.h"
#include "utils/guc.h"
#include "access/pushpage.h"
#include "miscadmin.h"
/*
* This struct of function pointers defines the API between smgr.c and
* any individual storage manager module. Note that smgr subfunctions are
@ -466,7 +467,7 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
// return;
// }
if (push_standby != true && EnableHotStandby != true) {
if ((push_standby != true && EnableHotStandby != true) || IsBootstrapProcessingMode() || InitdbSingle) {
smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
buffer, skipFsync);
}

View File

@ -104,6 +104,9 @@ int PostAuthDelay = 0;
/* Time between checks that the client is still connected. */
int client_connection_check_interval = 0;
/*for initdb by single mode*/
bool InitdbSingle = false;
/* ----------------
* private typedefs etc
@ -3949,6 +3952,11 @@ PostgresMain(int argc, char *argv[],
*/
if (!IsUnderPostmaster)
InitializeGUCOptions();
if (!IsUnderPostmaster) {
if (argc > 1 && strcmp(argv[1], "--single") == 0) {
InitdbSingle = true;
}
}
/*
* Parse command-line options.

View File

@ -753,6 +753,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
}
else if (!IsUnderPostmaster)
{
ufs_init_client();
InitializeSessionUserIdStandalone();
am_superuser = true;
if (!ThereIsAtLeastOneRole())

View File

@ -2117,7 +2117,7 @@ static struct config_bool ConfigureNamesBool[] =
gettext_noop("Sets base page push if push_standby is configured true."),
},
&push_standby,
false,
true,
NULL, NULL, NULL
},

View File

@ -16,7 +16,7 @@ subdir = src/bin/initdb
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) -I$(top_srcdir)/src/timezone $(CPPFLAGS)
override CPPFLAGS := -DFRONTEND -DPG_NOREPLAY -I$(libpq_srcdir) -I$(top_srcdir)/src/timezone $(CPPFLAGS)
# Note: it's important that we link to encnames.o from libpgcommon, not
# from libpq, else we have risks of version skew if we run with a libpq

View File

@ -804,5 +804,6 @@ extern Datum Float8GetDatum(float8 X);
#define Int64GetDatumFast(X) PointerGetDatum(&(X))
#define Float8GetDatumFast(X) PointerGetDatum(&(X))
#endif
extern bool InitdbSingle;
#endif /* POSTGRES_H */