#pragma once #include "scan_common.h" class CBaseTask{ public: virtual void Run(){} void SetExcutorNum(int excutor_num){m_excutor_num=excutor_num;} protected: int m_excutor_num; }; template class CTask{ public: CTask():task_state(TS_NONE),task_result(false),task(0){} TASK* task; bool task_result; enum TASK_STATE{ TS_NONE, TS_UPLOADING, TS_COMPLETE, }task_state; }; template class CTaskExcutor:public IService { public: CTaskExcutor(void):m_worker_state(WS_WAITING){} ~CTaskExcutor(void){} enum WORKER_STATE{ WS_WAITING, WS_WORKING, }; public: WORKER_STATE GetWorkerState() { return m_worker_state; } bool ExcuteTask( CTask * task) { if(m_worker_state==WS_WORKING)return FALSE; m_task = task; m_worker_state= WS_WORKING; return TRUE; } void SetExcutorNum(int excutorNum){m_excutorNum =excutorNum;} protected: virtual ServiceState OnRunning( void ){ if(m_worker_state == WS_WAITING){ ServiceState nextState = IService::OnRunning(); if(nextState != running) return nextState; Sleep(10); return running; } m_task->task->SetExcutorNum(m_excutorNum); // Logger pTestLogger = Logger::getInstance(LOG4CPLUS_TEXT("LoggerName")); // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----开始执行任务%d----->"),m_excutorNum); try { m_task->task->Run(); // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d完成----->"),m_excutorNum); } catch (CException* e) { TCHAR msg[1024*2]; if(e->GetErrorMessage(msg,1024*2)){ // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错%s"),m_excutorNum,CString(msg)); }else{ // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错%s"),m_excutorNum,LOG4CPLUS_TEXT("未获取到错误信息")); } } catch (...) { // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错,"),m_excutorNum,LOG4CPLUS_TEXT("未知的错误")); } m_task->task_state = CTask::TS_COMPLETE; m_task= 0; m_worker_state = WS_WAITING; return ServiceState::running; } CTask * m_task; WORKER_STATE m_worker_state; int m_excutorNum; }; templateclass CMutilTaskDispatcher: public IService { public: CMutilTaskDispatcher(int worker_count=5,int buffer_len=6):m_UPLOAD_WORKER_COUNT(worker_count),m_UPLOAD_TASK_BUFFER_COUNT(buffer_len){} ~CMutilTaskDispatcher(void){} public: bool SetWorkerCount(int worker_count){ if(GetServiceSate()!=stoped)return false; m_UPLOAD_WORKER_COUNT=worker_count; return true; } bool SetTaskBufferLength(int buffer_len){ if(GetServiceSate()!=stoped)return false; m_UPLOAD_TASK_BUFFER_COUNT=buffer_len; return true; } protected: virtual ServiceState OnStarting( void ){ taskBuffer=new CTask[m_UPLOAD_TASK_BUFFER_COUNT]; worker=new CTaskExcutor[m_UPLOAD_WORKER_COUNT]; m_task = NULL; m_hasMore =true; for (int index =0;index::TS_NONE; } return IService::OnStarting(); } virtual ServiceState OnRunning( void ){ ServiceState nextState= IService::OnRunning(); if(nextState != running){ return nextState; } handleCompleteTask(); if(m_task == 0){ m_task =GetNextTask(); if(m_task==0){ m_hasMore =false; } } if(m_hasMore){ int index_task_buffer=-1; for (int index =0;index::TS_NONE ){ index_task_buffer =index; break; } } int index_worker=-1; for (int index =0;index::WS_WAITING){ index_worker =index; break; } } if(index_task_buffer>=0&&index_worker>=0){ CTask& task = taskBuffer[index_task_buffer]; task.task=m_task; task.task_state =CTask::TS_UPLOADING; task.task_result = false; bool success=worker[index_worker].ExcuteTask(&taskBuffer[index_task_buffer]); if(success){ m_task =0; }else{ task.task_state =CTask::TS_NONE; } }else{ Sleep(10); } }else{ bool has_someone_handling=false; for (int index =0;index::TS_NONE ){ has_someone_handling=true; break; } } if(!has_someone_handling)return stoping; } return running; } virtual ServiceState OnStoping( void ){ for (int index =0;index::TS_COMPLETE ){ SaveTask(taskBuffer[index].task); taskBuffer[index].task_state = CTask::TS_NONE; } } } virtual T * GetNextTask(){return 0;} virtual void SaveTask(T *){} protected: int m_UPLOAD_WORKER_COUNT ; int m_UPLOAD_TASK_BUFFER_COUNT ; CTaskExcutor* worker; CTask* taskBuffer; private: T* m_task; bool m_hasMore; };