/*! \file mpiroutines.cxx * \brief this file contains routines used with MPI compilation. MPI routines generally pertain to domain decomposition or to specific MPI tasks that determine what needs to be broadcast between various threads. */ #ifdef USEMPI #include #include //-- For MPI #include "stf.h" #ifdef SWIFTINTERFACE #include "swiftinterface.h" using namespace Swift; #endif /// \name Domain decomposition routines and io routines to place particles correctly in local mpi data space //@{ /*! Determine the domain decomposition.\n Here the domains are constructured in data units only ThisTask==0 should call this routine. It is tricky to get appropriate load balancing and correct number of particles per processor.\n I could use recursive binary splitting like kd-tree along most spread axis till have appropriate number of volumes corresponding to number of processors. NOTE: assume that cannot store data so position information is read Nsplit times to determine boundaries of subvolumes could also randomly subsample system and produce tree from that should store for each processor the node structure generated by the domain decomposition what I could do is read file twice, one to get extent and other to calculate entropy then decompose along some primary axis, then choose orthogonal axis, keep iterating till have appropriate number of subvolumes then store the boundaries of the subvolume. this means I don't store data but get at least reasonable domain decomposition NOTE: pkdgrav uses orthoganl recursive bisection along with kd-tree, gadget-2 uses peno-hilbert curve to map particles and oct-trees the question with either method is guaranteeing load balancing. for ORB achieved by splitting (sub)volume along a dimension (say one with largest spread or max entropy) such that either side of the cut has approximately the same number of particles (ie: median splitting). But for both cases, load balancing requires particle information so I must load the system then move particles about to ensure load balancing. Main thing first is get the dimensional extent of the system. then I could get initial splitting just using mid point between boundaries along each dimension. once have that initial splitting just load data then start shifting data around. */ ///determine the initial domains, ie: bisection distance mpi_dxsplit, which is used to determien what processor a particle is assigned to ///here the domains are constructured in data units void MPIInitialDomainDecomposition(Options &opt) { if (opt.impiusemesh) { MPIInitialDomainDecompositionWithMesh(opt); return; } Int_t i,j,k,n,m,temp,count,count2,pc,pc_new, Ntot; int Nsplit,isplit; Int_t nbins1d,nbins3d, ibin[3]; Double_t diffsplit; int b,a; if (ThisTask==0) { //first split need not be simply having the dimension but determine //number of splits to have Nprocs=a*2^b, where a and b are integers //initial integers b=floor(log((float)NProcs)/log(2.0))-1; a=floor(NProcs/pow(2,b)); diffsplit=(double)NProcs/(double)a/(double)pow(2,b); while (diffsplit!=1) { b--; a=floor(NProcs/pow(2,b)); diffsplit=(double)NProcs/(double)a/(double)pow(2,b); } Nsplit=b+1; mpi_ideltax[0]=0;mpi_ideltax[1]=1;mpi_ideltax[2]=2; isplit=0; for (j=0;j<3;j++) mpi_nxsplit[j]=0; for (j=0;j1) for (j=0;j1) for (k=0;k> zcurvevalue(3); struct zcurvestruct{ unsigned int coord[3]; unsigned long long index; bitset<48> zcurvevalue; }; vector zcurve(n3); unsigned long long index; for (auto ix=0;ix(ix); zcurvevalue[1] = bitset<16>(iy); zcurvevalue[2] = bitset<16>(iz); for (auto j=0; j<16;j++) { for (auto i = 0; i<3; i++) { zcurve[index].zcurvevalue[j*3+i] = zcurvevalue[i][j]; } } } } } //then sort index array based on the zcurvearray value sort(zcurve.begin(), zcurve.end(), [](const zcurvestruct &a, const zcurvestruct &b){ return a.zcurvevalue.to_ullong() < b.zcurvevalue.to_ullong(); }); //finally assign cells to tasks opt.cellnodeids.resize(n3); opt.cellnodeorder.resize(n3); opt.cellloc = new cell_loc[n3]; int nsub = max((int)floor(n3/(double)NProcs), 1); int itask = 0, count = 0; vector numcellspertask(NProcs,0); for (auto i=0;i mpinumparts(NProcs, 0); for (auto i=0;i x) minval = x; if (maxval < x) maxval = x; ave += x; std += x*x; } ave /= (double)NProcs; std /= (double)NProcs; std = sqrt(std - ave*ave); return (maxval-minval)/ave; } bool MPIRepartitionDomainDecompositionWithMesh(Options &opt){ Int_t *buff = new Int_t[opt.numcells]; for (auto i=0;i opt.mpimeshimbalancelimit) { LOG_RANK0(info) << "Imbalance too large, adjusting MPI domains ..."; int itask = 0; Int_t numparts = 0 ; vector numcellspertask(NProcs,0); vector mpinumparts(NProcs,0); for (auto i=0;i optimalave && itask < NProcs-1) { mpinumparts[itask] = numparts; itask++; numparts = 0; } } mpinumparts[NProcs-1] = numparts; if (ThisTask == 0) { for (auto x:mpinumparts) if (x == 0) { LOG(error) << "MPI Process has zero particles associated with it, likely due to too many mpi tasks requested or too coarse a mesh used."; LOG(error) << "Current number of tasks: " << NProcs; LOG(error) << "Current mesh resolution " << opt.numcellsperdim << "^3"; Int_t sum = 0; for (auto x:opt.cellnodenumparts) { sum +=x; } LOG(error) << "Total number of particles loaded " << sum; LOG(error) << "Suggested number of particles per mpi processes is > 1e7"; LOG(error) << "Suggested number of mpi processes using 1e7 is " << (int)ceil(sum / 1e7); LOG(error) << "Increase mesh resolution or reduce MPI Processes "; MPI_Abort(MPI_COMM_WORLD,8); } LOG(info) << "Now have MPI imbalance of " << MPILoadBalanceWithMesh(opt); LOG(info) << "MPI tasks:"; for (auto i=0; i0); } return false; } void MPINumInDomain(Options &opt) { //when reading number in domain, use all available threads to read all available files //first set number of read threads to either total number of mpi process or files, which ever is smaller //store old number of read threads if (NProcs==1) { Nlocal=Ntotal; Nmemlocal=Nlocal; return; } int nsnapread=opt.nsnapread; //opt.nsnapread=min(NProcs,opt.num_files); if(opt.inputtype==IOTIPSY) MPINumInDomainTipsy(opt); else if (opt.inputtype==IOGADGET) MPINumInDomainGadget(opt); else if (opt.inputtype==IORAMSES) MPINumInDomainRAMSES(opt); #ifdef USEHDF else if (opt.inputtype==IOHDF) MPINumInDomainHDF(opt); #endif if (ThisTask == 0) { if (Ntotal/1e7 < NProcs) { LOG(warning) << "Suggested number of particles per mpi processes is roughly > 1e7"; LOG(warning) << "Number of MPI tasks greater than this suggested number"; LOG(warning) << "May result in poor performance"; } } //if using mesh, check load imbalance and also repartition if (opt.impiusemesh) { if (MPIRepartitionDomainDecompositionWithMesh(opt)){ if(opt.inputtype==IOTIPSY) MPINumInDomainTipsy(opt); else if (opt.inputtype==IOGADGET) MPINumInDomainGadget(opt); else if (opt.inputtype==IORAMSES) MPINumInDomainRAMSES(opt); #ifdef USEHDF else if (opt.inputtype==IOHDF) MPINumInDomainHDF(opt); #endif } } opt.nsnapread=nsnapread; //adjust the memory allocated to allow some buffer room. Nmemlocal=Nlocal*(1.0+opt.mpipartfac); if (opt.iBaryonSearch) Nmemlocalbaryon=Nlocalbaryon[0]*(1.0+opt.mpipartfac); } void MPIDomainExtent(Options &opt) { if(opt.inputtype==IOTIPSY) MPIDomainExtentTipsy(opt); else if (opt.inputtype==IOGADGET) MPIDomainExtentGadget(opt); else if (opt.inputtype==IORAMSES) MPIDomainExtentRAMSES(opt); #ifdef USEHDF else if (opt.inputtype==IOHDF) MPIDomainExtentHDF(opt); #endif } void MPIDomainDecomposition(Options &opt) { MPIInitialDomainDecomposition(opt); if(opt.inputtype==IOTIPSY) MPIDomainDecompositionTipsy(opt); else if (opt.inputtype==IOGADGET) MPIDomainDecompositionGadget(opt); else if (opt.inputtype==IORAMSES) MPIDomainDecompositionRAMSES(opt); #ifdef USEHDF else if (opt.inputtype==IOHDF) MPIDomainDecompositionHDF(opt); #endif } ///adjust the domain boundaries to code units void MPIAdjustDomain(Options &opt){ if (opt.impiusemesh) { MPIAdjustDomainWithMesh(opt); return; } Double_t aadjust, lscale; if (opt.comove) aadjust=1.0; else aadjust=opt.a; lscale=opt.lengthinputconversion*aadjust; if (opt.inputcontainslittleh) lscale /=opt.h; for (int j=0;j= 0 && index < opt.numcells) return opt.cellnodeids[index]; } else { for (int j=0;j=x)&& (mpi_domain[j].bnd[1][0]<=y) && (mpi_domain[j].bnd[1][1]>=y)&& (mpi_domain[j].bnd[2][0]<=z) && (mpi_domain[j].bnd[2][1]>=z) ) return j; } } LOG(error) << "Particle outside the mpi domains of every process (" << x << "," << y << "," << z << ")"; MPI_Abort(MPI_COMM_WORLD,9); return 0; // dummy } void MPIStripExportParticleOfExtraInfo(Options &opt, Int_t n, Particle *Part) { #if defined(GASON) || defined(STARON) || defined(BHON) || defined(EXTRADMON) Int_t numextrafields = 0; #ifdef GASON numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields > 0) { for (auto i=0;i 0) { for (auto i=0;i 0) { for (auto i=0;i 0) { for (auto i=0;i &indices, vector &propbuff, bool resetbuff) { #ifdef GASON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i &indices, vector &propbuff, bool resetbuff) { #ifdef STARON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i &indices, vector &propbuff, bool resetbuff) { #ifdef BHON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i &indices, vector &propbuff, bool resetbuff) { #ifdef EXTRADMON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i &indices, vector &propbuff, bool iforexport) { #ifdef GASON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i &indices, vector &propbuff, bool iforexport) { #ifdef STARON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i &indices, vector &propbuff, bool iforexport) { #ifdef BHON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i &indices, vector &propbuff, bool iforexport) { #ifdef EXTRADMON Int_t num = 0, numextrafields = 0, index, offset = 0; string field; indices.clear(); propbuff.clear(); numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices_gas, indices_star, indices_bh, indices_extradm; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff_gas, propbuff_star, propbuff_bh, propbuff_extradm; string field; #ifdef GASON numextrafields = opt.gas_internalprop_names.size() + opt.gas_chem_names.size() + opt.gas_chemproduction_names.size(); if (numextrafields > 0) { for (auto i=0;i0) { propbuff_gas.resize(numextrafields*num); for (auto i=0;i 0) { for (auto i=0;i 0) { propbuff_star.resize(numextrafields*num); for (auto i=0;i 0) { for (auto i=0;i 0) { propbuff_bh.resize(numextrafields*num); for (auto i=0;i 0) { for (auto i=0;i0) { propbuff_extradm.resize(numextrafields*num); for (auto i=0;i 0) { num = indices_gas.size(); MPI_Send(&num,sizeof(Int_t),MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); if (num > 0) { MPI_Send(indices_gas.data(),sizeof(Int_t)*num,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); MPI_Send(propbuff_gas.data(),sizeof(float)*num*numextrafields,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); } } #endif #ifdef STARON numextrafields = opt.star_internalprop_names.size() + opt.star_chem_names.size() + opt.star_chemproduction_names.size(); if (numextrafields > 0) { num = indices_star.size(); MPI_Send(&num,sizeof(Int_t),MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); if (num > 0) { MPI_Send(indices_star.data(),sizeof(Int_t)*num,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); MPI_Send(propbuff_star.data(),sizeof(float)*num*numextrafields,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); } } #endif #ifdef BHON numextrafields = opt.bh_internalprop_names.size() + opt.bh_chem_names.size() + opt.bh_chemproduction_names.size(); if (numextrafields > 0) { num = indices_bh.size(); MPI_Send(&num,sizeof(Int_t),MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); if (num > 0) { MPI_Send(indices_bh.data(),sizeof(Int_t)*num,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); MPI_Send(propbuff_bh.data(),sizeof(float)*num*numextrafields,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); } } #endif #ifdef EXTRADMON numextrafields = opt.extra_dm_internalprop_names.size(); if (numextrafields > 0) { num = indices_gas.size(); MPI_Send(&num,sizeof(Int_t),MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); if (num > 0) { MPI_Send(indices_extradm.data(),sizeof(Int_t)*num,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); MPI_Send(propbuff_extradm.data(),sizeof(float)*num*numextrafields,MPI_BYTE,taskID,taskID,MPI_COMM_WORLD); } } #endif } void MPISendHydroInfoFromReadThreads(Options &opt, Int_t nlocalbuff, Particle *Part, int taskID) { #ifdef GASON MPI_Status status; vector indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() +opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i indices; Int_t num = 0, numextrafields = 0, index, offset = 0; vector propbuff; string field; numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; for (auto i=0;i*&Preadbuf){ if (ibuf==ThisTask) { Nbuf[ibuf]--; Part[numpart++]=Pbuf[ibufindex]; } else { if(Nbuf[ibuf]==BufSize&&ireadtask[ibuf]<0) { MPI_Send(&Nbuf[ibuf], 1, MPI_Int_t, ibuf, ibuf+NProcs, MPI_COMM_WORLD); MPI_Send(&Pbuf[ibuf*BufSize],sizeof(Particle)*Nbuf[ibuf],MPI_BYTE,ibuf,ibuf,MPI_COMM_WORLD); MPISendHydroInfoFromReadThreads(opt, Nbuf[ibuf], &Pbuf[ibuf*BufSize], ibuf); MPISendStarInfoFromReadThreads(opt, Nbuf[ibuf], &Pbuf[ibuf*BufSize], ibuf); MPISendBHInfoFromReadThreads(opt, Nbuf[ibuf], &Pbuf[ibuf*BufSize], ibuf); MPISendExtraDMInfoFromReadThreads(opt, Nbuf[ibuf], &Pbuf[ibuf*BufSize], ibuf); Nbuf[ibuf]=0; } else if (ireadtask[ibuf]>=0) { if (ibuf!=ThisTask) { if (Nreadbuf[ireadtask[ibuf]]==Preadbuf[ireadtask[ibuf]].size()) Preadbuf[ireadtask[ibuf]].resize(Preadbuf[ireadtask[ibuf]].size()+BufSize); Preadbuf[ireadtask[ibuf]][Nreadbuf[ireadtask[ibuf]]]=Pbuf[ibufindex]; Nreadbuf[ireadtask[ibuf]]++; Nbuf[ibuf]=0; } } } } //@} /// \name routines which check to see if some search region overlaps with local mpi domain //@{ ///search if some region is in the local mpi domain int MPIInDomain(Double_t xsearch[3][2], Double_t bnd[3][2]){ Double_t xsearchp[3][2]; if (NProcs==1) return 1; if (!((bnd[0][1] < xsearch[0][0]) || (bnd[0][0] > xsearch[0][1]) || (bnd[1][1] < xsearch[1][0]) || (bnd[1][0] > xsearch[1][1]) || (bnd[2][1] < xsearch[2][0]) || (bnd[2][0] > xsearch[2][1]))) return 1; else { if (mpi_period==0) return 0; else { for (int j=0;j<3;j++) {xsearchp[j][0]=xsearch[j][0];xsearchp[j][1]=xsearch[j][1];} for (int j=0;j<3;j++) { if (!((bnd[j][1] < xsearch[j][0]+mpi_period) || (bnd[j][0] > xsearch[j][1]+mpi_period))) {xsearchp[j][0]+=mpi_period;xsearchp[j][1]+=mpi_period;} else if (!((bnd[j][1] < xsearch[j][0]-mpi_period) || (bnd[j][0] > xsearch[j][1]-mpi_period))) {xsearchp[j][0]-=mpi_period;xsearchp[j][1]-=mpi_period;} } if (!((bnd[0][1] < xsearchp[0][0]) || (bnd[0][0] > xsearchp[0][1]) || (bnd[1][1] < xsearchp[1][0]) || (bnd[1][0] > xsearchp[1][1]) || (bnd[2][1] < xsearchp[2][0]) || (bnd[2][0] > xsearchp[2][1]))) return 1; else return 0; } } } ///\todo clean up memory allocation in these functions, no need to keep allocating xsearch,xsearchp,numoverlap,etc /// Determine if a particle needs to be exported to another mpi domain based on a physical search radius int MPISearchForOverlap(Particle &Part, Double_t &rdist){ Double_t xsearch[3][2]; for (auto k=0;k<3;k++) {xsearch[k][0]=Part.GetPosition(k)-rdist;xsearch[k][1]=Part.GetPosition(k)+rdist;} return MPISearchForOverlap(xsearch); } int MPISearchForOverlap(Coordinate &x, Double_t &rdist){ Double_t xsearch[3][2]; for (auto k=0;k<3;k++) {xsearch[k][0]=x[k]-rdist;xsearch[k][1]=x[k]+rdist;} return MPISearchForOverlap(xsearch); } int MPISearchForOverlap(Double_t xsearch[3][2]){ Double_t xsearchp[7][3][2];//used to store periodic reflections int numoverlap=0,numreflecs=0,ireflec[3],numreflecchoice=0; int indomain; int j,k; for (j=0;j xsearch[0][1]) || (mpi_domain[j].bnd[1][1] < xsearch[1][0]) || (mpi_domain[j].bnd[1][0] > xsearch[1][1]) || (mpi_domain[j].bnd[2][1] < xsearch[2][0]) || (mpi_domain[j].bnd[2][0] > xsearch[2][1]))) numoverlap++; } } if (mpi_period!=0) { for (k=0;k<3;k++) if (xsearch[k][0]<0||xsearch[k][1]>mpi_period) ireflec[numreflecs++]=k; if (numreflecs==1)numreflecchoice=1; else if (numreflecs==2) numreflecchoice=3; else if (numreflecs==3) numreflecchoice=7; for (j=0;jmpi_period) { xsearchp[0][ireflec[0]][0]=xsearch[ireflec[0]][0]-mpi_period; xsearchp[0][ireflec[0]][1]=xsearch[ireflec[0]][1]-mpi_period; } } else if (numreflecs==2) { k=0;j=0; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } k++;j++; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } j++;k=0; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } k++; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } } else if (numreflecs==3) { j=0;k=0; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } j++;k++; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } j++;k++; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } j++;k=0; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } k=1; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } j++;k=0; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } k=2; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } j++;k=1; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } k=2; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } j++;k=0; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } k++; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } k++; if (xsearch[ireflec[k]][0]<0) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]+mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]+mpi_period; } else if (xsearch[ireflec[k]][1]>mpi_period) { xsearchp[j][ireflec[k]][0]=xsearch[ireflec[k]][0]-mpi_period; xsearchp[j][ireflec[k]][1]=xsearch[ireflec[k]][1]-mpi_period; } } for (j=0;j xsearchp[k][0][1]) || (mpi_domain[j].bnd[1][1] < xsearchp[k][1][0]) || (mpi_domain[j].bnd[1][0] > xsearchp[k][1][1]) || (mpi_domain[j].bnd[2][1] < xsearchp[k][2][0]) || (mpi_domain[j].bnd[2][0] > xsearchp[k][2][1]))) numoverlap++; } } } return numoverlap; } ///\todo clean up memory allocation in these functions, no need to keep allocating xsearch,xsearchp,numoverlap,etc /// Determine if a particle needs to be exported to another mpi domain based on a physical search radius int MPISearchForOverlapUsingMesh(Options &opt, Particle &Part, Double_t &rdist){ Double_t xsearch[3][2]; for (auto k=0;k<3;k++) {xsearch[k][0]=Part.GetPosition(k)-rdist;xsearch[k][1]=Part.GetPosition(k)+rdist;} return MPISearchForOverlapUsingMesh(opt, xsearch); } int MPISearchForOverlapUsingMesh(Options &opt, Coordinate &x, Double_t &rdist){ Double_t xsearch[3][2]; for (auto k=0;k<3;k++) {xsearch[k][0]=x[k]-rdist;xsearch[k][1]=x[k]+rdist;} return MPISearchForOverlapUsingMesh(opt, xsearch); } int MPISearchForOverlapUsingMesh(Options &opt, Double_t xsearch[3][2]){ int numoverlap=0; /// Store whether an MPI domain has already been sent to vectorsent_mpi_domain(NProcs); for(int i=0; i celllist=MPIGetCellListInSearchUsingMesh(opt,xsearch); for (auto j:celllist) { const int cellnodeID = opt.cellnodeids[j]; /// Only check if particles have overlap with neighbouring cells that are on another MPI domain and have not already been sent to if (sent_mpi_domain[cellnodeID] == 1) continue; numoverlap++; sent_mpi_domain[cellnodeID]++; } return numoverlap; } //@} /// \name Routines involved in reading input data //@{ ///Distribute the mpi processes that read the input files so as to spread the read threads evenly throughout the MPI_COMM_WORLD void MPIDistributeReadTasks(Options&opt, int *&ireadtask, int*&readtaskID){ //initialize if (opt.nsnapread>NProcs) opt.nsnapread=NProcs; #ifndef USEPARALLELHDF //if not using parallel hdf5, allow only one task per file if (opt.num_files &ireadfile, int *&ireadtask){ //to determine which files the thread should read int nread, niread, nfread; for (int i=0;i= nsnapread, proceed as always. if (opt.num_files>=opt.nsnapread) { nread=opt.num_files/opt.nsnapread; niread=ireadtask[ThisTask]*nread,nfread=(ireadtask[ThisTask]+1)*nread; if (ireadtask[ThisTask]==opt.nsnapread-1) nfread=opt.num_files; for (int i=niread;i 1) { ThisWriteComm = (int)(floor(ThisTask/(float)opt.mpinprocswritesize)); NWriteComms = (int)(ceil(NProcs/(float)opt.mpinprocswritesize)); MPI_Comm_split(MPI_COMM_WORLD, ThisWriteComm, ThisTask, &mpi_comm_write); MPI_Comm_rank(mpi_comm_write, &ThisWriteTask); MPI_Comm_size(mpi_comm_write, &NProcsWrite); } else{ ThisWriteTask = ThisTask; ThisWriteComm = ThisTask; NProcsWrite = NProcs; NWriteComms = NProcs; mpi_comm_write = MPI_COMM_WORLD; } #endif } void MPIFreeWriteComm(){ if (mpi_comm_write != MPI_COMM_WORLD) MPI_Comm_free(&mpi_comm_write); mpi_comm_write = MPI_COMM_WORLD; ThisWriteTask = ThisTask; ThisWriteComm = ThisTask; NProcsWrite = NProcs; NWriteComms = NProcs; } //@} /// \name Routines involved in exporting particles //@{ void MPIReceiveHydroInfoFromReadThreads(Options &opt, Int_t nlocalbuff, Particle *Part, int readtaskID) { #ifdef GASON MPI_Status status; vector indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; HydroProperties x; numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, readtaskID, ThisTask, MPI_COMM_WORLD, &status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; StarProperties x; numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, readtaskID, ThisTask, MPI_COMM_WORLD, &status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; BHProperties x; numextrafields = opt.bh_internalprop_unique_input_names.size() +opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, readtaskID, ThisTask, MPI_COMM_WORLD, &status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; ExtraDMProperties x; numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, readtaskID, ThisTask, MPI_COMM_WORLD, &status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; HydroProperties x; numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, sourceTaskID, tag, MPI_COMM_WORLD,&status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; StarProperties x; numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, sourceTaskID, tag, MPI_COMM_WORLD,&status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; BHProperties x; numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, sourceTaskID, tag, MPI_COMM_WORLD,&status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indices; Int_t num, numextrafields = 0, index, offset = 0; vector propbuff; string field; ExtraDMProperties x; numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; MPI_Recv(&num, 1, MPI_Int_t, sourceTaskID, tag, MPI_COMM_WORLD,&status); if (num == 0) return; //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i0) { MPI_Recv(&Part[Nlocal],sizeof(Particle)*Nlocalthreadbuf[i],MPI_BYTE,readtaskID[i],ThisTask, MPI_COMM_WORLD,&status); MPIReceiveHydroInfoFromReadThreads(opt, Nlocalthreadbuf[i], &Part[Nlocal], readtaskID[i]); MPIReceiveStarInfoFromReadThreads(opt, Nlocalthreadbuf[i], &Part[Nlocal], readtaskID[i]); MPIReceiveBHInfoFromReadThreads(opt, Nlocalthreadbuf[i], &Part[Nlocal], readtaskID[i]); MPIReceiveExtraDMInfoFromReadThreads(opt, Nlocalthreadbuf[i], &Part[Nlocal], readtaskID[i]); Nlocal+=Nlocalthreadbuf[i]; Nlocaltotalbuf+=Nlocalthreadbuf[i]; mpi_irecvflag[i]=0; MPI_Irecv(&Nlocalthreadbuf[i], 1, MPI_Int_t, readtaskID[i], ThisTask+NProcs, MPI_COMM_WORLD, &mpi_request[i]); } else { irecv[i]=0; } } } } for (i=0;i0); //now that data is local, must adjust data iff a separate baryon search is required. if (opt.partsearchtype==PSTDARK && opt.iBaryonSearch) { for (i=0;i indicessend, indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector propsendbuff, proprecvbuff; string field; HydroProperties x; numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; //first determine what needs to be sent. for (auto i=0;i0) { propsendbuff.resize(numextrafields*numsend); for (auto i=0;i0) { indicesrecv.resize(numrecv); proprecvbuff.resize(numrecv*numextrafields); } //send the information. If size is zero, resize vector so .data() points to valid address if (numsend == 0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv == 0) {indicesrecv.resize(1);proprecvbuff.resize(1);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*2, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend, MPI_FLOAT, recvTask, tag*3, proprecvbuff.data(),numrecv, MPI_FLOAT, recvTask, tag*3, mpi_comm, &status); if (numrecv == 0) return; //and then update the local information //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indicessend, indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector propsendbuff, proprecvbuff; string field; StarProperties x; numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; //first determine what needs to be sent. for (auto i=0;i0) { propsendbuff.resize(numextrafields*numsend); for (auto i=0;i0) { indicesrecv.resize(numrecv); proprecvbuff.resize(numrecv*numextrafields); } //send the information. If size is zero, resize vector so .data() points to valid address if (numsend == 0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv == 0) {indicesrecv.resize(1);proprecvbuff.resize(1);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*2, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend, MPI_FLOAT, recvTask, tag*3, proprecvbuff.data(),numrecv, MPI_FLOAT, recvTask, tag*3, mpi_comm, &status); if (numrecv == 0) return; //and then update the local information //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indicessend, indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector propsendbuff, proprecvbuff; string field; BHProperties x; numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; //first determine what needs to be sent. for (auto i=0;i0) { propsendbuff.resize(numextrafields*numsend); for (auto i=0;i0) { indicesrecv.resize(numrecv); proprecvbuff.resize(numrecv*numextrafields); } //send the information. If size is zero, resize vector so .data() points to valid address if (numsend == 0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv == 0) {indicesrecv.resize(1);proprecvbuff.resize(1);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*2, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend, MPI_FLOAT, recvTask, tag*3, proprecvbuff.data(),numrecv, MPI_FLOAT, recvTask, tag*3, mpi_comm, &status); if (numrecv == 0) return; //and then update the local information //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i indicessend, indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector propsendbuff, proprecvbuff; string field; ExtraDMProperties x; numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; //first determine what needs to be sent. for (auto i=0;i0) { propsendbuff.resize(numextrafields*numsend); for (auto i=0;i0) { indicesrecv.resize(numrecv); proprecvbuff.resize(numrecv*numextrafields); } //send the information. If size is zero, resize vector so .data() points to valid address if (numsend == 0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv == 0) {indicesrecv.resize(1);proprecvbuff.resize(1);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*2, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend, MPI_FLOAT, recvTask, tag*3, proprecvbuff.data(),numrecv, MPI_FLOAT, recvTask, tag*3, mpi_comm, &status); if (numrecv == 0) return; //and then update the local information //explicitly NULLing copied information which was done with a BYTE copy //The unique pointers will have meaningless info so NULL them (by relasing ownership) //and then setting the released pointer to null via in built function. for (auto i=0;i &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef GASON MPI_Status status; vector indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector proprecvbuff(0); string field; HydroProperties x; numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; numsend = indicessend.size(); MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask, tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); //send the information. If vectors are of zero size, must increase size so .data() points to a valid address if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);} else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask, tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status); if (numrecv == 0) return; for (auto i=0;i &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef STARON MPI_Status status; vector indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector proprecvbuff(0); string field; StarProperties x; numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; numsend = indicessend.size(); MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask, tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); //send the information. If vectors are of zero size, must increase size so .data() points to a valid address if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);} else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask, tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status); if (numrecv == 0) return; for (auto i=0;i &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef BHON MPI_Status status; vector indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector proprecvbuff(0); string field; BHProperties x; numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; numsend = indicessend.size(); MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask, tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); //send the information. If vectors are of zero size, must increase size so .data() points to a valid address if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);} else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask, tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status); if (numrecv == 0) return; for (auto i=0;i &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef EXTRADMON MPI_Status status; vector indicesrecv; Int_t numsend, numrecv, numextrafields = 0, index, offset = 0; vector proprecvbuff(0); string field; ExtraDMProperties x; numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; numsend = indicessend.size(); MPI_Sendrecv(&numsend, 1, MPI_Int_t, recvTask, tag*2, &numrecv, 1, MPI_Int_t, recvTask, tag*2, mpi_comm, &status); //send the information. If vectors are of zero size, must increase size so .data() points to a valid address if (numsend==0) {indicessend.resize(1);propsendbuff.resize(1);} if (numrecv==0) {indicesrecv.resize(1);proprecvbuff.resize(1);} else {indicesrecv.resize(numrecv);proprecvbuff.resize(numrecv);} MPI_Sendrecv(indicessend.data(),numsend, MPI_Int_t, recvTask, tag*3, indicesrecv.data(),numrecv, MPI_Int_t, recvTask, tag*3, mpi_comm, &status); MPI_Sendrecv(propsendbuff.data(),numsend*numextrafields, MPI_FLOAT, recvTask, tag*4, proprecvbuff.data(),numrecv*numextrafields, MPI_FLOAT, recvTask, tag*4, mpi_comm, &status); if (numrecv == 0) return; for (auto i=0;i, std::vector> exchange_indices_and_props( const std::vector &indices, const std::vector &props, std::size_t props_per_index, int rank, int tag, MPI_Comm mpi_comm) { assert(indices.size() <= std::numeric_limits::max()); assert(props.size() == indices.size() * props_per_index); // Send/recv number of indices to allocate reception buffers unsigned long long num_indices = indices.size(); unsigned long long num_indices_recv; MPI_Status status; MPI_Sendrecv( &num_indices, 1, MPI_UNSIGNED_LONG_LONG, rank, tag * 2, &num_indices_recv, 1, MPI_UNSIGNED_LONG_LONG, rank, tag * 2, mpi_comm, &status); // Send/recv actual indices and properties std::vector indices_recv(num_indices_recv); std::vector props_recv(num_indices_recv * props_per_index); MPI_Sendrecv( indices.data(), indices.size(), MPI_Int_t, rank, tag * 3, indices_recv.data(), indices_recv.size(), MPI_Int_t, rank, tag * 3, mpi_comm, &status); MPI_Sendrecv( props.data(), props.size(), MPI_FLOAT, rank, tag * 4, props_recv.data(), props_recv.size(), MPI_FLOAT, rank, tag * 4, mpi_comm, &status); return std::make_tuple(std::move(indices_recv), std::move(props_recv)); } void MPISendReceiveFOFHydroInfoBetweenThreads(Options &opt, fofid_in *FoFGroupDataLocal, vector &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef GASON Int_t numextrafields = 0, index, offset = 0; string field; HydroProperties x; numextrafields = opt.gas_internalprop_unique_input_names.size() + opt.gas_chem_unique_input_names.size() + opt.gas_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; vector indicesrecv; vector proprecvbuff; std::tie(indicesrecv, proprecvbuff) = exchange_indices_and_props(indicessend, propsendbuff, numextrafields, recvTask, tag, mpi_comm); for (auto i = 0; i < indicesrecv.size(); i++) { index=indicesrecv[i]; FoFGroupDataLocal[index].p.SetHydroProperties(x); offset = 0; for (auto iextra=0;iextra &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef STARON Int_t numextrafields = 0, index, offset = 0; string field; StarProperties x; numextrafields = opt.star_internalprop_unique_input_names.size() + opt.star_chem_unique_input_names.size() + opt.star_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; vector indicesrecv; vector proprecvbuff; std::tie(indicesrecv, proprecvbuff) = exchange_indices_and_props(indicessend, propsendbuff, numextrafields, recvTask, tag, mpi_comm); for (auto i = 0; i < indicesrecv.size(); i++) { index=indicesrecv[i]; FoFGroupDataLocal[index].p.SetStarProperties(x); offset = 0; for (auto iextra=0;iextra &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef BHON Int_t numextrafields = 0, index, offset = 0; string field; BHProperties x; numextrafields = opt.bh_internalprop_unique_input_names.size() + opt.bh_chem_unique_input_names.size() + opt.bh_chemproduction_unique_input_names.size(); if (numextrafields == 0) return; vector indicesrecv; vector proprecvbuff; std::tie(indicesrecv, proprecvbuff) = exchange_indices_and_props(indicessend, propsendbuff, numextrafields, recvTask, tag, mpi_comm); for (auto i = 0; i < indicesrecv.size(); i++) { index=indicesrecv[i]; FoFGroupDataLocal[index].p.SetBHProperties(x); offset = 0; for (auto iextra=0;iextra &indicessend, vector &propsendbuff, int recvTask, int tag, MPI_Comm &mpi_comm) { #ifdef EXTRADMON Int_t numextrafields = 0, index, offset = 0; string field; ExtraDMProperties x; numextrafields = opt.extra_dm_internalprop_unique_input_names.size(); if (numextrafields == 0) return; vector indicesrecv; vector proprecvbuff; std::tie(indicesrecv, proprecvbuff) = exchange_indices_and_props(indicessend, propsendbuff, numextrafields, recvTask, tag, mpi_comm); for (auto i = 0; i < indicesrecv.size(); i++) { index=indicesrecv[i]; FoFGroupDataLocal[index].p.SetExtraDMProperties(x); offset = 0; for (auto iextra=0;iextra=0) { //split the communication into small buffers int icycle=0,ibuf; //maximum send size int maxchunksize=2147483648/opt.nsnapread/sizeof(Particle); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendTask,recvTask; int sendoffset,recvoffset; int isendrecv; int cursendchunksize,currecvchunksize; MPI_Status status; for (ibuf=0;ibuf< opt.nsnapread; ibuf++){ //if there are an even number of read tasks, then communicate such that 0 communcates with N-1, 1<->N-2, etc //and moves on to next communication 0<->N-2, 1<->N-3, etc with the communication in chunks //first base on read thread position sendTask=ireadtask[ThisTask]; ///map so that 0 <->N-1, 1 <->N-2, etc to start moving to recvTask=abs(opt.nsnapread-1-ibuf-sendTask); //if have cycled passed zero, then need to adjust recvTask if (icycle==1) recvTask=opt.nsnapread-recvTask; //now adjust to the actually task ID in the MPI_COMM_WORLD sendTask=ThisTask; recvTask=readtaskID[recvTask]; //if ibuf>0 and now at recvTask=0, then next time, cycle if (ibuf>0 && recvTask==0) icycle=1; //if sendtask!=recvtask, and information needs to be sent, send information if (sendTask!=recvTask && (mpi_nsend[ThisTask * NProcs + recvTask] > 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0)) { nsend=mpi_nsend[ThisTask * NProcs + recvTask]; nrecv=mpi_nsend[recvTask * NProcs + ThisTask]; //calculate how many send/recvs are needed nsendchunks=ceil((double)nsend/(double)maxchunksize); nrecvchunks=ceil((double)nrecv/(double)maxchunksize); numsendrecv=max(nsendchunks,nrecvchunks); //initialize the offset in the particle array sendoffset=0; recvoffset=0; isendrecv=1; do { //determine amount to be sent cursendchunksize=min(maxchunksize,nsend-sendoffset); currecvchunksize=min(maxchunksize,nrecv-recvoffset); //blocking point-to-point send and receive. Here must determine the appropriate offset point in the local export buffer //for sending data and also the local appropriate offset in the local the receive buffer for information sent from the local receiving buffer MPI_Sendrecv(&Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset],sizeof(Particle)*cursendchunksize, MPI_BYTE, recvTask, TAG_IO_A+isendrecv, &Part[Nlocal],sizeof(Particle)*currecvchunksize, MPI_BYTE, recvTask, TAG_IO_A+isendrecv, MPI_COMM_WORLD, &status); MPISendReceiveHydroInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); MPISendReceiveStarInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); MPISendReceiveBHInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); MPISendReceiveExtraDMInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); Nlocal+=currecvchunksize; sendoffset+=cursendchunksize; recvoffset+=currecvchunksize; isendrecv++; } while (isendrecv<=numsendrecv); } //if separate baryon search, send baryons too if (opt.iBaryonSearch && opt.partsearchtype!=PSTALL) { nsend=mpi_nsend_baryon[ThisTask * NProcs + recvTask]; nrecv=mpi_nsend_baryon[recvTask * NProcs + ThisTask]; //calculate how many send/recvs are needed nsendchunks=ceil((double)nsend/(double)maxchunksize); nrecvchunks=ceil((double)nrecv/(double)maxchunksize); numsendrecv=max(nsendchunks,nrecvchunks); //initialize the offset in the particle array sendoffset=0; recvoffset=0; isendrecv=1; do { //determine amount to be sent cursendchunksize=min(maxchunksize,nsend-sendoffset); currecvchunksize=min(maxchunksize,nrecv-recvoffset); //blocking point-to-point send and receive. Here must determine the appropriate offset point in the local export buffer //for sending data and also the local appropriate offset in the local the receive buffer for information sent from the local receiving buffer MPI_Sendrecv(&Pbuf[nreadoffset[ireadtask[recvTask]]+mpi_nsend[ThisTask * NProcs + recvTask]+sendoffset],sizeof(Particle)*cursendchunksize, MPI_BYTE, recvTask, TAG_IO_B+isendrecv, &Pbaryons[Nlocalbaryon[0]],sizeof(Particle)*currecvchunksize, MPI_BYTE, recvTask, TAG_IO_B+isendrecv, MPI_COMM_WORLD, &status); MPISendReceiveHydroInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); MPISendReceiveStarInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); MPISendReceiveBHInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); MPISendReceiveExtraDMInfoBetweenThreads(opt, cursendchunksize, &Pbuf[nreadoffset[ireadtask[recvTask]]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); Nlocalbaryon[0]+=currecvchunksize; sendoffset+=cursendchunksize; recvoffset+=currecvchunksize; isendrecv++; } while (isendrecv<=numsendrecv); } } } } void MPISendParticlesBetweenReadThreads(Options &opt, vector *&Preadbuf, Particle *Part, int *&ireadtask, int *&readtaskID, Particle *&Pbaryons, MPI_Comm &mpi_comm_read, Int_t *&mpi_nsend_readthread, Int_t *&mpi_nsend_readthread_baryon) { if (ireadtask[ThisTask]>=0) { //split the communication into small buffers int icycle=0,ibuf; //maximum send size int maxchunksize=2147483648/opt.nsnapread/sizeof(Particle); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendTask,recvTask; int sendoffset,recvoffset; int isendrecv; int cursendchunksize,currecvchunksize; MPI_Status status; for (ibuf=0;ibuf< opt.nsnapread; ibuf++){ //if there are an even number of read tasks, then communicate such that 0 communcates with N-1, 1<->N-2, etc //and moves on to next communication 0<->N-2, 1<->N-3, etc with the communication in chunks //first base on read thread position sendTask=ireadtask[ThisTask]; ///map so that 0 <->N-1, 1 <->N-2, etc to start moving to recvTask=abs(opt.nsnapread-1-ibuf-sendTask); //if have cycled passed zero, then need to adjust recvTask if (icycle==1) recvTask=opt.nsnapread-recvTask; //if ibuf>0 and now at recvTask=0, then next time, cycle if (ibuf>0 && recvTask==0) icycle=1; //if sendtask!=recvtask, and information needs to be sent, send information if (sendTask!=recvTask && (mpi_nsend_readthread[sendTask * opt.nsnapread + recvTask] > 0 || mpi_nsend_readthread[recvTask * opt.nsnapread + sendTask] > 0)) { nsend=mpi_nsend_readthread[sendTask * opt.nsnapread + recvTask]; nrecv=mpi_nsend_readthread[recvTask * opt.nsnapread + sendTask]; //calculate how many send/recvs are needed nsendchunks=ceil((double)nsend/(double)maxchunksize); nrecvchunks=ceil((double)nrecv/(double)maxchunksize); numsendrecv=max(nsendchunks,nrecvchunks); //initialize the offset in the particle array sendoffset=0; recvoffset=0; isendrecv=1; do { //determine amount to be sent cursendchunksize=min(maxchunksize,nsend-sendoffset); currecvchunksize=min(maxchunksize,nrecv-recvoffset); //blocking point-to-point send and receive. Here must determine the appropriate offset point in the local export buffer //for sending data and also the local appropriate offset in the local the receive buffer for information sent from the local receiving buffer MPI_Sendrecv(&Preadbuf[recvTask][sendoffset],sizeof(Particle)*cursendchunksize, MPI_BYTE, recvTask, TAG_IO_A+isendrecv, &(Part[Nlocal]),sizeof(Particle)*currecvchunksize, MPI_BYTE, recvTask, TAG_IO_A+isendrecv, mpi_comm_read, &status); MPISendReceiveHydroInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); MPISendReceiveStarInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); MPISendReceiveBHInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); MPISendReceiveExtraDMInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][sendoffset], currecvchunksize, &Part[Nlocal], recvTask, TAG_IO_A+isendrecv, mpi_comm_read); Nlocal+=currecvchunksize; sendoffset+=cursendchunksize; recvoffset+=currecvchunksize; isendrecv++; } while (isendrecv<=numsendrecv); } //if separate baryon search, send baryons too if (opt.iBaryonSearch && opt.partsearchtype!=PSTALL) { nsend=mpi_nsend_readthread_baryon[sendTask * opt.nsnapread + recvTask]; nrecv=mpi_nsend_readthread_baryon[recvTask * opt.nsnapread + sendTask]; //calculate how many send/recvs are needed nsendchunks=ceil((double)nsend/(double)maxchunksize); nrecvchunks=ceil((double)nrecv/(double)maxchunksize); numsendrecv=max(nsendchunks,nrecvchunks); //initialize the offset in the particle array sendoffset=0; recvoffset=0; isendrecv=1; do { //determine amount to be sent cursendchunksize=min(maxchunksize,nsend-sendoffset); currecvchunksize=min(maxchunksize,nrecv-recvoffset); //blocking point-to-point send and receive. Here must determine the appropriate offset point in the local export buffer //for sending data and also the local appropriate offset in the local the receive buffer for information sent from the local receiving buffer MPI_Sendrecv(&Preadbuf[recvTask][mpi_nsend_readthread[sendTask * opt.nsnapread + recvTask]+sendoffset],sizeof(Particle)*cursendchunksize, MPI_BYTE, recvTask, TAG_IO_B+isendrecv, &Pbaryons[Nlocalbaryon[0]],sizeof(Particle)*currecvchunksize, MPI_BYTE, recvTask, TAG_IO_B+isendrecv, mpi_comm_read, &status); MPISendReceiveHydroInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][mpi_nsend_readthread[sendTask * opt.nsnapread + recvTask]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); MPISendReceiveStarInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][mpi_nsend_readthread[sendTask * opt.nsnapread + recvTask]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); MPISendReceiveBHInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][mpi_nsend_readthread[sendTask * opt.nsnapread + recvTask]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); MPISendReceiveExtraDMInfoBetweenThreads(opt, cursendchunksize, &Preadbuf[recvTask][mpi_nsend_readthread[sendTask * opt.nsnapread + recvTask]+sendoffset], currecvchunksize, &Pbaryons[Nlocalbaryon[0]], recvTask, TAG_IO_B+isendrecv, mpi_comm_read); Nlocalbaryon[0]+=currecvchunksize; sendoffset+=cursendchunksize; recvoffset+=currecvchunksize; isendrecv++; } while (isendrecv<=numsendrecv); } } } } void MPIGetExportNum(const Int_t nbodies, Particle *Part, Double_t rdist){ Int_t i, j,nthreads,nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; Int_t sendTask,recvTask; MPI_Status status; ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (j=0;jsent_mpi_domain(NProcs); for (i=0;i celllist=MPIGetCellListInSearchUsingMesh(opt,xsearch); for (auto j:celllist) { const int cellnodeID = opt.cellnodeids[j]; /// Only check if particles have overlap with neighbouring cells that are on another MPI domain and have not already been sent to if (sent_mpi_domain[cellnodeID] == 1) continue; nexport++; nsend_local[cellnodeID]++; sent_mpi_domain[cellnodeID]++; } } NExport=nexport;//*(1.0+MPIExportFac); MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); NImport=0; for (j=0;j0) { //sort the export data such that all particles to be passed to thread j are together in ascending thread number qsort(FoFDataIn, nexport, sizeof(struct fofdata_in), fof_export_cmp); for (i=0;i0||nimport>0) { for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { nsend=mpi_nsend[sendTask * NProcs + recvTask]; nrecv=mpi_nsend[recvTask * NProcs + sendTask]; //calculate how many send/recvs are needed nsendchunks=ceil((double)nsend/(double)maxchunksize); nrecvchunks=ceil((double)nrecv/(double)maxchunksize); numsendrecv=max(nsendchunks,nrecvchunks); //initialize the offset in the particle array sendoffset=0; recvoffset=0; isendrecv=1; do { //determine amount to be sent cursendchunksize=min(maxchunksize,nsend-sendoffset); currecvchunksize=min(maxchunksize,nrecv-recvoffset); //blocking point-to-point send and receive. Here must determine the appropriate offset point in the local export buffer //for sending data and also the local appropriate offset in the local the receive buffer for information sent from the local receiving buffer //first send FOF data and then particle data MPI_Sendrecv(&FoFDataIn[noffset[recvTask]+sendoffset], cursendchunksize * sizeof(struct fofdata_in), MPI_BYTE, recvTask, TAG_FOF_A, &FoFDataGet[nbuffer[recvTask]+recvoffset], currecvchunksize * sizeof(struct fofdata_in), MPI_BYTE, recvTask, TAG_FOF_A, MPI_COMM_WORLD, &status); MPI_Sendrecv(&PartDataIn[noffset[recvTask]+sendoffset], cursendchunksize * sizeof(Particle), MPI_BYTE, recvTask, TAG_FOF_B, &PartDataGet[nbuffer[recvTask]+recvoffset], currecvchunksize * sizeof(Particle), MPI_BYTE, recvTask, TAG_FOF_B, MPI_COMM_WORLD, &status); MPISendReceiveHydroInfoBetweenThreads(opt, cursendchunksize, &PartDataIn[noffset[recvTask]+sendoffset], currecvchunksize, &PartDataGet[nbuffer[recvTask]+recvoffset], recvTask, TAG_FOF_B_HYDRO, mpi_comm); MPISendReceiveStarInfoBetweenThreads(opt, cursendchunksize, &PartDataIn[noffset[recvTask]+sendoffset], currecvchunksize, &PartDataGet[nbuffer[recvTask]+recvoffset], recvTask, TAG_FOF_B_STAR, mpi_comm); MPISendReceiveBHInfoBetweenThreads(opt, cursendchunksize, &PartDataIn[noffset[recvTask]+sendoffset], currecvchunksize, &PartDataGet[nbuffer[recvTask]+recvoffset], recvTask, TAG_FOF_B_BH, mpi_comm); MPISendReceiveExtraDMInfoBetweenThreads(opt, cursendchunksize, &PartDataIn[noffset[recvTask]+sendoffset], currecvchunksize, &PartDataGet[nbuffer[recvTask]+recvoffset], recvTask, TAG_FOF_B_EXTRA_DM, mpi_comm); sendoffset+=cursendchunksize; recvoffset+=currecvchunksize; isendrecv++; } while (isendrecv<=numsendrecv); } } } } } /*! Similar to \ref MPIBuildParticleExportList but uses mesh of swift to determine when mpi's to search */ void MPIBuildParticleExportListUsingMesh(Options &opt, const Int_t nbodies, Particle *Part, Int_t *&pfof, Int_tree_t *&Len, Double_t rdist){ Int_t i, j,nthreads,nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; Int_t sendTask,recvTask; MPI_Status status; MPI_Comm mpi_comm = MPI_COMM_WORLD; vectorsent_mpi_domain(NProcs); ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (j=0;j celllist=MPIGetCellListInSearchUsingMesh(opt,xsearch); for (auto j:celllist) { const int cellnodeID = opt.cellnodeids[j]; /// Only check if particles have overlap with neighbouring cells that are on another MPI domain and have not already been sent to if (sent_mpi_domain[cellnodeID] == 1) continue; FoFDataIn[nexport].Index = i; FoFDataIn[nexport].Task = cellnodeID; FoFDataIn[nexport].iGroup = pfof[Part[i].GetID()];//set group id FoFDataIn[nexport].iGroupTask = ThisTask;//and the task of the group FoFDataIn[nexport].iLen = Len[i]; nexport++; nsend_local[cellnodeID]++; sent_mpi_domain[cellnodeID]++; } } if (nexport>0) { //sort the export data such that all particles to be passed to thread j are together in ascending thread number qsort(FoFDataIn, nexport, sizeof(struct fofdata_in), fof_export_cmp); for (i=0;i= maxNumPart || nsend_local[recvTask] >= maxNumPart ) bufferFlag++; } } // Ensure bufferflag is the same over all ranks MPI_Allreduce(MPI_IN_PLACE, &bufferFlag, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD); //if buffer is too large, split sends if (bufferFlag) { MPI_Request rqst; int numBuffersToSend [NProcs]; int numBuffersToRecv [NProcs]; int numPartInBuffer = maxNumPart * 0.9; int maxnbufferslocal=0,maxnbuffers; for (j = 0; j < NProcs; j++) { numBuffersToSend[j] = 0; numBuffersToRecv[j] = 0; if (nsend_local[j] > 0) numBuffersToSend[j] = (nsend_local[j]/numPartInBuffer) + 1; } for (int i = 1; i < NProcs; i++) { int src = (ThisTask + NProcs - i) % NProcs; int dst = (ThisTask + i) % NProcs; MPI_Isend (&numBuffersToSend[dst], 1, MPI_INT, dst, 0, MPI_COMM_WORLD, &rqst); MPI_Recv (&numBuffersToRecv[src], 1, MPI_INT, src, 0, MPI_COMM_WORLD, &status); } MPI_Barrier (MPI_COMM_WORLD); //find max to be transfer, allows appropriate tagging of messages for (int i=0;imaxnbufferslocal) maxnbufferslocal=numBuffersToRecv[i]; for (int i=0;imaxnbufferslocal) maxnbufferslocal=numBuffersToSend[i]; MPI_Allreduce (&maxnbufferslocal, &maxnbuffers, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD); for (int i = 1; i < NProcs; i++) { int src = (ThisTask + NProcs - i) % NProcs; int dst = (ThisTask + i) % NProcs; Int_t size = numPartInBuffer; nbuffer[src] = 0; int buffOffset = 0; for (int jj = 0; jj < src; jj++) nbuffer[src] += mpi_nsend[ThisTask + jj*NProcs]; // Send Buffers for (int jj = 0; jj < numBuffersToSend[dst]-1; jj++) { MPI_Isend (&size, 1, MPI_Int_t, dst, (int)(jj+1), MPI_COMM_WORLD, &rqst); MPI_Isend (&FoFDataIn[noffset[dst] + buffOffset], sizeof(struct fofdata_in)*size, MPI_BYTE, dst, (int)(TAG_FOF_A*maxnbuffers+jj+1), MPI_COMM_WORLD, &rqst); MPI_Isend (&PartDataIn[noffset[dst] + buffOffset], sizeof(Particle)*size, MPI_BYTE, dst, (int)(TAG_FOF_B*maxnbuffers*3+jj+1), MPI_COMM_WORLD, &rqst); MPIISendHydroInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_HYDRO*maxnbuffers*3+jj+1), rqst); MPIISendStarInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_STAR*maxnbuffers*3+jj+1), rqst); MPIISendBHInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_BH*maxnbuffers*3+jj+1), rqst); MPIISendExtraDMInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_EXTRA_DM*maxnbuffers*3+jj+1), rqst); buffOffset += size; } size = nsend_local[dst] % numPartInBuffer; if (size > 0 && numBuffersToSend[dst] > 0) { MPI_Isend (&size, 1, MPI_Int_t, dst, (int)(numBuffersToSend[dst]), MPI_COMM_WORLD, &rqst); MPI_Isend (&FoFDataIn[noffset[dst] + buffOffset], sizeof(struct fofdata_in)*size, MPI_BYTE, dst, (int)(TAG_FOF_A*maxnbuffers+numBuffersToSend[dst]), MPI_COMM_WORLD, &rqst); MPI_Isend (&PartDataIn[noffset[dst] + buffOffset], sizeof(Particle)*size, MPI_BYTE, dst, (int)(TAG_FOF_B*maxnbuffers*3+numBuffersToSend[dst]), MPI_COMM_WORLD, &rqst); MPIISendHydroInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_HYDRO*maxnbuffers*3+numBuffersToSend[dst]), rqst); MPIISendStarInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_STAR*maxnbuffers*3+numBuffersToSend[dst]), rqst); MPIISendBHInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_BH*maxnbuffers*3+numBuffersToSend[dst]), rqst); MPIISendExtraDMInfo(opt, size, &PartDataIn[noffset[dst] + buffOffset], dst, (int)(TAG_FOF_B_EXTRA_DM*maxnbuffers*3+numBuffersToSend[dst]), rqst); } // Receive Buffers buffOffset = 0; for (int jj = 0; jj < numBuffersToRecv[src]; jj++) { Int_t numInBuffer = 0; MPI_Recv (&numInBuffer, 1, MPI_Int_t, src, (int)(jj+1), MPI_COMM_WORLD, &status); MPI_Recv (&FoFDataGet[nbuffer[src] + buffOffset], sizeof(struct fofdata_in)*numInBuffer, MPI_BYTE, src, (int)(TAG_FOF_A*maxnbuffers+jj+1), MPI_COMM_WORLD, &status); MPI_Recv (&PartDataGet[nbuffer[src] + buffOffset], sizeof(Particle)*numInBuffer, MPI_BYTE, src, (int)(TAG_FOF_B*maxnbuffers*3+jj+1), MPI_COMM_WORLD, &status); MPIReceiveHydroInfo(opt, numInBuffer, &PartDataGet[nbuffer[src] + buffOffset], src, (int)(TAG_FOF_B_HYDRO*maxnbuffers*3+jj+1)); MPIReceiveStarInfo(opt, numInBuffer, &PartDataGet[nbuffer[src] + buffOffset], src, (int)(TAG_FOF_B_STAR*maxnbuffers*3+jj+1)); MPIReceiveBHInfo(opt, numInBuffer, &PartDataGet[nbuffer[src] + buffOffset], src, (int)(TAG_FOF_B_HYDRO*maxnbuffers*3+jj+1)); MPIReceiveExtraDMInfo(opt, numInBuffer, &PartDataGet[nbuffer[src] + buffOffset], src, (int)(TAG_FOF_B_HYDRO*maxnbuffers*3+jj+1)); buffOffset += numInBuffer; } } } else { if (nexport>0||nimport>0) { for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //blocking point-to-point send and receive. Here must determine the appropriate offset point in the local export buffer //for sending data and also the local appropriate offset in the local the receive buffer for information sent from the local receiving buffer //first send FOF data and then particle data MPI_Sendrecv(&FoFDataIn[noffset[recvTask]], nsend_local[recvTask] * sizeof(struct fofdata_in), MPI_BYTE, recvTask, TAG_FOF_A, &FoFDataGet[nbuffer[recvTask]], mpi_nsend[ThisTask+recvTask * NProcs] * sizeof(struct fofdata_in), MPI_BYTE, recvTask, TAG_FOF_A, MPI_COMM_WORLD, &status); MPI_Sendrecv(&PartDataIn[noffset[recvTask]], nsend_local[recvTask] * sizeof(Particle), MPI_BYTE, recvTask, TAG_FOF_B, &PartDataGet[nbuffer[recvTask]], mpi_nsend[ThisTask+recvTask * NProcs] * sizeof(Particle), MPI_BYTE, recvTask, TAG_FOF_B, MPI_COMM_WORLD, &status); MPISendReceiveHydroInfoBetweenThreads(opt, nsend_local[recvTask], &PartDataIn[noffset[recvTask]], mpi_nsend[ThisTask+recvTask * NProcs], &PartDataGet[nbuffer[recvTask]], recvTask, TAG_FOF_B_HYDRO, mpi_comm); MPISendReceiveStarInfoBetweenThreads(opt, nsend_local[recvTask], &PartDataIn[noffset[recvTask]], mpi_nsend[ThisTask+recvTask * NProcs], &PartDataGet[nbuffer[recvTask]], recvTask, TAG_FOF_B_STAR, mpi_comm); MPISendReceiveBHInfoBetweenThreads(opt, nsend_local[recvTask], &PartDataIn[noffset[recvTask]], mpi_nsend[ThisTask+recvTask * NProcs], &PartDataGet[nbuffer[recvTask]], recvTask, TAG_FOF_B_BH, mpi_comm); MPISendReceiveExtraDMInfoBetweenThreads(opt, nsend_local[recvTask], &PartDataIn[noffset[recvTask]], mpi_nsend[ThisTask+recvTask * NProcs], &PartDataGet[nbuffer[recvTask]], recvTask, TAG_FOF_B_EXTRA_DM, mpi_comm); } } } } } } /*! like \ref MPIGetExportNum but number based on NN search, useful for reducing memory costs at the expense of cpu cycles */ void MPIGetNNExportNum(const Int_t nbodies, Particle *Part, Double_t *rdist){ Int_t i, j,nthreads,nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; Int_t sendTask,recvTask; MPI_Status status; int indomain; ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (j=0;jsent_mpi_domain(NProcs); ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (j=0;j celllist=MPIGetCellListInSearchUsingMesh(opt,xsearch); for (auto j:celllist) { const int cellnodeID = opt.cellnodeids[j]; /// Only check if particles have overlap with neighbouring cells that are on another MPI domain and have not already been sent to if (sent_mpi_domain[cellnodeID] == 1) continue; nexport++; nsend_local[cellnodeID]++; sent_mpi_domain[cellnodeID]++; } } //and then gather the number of particles to be sent from mpi thread m to mpi thread n in the mpi_nsend[NProcs*NProcs] array via [n+m*NProcs] MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); NImport=0; for (j=0;j0) qsort(NNDataIn, nexport, sizeof(struct nndata_in), nn_export_cmp); if (nexport>0) { std::sort(NNDataIn, NNDataIn + nexport, [](const nndata_in &a, const nndata_in &b) { return a.ToTask < b.ToTask; }); } //then store the offset in the export particle data for the jth Task in order to send data. for(j = 1, noffset[0] = 0; j < NProcs; j++) noffset[j]=noffset[j-1] + nsend_local[j-1]; //and then gather the number of particles to be sent from mpi thread m to mpi thread n in the mpi_nsend[NProcs*NProcs] array via [n+m*NProcs] MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); //now send the data. ///\todo In determination of particle export, eventually need to place a check for the communication buffer so that if exported number ///is larger than the size of the buffer, iterate over the number exported //if either sending or receiving then run this process for (j=0;j0||nimport>0) { for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } } } /*! like \ref MPIBuildParticleExportList but each particle has a different distance stored in rdist used to find nearest neighbours */ void MPIBuildParticleNNExportListUsingMesh(Options &opt, const Int_t nbodies, Particle *Part, Double_t *rdist){ Int_t i, j,nthreads,nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; Int_t sendTask,recvTask; MPI_Status status; int indomain; vectorsent_mpi_domain(NProcs); ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (j=0;j celllist=MPIGetCellListInSearchUsingMesh(opt,xsearch); for (auto j:celllist) { const int cellnodeID = opt.cellnodeids[j]; /// Only check if particles have overlap with neighbouring cells that are on another MPI domain and have not already been sent to if (sent_mpi_domain[cellnodeID] == 1) continue; //NNDataIn[nexport].Index=i; NNDataIn[nexport].ToTask=cellnodeID; NNDataIn[nexport].FromTask=ThisTask; NNDataIn[nexport].R2=rdist[i]*rdist[i]; //NNDataIn[nexport].V2=vdist2[i]; for (int k=0;k<3;k++) { NNDataIn[nexport].Pos[k]=Part[i].GetPosition(k); NNDataIn[nexport].Vel[k]=Part[i].GetVelocity(k); } nexport++; nsend_local[cellnodeID]++; sent_mpi_domain[cellnodeID]++; } } //sort the export data such that all particles to be passed to thread j are together in ascending thread number //if (nexport>0) qsort(NNDataIn, nexport, sizeof(struct nndata_in), nn_export_cmp); if (nexport>0) { std::sort(NNDataIn, NNDataIn + nexport, [](const nndata_in &a, const nndata_in &b) { return a.ToTask < b.ToTask; }); } //then store the offset in the export particle data for the jth Task in order to send data. for(j = 1, noffset[0] = 0; j < NProcs; j++) noffset[j]=noffset[j-1] + nsend_local[j-1]; //and then gather the number of particles to be sent from mpi thread m to mpi thread n in the mpi_nsend[NProcs*NProcs] array via [n+m*NProcs] MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); //now send the data. ///\todo In determination of particle export, eventually need to place a check for the communication buffer so that if exported number ///is larger than the size of the buffer, iterate over the number exported //if either sending or receiving then run this process for (j=0;j0||nimport>0) { for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //blocking point-to-point send and receive. Here must determine the appropriate offset point in the local export buffer //for sending data and also the local appropriate offset in the local the receive buffer for information sent from the local receiving buffer MPI_Sendrecv(&NNDataIn[noffset[recvTask]], nsend_local[recvTask] * sizeof(struct nndata_in), MPI_BYTE, recvTask, TAG_NN_A, &NNDataGet[nbuffer[recvTask]], mpi_nsend[ThisTask+recvTask * NProcs] * sizeof(struct nndata_in), MPI_BYTE, recvTask, TAG_NN_A, MPI_COMM_WORLD, &status); } } } } MPI_Barrier(MPI_COMM_WORLD); } /*! Mirror to \ref MPIGetNNExportNum, use exported particles, run ball search to find number of all local particles that need to be imported back to exported particle's thread so that a proper NN search can be made. */ void MPIGetNNImportNum(const Int_t nbodies, KDTree *tree, Particle *Part, int iallflag){ Int_t i, j,nthreads,nexport=0,ncount; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Int_t oldnsend[NProcs*NProcs]; Double_t xsearch[3][2]; bool *iflagged = new bool[nbodies]; vector taggedindex; nthreads=1; Int_t sendTask,recvTask; MPI_Status status; #ifdef USEOPENMP #pragma omp parallel { if (omp_get_thread_num()==0) nthreads=omp_get_num_threads(); } #endif for(j=0;jSearchBallPosTagged(NNDataGet[i].Pos, NNDataGet[i].R2); if (taggedindex.size()==0) continue; for (auto &index:taggedindex) { if (iflagged[index]) continue; #ifdef STRUCDEN if (iallflag==0 && Part[index].GetType()<0) continue; #endif nexport++; nsend_local[j]++; } for (auto &index:taggedindex) iflagged[index]=true; } } delete[] iflagged; //must store old mpi nsend for accessing NNDataGet properly. for (j=0;j taggedindex; nthreads=1; int sendTask,recvTask; int maxchunksize=2147483648/NProcs/sizeof(Particle); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendoffset,recvoffset; int cursendchunksize,currecvchunksize; MPI_Status status; MPI_Comm mpi_comm = MPI_COMM_WORLD; #ifdef USEOPENMP #pragma omp parallel { if (omp_get_thread_num()==0) nthreads=omp_get_num_threads(); } #endif for(j=0;jSearchBallPosTagged(NNDataGet[i].Pos, NNDataGet[i].R2); if (taggedindex.size()==0) continue; for (auto &index:taggedindex) { if (iflagged[index]) continue; iflagged[index] = true; #ifdef STRUCDEN if (iallflag==0 && Part[index].GetType()<0) continue; #endif PartDataIn[nexport]=Part[index]; nexport++; nsend_local[j]++; } } } delete[] iflagged; //sort the export data such that all particles to be passed to thread j are together in ascending thread number //qsort(NNDataReturn, nexport, sizeof(struct nndata_in), nn_export_cmp); //now if there is extra information, strip off the data from the particles to be sent to store in a separate buffer //and store it explicitly into a buffer. Here are the buffers vector indices_gas_send; vector propbuff_gas_send; vector indices_star_send; vector propbuff_star_send; vector indices_bh_send; vector propbuff_bh_send; vector indices_extra_dm_send; vector propbuff_extra_dm_send; //if no information stored in the extra fields will be used, then just remove it before sending //now if this is not called for SO calculations, assume that particles do not need to be exported //with extra information if (!iSOcalc) { MPIStripExportParticleOfExtraInfo(opt, nexport, PartDataIn); } //then store the offset in the export particle data for the jth Task in order to send data. for(j = 1, noffset[0] = 0; j < NProcs; j++) noffset[j]=noffset[j-1] + nsend_local[j-1]; //and then gather the number of particles to be sent from mpi thread m to mpi thread n in the mpi_nsend[NProcs*NProcs] array via [n+m*NProcs] MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); //now send the data. for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } ncount=0;for (int k=0;k MPIGetHaloSearchExportNum(const Int_t ngroup, PropData *&pdata, vector &rdist) { Int_t i,j,nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; Int_t sendTask,recvTask; MPI_Status status; int indomain; vector halooverlap(ngroup+1); ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (j=0;j MPIGetHaloSearchExportNumUsingMesh(Options &opt, const Int_t ngroup, PropData *&pdata, vector &rdist) { Int_t nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; Int_t sendTask,recvTask; MPI_Status status; int indomain; vector halooverlap(ngroup+1); vectorsent_mpi_domain(NProcs); ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (auto j=0;j celllist=MPIGetCellListInSearchUsingMesh(opt,xsearch); for (auto j:celllist) { const int cellnodeID = opt.cellnodeids[j]; /// Only check if particles have overlap with neighbouring cells that are on another MPI domain and have not already been sent to if (sent_mpi_domain[cellnodeID] == 1) continue; nexport++; nsend_local[cellnodeID]++; halooverlap[i]=true; sent_mpi_domain[cellnodeID]++; } } //and then gather the number of particles to be sent from mpi thread m to mpi thread n in the mpi_nsend[NProcs*NProcs] array via [n+m*NProcs] MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); NImport=0; for (auto j=0;j &rdist, vector &halooverlap) { Int_t i, j,nthreads,nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; MPI_Status status; int indomain; int sendTask,recvTask; int maxchunksize=2147483648/NProcs/sizeof(nndata_in); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendoffset,recvoffset; int cursendchunksize,currecvchunksize; ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (j=0;j0) qsort(NNDataIn, nexport, sizeof(struct nndata_in), nn_export_cmp); //then store the offset in the export data for the jth Task in order to send data. for(j = 1, noffset[0] = 0; j < NProcs; j++) noffset[j]=noffset[j-1] + nsend_local[j-1]; //and then gather the number of items to be sent from mpi thread m to mpi thread n in the mpi_nsend[NProcs*NProcs] array via [n+m*NProcs] MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); //now send the data. for (j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } } /*! similar \ref MPIBuildHaloSearchExportList but for swift mesh mpi decomposition */ void MPIBuildHaloSearchExportListUsingMesh(Options &opt, const Int_t ngroup, PropData *&pdata, vector &rdist, vector &halooverlap) { Int_t nthreads,nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Double_t xsearch[3][2]; MPI_Status status; int indomain; int sendTask,recvTask; int maxchunksize=2147483648/NProcs/sizeof(nndata_in); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendoffset,recvoffset; int cursendchunksize,currecvchunksize; vectorsent_mpi_domain(NProcs); ///\todo would like to add openmp to this code. In particular, loop over nbodies but issue is nexport. ///This would either require making a FoFDataIn[nthreads][NExport] structure so that each omp thread ///can only access the appropriate memory and adjust nsend_local.\n ///\em Or outer loop is over threads, inner loop over nbodies and just have a idlist of size Nlocal that tags particles ///which must be exported. Then its a much quicker follow up loop (no if statement) that stores the data for (auto j=0;j celllist=MPIGetCellListInSearchUsingMesh(opt,xsearch); for (auto j:celllist) { const int cellnodeID = opt.cellnodeids[j]; /// Only check if particles have overlap with neighbouring cells that are on another MPI domain and have not already been sent to if (sent_mpi_domain[cellnodeID] == 1) continue; //NNDataIn[nexport].Index=i; NNDataIn[nexport].ToTask=cellnodeID; NNDataIn[nexport].FromTask=ThisTask; NNDataIn[nexport].R2=rdist[i]*rdist[i]; //NNDataIn[nexport].V2=vdist2[i]; for (int k=0;k<3;k++) { NNDataIn[nexport].Pos[k]=pdata[i].gcm[k]; } nexport++; nsend_local[cellnodeID]++; sent_mpi_domain[cellnodeID]++; } } //sort the export data such that all particles to be passed to thread j are together in ascending thread number if (nexport>0) qsort(NNDataIn, nexport, sizeof(struct nndata_in), nn_export_cmp); //then store the offset in the export data for the jth Task in order to send data. noffset[0] = 0; for(auto j = 1; j < NProcs; j++) noffset[j]=noffset[j-1] + nsend_local[j-1]; //and then gather the number of items to be sent from mpi thread m to mpi thread n in the mpi_nsend[NProcs*NProcs] array via [n+m*NProcs] MPI_Allgather(nsend_local, NProcs, MPI_Int_t, mpi_nsend, NProcs, MPI_Int_t, MPI_COMM_WORLD); //now send the data. for (auto j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } } /*! Mirror to \ref MPIGetHaloSearchExportNum, use exported positions, run ball search to find number of all local particles that need to be imported back to exported positions's thread so that a proper search can be made. */ void MPIGetHaloSearchImportNum(const Int_t nbodies, KDTree *tree, Particle *Part) { Int_t i, j,nthreads,nexport=0,ncount; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; Int_t oldnsend[NProcs*NProcs]; Double_t xsearch[3][2]; Int_t *nn=new Int_t[nbodies]; Double_t *nnr2=new Double_t[nbodies]; nthreads=1; Int_t sendTask,recvTask; MPI_Status status; #ifdef USEOPENMP #pragma omp parallel { if (omp_get_thread_num()==0) nthreads=omp_get_num_threads(); } #endif for(j=0;jSearchBallPos(NNDataGet[i].Pos, NNDataGet[i].R2, j, nn, nnr2); } for (i=0;iSearchBallPos(NNDataGet[i].Pos, NNDataGet[i].R2, j, nn, nnr2); } for (i=0;i 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } ncount=0;for (int k=0;k0) { //sort the export data such that all particles to be passed to thread j are together in ascending thread number qsort(FoFDataIn, nexport, sizeof(struct fofdata_in), fof_export_cmp); for (i=0;i0||nimport>0) { for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } } } //@} /// \name FOF related mpi routines //@{ ///Set fof task id of particle short_mpi_t* MPISetTaskID(const Int_t nbodies){ short_mpi_t *foftask=new short_mpi_t[nbodies]; for (Int_t i=0;iPush(j,mpi_nlocal[j]); Int_t rankorder[NProcs]; for (int j=0;jTopQueue();pq->Pop();} Int_t offset=0; for (int j=0;j0) pfof[i]+=offset; mpi_maxgid=0; for (int j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } } /*! This routine searches the local particle list using the positions of the exported particles to see if any local particles met the linking criterion and any other FOF criteria of said exported particle. If that is the case, then the group id of the local particle and all other particles that belong to the same group are adjusted if the group id of the exported particle is smaller. This routine returns the number of links found between the local particles and all other exported particles from all other mpi domains. \todo need to update lengths if strucden flag used to limit particles for which real velocity density calculated */ Int_t MPILinkAcross(const Int_t nbodies, KDTree *&tree, Particle *Part, Int_t *&pfof, Int_tree_t *&Len, Int_tree_t *&Head, Int_tree_t *&Next, Double_t rdist2){ Int_t i,j,k; Int_t links=0; Int_t nbuffer[NProcs]; Int_t *nn=new Int_t[nbodies]; Int_t nt,ss,oldlen; Coordinate x; for (i=0;iSearchBallPosTagged(x, rdist2, nn); for (Int_t ii=0;ii PartDataGet[i].GetPID()) { pfof[Part[k].GetID()]=mpi_maxgid+mpi_gidoffset;///some unique identifier based on this task mpi_gidoffset++;//increase unique identifier Len[k]=1; mpi_foftask[Part[k].GetID()]=FoFDataGet[i].iGroupTask; links++; } //if the local particle does belong to a group, let the task from which the imported particle came from //handle the change } //if imported particle has already been linked else { //check to see if local particle has already been linked if (pfof[Part[Head[k]].GetID()]>0) { //as iGroups and pfof have been rank ordered globally //proceed to link local particle to imported particle if //its group id is larger if(pfof[Part[Head[k]].GetID()] > FoFDataGet[i].iGroup) { ss = Head[k]; oldlen=Len[k]; do{ pfof[Part[ss].GetID()]=FoFDataGet[i].iGroup; mpi_foftask[Part[ss].GetID()]=FoFDataGet[i].iGroupTask; Len[ss]=FoFDataGet[i].iLen+oldlen; }while((ss = Next[ss]) >= 0); FoFDataGet[i].iLen+=oldlen; ss = Head[k]; links++; } //otherwise, let the task from which this imported particle came from //handle the change } //if not in local group, add the particle to the imported particles group else { pfof[Part[k].GetID()]=FoFDataGet[i].iGroup; Len[k]=FoFDataGet[i].iLen; mpi_foftask[Part[k].GetID()]=FoFDataGet[i].iGroupTask; FoFDataGet[i].iLen+=1; links++; } } } } delete[] nn; return links; } ///link particles belonging to the same group across mpi domains using comparison function Int_t MPILinkAcross(const Int_t nbodies, KDTree *&tree, Particle *Part, Int_t *&pfof, Int_tree_t *&Len, Int_tree_t *&Head, Int_tree_t *&Next, Double_t rdist2, FOFcompfunc &cmp, Double_t *params){ Int_t i,j,k; Int_t links=0; Int_t nbuffer[NProcs]; Int_t *nn=new Int_t[nbodies]; Int_t nt; for (i=0;iSearchCriterionTagged(PartDataGet[i], cmp, params, nn); for (Int_t ii=0;ii PartDataGet[i].GetPID()) { pfof[Part[k].GetID()]=mpi_maxgid+mpi_gidoffset;///some unique identifier based on this task mpi_gidoffset++;//increase unique identifier Len[k]=1; mpi_foftask[Part[k].GetID()]=FoFDataGet[i].iGroupTask; links++; } } else { if (pfof[Part[Head[k]].GetID()]>0) { if(pfof[Part[Head[k]].GetID()] > FoFDataGet[i].iGroup) { Int_t ss = Head[k]; do{ pfof[Part[ss].GetID()]=FoFDataGet[i].iGroup; mpi_foftask[Part[ss].GetID()]=FoFDataGet[i].iGroupTask; Len[ss]=FoFDataGet[i].iLen; }while((ss = Next[ss]) >= 0); ss = Head[k]; links++; } } else { pfof[Part[k].GetID()]=FoFDataGet[i].iGroup; Len[k]=FoFDataGet[i].iLen; mpi_foftask[Part[k].GetID()]=FoFDataGet[i].iGroupTask; links++; } } } } delete[] nn; return links; } ///link particles belonging to the same group across mpi domains given a type check function Int_t MPILinkAcross(const Int_t nbodies, KDTree *&tree, Particle *Part, Int_t *&pfof, Int_tree_t *&Len, Int_tree_t *&Head, Int_tree_t *&Next, Double_t rdist2, FOFcheckfunc &check, Double_t *params){ Int_t i,j,k; Int_t links=0; Int_t nbuffer[NProcs]; Int_t *nn=new Int_t[nbodies]; Int_t nt; bool iflag; Coordinate x; for (i=0;iSearchBallPosTagged(x, rdist2, nn); for (Int_t ii=0;ii0) { //only change if both particles are appropriate type and group ids indicate local needs to be exported if (!(check(Part[k],params)==0 && check(PartDataGet[i],params)==0)) continue; if(pfof[Part[Head[k]].GetID()] > FoFDataGet[i].iGroup) { Int_t ss = Head[k]; do{ pfof[Part[ss].GetID()]=FoFDataGet[i].iGroup; mpi_foftask[Part[ss].GetID()]=FoFDataGet[i].iGroupTask; Len[ss]=FoFDataGet[i].iLen; }while((ss = Next[ss]) >= 0); ss = Head[k]; links++; } } //if local particle not in a group and export is appropriate type, link else { if (check(PartDataGet[i],params)!=0) continue; pfof[Part[k].GetID()]=FoFDataGet[i].iGroup; Len[k]=FoFDataGet[i].iLen; mpi_foftask[Part[k].GetID()]=FoFDataGet[i].iGroupTask; links++; } } } delete[] nn; return links; } /*! Group particles belong to a group to a particular mpi thread so that locally easy to determine the maximum group size and reoder the group ids according to descending group size. return the new local number of particles */ Int_t MPIGroupExchange(Options &opt, const Int_t nbodies, Particle *Part, Int_t *&pfof){ Int_t i, j,nthreads,nexport,nimport,nlocal,n; Int_t nsend_local[NProcs],noffset_import[NProcs],noffset_export[NProcs],nbuffer[NProcs]; int recvTask; int maxchunksize=2147483648/NProcs/sizeof(fofid_in); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendoffset,recvoffset; int cursendchunksize,currecvchunksize; MPI_Status status; MPI_Comm mpi_comm = MPI_COMM_WORLD; int task; FoFGroupDataExport=NULL; FoFGroupDataLocal=NULL; for (j=0;j0) FoFGroupDataExport=new fofid_in[nexport]; Int_t *storeval=new Int_t[nbodies]; Noldlocal=nbodies; //store type in temporary array, then use type to store what task particle belongs to and sort values for (i=0;i0) FoFGroupDataLocal=new fofid_in[nimport]; delete[] storeval; //determine offsets in arrays so that data contiguous with regards to processors for broadcasting //offset on transmitter end noffset_export[0]=0; for (j=1;j indices_gas_send; vector propbuff_gas_send; vector indices_star_send; vector propbuff_star_send; vector indices_bh_send; vector propbuff_bh_send; vector indices_extra_dm_send; vector propbuff_extra_dm_send; //now send the data. for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } Nlocal=nlocal; return nlocal; } /*! The baryon equivalent of \ref MPIGroupExchange. Here assume baryons are searched afterwards */ Int_t MPIBaryonGroupExchange(Options &opt, const Int_t nbodies, Particle *Part, Int_t *&pfof){ Int_t i, j,nthreads,nexport,nimport,nlocal,n; Int_t nsend_local[NProcs],noffset_import[NProcs],noffset_export[NProcs],nbuffer[NProcs]; int sendTask,recvTask; int maxchunksize=2147483648/NProcs/sizeof(fofid_in); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendoffset,recvoffset; int cursendchunksize,currecvchunksize; MPI_Status status; int task; FoFGroupDataExport=NULL; FoFGroupDataLocal=NULL; for (j=0;j0) FoFGroupDataExport=new fofid_in[nexport]; else FoFGroupDataExport=new fofid_in[1]; Nmemlocalbaryon=Nlocalbaryon[0]; for (i=0;i0) FoFGroupDataLocal=new fofid_in[nimport]; } //otherwise use FoFGroupDataLocal to store all the necessary data else { FoFGroupDataLocal=new fofid_in[nlocal]; for (i=0;i 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } Nlocalbaryon[0]=nlocal; return nlocal; } ///Determine the local number of groups and their sizes (groups must be local to an mpi thread) Int_t MPICompileGroups(Options &opt, const Int_t nbodies, Particle *Part, Int_t *&pfof, Int_t minsize){ Int_t i,j,start,ngroups; Int_t *numingroup,*groupid,**plist; ngroups=0; for (i=Noldlocal-NExport;i=0) break; } //again resort to move untagged particles to the end. qsort(Part,nbodies,sizeof(Particle),IDCompare); for (i=nbodies-NExport; i< nbodies; i++) Part[i].SetID(0); //now adjust pfof and ids. for (i=0;inbodies) { for (i=Noldlocal;i0) delete[] plist[i]; delete[] plist; delete[] numingroup; //broadcast number of groups so that ids can be properly offset MPI_Allgather(&ngroups, 1, MPI_Int_t, mpi_ngroups, 1, MPI_Int_t, MPI_COMM_WORLD); if(FoFGroupDataLocal!=NULL) delete[] FoFGroupDataLocal; if(FoFGroupDataExport!=NULL) delete[] FoFGroupDataExport; return ngroups; } ///Determine which exported dm particle is closest in phase-space to a local baryon particle and assign that particle to the group of that dark matter particle if is closest particle Int_t MPISearchBaryons(const Int_t nbaryons, Particle *&Pbaryons, Int_t *&pfofbaryons, Int_t *numingroup, Double_t *localdist, Int_t nsearch, Double_t *param, Double_t *period) { Double_t D2, dval, rval; Coordinate x1; Particle p1; Int_t i, j, k, pindex,nexport=0; int tid; FOFcompfunc fofcmp=FOF6d; Int_t *nnID; Double_t *dist2; if (NImport>0) { //now dark matter particles associated with a group existing on another mpi domain are local and can be searched. KDTree *mpitree=new KDTree(PartDataGet,NImport,nsearch/2,mpitree->TPHYS,mpitree->KEPAN,100,0,0,0,period); if (nsearch>NImport) nsearch=NImport; #ifdef USEOPENMP #pragma omp parallel default(shared) \ private(i,j,k,tid,p1,pindex,x1,D2,dval,rval,nnID,dist2) { nnID=new Int_t[nsearch]; dist2=new Double_t[nsearch]; #pragma omp for reduction(+:nexport) #endif for (i=0;iFindNearestPos(x1, nnID, dist2,nsearch); if (dist2[0]D2) {dval=D2;pfofbaryons[i]=FoFDataGet[pindex].iGroup;rval=dist2[j];mpi_foftask[i]=FoFDataGet[pindex].iGroupTask;} } } } } nexport+=(mpi_foftask[i]!=ThisTask); } delete[] nnID; delete[] dist2; #ifdef USEOPENMP } #endif } return nexport; } Int_t MPIBaryonExchange(Options &opt, const Int_t nbaryons, Particle *Pbaryons, Int_t *pfofbaryons){ Int_t i, j,nthreads,nexport,nimport,nlocal,n; Int_t nsend_local[NProcs],noffset_import[NProcs],noffset_export[NProcs],nbuffer[NProcs]; int sendTask,recvTask; int maxchunksize=2147483648/NProcs/sizeof(fofid_in); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendoffset,recvoffset; int cursendchunksize,currecvchunksize; MPI_Status status; int task; //initial containers to send info across threads FoFGroupDataExport=NULL; FoFGroupDataLocal=NULL; MPI_Barrier(MPI_COMM_WORLD); //first determine how big a local array is needed to store tagged baryonic particles for (j=0;j0) FoFGroupDataLocal=new fofid_in[nimport]; } //otherwise use FoFGroupDataLocal to store all the necessary data else { FoFGroupDataLocal=new fofid_in[nlocal]; for (i=0;i 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } Nlocalbaryon[0]=nlocal; return nlocal; } //@} /// \name FOF routines related to modifying group ids //@{ ///This alters the group ids by an offset determined by the number of groups on all previous mpi threads so that the group ///has a unique group id. Prior to this, group ids are determined locally. inline void MPIAdjustGroupIDs(const Int_t nbodies,Int_t *pfof) { Int_t noffset=0; for (int j=0;j0) pfof[i]+=noffset; } ///Collect FOF from all void MPICollectFOF(const Int_t nbodies, Int_t *&pfof){ Int_t sendTask,recvTask; MPI_Status status; //if using mpi, offset the pfof so that group ids are no unique, before just local to thread MPIAdjustGroupIDs(Nlocal,pfof); //now send the data from all MPI threads to thread zero //first must send how much data is local to a processor Int_t nsend_local[NProcs]; for (int i=0;iTask < (((struct fofdata_in *) b)->Task)) return -1; if(((struct fofdata_in *) a)->Task > (((struct fofdata_in *) b)->Task)) return +1; return 0; } ///comprasion function used to sort particles for export so that all particles being exported to the same processor are in a contiguous block and well ordered int nn_export_cmp(const void *a, const void *b) { if(((struct nndata_in *) a)->ToTask < (((struct nndata_in *) b)->ToTask)) return -1; if(((struct nndata_in *) a)->ToTask > (((struct nndata_in *) b)->ToTask)) return +1; return 0; } ///comprasion function used to sort grouped particles so that easy to determine total number of groups locally, size of groups, etc. int fof_id_cmp(const void *a, const void *b) { if(((struct fofid_in *) a)->iGroup > (((struct fofid_in *) b)->iGroup)) return -1; if(((struct fofid_in *) a)->iGroup < (((struct fofid_in *) b)->iGroup)) return +1; if(((struct fofid_in *) a)->p.GetType() < (((struct fofid_in *) b)->p.GetType())) return -1; if(((struct fofid_in *) a)->p.GetType() > (((struct fofid_in *) b)->p.GetType())) return +1; if(((struct fofid_in *) a)->p.GetID() < (((struct fofid_in *) b)->p.GetID())) return -1; if(((struct fofid_in *) a)->p.GetID() > (((struct fofid_in *) b)->p.GetID())) return +1; return 0; } //@} ///\name mesh MPI decomposition related functions //@{ vector MPIGetCellListInSearchUsingMesh(Options &opt, Double_t xsearch[3][2], bool ignorelocalcells) { int ixstart,iystart,izstart,ixend,iyend,izend,index; vector celllist; ixstart=floor(xsearch[0][0]*opt.icellwidth[0]); ixend=floor(xsearch[0][1]*opt.icellwidth[0]); iystart=floor(xsearch[1][0]*opt.icellwidth[1]); iyend=floor(xsearch[1][1]*opt.icellwidth[1]); izstart=floor(xsearch[2][0]*opt.icellwidth[2]); izend=floor(xsearch[2][1]*opt.icellwidth[2]); for (auto ix=ixstart;ix<=ixend;ix++){ for (auto iy=iystart;iy<=iyend;iy++){ for (auto iz=izstart;iz<=izend;iz++){ index=0; if (iz<0) index+=opt.numcellsperdim+iz; else if (iz>=opt.numcellsperdim) index+=iz-opt.numcellsperdim; else index+=iz; if (iy<0) index+=(opt.numcellsperdim+iy)*opt.numcellsperdim; else if (iy>=opt.numcellsperdim) index+=(iy-opt.numcellsperdim)*opt.numcellsperdim; else index+=iy*opt.numcellsperdim; if (ix<0) index+=(opt.numcellsperdim+ix)*opt.numcellsperdim*opt.numcellsperdim; else if (ix>=opt.numcellsperdim) index+=(ix-opt.numcellsperdim)*opt.numcellsperdim*opt.numcellsperdim; else index+=ix*opt.numcellsperdim*opt.numcellsperdim; if (ignorelocalcells && opt.cellnodeids[index]==ThisTask) continue; celllist.push_back(index); } } } return celllist; } //@} //@{ ///Find local particle that originated from foreign swift tasks #ifdef SWIFTINTERFACE void MPISwiftExchange(vector &Part){ Int_t nbodies = Part.size(); Int_t i, j, nexport=0,nimport=0; Int_t nsend_local[NProcs],noffset[NProcs],nbuffer[NProcs]; int sendTask,recvTask; int maxchunksize=2147483648/NProcs/sizeof(Particle); int nsend,nrecv,nsendchunks,nrecvchunks,numsendrecv; int sendoffset,recvoffset; int cursendchunksize,currecvchunksize; MPI_Status status; Particle *PartBufSend, *PartBufRecv; for (j=0;j0) { PartBufSend=new Particle[nexport]; for (i=0;i 0) PartBufRecv = new Particle[nimport]; //now send the data. ///\todo In determination of particle export, eventually need to place a check for the communication buffer so that if exported number ///is larger than the size of the buffer, iterate over the number exported //if either sending or receiving then run this process if (nexport>0||nimport>0) { for(j=0;j 0 || mpi_nsend[recvTask * NProcs + ThisTask] > 0) { //send info in loops to minimize memory footprint cursendchunksize=currecvchunksize=maxchunksize; nsendchunks=ceil(mpi_nsend[recvTask+ThisTask*NProcs]/(Double_t)maxchunksize); nrecvchunks=ceil(mpi_nsend[ThisTask+recvTask*NProcs]/(Double_t)maxchunksize); if (cursendchunksize>mpi_nsend[recvTask+ThisTask*NProcs]) { nsendchunks=1; cursendchunksize=mpi_nsend[recvTask+ThisTask*NProcs]; } if (currecvchunksize>mpi_nsend[ThisTask+recvTask*NProcs]) { nrecvchunks=1; currecvchunksize=mpi_nsend[ThisTask+recvTask*NProcs]; } numsendrecv=max(nsendchunks,nrecvchunks); sendoffset=recvoffset=0; for (auto ichunk=0;ichunkmpi_nsend[recvTask+ThisTask * NProcs]-sendoffset)cursendchunksize=mpi_nsend[recvTask+ThisTask * NProcs]-sendoffset; if (currecvchunksize>mpi_nsend[ThisTask+recvTask * NProcs]-sendoffset)currecvchunksize=mpi_nsend[ThisTask+recvTask * NProcs]-recvoffset; } } } } } MPI_Barrier(MPI_COMM_WORLD); Part.resize(nbodies-nexport+nimport); if (nexport > 0) delete[] PartBufSend; if (nimport > 0) { for (i=0;i