Changeset 873

Show
Ignore:
Timestamp:
01/29/08 17:36:56
Author:
robert
Message:

Added MachinePool?::reportTimingStatus() to provide an task progress estimate.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/include/vpb/MachinePool

    r868 r873  
    130130        void taskFailed(Task* task, int result); 
    131131         
     132        OpenThreads::Mutex& getRunningTasksMutex() const { return _runningTasksMutex; } 
     133         
    132134        RunningTasks& getRunningTasks() { return _runningTasks; } 
    133135         
     
    239241        void updateMachinePool(); 
    240242 
     243        /** Generate a report of the task timing status, i.e. how long do we expect before completion.*/ 
     244        void reportTimingStatus(); 
     245 
    241246        /** Generate a report of the task timing stats.*/ 
    242247        void reportTimingStats(); 
  • trunk/src/vpb/MachinePool.cpp

    r871 r873  
    5757            _task->write(); 
    5858             
     59            // machine->log(osg::NOTICE,"machine=%s running task=%s",machine->getHostName().c_str(),_task->getFileName().c_str()); 
     60 
    5961            machine->startedTask(_task.get()); 
    6062 
     
    8890            } 
    8991             
    90             machine->log(osg::NOTICE,"%s  : completed in %f  seconds : %s result=%d",machine->getHostName().c_str(),duration,application.c_str(),result); 
     92            // machine->log(osg::NOTICE,"machine=%s completed task=%s in %f seconds, result=%d",machine->getHostName().c_str(),_task->getFileName().c_str(),duration,result); 
    9193        } 
    9294 
     
    185187    } 
    186188 
    187     log(osg::NOTICE,"%s : running %s",getHostName().c_str(),executionString.c_str()); 
     189    log(osg::INFO,"%s : running %s",getHostName().c_str(),executionString.c_str()); 
    188190 
    189191    return system(executionString.c_str()); 
     
    301303    OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_runningTasksMutex); 
    302304    _runningTasks[task] = osg::Timer::instance()->time_s(); 
     305 
     306    log(osg::NOTICE,"machine=%s running task=%s",getHostName().c_str(),task->getFileName().c_str()); 
    303307} 
    304308 
    305309void Machine::endedTask(Task* task) 
    306310{ 
    307     OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_runningTasksMutex); 
    308      
    309     RunningTasks::iterator itr = _runningTasks.find(task); 
    310     if (itr != _runningTasks.end()) 
    311     { 
    312         double runningTime = osg::Timer::instance()->time_s() - itr->second; 
    313  
    314         _runningTasks.erase(itr); 
    315  
    316         std::string taskType; 
    317         task->getProperty("type",taskType); 
    318  
    319         _taskStatsMap[taskType].logTime(runningTime); 
    320     } 
     311    { 
     312        OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_runningTasksMutex); 
     313 
     314        double duration = 0.0; 
     315 
     316        RunningTasks::iterator itr = _runningTasks.find(task); 
     317        if (itr != _runningTasks.end()) 
     318        { 
     319            duration = osg::Timer::instance()->time_s() - itr->second; 
     320 
     321            _runningTasks.erase(itr); 
     322 
     323            std::string taskType; 
     324            task->getProperty("type",taskType); 
     325 
     326            _taskStatsMap[taskType].logTime(duration); 
     327        } 
     328 
     329        log(osg::NOTICE,"machine=%s completed task=%s in %f seconds",getHostName().c_str(),task->getFileName().c_str(),duration); 
     330    } 
     331     
     332    if (_machinePool) _machinePool->reportTimingStatus(); 
     333     
    321334} 
    322335 
     
    790803} 
    791804 
     805void MachinePool::reportTimingStatus() 
     806{ 
     807    unsigned int numTasksPending = _operationQueue->getNumOperationsInQueue(); 
     808 
     809    unsigned int numTasksCompleted = 0; 
     810    double totalComputeTime = 0.0; 
     811    unsigned int numCores = 0; 
     812     
     813    for(Machines::iterator itr = _machines.begin(); 
     814        itr != _machines.end(); 
     815        ++itr) 
     816    { 
     817        Machine* machine = itr->get(); 
     818        OpenThreads::ScopedLock<OpenThreads::Mutex> lock(machine->getRunningTasksMutex()); 
     819 
     820        TaskStatsMap& taskStatsMap = machine->getTaskStatsMap(); 
     821        for(TaskStatsMap::iterator titr = taskStatsMap.begin(); 
     822            titr != taskStatsMap.end(); 
     823            ++titr) 
     824        { 
     825            TaskStats& stats = titr->second; 
     826             
     827            numTasksCompleted += stats.numTasks(); 
     828            totalComputeTime += stats.totalTime(); 
     829        } 
     830        numCores += getNumThreads(); 
     831    } 
     832         
     833    double averageTaskTime = (numTasksCompleted!=0) ? (totalComputeTime/ double(numTasksCompleted)) : 0; 
     834 
     835    double currentTime = osg::Timer::instance()->time_s(); 
     836    double estimatedTimeOfLastCompletion = currentTime; 
     837    unsigned int numTasksRunning = 0; 
     838    for(Machines::iterator itr = _machines.begin(); 
     839        itr != _machines.end(); 
     840        ++itr) 
     841    { 
     842        Machine* machine = itr->get(); 
     843        OpenThreads::ScopedLock<OpenThreads::Mutex> lock(machine->getRunningTasksMutex()); 
     844         
     845        Machine::RunningTasks& runningTasks = machine->getRunningTasks(); 
     846        numTasksRunning = runningTasks.size(); 
     847        for(Machine::RunningTasks::iterator ritr = runningTasks.begin(); 
     848            ritr != runningTasks.end(); 
     849            ++ritr) 
     850        { 
     851            double startTime = ritr->second; 
     852            double elapsedTime = currentTime - startTime; 
     853            double estimatedEndTime = (elapsedTime < averageTaskTime) ? startTime + averageTaskTime : currentTime+1.0; 
     854            if (estimatedTimeOfLastCompletion < estimatedEndTime) estimatedTimeOfLastCompletion = estimatedEndTime; 
     855        } 
     856    } 
     857     
     858    double numTaskPendingAcrossAllCores = (numTasksPending>0) ? ceil(double(numTasksPending) / double(numCores)) : 0; 
     859    estimatedTimeOfLastCompletion += numTaskPendingAcrossAllCores*averageTaskTime; 
     860    double estimateTimeToCompletion = estimatedTimeOfLastCompletion-currentTime; 
     861     
     862    log(osg::NOTICE,"Number of tasks completed %i, running %i, pending %i. Estimated time to completion %.1f seconds, %2.1f%% done.",numTasksCompleted, numTasksRunning, numTasksPending, estimateTimeToCompletion, 100.0*currentTime/estimatedTimeOfLastCompletion); 
     863 
     864} 
     865 
    792866void MachinePool::reportTimingStats() 
    793867{