ThreadPool.cpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. #include "stdafx.h"
  2. #include "ThreadPool.h"
  3. #include <process.h>
  4. ThreadPool::ThreadPool(size_t minNumOfThread, size_t maxNumOfThread)
  5. {
  6. if (minNumOfThread < 2)
  7. this->minNumOfThread = 2;
  8. else
  9. this->minNumOfThread = minNumOfThread;
  10. if (maxNumOfThread < this->minNumOfThread * 2)
  11. this->maxNumOfThread = this->minNumOfThread * 2;
  12. else
  13. this->maxNumOfThread = maxNumOfThread;
  14. stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  15. idleThreadList.clear();
  16. CreateIdleThread(this->minNumOfThread);
  17. busyThreadList.clear();
  18. dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0);
  19. numOfLongFun = 0;
  20. }
  21. ThreadPool::~ThreadPool()
  22. {
  23. opType_cs.Lock();
  24. opType = EXIT;
  25. opType_cs.UnLock();
  26. SetEvent(stopEvent);
  27. CloseHandle(stopEvent);
  28. }
  29. void ThreadPool::init(size_t minNumOfThread, size_t maxNumOfThread)
  30. {
  31. if (minNumOfThread < 2)
  32. this->minNumOfThread = 2;
  33. else
  34. this->minNumOfThread = minNumOfThread;
  35. if (maxNumOfThread < this->minNumOfThread * 2)
  36. this->maxNumOfThread = this->minNumOfThread * 2;
  37. else
  38. this->maxNumOfThread = maxNumOfThread;
  39. stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  40. idleThreadList.clear();
  41. CreateIdleThread(this->minNumOfThread);
  42. busyThreadList.clear();
  43. dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0);
  44. numOfLongFun = 0;
  45. }
  46. BOOL ThreadPool::QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL longFun)
  47. {
  48. waitTaskLock.Lock();
  49. WaitTask *waitTask = new WaitTask(task, param, taskCb, longFun);
  50. waitTaskList.push_back(waitTask);
  51. waitTaskLock.UnLock();
  52. opType_cs.Lock();
  53. opType = GET_TASK;
  54. opType_cs.UnLock();
  55. SetEvent(stopEvent);
  56. return TRUE;
  57. }
  58. void ThreadPool::CreateIdleThread(size_t size)
  59. {
  60. idleThreadLock.Lock();
  61. for (size_t i = 0; i < size; i++)
  62. {
  63. idleThreadList.push_back(new Thread(this));
  64. }
  65. idleThreadLock.UnLock();
  66. }
  67. void ThreadPool::DeleteIdleThread(size_t size)
  68. {
  69. idleThreadLock.Lock();
  70. size_t t = idleThreadList.size();
  71. if (t >= size)
  72. {
  73. for (size_t i = 0; i < size; i++)
  74. {
  75. auto thread = idleThreadList.back();
  76. delete thread;
  77. idleThreadList.pop_back();
  78. }
  79. }
  80. else
  81. {
  82. for (size_t i = 0; i < t; i++)
  83. {
  84. auto thread = idleThreadList.back();
  85. delete thread;
  86. idleThreadList.pop_back();
  87. }
  88. }
  89. idleThreadLock.UnLock();
  90. }
  91. ThreadPool::Thread *ThreadPool::GetIdleThread()
  92. {
  93. Thread *thread = NULL;
  94. idleThreadLock.Lock();
  95. if (idleThreadList.size() > 0)
  96. {
  97. thread = idleThreadList.front();
  98. idleThreadList.pop_front();
  99. }
  100. idleThreadLock.UnLock();
  101. if (thread == NULL && getCurNumOfThread() < maxNumOfThread)
  102. {
  103. thread = new Thread(this);
  104. }
  105. if (thread == NULL && waitTaskList.size() > THRESHOLE_OF_WAIT_TASK)
  106. {
  107. thread = new Thread(this);
  108. InterlockedIncrement(&maxNumOfThread);
  109. }
  110. return thread;
  111. }
  112. void ThreadPool::MoveBusyThreadToIdleList(Thread * busyThread)
  113. {
  114. idleThreadLock.Lock();
  115. idleThreadList.push_back(busyThread);
  116. idleThreadLock.UnLock();
  117. busyThreadLock.Lock();
  118. for (auto it = busyThreadList.begin(); it != busyThreadList.end(); it++)
  119. {
  120. if (*it == busyThread)
  121. {
  122. busyThreadList.erase(it);
  123. break;
  124. }
  125. }
  126. busyThreadLock.UnLock();
  127. if (maxNumOfThread != 0 && idleThreadList.size() > maxNumOfThread * 0.8)
  128. {
  129. DeleteIdleThread(idleThreadList.size() / 2);
  130. }
  131. opType_cs.Lock();
  132. opType = GET_TASK;
  133. opType_cs.UnLock();
  134. ResetEvent(stopEvent);
  135. }
  136. void ThreadPool::MoveThreadToBusyList(Thread * thread)
  137. {
  138. busyThreadLock.Lock();
  139. busyThreadList.push_back(thread);
  140. busyThreadLock.UnLock();
  141. }
  142. void ThreadPool::GetTaskExcute()
  143. {
  144. Thread *thread = NULL;
  145. WaitTask *waitTask = NULL;
  146. waitTask = GetTask();
  147. if (waitTask == NULL)
  148. {
  149. return;
  150. }
  151. if (waitTask->bLong)
  152. {
  153. if (idleThreadList.size() > minNumOfThread)
  154. {
  155. thread = GetIdleThread();
  156. }
  157. else
  158. {
  159. thread = new Thread(this);
  160. InterlockedIncrement(&numOfLongFun);
  161. InterlockedIncrement(&maxNumOfThread);
  162. }
  163. }
  164. else
  165. {
  166. thread = GetIdleThread();
  167. }
  168. if (thread != NULL)
  169. {
  170. thread->ExecuteTask(waitTask->task, waitTask->param, waitTask->taskCb);
  171. delete waitTask;
  172. MoveThreadToBusyList(thread);
  173. }
  174. else
  175. {
  176. waitTaskLock.Lock();
  177. waitTaskList.push_front(waitTask);
  178. waitTaskLock.UnLock();
  179. }
  180. }
  181. ThreadPool::WaitTask *ThreadPool::GetTask()
  182. {
  183. WaitTask *waitTask = NULL;
  184. waitTaskLock.Lock();
  185. if (waitTaskList.size() > 0)
  186. {
  187. waitTask = waitTaskList.front();
  188. waitTaskList.pop_front();
  189. }
  190. waitTaskLock.UnLock();
  191. return waitTask;
  192. }
  193. ThreadPool::Thread::Thread(ThreadPool *threadPool) :
  194. busy(FALSE),
  195. thread(INVALID_HANDLE_VALUE),
  196. task(NULL),
  197. taskCb(NULL),
  198. exit(FALSE),
  199. threadPool(threadPool)
  200. {
  201. thread = (HANDLE)_beginthreadex(0, 0, ThreadProc, this, CREATE_SUSPENDED, 0);
  202. }
  203. ThreadPool::Thread::~Thread()
  204. {
  205. exit = TRUE;
  206. task = NULL;
  207. taskCb = NULL;
  208. ResumeThread(thread);
  209. WaitForSingleObject(thread, INFINITE);
  210. CloseHandle(thread);
  211. }
  212. BOOL ThreadPool::Thread::isBusy()
  213. {
  214. return busy;
  215. }
  216. void ThreadPool::Thread::ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback)
  217. {
  218. busy = TRUE;
  219. this->task = task;
  220. this->param = param;
  221. this->taskCb = taskCallback;
  222. ResumeThread(thread);
  223. }
  224. unsigned int ThreadPool::Thread::ThreadProc(PVOID pM)
  225. {
  226. Thread *pThread = (Thread*)pM;
  227. while (true)
  228. {
  229. if (pThread->exit)
  230. break; //线程退出
  231. if (pThread->task == NULL && pThread->taskCb == NULL)
  232. {
  233. pThread->busy = FALSE;
  234. pThread->threadPool->MoveBusyThreadToIdleList(pThread);
  235. SuspendThread(pThread->thread);
  236. continue;
  237. }
  238. int resulst = pThread->task(pThread->param);
  239. #ifdef _DEBUG
  240. LOGFMTI("调用回调-1 callbackisnull:%d", pThread->taskCb? 0:1);
  241. #endif
  242. if (pThread->taskCb /*&& resulst == 0*/){
  243. #ifdef _DEBUG
  244. LOGFMTI("调用回调-2");
  245. #endif
  246. pThread->taskCb(pThread->param/*resulst*/);
  247. }
  248. WaitTask *waitTask = pThread->threadPool->GetTask();
  249. if (waitTask != NULL)
  250. {
  251. pThread->task = waitTask->task;
  252. pThread->taskCb = waitTask->taskCb;
  253. pThread->param = waitTask->param;
  254. delete waitTask;
  255. continue;
  256. }
  257. else
  258. {
  259. pThread->task = NULL;
  260. pThread->param = NULL;
  261. pThread->taskCb = NULL;
  262. pThread->busy = FALSE;
  263. pThread->threadPool->MoveBusyThreadToIdleList(pThread);
  264. SuspendThread(pThread->thread);
  265. }
  266. }
  267. return 0;
  268. }