123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- #include "stdafx.h"
- #include "ThreadPool.h"
- #include <process.h>
- ThreadPool::ThreadPool(size_t minNumOfThread, size_t maxNumOfThread)
- {
- if (minNumOfThread < 2)
- this->minNumOfThread = 2;
- else
- this->minNumOfThread = minNumOfThread;
- if (maxNumOfThread < this->minNumOfThread * 2)
- this->maxNumOfThread = this->minNumOfThread * 2;
- else
- this->maxNumOfThread = maxNumOfThread;
- stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- idleThreadList.clear();
- CreateIdleThread(this->minNumOfThread);
- busyThreadList.clear();
- dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0);
- numOfLongFun = 0;
- }
- ThreadPool::~ThreadPool()
- {
- opType_cs.Lock();
- opType = EXIT;
- opType_cs.UnLock();
- SetEvent(stopEvent);
- CloseHandle(stopEvent);
- }
- void ThreadPool::init(size_t minNumOfThread, size_t maxNumOfThread)
- {
- if (minNumOfThread < 2)
- this->minNumOfThread = 2;
- else
- this->minNumOfThread = minNumOfThread;
- if (maxNumOfThread < this->minNumOfThread * 2)
- this->maxNumOfThread = this->minNumOfThread * 2;
- else
- this->maxNumOfThread = maxNumOfThread;
- stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- idleThreadList.clear();
- CreateIdleThread(this->minNumOfThread);
- busyThreadList.clear();
- dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0);
- numOfLongFun = 0;
- }
- BOOL ThreadPool::QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL longFun)
- {
- waitTaskLock.Lock();
- WaitTask *waitTask = new WaitTask(task, param, taskCb, longFun);
- waitTaskList.push_back(waitTask);
- waitTaskLock.UnLock();
- opType_cs.Lock();
- opType = GET_TASK;
- opType_cs.UnLock();
- SetEvent(stopEvent);
- return TRUE;
- }
- void ThreadPool::CreateIdleThread(size_t size)
- {
- idleThreadLock.Lock();
- for (size_t i = 0; i < size; i++)
- {
- idleThreadList.push_back(new Thread(this));
- }
- idleThreadLock.UnLock();
- }
- void ThreadPool::DeleteIdleThread(size_t size)
- {
- idleThreadLock.Lock();
- size_t t = idleThreadList.size();
- if (t >= size)
- {
- for (size_t i = 0; i < size; i++)
- {
- auto thread = idleThreadList.back();
- delete thread;
- idleThreadList.pop_back();
- }
- }
- else
- {
- for (size_t i = 0; i < t; i++)
- {
- auto thread = idleThreadList.back();
- delete thread;
- idleThreadList.pop_back();
- }
- }
- idleThreadLock.UnLock();
- }
- ThreadPool::Thread *ThreadPool::GetIdleThread()
- {
- Thread *thread = NULL;
- idleThreadLock.Lock();
- if (idleThreadList.size() > 0)
- {
- thread = idleThreadList.front();
- idleThreadList.pop_front();
- }
- idleThreadLock.UnLock();
- if (thread == NULL && getCurNumOfThread() < maxNumOfThread)
- {
- thread = new Thread(this);
- }
- if (thread == NULL && waitTaskList.size() > THRESHOLE_OF_WAIT_TASK)
- {
- thread = new Thread(this);
- InterlockedIncrement(&maxNumOfThread);
- }
- return thread;
- }
- void ThreadPool::MoveBusyThreadToIdleList(Thread * busyThread)
- {
- idleThreadLock.Lock();
- idleThreadList.push_back(busyThread);
- idleThreadLock.UnLock();
- busyThreadLock.Lock();
- for (auto it = busyThreadList.begin(); it != busyThreadList.end(); it++)
- {
- if (*it == busyThread)
- {
- busyThreadList.erase(it);
- break;
- }
- }
- busyThreadLock.UnLock();
- if (maxNumOfThread != 0 && idleThreadList.size() > maxNumOfThread * 0.8)
- {
- DeleteIdleThread(idleThreadList.size() / 2);
- }
- opType_cs.Lock();
- opType = GET_TASK;
- opType_cs.UnLock();
- ResetEvent(stopEvent);
- }
- void ThreadPool::MoveThreadToBusyList(Thread * thread)
- {
- busyThreadLock.Lock();
- busyThreadList.push_back(thread);
- busyThreadLock.UnLock();
- }
- void ThreadPool::GetTaskExcute()
- {
- Thread *thread = NULL;
- WaitTask *waitTask = NULL;
- waitTask = GetTask();
- if (waitTask == NULL)
- {
- return;
- }
- if (waitTask->bLong)
- {
- if (idleThreadList.size() > minNumOfThread)
- {
- thread = GetIdleThread();
- }
- else
- {
- thread = new Thread(this);
- InterlockedIncrement(&numOfLongFun);
- InterlockedIncrement(&maxNumOfThread);
- }
- }
- else
- {
- thread = GetIdleThread();
- }
- if (thread != NULL)
- {
- thread->ExecuteTask(waitTask->task, waitTask->param, waitTask->taskCb);
- delete waitTask;
- MoveThreadToBusyList(thread);
- }
- else
- {
- waitTaskLock.Lock();
- waitTaskList.push_front(waitTask);
- waitTaskLock.UnLock();
- }
- }
- ThreadPool::WaitTask *ThreadPool::GetTask()
- {
- WaitTask *waitTask = NULL;
- waitTaskLock.Lock();
- if (waitTaskList.size() > 0)
- {
- waitTask = waitTaskList.front();
- waitTaskList.pop_front();
- }
- waitTaskLock.UnLock();
- return waitTask;
- }
- ThreadPool::Thread::Thread(ThreadPool *threadPool) :
- busy(FALSE),
- thread(INVALID_HANDLE_VALUE),
- task(NULL),
- taskCb(NULL),
- exit(FALSE),
- threadPool(threadPool)
- {
- thread = (HANDLE)_beginthreadex(0, 0, ThreadProc, this, CREATE_SUSPENDED, 0);
- }
- ThreadPool::Thread::~Thread()
- {
- exit = TRUE;
- task = NULL;
- taskCb = NULL;
- ResumeThread(thread);
- WaitForSingleObject(thread, INFINITE);
- CloseHandle(thread);
- }
- BOOL ThreadPool::Thread::isBusy()
- {
- return busy;
- }
- void ThreadPool::Thread::ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback)
- {
- busy = TRUE;
- this->task = task;
- this->param = param;
- this->taskCb = taskCallback;
- ResumeThread(thread);
- }
- unsigned int ThreadPool::Thread::ThreadProc(PVOID pM)
- {
- Thread *pThread = (Thread*)pM;
- while (true)
- {
- if (pThread->exit)
- break; //线程退出
- if (pThread->task == NULL && pThread->taskCb == NULL)
- {
- pThread->busy = FALSE;
- pThread->threadPool->MoveBusyThreadToIdleList(pThread);
- SuspendThread(pThread->thread);
- continue;
- }
- int resulst = pThread->task(pThread->param);
- #ifdef _DEBUG
- LOGFMTI("调用回调-1 callbackisnull:%d", pThread->taskCb? 0:1);
- #endif
- if (pThread->taskCb /*&& resulst == 0*/){
- #ifdef _DEBUG
- LOGFMTI("调用回调-2");
- #endif
- pThread->taskCb(pThread->param/*resulst*/);
- }
- WaitTask *waitTask = pThread->threadPool->GetTask();
- if (waitTask != NULL)
- {
- pThread->task = waitTask->task;
- pThread->taskCb = waitTask->taskCb;
- pThread->param = waitTask->param;
- delete waitTask;
- continue;
- }
- else
- {
- pThread->task = NULL;
- pThread->param = NULL;
- pThread->taskCb = NULL;
- pThread->busy = FALSE;
- pThread->threadPool->MoveBusyThreadToIdleList(pThread);
- SuspendThread(pThread->thread);
- }
- }
- return 0;
- }
|