|
- #pragma once
- #include "scan_common.h"
- class CBaseTask{
- public:
- virtual void Run(){}
- void SetExcutorNum(int excutor_num){m_excutor_num=excutor_num;}
- protected:
- int m_excutor_num;
- };
- template<class TASK> class CTask{
- public:
- CTask():task_state(TS_NONE),task_result(false),task(0){}
- TASK* task;
- bool task_result;
- enum TASK_STATE{
- TS_NONE,
- TS_UPLOADING,
- TS_COMPLETE,
- }task_state;
- };
- template<class TASK> class CTaskExcutor:public IService
- {
- public:
- CTaskExcutor(void):m_worker_state(WS_WAITING){}
- ~CTaskExcutor(void){}
- enum WORKER_STATE{
- WS_WAITING,
- WS_WORKING,
- };
- public:
- WORKER_STATE GetWorkerState()
- {
- return m_worker_state;
- }
- bool ExcuteTask( CTask<TASK> * task)
- {
- if(m_worker_state==WS_WORKING)return FALSE;
- m_task = task;
- m_worker_state= WS_WORKING;
- return TRUE;
- }
- void SetExcutorNum(int excutorNum){m_excutorNum =excutorNum;}
- protected:
- virtual ServiceState OnRunning( void ){
- if(m_worker_state == WS_WAITING){
- ServiceState nextState = IService::OnRunning();
- if(nextState != running) return nextState;
- Sleep(10);
- return running;
- }
- m_task->task->SetExcutorNum(m_excutorNum);
- // Logger pTestLogger = Logger::getInstance(LOG4CPLUS_TEXT("LoggerName"));
- // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----开始执行任务%d----->"),m_excutorNum);
- try
- {
- m_task->task->Run();
- // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d完成----->"),m_excutorNum);
- }
- catch (CException* e)
- {
- TCHAR msg[1024*2];
- if(e->GetErrorMessage(msg,1024*2)){
- // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错%s"),m_excutorNum,CString(msg));
- }else{
- // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错%s"),m_excutorNum,LOG4CPLUS_TEXT("未获取到错误信息"));
- }
- }
- catch (...)
- {
- // LOG4CPLUS_INFO_FMT(pTestLogger, LOG4CPLUS_TEXT("----执行任务%d出错,"),m_excutorNum,LOG4CPLUS_TEXT("未知的错误"));
- }
- m_task->task_state = CTask<TASK>::TS_COMPLETE;
- m_task= 0;
- m_worker_state = WS_WAITING;
- return ServiceState::running;
- }
- CTask<TASK> * m_task;
- WORKER_STATE m_worker_state;
- int m_excutorNum;
- };
- template<class T>class CMutilTaskDispatcher: public IService
- {
- public:
- CMutilTaskDispatcher(int worker_count=5,int buffer_len=6):m_UPLOAD_WORKER_COUNT(worker_count),m_UPLOAD_TASK_BUFFER_COUNT(buffer_len){}
- ~CMutilTaskDispatcher(void){}
- public:
- bool SetWorkerCount(int worker_count){
- if(GetServiceSate()!=stoped)return false;
- m_UPLOAD_WORKER_COUNT=worker_count;
- return true;
- }
- bool SetTaskBufferLength(int buffer_len){
- if(GetServiceSate()!=stoped)return false;
- m_UPLOAD_TASK_BUFFER_COUNT=buffer_len;
- return true;
- }
- protected:
- virtual ServiceState OnStarting( void ){
- taskBuffer=new CTask<T>[m_UPLOAD_TASK_BUFFER_COUNT];
- worker=new CTaskExcutor<T>[m_UPLOAD_WORKER_COUNT];
- m_task = NULL;
- m_hasMore =true;
- for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
- {
- worker[index].SetExcutorNum(index+1);
- worker[index].Start();
- }
- for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
- {
- taskBuffer[index].task_state = CTask<T>::TS_NONE;
- }
- return IService::OnStarting();
- }
- virtual ServiceState OnRunning( void ){
- ServiceState nextState= IService::OnRunning();
- if(nextState != running){
- return nextState;
- }
- handleCompleteTask();
- if(m_task == 0){
- m_task =GetNextTask();
- if(m_task==0){
- m_hasMore =false;
- }
- }
- if(m_hasMore){
- int index_task_buffer=-1;
- for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
- {
- if(taskBuffer[index].task_state ==CTask<T>::TS_NONE ){
- index_task_buffer =index;
- break;
- }
- }
- int index_worker=-1;
- for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
- {
- if(worker[index].GetWorkerState()==CTaskExcutor<T>::WS_WAITING){
- index_worker =index;
- break;
- }
- }
- if(index_task_buffer>=0&&index_worker>=0){
- CTask<T>& task = taskBuffer[index_task_buffer];
- task.task=m_task;
- task.task_state =CTask<T>::TS_UPLOADING;
- task.task_result = false;
- bool success=worker[index_worker].ExcuteTask(&taskBuffer[index_task_buffer]);
- if(success){
- m_task =0;
- }else{
- task.task_state =CTask<T>::TS_NONE;
- }
- }else{
- Sleep(10);
- }
- }else{
- bool has_someone_handling=false;
- for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
- {
- if(taskBuffer[index].task_state !=CTask<T>::TS_NONE ){
- has_someone_handling=true;
- break;
- }
- }
- if(!has_someone_handling)return stoping;
- }
- return running;
- }
- virtual ServiceState OnStoping( void ){
- for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
- {
- worker[index].Stop();
- }
- for (int index =0;index<m_UPLOAD_WORKER_COUNT;index++)
- {
- while(worker[index].GetServiceSate()!=ServiceState::stoped){Sleep(10);}
- }
- delete taskBuffer;
- taskBuffer =0;
- //delete worker;
- worker = 0;
- return IService::OnStoping();
- }
- void handleCompleteTask(){
- for (int index =0;index<m_UPLOAD_TASK_BUFFER_COUNT;index++)
- {
- if(taskBuffer[index].task_state ==CTask<T>::TS_COMPLETE ){
- SaveTask(taskBuffer[index].task);
- taskBuffer[index].task_state = CTask<T>::TS_NONE;
- }
- }
- }
- virtual T * GetNextTask(){return 0;}
- virtual void SaveTask(T *){}
- protected:
- int m_UPLOAD_WORKER_COUNT ;
- int m_UPLOAD_TASK_BUFFER_COUNT ;
- CTaskExcutor<T>* worker;
- CTask<T>* taskBuffer;
- private:
- T* m_task;
- bool m_hasMore;
- };
|