Changeset 7138

Show
Ignore:
Timestamp:
07/19/07 14:09:25
Author:
robert
Message:

Added support for using multiple load threads as a thread pool used by the MasterOperation?

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • OpenSceneGraph/trunk/examples/osgterrain/osgterrain.cpp

    r7136 r7138  
    5252 
    5353 
     54typedef std::vector< osg::ref_ptr<osg::GraphicsThread> > GraphicsThreads; 
     55 
     56class CountedBlock : public osg::Referenced 
     57{ 
     58    public: 
     59     
     60        CountedBlock(unsigned int blockCount); 
     61     
     62        void completed(); 
     63 
     64        void block(); 
     65         
     66        void reset(); 
     67 
     68        void release(); 
     69 
     70        void setBlockCount(unsigned int blockCount); 
     71 
     72    protected: 
     73 
     74        ~CountedBlock(); 
     75 
     76        OpenThreads::Mutex _mut; 
     77        OpenThreads::Condition _cond; 
     78        unsigned int _numberOfBlocks; 
     79        unsigned int _blockCount; 
     80}; 
     81 
     82CountedBlock::CountedBlock(unsigned int numberOfBlocks): 
     83    _numberOfBlocks(numberOfBlocks), 
     84    _blockCount(0) 
     85{ 
     86} 
     87 
     88void CountedBlock::completed() 
     89{ 
     90    OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut); 
     91    if (_blockCount>0) 
     92    { 
     93        --_blockCount; 
     94 
     95        if (_blockCount==0) 
     96        { 
     97            // osg::notify(osg::NOTICE)<<"Released"<<std::endl; 
     98            _cond.broadcast(); 
     99        } 
     100    } 
     101} 
     102 
     103void CountedBlock::block() 
     104{ 
     105    OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut); 
     106    if (_blockCount) 
     107        _cond.wait(&_mut); 
     108} 
     109 
     110void CountedBlock::release() 
     111{ 
     112    OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut); 
     113    if (_blockCount) 
     114    { 
     115        _blockCount = 0; 
     116        _cond.broadcast(); 
     117    } 
     118} 
     119 
     120void CountedBlock::reset() 
     121{ 
     122    OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut); 
     123    if (_numberOfBlocks!=_blockCount) 
     124    { 
     125        if (_numberOfBlocks==0) _cond.broadcast(); 
     126        _blockCount = _numberOfBlocks; 
     127    } 
     128} 
     129 
     130void CountedBlock::setBlockCount(unsigned int blockCount) 
     131{ 
     132    _numberOfBlocks = blockCount; 
     133} 
     134 
     135CountedBlock::~CountedBlock() 
     136{ 
     137    _blockCount = 0; 
     138    release(); 
     139} 
     140 
     141class LoadAndCompileOperation : public osg::Operation 
     142{ 
     143public: 
     144 
     145    LoadAndCompileOperation(const std::string& filename, GraphicsThreads& graphicsThreads, CountedBlock* block): 
     146        Operation("Load and compile Operation", false), 
     147        _filename(filename), 
     148        _graphicsThreads(graphicsThreads), 
     149        _block(block) {} 
     150 
     151    virtual void operator () (osg::Object* object) 
     152    { 
     153        // osg::notify(osg::NOTICE)<<"LoadAndCompileOperation "<<_filename<<std::endl; 
     154 
     155        _loadedModel = osgDB::readNodeFile(_filename); 
     156        if (_loadedModel.valid() && !_graphicsThreads.empty()) 
     157        { 
     158            osg::ref_ptr<osgUtil::GLObjectsOperation> compileOperation = new osgUtil::GLObjectsOperation(_loadedModel.get()); 
     159 
     160            for(GraphicsThreads::iterator gitr = _graphicsThreads.begin(); 
     161                gitr != _graphicsThreads.end(); 
     162                ++gitr) 
     163            { 
     164                (*gitr)->add( compileOperation.get() ); 
     165                // requiresBarrier = true; 
     166            } 
     167        } 
     168         
     169        if (_block.valid()) _block->completed(); 
     170 
     171        // osg::notify(osg::NOTICE)<<"done LoadAndCompileOperation "<<_filename<<std::endl; 
     172    } 
     173     
     174    std::string                 _filename; 
     175    GraphicsThreads             _graphicsThreads; 
     176    osg::ref_ptr<osg::Node>     _loadedModel; 
     177    osg::ref_ptr<CountedBlock>  _block; 
     178 
     179}; 
     180 
    54181class MasterOperation : public osg::Operation 
    55182{ 
     
    66193    { 
    67194    } 
     195     
     196    /** Set the OperationQueue that the MasterOperation can use to place tasks like file loading on for other processes to handle.*/  
     197    void setOperationQueue(osg::OperationQueue* oq) { _operationQueue = oq; } 
     198     
     199    osg::OperationQueue* getOperationQueue() { return _operationQueue.get(); } 
    68200     
    69201    bool readMasterFile(Files& files) const 
     
    171303 
    172304            bool requiresBarrier = false; 
    173              
    174             for(Files::iterator nitr = newFiles.begin(); 
    175                 nitr != newFiles.end(); 
    176                 ++nitr) 
    177             { 
    178                 osg::ref_ptr<osg::Node> loadedModel = osgDB::readNodeFile(*nitr); 
    179  
    180                 if (loadedModel.get()) 
    181                 { 
    182                     nodesToAdd[*nitr] = loadedModel; 
    183  
    184                     osg::ref_ptr<osgUtil::GLObjectsOperation> compileOperation = new osgUtil::GLObjectsOperation(loadedModel.get()); 
    185                  
    186                     for(GraphicsThreads::iterator gitr = threads.begin(); 
    187                         gitr != threads.end(); 
    188                         ++gitr) 
     305 
     306 
     307            if (_operationQueue.valid()) 
     308            { 
     309                // osg::notify(osg::NOTICE)<<"Using OperationQueue"<<std::endl; 
     310 
     311                _endOfLoadBlock = new CountedBlock(newFiles.size()); 
     312      
     313                typedef std::list< osg::ref_ptr<LoadAndCompileOperation> > LoadAndCompileList; 
     314                LoadAndCompileList loadAndCompileList; 
     315      
     316                for(Files::iterator nitr = newFiles.begin(); 
     317                    nitr != newFiles.end(); 
     318                    ++nitr) 
     319                { 
     320                    osg::ref_ptr<LoadAndCompileOperation> loadAndCompile = new LoadAndCompileOperation( *nitr, threads, _endOfLoadBlock.get() ); 
     321                    loadAndCompileList.push_back(loadAndCompile); 
     322                    _operationQueue->add( loadAndCompile.get() ); 
     323                } 
     324                 
     325                osg::ref_ptr<osg::Operation> operation; 
     326                while ((operation=_operationQueue->getNextOperation()).valid()) 
     327                { 
     328                    // osg::notify(osg::NOTICE)<<"Local running of operation"<<std::endl; 
     329                    (*operation)(0); 
     330                } 
     331                                 
     332                // osg::notify(osg::NOTICE)<<"Waiting for completion of LoadAndCompile operations"<<std::endl; 
     333                _endOfLoadBlock->block(); 
     334                // osg::notify(osg::NOTICE)<<"done ... Waiting for completion of LoadAndCompile operations"<<std::endl; 
     335                 
     336                for(LoadAndCompileList::iterator litr = loadAndCompileList.begin(); 
     337                    litr != loadAndCompileList.end(); 
     338                    ++litr) 
     339                { 
     340                    if ((*litr)->_loadedModel.valid()) 
    189341                    { 
    190                         (*gitr)->add( compileOperation.get() )
     342                        nodesToAdd[(*litr)->_filename] = (*litr)->_loadedModel
    191343                        requiresBarrier = true; 
    192344                    } 
    193345                } 
    194             } 
    195              
     346 
     347            } 
     348             
     349            else 
     350            { 
     351                for(Files::iterator nitr = newFiles.begin(); 
     352                    nitr != newFiles.end(); 
     353                    ++nitr) 
     354                { 
     355                    osg::ref_ptr<osg::Node> loadedModel = osgDB::readNodeFile(*nitr); 
     356 
     357                    if (loadedModel.get()) 
     358                    { 
     359                        nodesToAdd[*nitr] = loadedModel; 
     360 
     361                        osg::ref_ptr<osgUtil::GLObjectsOperation> compileOperation = new osgUtil::GLObjectsOperation(loadedModel.get()); 
     362 
     363                        for(GraphicsThreads::iterator gitr = threads.begin(); 
     364                            gitr != threads.end(); 
     365                            ++gitr) 
     366                        { 
     367                            (*gitr)->add( compileOperation.get() ); 
     368                            requiresBarrier = true; 
     369                        } 
     370                    } 
     371                } 
     372            } 
     373                         
    196374            if (requiresBarrier) 
    197375            { 
    198                 _barrier = new osg::BarrierOperation(threads.size()+1); 
    199                 _barrier->setKeep(false); 
     376                _endOfCompilebarrier = new osg::BarrierOperation(threads.size()+1); 
     377                _endOfCompilebarrier->setKeep(false); 
    200378                 
    201379                for(GraphicsThreads::iterator gitr = threads.begin(); 
     
    203381                    ++gitr) 
    204382                { 
    205                     (*gitr)->add(_barrier.get()); 
    206                 } 
    207                  
     383                    (*gitr)->add(_endOfCompilebarrier.get()); 
     384                } 
     385                 
     386                // osg::notify(osg::NOTICE)<<"Waiting for Compile to complete"<<std::endl; 
     387 
    208388                // wait for the graphics threads to complete. 
    209                 _barrier->block(); 
     389                _endOfCompilebarrier->block(); 
     390                 
     391                // osg::notify(osg::NOTICE)<<"done ... Waiting for Compile to complete"<<std::endl; 
    210392            } 
    211393        } 
     
    295477    virtual void release() 
    296478    { 
     479        if (_operationQueue.valid()) _operationQueue->removeAllOperations(); 
     480 
    297481        _updatesMergedBlock.release(); 
    298         if (_barrier.valid()) _barrier.release(); 
     482        if (_endOfCompilebarrier.valid()) _endOfCompilebarrier.release(); 
     483        if (_endOfLoadBlock.valid()) _endOfLoadBlock.release(); 
    299484    } 
    300485 
     
    307492    FilenameNodeMap                     _nodesToAdd; 
    308493    OpenThreads::Block                  _updatesMergedBlock; 
    309     osg::ref_ptr<osg::BarrierOperation> _barrier; 
     494 
     495    osg::ref_ptr<osg::BarrierOperation> _endOfCompilebarrier; 
     496    osg::ref_ptr<CountedBlock>          _endOfLoadBlock; 
     497     
     498    osg::ref_ptr<osg::OperationQueue>   _operationQueue; 
    310499}; 
    311500 
     
    522711    double h = 1.0; 
    523712 
    524     bool createBackgroundContextForCompiling = false; 
    525     while (arguments.read("--bc")) { createBackgroundContextForCompiling = true; } 
    526  
    527     bool createBackgroundThreadsForCompiling = false; 
    528     while (arguments.read("--bt")) { createBackgroundContextForCompiling = true; createBackgroundThreadsForCompiling = true; } 
    529  
     713    unsigned int numLoadThreads = 1; 
     714    while (arguments.read("--load-threads",numLoadThreads)) { } 
    530715 
    531716    osg::ref_ptr<MasterOperation> masterOperation; 
     
    773958 
    774959    // start operation thread if a master file has been used. 
    775     osg::ref_ptr<osg::OperationThread> operationThread; 
     960    osg::ref_ptr<osg::OperationThread> masterOperationThread; 
     961     
     962    typedef std::list< osg::ref_ptr<osg::OperationThread> > OperationThreadList; 
     963    OperationThreadList generalThreadList; 
     964     
    776965    if (masterOperation.valid())  
    777966    { 
    778         operationThread = new osg::OperationThread; 
    779         operationThread->startThread(); 
    780         operationThread->add(masterOperation.get()); 
     967        masterOperationThread = new osg::OperationThread; 
     968        masterOperationThread->startThread(); 
     969         
     970        masterOperationThread->add(masterOperation.get()); 
     971 
     972//        if (numLoadThreads>0) 
     973        { 
     974            osg::ref_ptr<osg::OperationQueue> operationQueue = new osg::OperationQueue; 
     975            masterOperation->setOperationQueue(operationQueue.get()); 
     976 
     977            for(unsigned int i=0; i<numLoadThreads; ++i) 
     978            {         
     979                osg::ref_ptr<osg::OperationThread> thread = new osg::OperationThread; 
     980                thread->setOperationQueue(operationQueue.get()); 
     981                thread->startThread(); 
     982                generalThreadList.push_back(thread); 
     983            } 
     984        } 
    781985    } 
    782986