This commit is contained in:
ldcsaa 2021-08-14 02:53:33 +08:00
parent fb3598355a
commit fe540a2630
4 changed files with 41 additions and 38 deletions

View File

@ -109,23 +109,23 @@ BOOL CHPThreadPool::Shutdown(DWORD dwMaxWait)
if(m_enRejectedPolicy == TRP_WAIT_FOR && bLimited)
{
CMutexLock2 lock(m_mtx);
CCriSecLock2 lock(m_csTask);
m_cvQueue.notify_all();
}
VERIFY(DoAdjustThreadCount(0));
if(bInfinite)
m_sem.Wait(CShutdownPredicate(this));
m_evShutdown.Wait(CShutdownPredicate(this));
else
m_sem.WaitFor(dwMaxWait, CShutdownPredicate(this));
m_evShutdown.WaitFor(dwMaxWait, CShutdownPredicate(this));
ASSERT(m_lsTasks.size() == 0);
ASSERT(m_stThreads.size() == 0);
if(!m_lsTasks.empty())
{
CMutexLock2 lock(m_mtx);
CCriSecLock2 lock(m_csTask);
if(!m_lsTasks.empty())
{
@ -134,6 +134,9 @@ BOOL CHPThreadPool::Shutdown(DWORD dwMaxWait)
TTask* pTask = m_lsTasks.front();
m_lsTasks.pop();
if(pTask->freeArg)
::DestroySocketTaskObj((LPTSocketTask)pTask->arg);
TTask::Destruct(pTask);
} while(!m_lsTasks.empty());
@ -145,7 +148,7 @@ BOOL CHPThreadPool::Shutdown(DWORD dwMaxWait)
if(!m_stThreads.empty())
{
CCriSecLock lock(m_cs);
CCriSecLock lock(m_csThread);
if(!m_stThreads.empty())
{
@ -191,7 +194,7 @@ BOOL CHPThreadPool::DoSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg,
}
else if(m_enRejectedPolicy == TRP_CALLER_RUN)
{
DoRunTaskProc(fnTaskProc, pvArg);
DoRunTaskProc(fnTaskProc, pvArg, bFreeArg);
}
else
{
@ -204,16 +207,19 @@ BOOL CHPThreadPool::DoSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg,
return TRUE;
}
void CHPThreadPool::DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg)
void CHPThreadPool::DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg)
{
::InterlockedIncrement(&m_dwTaskCount);
fnTaskProc(pvArg);
::InterlockedDecrement(&m_dwTaskCount);
if(bFreeArg)
::DestroySocketTaskObj((LPTSocketTask)pvArg);
}
CHPThreadPool::EnSubmitResult CHPThreadPool::DirectSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg)
{
CMutexLock2 lock(m_mtx);
CCriSecLock2 lock(m_csTask);
return DoDirectSubmit(fnTaskProc, pvArg, bFreeArg);
}
@ -247,7 +253,7 @@ BOOL CHPThreadPool::CycleWaitSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, DWORD d
while(CheckStarted())
{
CMutexLock2 lock(m_mtx);
CCriSecLock2 lock(m_csTask);
EnSubmitResult sr = DoDirectSubmit(fnTaskProc, pvArg, bFreeArg);
@ -302,7 +308,7 @@ BOOL CHPThreadPool::DoAdjustThreadCount(DWORD dwNewThreadCount)
DWORD dwThreadCount = 0;
{
CCriSecLock lock(m_cs);
CCriSecLock lock(m_csThread);
if(dwNewThreadCount > m_dwThreadCount)
{
@ -319,7 +325,7 @@ BOOL CHPThreadPool::DoAdjustThreadCount(DWORD dwNewThreadCount)
if(bRemove)
{
CMutexLock2 lock(m_mtx);
CCriSecLock2 lock(m_csTask);
for(DWORD i = 0; i < dwThreadCount; i++)
m_cvTask.notify_one();
@ -388,7 +394,7 @@ int CHPThreadPool::WorkerProc()
TTask* pTask = nullptr;
{
CMutexLock2 lock(m_mtx);
CCriSecLock2 lock(m_csTask);
do
{
@ -410,10 +416,7 @@ int CHPThreadPool::WorkerProc()
if(pTask != nullptr)
{
DoRunTaskProc(pTask->fn, pTask->arg);
if(pTask->freeArg)
::DestroySocketTaskObj((LPTSocketTask)pTask->arg);
DoRunTaskProc(pTask->fn, pTask->arg, pTask->freeArg);
TTask::Destruct(pTask);
}
@ -434,7 +437,7 @@ BOOL CHPThreadPool::CheckWorkerThreadExit()
if(m_dwThreadCount < m_stThreads.size())
{
CCriSecLock lock(m_cs);
CCriSecLock lock(m_csThread);
if(m_dwThreadCount < m_stThreads.size())
{
@ -450,7 +453,7 @@ BOOL CHPThreadPool::CheckWorkerThreadExit()
pthread_detach(SELF_THREAD_ID);
if(bShutdown)
m_sem.NotifyOne();
m_evShutdown.NotifyOne();
}
return bExit;

View File

@ -73,7 +73,7 @@ private:
public:
BOOL operator()()
{
CCriSecLock lock(m_pThreadPool->m_cs);
CCriSecLock lock(m_pThreadPool->m_csThread);
return m_pThreadPool->m_stThreads.empty();
}
@ -130,7 +130,7 @@ private:
EnSubmitResult DoDirectSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg);
BOOL CycleWaitSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, DWORD dwMaxWait, BOOL bFreeArg);
BOOL DoSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg, DWORD dwMaxWait);
void DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg);
void DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg);
public:
CHPThreadPool()
@ -147,8 +147,6 @@ private:
void Reset(BOOL bSetWaitEvent = TRUE);
private:
CSEM m_evWait;
DWORD m_dwStackSize;
DWORD m_dwMaxQueueSize;
EnRejectedPolicy m_enRejectedPolicy;
@ -157,14 +155,15 @@ private:
volatile DWORD m_dwThreadCount;
volatile EnServiceState m_enState;
unordered_set<THR_ID> m_stThreads;
queue<TTask*> m_lsTasks;
CSEM m_sem;
CCriSec m_cs;
CMTX m_mtx;
CSEM m_evWait;
CSEM m_evShutdown;
CCriSec m_csThread;
CCriSec m_csTask;
condition_variable m_cvTask;
condition_variable m_cvQueue;
unordered_set<THR_ID> m_stThreads;
queue<TTask*> m_lsTasks;
DECLARE_NO_COPY_CLASS(CHPThreadPool)
};

View File

@ -87,10 +87,7 @@ void CHPThreadPool::CWorker::Execute(TTask* pTask, PVOID pvWorkerParam, OVERLAPP
m_pthPool->m_cvQueue.WakeUp();
#endif
DoRunTaskProc(pTask->fn, pTask->arg, m_pthPool->m_dwTaskCount);
if(pTask->freeArg)
::DestroySocketTaskObj((LPTSocketTask)pTask->arg);
DoRunTaskProc(pTask->fn, pTask->arg, pTask->freeArg, m_pthPool->m_dwTaskCount);
TTask::Destruct(pTask);
}
@ -183,7 +180,7 @@ BOOL CHPThreadPool::DoSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg,
}
else if(m_enRejectedPolicy == TRP_CALLER_RUN)
{
DoRunTaskProc(fnTaskProc, pvArg, m_dwTaskCount);
DoRunTaskProc(fnTaskProc, pvArg, bFreeArg, m_dwTaskCount);
}
else
{
@ -196,11 +193,14 @@ BOOL CHPThreadPool::DoSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg,
return TRUE;
}
void CHPThreadPool::DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg, volatile DWORD& dwTaskCount)
void CHPThreadPool::DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg, volatile DWORD& dwTaskCount)
{
::InterlockedIncrement(&dwTaskCount);
fnTaskProc(pvArg);
::InterlockedDecrement(&dwTaskCount);
if(bFreeArg)
::DestroySocketTaskObj((LPTSocketTask)pvArg);
}
CHPThreadPool::EnSubmitResult CHPThreadPool::DirectSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg)
@ -256,17 +256,18 @@ BOOL CHPThreadPool::CycleWaitSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, DWORD d
#if _WIN32_WINNT >= _WIN32_WINNT_WS08
while(CheckStarted())
while(TRUE)
{
CCriSecLock locallock(m_csQueue);
EnSubmitResult sr = DirectSubmit(fnTaskProc, pvArg, bFreeArg);
if(sr == SUBMIT_OK)
return TRUE;
else if(sr == SUBMIT_ERROR)
return FALSE;
else
{
CCriSecLock locallock(m_csQueue);
if(bInfinite)
m_cvQueue.Wait(m_csQueue.GetObject());
else

View File

@ -127,7 +127,7 @@ private:
BOOL CycleWaitSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, DWORD dwMaxWait, BOOL bFreeArg);
BOOL DoSubmit(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg, DWORD dwMaxWait);
static void DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg, volatile DWORD& dwTaskCount);
static void DoRunTaskProc(Fn_TaskProc fnTaskProc, PVOID pvArg, BOOL bFreeArg, volatile DWORD& dwTaskCount);
public:
CHPThreadPool()