mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-12-03 12:47:34 +08:00
!93 warning problem
Merge pull request !93 from shenzhengntu/dev_performance
This commit is contained in:
commit
d9a5fa8c2a
@ -21,6 +21,8 @@
|
||||
#include "storage/buf_internals.h"
|
||||
#include "utils/guc.h"
|
||||
#include "storage/he3db_logindex.h"
|
||||
#include "utils/hfs.h"
|
||||
|
||||
static void WakeupFlushWork(void);
|
||||
XLogRecPtr *g_redoStartLsn;
|
||||
static HTAB *PageLogindexHash = NULL;
|
||||
@ -869,7 +871,6 @@ void SignalStartFlushWork(void) {
|
||||
usleep(200000);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Main entry point for autovacuum worker process.
|
||||
*
|
||||
@ -901,7 +902,7 @@ StartPageFlushWorker(void)
|
||||
|
||||
CreateAuxProcessResourceOwner();
|
||||
|
||||
MyPMChildSlot = AssignPostmasterChildSlot();
|
||||
//MyPMChildSlot = AssignPostmasterChildSlot();
|
||||
|
||||
IsParallelFlushWorker = true;
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "access/xlogrecord.h"
|
||||
#include <glib.h>
|
||||
#include "utils/guc.h"
|
||||
#include "utils/hfs.h"
|
||||
GThreadPool *gpool = NULL;
|
||||
static __thread GError *gerr = NULL;
|
||||
static bool IsInitPool = false;
|
||||
|
@ -95,7 +95,7 @@
|
||||
#include "catalog/pg_tablespace_d.h"
|
||||
#include "access/ringbuffer.h"
|
||||
#include "access/pthreadpool.h"
|
||||
|
||||
#include "storage/he3db_logindex.h"
|
||||
|
||||
extern uint32 bootstrap_data_checksum_version;
|
||||
|
||||
|
@ -50,6 +50,7 @@
|
||||
#include "utils/ps_status.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relmapper.h"
|
||||
#include "access/pagehashqueue.h"
|
||||
|
||||
uint32 bootstrap_data_checksum_version = 0; /* No checksum */
|
||||
|
||||
|
@ -147,7 +147,8 @@
|
||||
#define BACKEND_TYPE_AUTOVAC 0x0002 /* autovacuum worker process */
|
||||
#define BACKEND_TYPE_WALSND 0x0004 /* walsender process */
|
||||
#define BACKEND_TYPE_BGWORKER 0x0008 /* bgworker process */
|
||||
#define BACKEND_TYPE_ALL 0x000F /* OR of all the above */
|
||||
#define BACKEND_TYPE_FLUSHPAGE 0x0010 /* parallel flush pid*/
|
||||
#define BACKEND_TYPE_ALL 0x001F /* OR of all the above */
|
||||
|
||||
/*
|
||||
* List of active backends (or child processes anyway; we don't actually
|
||||
@ -443,6 +444,7 @@ static pid_t StartChildProcess(AuxProcType type);
|
||||
static void StartAutovacuumWorker(void);
|
||||
static void MaybeStartWalReceiver(void);
|
||||
static void InitPostmasterDeathWatchHandle(void);
|
||||
static void StartALLPageFlushWorker(void);
|
||||
|
||||
/*
|
||||
* Archiver is allowed to start up at the current postmaster state?
|
||||
@ -5306,10 +5308,9 @@ sigusr1_handler(SIGNAL_ARGS)
|
||||
/* start Flush Page */
|
||||
if (!PageParallelPush && CheckPostmasterSignal(PMSIGNAL_PARALLEL_FLUSH_WORKER)) {
|
||||
PageParallelPush = true;
|
||||
for(int i = 0;i<PARALLEL_NUM;i++) {
|
||||
StartPageFlushWorker();
|
||||
}
|
||||
StartALLPageFlushWorker();
|
||||
}
|
||||
|
||||
if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER))
|
||||
{
|
||||
/* Startup Process wants us to start the walreceiver process. */
|
||||
@ -5554,6 +5555,67 @@ StartChildProcess(AuxProcType type)
|
||||
return pid;
|
||||
}
|
||||
|
||||
static void StartALLPageFlushWorker(void) {
|
||||
for(int i = 0;i<PARALLEL_NUM;i++) {
|
||||
/*
|
||||
* Compute the cancel key that will be assigned to this session. We
|
||||
* probably don't need cancel keys for autovac workers, but we'd
|
||||
* better have something random in the field to prevent unfriendly
|
||||
* people from sending cancels to them.
|
||||
*/
|
||||
Backend *bn;
|
||||
/*
|
||||
* Compute the cancel key that will be assigned to this session. We
|
||||
* probably don't need cancel keys for autovac workers, but we'd
|
||||
* better have something random in the field to prevent unfriendly
|
||||
* people from sending cancels to them.
|
||||
*/
|
||||
if (!RandomCancelKey(&MyCancelKey))
|
||||
{
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("could not generate random cancel key")));
|
||||
return;
|
||||
}
|
||||
|
||||
bn = (Backend *) malloc(sizeof(Backend));
|
||||
if (bn)
|
||||
{
|
||||
bn->cancel_key = MyCancelKey;
|
||||
|
||||
/* parallel workers are not dead_end and need a child slot */
|
||||
bn->dead_end = false;
|
||||
bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
|
||||
bn->bgworker_notify = false;
|
||||
|
||||
bn->pid = StartPageFlushWorker();
|
||||
if (bn->pid > 0)
|
||||
{
|
||||
bn->bkend_type = BACKEND_TYPE_FLUSHPAGE;
|
||||
dlist_push_head(&BackendList, &bn->elem);
|
||||
#ifdef EXEC_BACKEND
|
||||
ShmemBackendArrayAdd(bn);
|
||||
#endif
|
||||
/* all OK */
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* fork failed, fall through to report -- actual error message was
|
||||
* logged by StartAutoVacWorker
|
||||
*/
|
||||
(void) ReleasePostmasterChildSlot(bn->child_slot);
|
||||
free(bn);
|
||||
}
|
||||
else
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||
errmsg("out of memory")));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartAutovacuumWorker
|
||||
* Start an autovac worker process.
|
||||
@ -5721,7 +5783,7 @@ int
|
||||
MaxLivePostmasterChildren(void)
|
||||
{
|
||||
return 2 * (MaxConnections + autovacuum_max_workers + 1 +
|
||||
max_wal_senders + max_worker_processes);
|
||||
max_wal_senders + max_parallel_flush_process + max_worker_processes);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include "utils/guc.h"
|
||||
#include "utils/timeout.h"
|
||||
#include "access/pagehashqueue.h"
|
||||
#include "utils/resowner_private.h"
|
||||
|
||||
#ifndef USE_POSTMASTER_DEATH_SIGNAL
|
||||
/*
|
||||
|
@ -59,6 +59,8 @@
|
||||
#include "access/pushpage.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/hfs.h"
|
||||
#include "storage/he3db_logindex.h"
|
||||
#include "access/ringbuffer.h"
|
||||
|
||||
/* Note: these two macros only work on shared buffers, not local ones! */
|
||||
#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
|
||||
|
@ -49,6 +49,7 @@
|
||||
#include "utils/snapmgr.h"
|
||||
#include "access/pagehashqueue.h"
|
||||
#include "access/ringbuffer.h"
|
||||
#include "storage/filecache.h"
|
||||
/* GUCs */
|
||||
int shared_memory_type = DEFAULT_SHARED_MEMORY_TYPE;
|
||||
|
||||
|
@ -45,6 +45,7 @@
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/hfs.h"
|
||||
#include "storage/he3db_logindex.h"
|
||||
|
||||
/*
|
||||
* The magnetic disk storage manager keeps track of open file
|
||||
@ -769,22 +770,25 @@ he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknu
|
||||
if (bufrd.count > 0)
|
||||
{
|
||||
*buffer = (uint8_t *)malloc(bufrd.count);
|
||||
memcpy(buffer, bufrd.buf,bufrd.count);
|
||||
|
||||
memcpy(*buffer, bufrd.buf,bufrd.count);
|
||||
free_dataRead(bufrd.buf, bufrd.count, bufrd.cap);
|
||||
if (push_standby)
|
||||
{
|
||||
Assert(bufrd.count == BLCKSZ);
|
||||
pageKey.pageLsn = PageGetLSN(bufrd.buf);
|
||||
pageKey.pageLsn = PageGetLSN(*buffer);
|
||||
LsnNode *head = GetLogIndexByPage(&pageTag, pageKey.pageLsn, pageKey.replyLsn);
|
||||
Bufrd result;
|
||||
result = ReadWalsByPage(pageKey.relfileNode.dbNode, pageKey.relfileNode.relNode,
|
||||
pageKey.forkNo, pageKey.blkNo, ThisTimeLineID, head);
|
||||
buffer = (uint8_t *)realloc(buffer, BLCKSZ + result.count);
|
||||
strcat(buffer,result.buf);
|
||||
if (result.count !=0) {
|
||||
*buffer = (uint8_t *)realloc(*buffer, BLCKSZ + result.count);
|
||||
strcat(*buffer,result.buf);
|
||||
free_dataRead(result.buf, result.count, result.cap);
|
||||
}
|
||||
//TODO free result
|
||||
free_dataRead(result.buf, result.count, result.cap);
|
||||
FreeLsnNode(head);
|
||||
return BLCKSZ + result.count;
|
||||
}
|
||||
free_dataRead(bufrd.buf, bufrd.count, bufrd.cap);
|
||||
// *buffer = bufrd.buf;
|
||||
return bufrd.count;
|
||||
}
|
||||
@ -829,14 +833,18 @@ he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknu
|
||||
result = ReadWalsByPage(pageKey.relfileNode.dbNode,pageKey.relfileNode.relNode,
|
||||
pageKey.forkNo,pageKey.blkNo, ThisTimeLineID, head);
|
||||
}
|
||||
|
||||
buf = (uint8_t *)realloc(buf, BLCKSZ + result.count);
|
||||
strcat(buf,result.buf);
|
||||
//TODO free result
|
||||
free_dataRead(result.buf, result.count, result.cap);
|
||||
if (result.count !=0) {
|
||||
buf = (uint8_t *)realloc(buf, BLCKSZ + result.count);
|
||||
strcat(buf,result.buf);
|
||||
//TODO free result
|
||||
free_dataRead(result.buf, result.count, result.cap);
|
||||
}
|
||||
*buffer = buf;
|
||||
FreeLsnNode(head);
|
||||
return BLCKSZ + result.count;
|
||||
}
|
||||
*buffer = buf;
|
||||
FreeLsnNode(head);
|
||||
return BLCKSZ;
|
||||
}
|
||||
|
||||
|
@ -12003,7 +12003,7 @@ static bool
|
||||
check_maxconnections(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (*newval + autovacuum_max_workers + 1 +
|
||||
max_worker_processes + max_wal_senders > MAX_BACKENDS)
|
||||
max_worker_processes + max_parallel_flush_process + max_wal_senders > MAX_BACKENDS)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
@ -12012,7 +12012,7 @@ static bool
|
||||
check_autovacuum_max_workers(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (MaxConnections + *newval + 1 +
|
||||
max_worker_processes + max_wal_senders > MAX_BACKENDS)
|
||||
max_worker_processes + max_parallel_flush_process + max_wal_senders > MAX_BACKENDS)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
@ -12021,7 +12021,7 @@ static bool
|
||||
check_max_wal_senders(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (MaxConnections + autovacuum_max_workers + 1 +
|
||||
max_worker_processes + *newval > MAX_BACKENDS)
|
||||
max_worker_processes + max_parallel_flush_process + *newval > MAX_BACKENDS)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
@ -12052,8 +12052,8 @@ check_autovacuum_work_mem(int *newval, void **extra, GucSource source)
|
||||
static bool
|
||||
check_max_worker_processes(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (MaxConnections + autovacuum_max_workers + 1 +
|
||||
*newval + max_wal_senders > MAX_BACKENDS)
|
||||
if (MaxConnections + autovacuum_max_workers + 1 +
|
||||
*newval + max_parallel_flush_process + max_wal_senders > MAX_BACKENDS)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
@ -6,7 +6,7 @@
|
||||
#include "storage/buf_internals.h"
|
||||
//max Page Num
|
||||
#define G_QUEUE_LEN 2048
|
||||
#define PARALLEL_NUM 4
|
||||
#define PARALLEL_NUM 1
|
||||
typedef struct lsn_list_t {
|
||||
XLogRecPtr lsn;
|
||||
XLogRecPtr endlsn;
|
||||
@ -40,6 +40,8 @@ extern void PageHashQueueShmemInit(void);
|
||||
Size LogindexHashAllShmemSize(void);
|
||||
void InitLogindexHashBrucket(void);
|
||||
void pushSlaveReplayQueue(int pageNum);
|
||||
void CleanLogIndexMain(int argc, char *argv[]);
|
||||
|
||||
extern XLogRecPtr *g_redoStartLsn;
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user