MutilTaskExcute.h 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #pragma once
  2. #include "scan_common.h"
  3. class CBaseTask{
  4. public:
  5. virtual void Run(){}
  6. void SetExcutorNum(int excutor_num){m_excutor_num=excutor_num;}
  7. protected:
  8. int m_excutor_num;
  9. };
  10. template<class TASK> class CTask{
  11. public:
  12. CTask():task_state(TS_NONE),task_result(false),task(0){}
  13. TASK* task;
  14. bool task_result;
  15. enum TASK_STATE{
  16. TS_NONE,
  17. TS_UPLOADING,
  18. TS_COMPLETE,
  19. }task_state;
  20. };
  21. template<class TASK> class CTaskExcutor:public IService
  22. {
  23. public:
  24. CTaskExcutor(void):m_worker_state(WS_WAITING){}
  25. ~CTaskExcutor(void){}
  26. enum WORKER_STATE{
  27. WS_WAITING,
  28. WS_WORKING,
  29. };
  30. public:
  31. WORKER_STATE GetWorkerState()
  32. {
  33. return m_worker_state;
  34. }
  35. bool ExcuteTask( CTask<TASK> * task)
  36. {
  37. if(m_worker_state==WS_WORKING)return FALSE;
  38. m_task = task;
  39. m_worker_state= WS_WORKING;
  40. return TRUE;
  41. }
  42. void SetExcutorNum(int excutorNum){m_excutorNum =excutorNum;}
  43. protected:
  44. virtual ServiceState OnRunning( void ){
  45. if(m_worker_state == WS_WAITING){
  46. ServiceState nextState = IService::OnRunning();
  47. if(nextState != running) return nextState;
  48. Sleep(10);
  49. return running;
  50. }
  51. m_task->task->SetExcutorNum(m_excutorNum);
  52. // Logger pTestLogger = Logger::getInstance(LOG4CPLUS_TEXT("LoggerName"));
  53. // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----开始执行任务%d----->"),m_excutorNum);
  54. try
  55. {
  56. m_task->task->Run();
  57. // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d完成----->"),m_excutorNum);
  58. }
  59. catch (CException* e)
  60. {
  61. TCHAR msg[1024*2];
  62. if(e->GetErrorMessage(msg,1024*2)){
  63. // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错%s"),m_excutorNum,CString(msg));
  64. }else{
  65. // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错%s"),m_excutorNum,LOG4CPLUS_TEXT("未获取到错误信息"));
  66. }
  67. }
  68. catch (...)
  69. {
  70. // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错,"),m_excutorNum,LOG4CPLUS_TEXT("未知的错误"));
  71. }
  72. m_task->task_state = CTask<TASK>::TS_COMPLETE;
  73. m_task= 0;
  74. m_worker_state = WS_WAITING;
  75. return ServiceState::running;
  76. }
  77. CTask<TASK> * m_task;
  78. WORKER_STATE m_worker_state;
  79. int m_excutorNum;
  80. };
  81. template<class T>class CMutilTaskDispatcher: public IService
  82. {
  83. public:
  84. CMutilTaskDispatcher(int worker_count=5,int buffer_len=6):m_UPLOAD_WORKER_COUNT(worker_count),m_UPLOAD_TASK_BUFFER_COUNT(buffer_len){}
  85. ~CMutilTaskDispatcher(void){}
  86. public:
  87. bool SetWorkerCount(int worker_count){
  88. if(GetServiceSate()!=stoped)return false;
  89. m_UPLOAD_WORKER_COUNT=worker_count;
  90. return true;
  91. }
  92. bool SetTaskBufferLength(int buffer_len){
  93. if(GetServiceSate()!=stoped)return false;
  94. m_UPLOAD_TASK_BUFFER_COUNT=buffer_len;
  95. return true;
  96. }
  97. protected:
  98. virtual ServiceState OnStarting( void ){
  99. taskBuffer=new CTask<T>[m_UPLOAD_TASK_BUFFER_COUNT];
  100. worker=new CTaskExcutor<T>[m_UPLOAD_WORKER_COUNT];
  101. m_task = NULL;
  102. m_hasMore =true;
  103. for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
  104. {
  105. worker[index].SetExcutorNum(index+1);
  106. worker[index].Start();
  107. }
  108. for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
  109. {
  110. taskBuffer[index].task_state = CTask<T>::TS_NONE;
  111. }
  112. return IService::OnStarting();
  113. }
  114. virtual ServiceState OnRunning( void ){
  115. ServiceState nextState= IService::OnRunning();
  116. if(nextState != running){
  117. return nextState;
  118. }
  119. handleCompleteTask();
  120. if(m_task == 0){
  121. m_task =GetNextTask();
  122. if(m_task==0){
  123. m_hasMore =false;
  124. }
  125. }
  126. if(m_hasMore){
  127. int index_task_buffer=-1;
  128. for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
  129. {
  130. if(taskBuffer[index].task_state ==CTask<T>::TS_NONE ){
  131. index_task_buffer =index;
  132. break;
  133. }
  134. }
  135. int index_worker=-1;
  136. for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
  137. {
  138. if(worker[index].GetWorkerState()==CTaskExcutor<T>::WS_WAITING){
  139. index_worker =index;
  140. break;
  141. }
  142. }
  143. if(index_task_buffer>=0&&index_worker>=0){
  144. CTask<T>& task = taskBuffer[index_task_buffer];
  145. task.task=m_task;
  146. task.task_state =CTask<T>::TS_UPLOADING;
  147. task.task_result = false;
  148. bool success=worker[index_worker].ExcuteTask(&taskBuffer[index_task_buffer]);
  149. if(success){
  150. m_task =0;
  151. }else{
  152. task.task_state =CTask<T>::TS_NONE;
  153. }
  154. }else{
  155. Sleep(10);
  156. }
  157. }else{
  158. bool has_someone_handling=false;
  159. for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
  160. {
  161. if(taskBuffer[index].task_state !=CTask<T>::TS_NONE ){
  162. has_someone_handling=true;
  163. break;
  164. }
  165. }
  166. if(!has_someone_handling)return stoping;
  167. }
  168. return running;
  169. }
  170. virtual ServiceState OnStoping( void ){
  171. for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
  172. {
  173. worker[index].Stop();
  174. }
  175. for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
  176. {
  177. while(worker[index].GetServiceSate()!=ServiceState::stoped){Sleep(10);}
  178. }
  179. delete taskBuffer;
  180. taskBuffer =0;
  181. //delete worker;
  182. worker = 0;
  183. return IService::OnStoping();
  184. }
  185. void handleCompleteTask(){
  186. for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
  187. {
  188. if(taskBuffer[index].task_state ==CTask<T>::TS_COMPLETE ){
  189. SaveTask(taskBuffer[index].task);
  190. taskBuffer[index].task_state = CTask<T>::TS_NONE;
  191. }
  192. }
  193. }
  194. virtual T * GetNextTask(){return 0;}
  195. virtual void SaveTask(T *){}
  196. protected:
  197. int m_UPLOAD_WORKER_COUNT ;
  198. int m_UPLOAD_TASK_BUFFER_COUNT ;
  199. CTaskExcutor<T>* worker;
  200. CTask<T>* taskBuffer;
  201. private:
  202. T* m_task;
  203. bool m_hasMore;
  204. };