Cloned library of VTK-5.0.0 with extra build files for internal package management.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

427 lines
12 KiB

/*=========================================================================
Program: Visualization Toolkit
Module: $RCSfile: vtkMultiProcessController.cxx,v $
Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
All rights reserved.
See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
// This will be the default.
#include "vtkMultiProcessController.h"
#include "vtkDummyController.h"
#include "vtkToolkits.h"
#ifdef VTK_USE_MPI
#include "vtkMPIController.h"
#endif
#include "vtkCollection.h"
#include "vtkObjectFactory.h"
#include "vtkOutputWindow.h"
//----------------------------------------------------------------------------
// Needed when we don't use the vtkStandardNewMacro.
vtkInstantiatorNewMacro(vtkMultiProcessController);
//----------------------------------------------------------------------------
// Helper class to contain the RMI information.
// A subclass of vtkObject so that I can keep them in a collection.
class VTK_PARALLEL_EXPORT vtkMultiProcessControllerRMI : public vtkObject
{
public:
static vtkMultiProcessControllerRMI *New();
vtkTypeRevisionMacro(vtkMultiProcessControllerRMI, vtkObject);
int Tag;
vtkRMIFunctionType Function;
void *LocalArgument;
protected:
vtkMultiProcessControllerRMI() {};
vtkMultiProcessControllerRMI(const vtkMultiProcessControllerRMI&);
void operator=(const vtkMultiProcessControllerRMI&);
};
vtkCxxRevisionMacro(vtkMultiProcessControllerRMI, "$Revision: 1.23 $");
vtkStandardNewMacro(vtkMultiProcessControllerRMI);
vtkCxxRevisionMacro(vtkMultiProcessController, "$Revision: 1.23 $");
//----------------------------------------------------------------------------
// An RMI function that will break the "ProcessRMIs" loop.
void vtkMultiProcessControllerBreakRMI(void *localArg,
void *remoteArg, int remoteArgLength,
int vtkNotUsed(remoteId))
{
vtkMultiProcessController *controller;
remoteArg = remoteArg;
remoteArgLength = remoteArgLength;
controller = (vtkMultiProcessController*)(localArg);
controller->SetBreakFlag(1);
}
//----------------------------------------------------------------------------
vtkMultiProcessController::vtkMultiProcessController()
{
int i;
this->LocalProcessId = 0;
this->NumberOfProcesses = 1;
this->MaximumNumberOfProcesses = MAX_PROCESSES;
this->RMIs = vtkCollection::New();
this->SingleMethod = 0;
this->SingleData = 0;
this->Communicator = 0;
this->RMICommunicator = 0;
for ( i = 0; i < VTK_MAX_THREADS; i++ )
{
this->MultipleMethod[i] = NULL;
this->MultipleData[i] = NULL;
}
this->BreakFlag = 0;
this->ForceDeepCopy = 1;
this->OutputWindow = 0;
// Define an rmi internally to exit from the processing loop.
this->AddRMI(vtkMultiProcessControllerBreakRMI, this, BREAK_RMI_TAG);
}
//----------------------------------------------------------------------------
// This is an old comment that I do not know is true:
// (We need to have a "GetNetReferenceCount" to avoid memory leaks.)
vtkMultiProcessController::~vtkMultiProcessController()
{
if ( this->OutputWindow &&
(this->OutputWindow == vtkOutputWindow::GetInstance()) )
{
vtkOutputWindow::SetInstance(0);
}
if (this->OutputWindow)
{
this->OutputWindow->Delete();
}
this->RMIs->Delete();
this->RMIs = NULL;
}
//----------------------------------------------------------------------------
void vtkMultiProcessController::PrintSelf(ostream& os, vtkIndent indent)
{
this->Superclass::PrintSelf(os,indent);
vtkMultiProcessControllerRMI *rmi;
vtkIndent nextIndent = indent.GetNextIndent();
os << indent << "MaximumNumberOfProcesses: "
<< this->MaximumNumberOfProcesses << endl;
os << indent << "NumberOfProcesses: " << this->NumberOfProcesses << endl;
os << indent << "LocalProcessId: " << this->LocalProcessId << endl;
os << indent << "Break flag: " << (this->BreakFlag ? "(yes)" : "(no)")
<< endl;
os << indent << "Force deep copy: " << (this->ForceDeepCopy ? "(yes)" : "(no)")
<< endl;
os << indent << "Output window: ";
if (this->OutputWindow)
{
os << endl;
this->OutputWindow->PrintSelf(os, nextIndent);
}
else
{
os << "(none)" << endl;
}
os << indent << "Communicator: ";
if (this->Communicator)
{
os << endl;
this->Communicator->PrintSelf(os, nextIndent);
}
else
{
os << "(none)" << endl;
}
os << indent << "RMI communicator: ";
if (this->RMICommunicator)
{
os << endl;
this->RMICommunicator->PrintSelf(os, nextIndent);
}
else
{
os << "(none)" << endl;
}
os << indent << "RMIs: \n";
this->RMIs->InitTraversal();
while ( (rmi = (vtkMultiProcessControllerRMI*)(this->RMIs->GetNextItemAsObject())) )
{
os << nextIndent << rmi->Tag << endl;
}
}
//----------------------------------------------------------------------------
void vtkMultiProcessController::SetNumberOfProcesses(int num)
{
if (num == this->NumberOfProcesses)
{
return;
}
if (num < 1 || num > this->MaximumNumberOfProcesses)
{
vtkErrorMacro( << num
<< " is an invalid number of processes try a number from 1 to "
<< this->NumberOfProcesses );
return;
}
this->NumberOfProcesses = num;
this->Modified();
}
//----------------------------------------------------------------------------
void vtkMultiProcessController::SetSingleMethod( vtkProcessFunctionType f,
void *data )
{
this->SingleMethod = f;
this->SingleData = data;
}
//----------------------------------------------------------------------------
// Set one of the user defined methods that will be run on NumberOfProcesses
// processes when MultipleMethodExecute is called. This method should be
// called with index = 0, 1, .., NumberOfProcesses-1 to set up all the
// required user defined methods
void vtkMultiProcessController::SetMultipleMethod( int index,
vtkProcessFunctionType f, void *data )
{
// You can only set the method for 0 through NumberOfProcesses-1
if ( index >= this->NumberOfProcesses )
{
vtkErrorMacro( << "Can't set method " << index <<
" with a processes count of " << this->NumberOfProcesses );
}
else
{
this->MultipleMethod[index] = f;
this->MultipleData[index] = data;
}
}
//----------------------------------------------------------------------------
int vtkMultiProcessController::RemoveFirstRMI(int tag)
{
vtkMultiProcessControllerRMI *rmi = NULL;
this->RMIs->InitTraversal();
while ( (rmi = (vtkMultiProcessControllerRMI*)(this->RMIs->GetNextItemAsObject())) )
{
if (rmi->Tag == tag)
{
this->RMIs->RemoveItem(rmi);
return 1;
}
}
return 0;
}
//----------------------------------------------------------------------------
void vtkMultiProcessController::AddRMI(vtkRMIFunctionType f,
void *localArg, int tag)
{
vtkMultiProcessControllerRMI *rmi = vtkMultiProcessControllerRMI::New();
rmi->Tag = tag;
rmi->Function = f;
rmi->LocalArgument = localArg;
this->RMIs->AddItem(rmi);
rmi->Delete();
}
//----------------------------------------------------------------------------
void vtkMultiProcessController::TriggerRMI(int remoteProcessId,
void *arg, int argLength,
int rmiTag)
{
int triggerMessage[3];
// Deal with sending RMI to ourself here for now.
if (remoteProcessId == this->GetLocalProcessId())
{
this->ProcessRMI(remoteProcessId, arg, argLength, rmiTag);
return;
}
triggerMessage[0] = rmiTag;
triggerMessage[1] = argLength;
// It is important for the remote process to know what process invoked it.
// Multiple processes might try to invoke the method at the same time.
// The remote method will know where to get additional args.
triggerMessage[2] = this->GetLocalProcessId();
this->RMICommunicator->Send(triggerMessage, 3, remoteProcessId, RMI_TAG);
if (argLength > 0)
{
this->RMICommunicator->Send((char*)arg, argLength, remoteProcessId,
RMI_ARG_TAG);
}
}
//----------------------------------------------------------------------------
void vtkMultiProcessController::TriggerBreakRMIs()
{
int idx, num;
if (this->GetLocalProcessId() != 0)
{
vtkErrorMacro("Break should be triggered from process 0.");
return;
}
num = this->GetNumberOfProcesses();
for (idx = 1; idx < num; ++idx)
{
this->TriggerRMI(idx, NULL, 0, BREAK_RMI_TAG);
}
}
//----------------------------------------------------------------------------
int vtkMultiProcessController::ProcessRMIs()
{
return this->ProcessRMIs(1);
}
//----------------------------------------------------------------------------
int vtkMultiProcessController::ProcessRMIs(int reportErrors)
{
int triggerMessage[3];
unsigned char *arg = NULL;
int error = RMI_NO_ERROR;
while (1)
{
if (!this->RMICommunicator->Receive(triggerMessage, 3, ANY_SOURCE, RMI_TAG))
{
if (reportErrors)
{
vtkErrorMacro("Could not receive RMI trigger message.");
}
error = RMI_TAG_ERROR;
break;
}
if (triggerMessage[1] > 0)
{
arg = new unsigned char[triggerMessage[1]];
if (!this->RMICommunicator->Receive((char*)(arg), triggerMessage[1],
triggerMessage[2], RMI_ARG_TAG))
{
if (reportErrors)
{
vtkErrorMacro("Could not receive RMI argument.");
}
error = RMI_ARG_ERROR;
break;
}
}
this->ProcessRMI(triggerMessage[2], arg, triggerMessage[1],
triggerMessage[0]);
if (arg)
{
delete [] arg;
arg = NULL;
}
// check for break
if (this->BreakFlag)
{
this->BreakFlag = 0;
return error;
}
}
return error;
}
//----------------------------------------------------------------------------
void vtkMultiProcessController::ProcessRMI(int remoteProcessId,
void *arg, int argLength,
int rmiTag)
{
vtkMultiProcessControllerRMI *rmi = NULL;
int found = 0;
// find the rmi
this->RMIs->InitTraversal();
while ( !found &&
(rmi = (vtkMultiProcessControllerRMI*)(this->RMIs->GetNextItemAsObject())) )
{
if (rmi->Tag == rmiTag)
{
found = 1;
}
}
if ( ! found)
{
vtkErrorMacro("Process " << this->GetLocalProcessId() <<
" Could not find RMI with tag " << rmiTag);
}
else
{
if ( rmi->Function )
{
(*rmi->Function)(rmi->LocalArgument, arg, argLength, remoteProcessId);
}
}
}
//============================================================================
// The intent is to give access to a processes controller from a static method.
vtkMultiProcessController *VTK_GLOBAL_MULTI_PROCESS_CONTROLLER = NULL;
//----------------------------------------------------------------------------
vtkMultiProcessController *vtkMultiProcessController::GetGlobalController()
{
if (VTK_GLOBAL_MULTI_PROCESS_CONTROLLER == NULL)
{
return NULL;
}
return VTK_GLOBAL_MULTI_PROCESS_CONTROLLER->GetLocalController();
}
//----------------------------------------------------------------------------
// This can be overridden in the subclass to translate controllers.
vtkMultiProcessController *vtkMultiProcessController::GetLocalController()
{
return VTK_GLOBAL_MULTI_PROCESS_CONTROLLER;
}
//----------------------------------------------------------------------------
// This can be overridden in the subclass to translate controllers.
void vtkMultiProcessController::SetGlobalController(
vtkMultiProcessController *controller)
{
VTK_GLOBAL_MULTI_PROCESS_CONTROLLER = controller;
}