25 #include <TGeoManager.h>
27 #include "Math/MinimizerOptions.h"
28 #include "TBranchRef.h"
29 #include "TInterpreter.h"
30 #include "TMinuitMinimizer.h"
33 #include "TRestManager.h"
34 #include "TRestThread.h"
42 std::mutex mutex_write;
45 #ifdef TIME_MEASUREMENT
47 using namespace std::chrono;
51 high_resolution_clock::time_point tS, tE;
55 Long64_t bytesReadLast = 0;
56 Double_t progressLast = 0;
57 Int_t progressLastPrinted = 0;
58 vector<Long64_t> bytesAdded(ncalculated, 0);
59 vector<Double_t> progAdded(ncalculated, 0);
60 int positionCalculated = 0;
61 int printInterval = 200000;
62 int inputTreeEntries = 0;
66 TRestProcessRunner::TRestProcessRunner() { Initialize(); }
68 TRestProcessRunner::~TRestProcessRunner() {}
75 fInputEvent =
nullptr;
76 fOutputEvent =
nullptr;
78 fAnalysisTree =
nullptr;
79 fOutputDataFile =
nullptr;
80 fOutputDataFileName =
"";
91 fProcStatus = kNormal;
92 fFileSplitSize = 10000000000LL;
97 fValidateObservables =
false;
98 fSortOutputEvents =
true;
99 fInputAnalysisStorage =
true;
100 fInputEventStorage =
true;
101 fOutputEventStorage =
true;
102 fOutputAnalysisStorage =
true;
115 RESTInfo << RESTendl;
116 if (fHostmgr !=
nullptr) {
117 fRunInfo = fHostmgr->GetRunInfo();
118 if (fRunInfo ==
nullptr) {
119 RESTError <<
"File IO has not been specified, " << RESTendl;
120 RESTError <<
"please make sure the \"TRestFiles\" section is ahead of the "
121 "\"TRestProcessRunner\" section"
126 RESTError <<
"manager not initialized!" << RESTendl;
135 if (!fRunInfo->GetFileProcess() && fRunInfo->GetEntries() == 0) {
136 RESTError <<
"TRestProcessRunner::BeginOfInit. The input file is a valid REST file but entries are 0!"
146 if (lastEntry - fFirstEntry > 0 && fEventsToProcess == 0) {
147 fEventsToProcess = lastEntry - fFirstEntry;
148 }
else if (fEventsToProcess > 0 && lastEntry - fFirstEntry > 0 &&
149 lastEntry - fFirstEntry != fEventsToProcess) {
150 RESTWarning <<
"Conflict number of events to process!" << RESTendl;
151 }
else if (fEventsToProcess > 0 && lastEntry - fFirstEntry == 0) {
152 lastEntry = fFirstEntry + fEventsToProcess;
153 }
else if (fEventsToProcess == 0 && fFirstEntry == 0 && lastEntry == 0) {
154 fEventsToProcess = REST_MAXIMUM_EVENTS;
155 lastEntry = REST_MAXIMUM_EVENTS;
157 RESTWarning <<
"error setting of event number" << RESTendl;
158 fEventsToProcess = fEventsToProcess > 0 ? fEventsToProcess : REST_MAXIMUM_EVENTS;
159 fFirstEntry = fFirstEntry > 0 ? fFirstEntry : 0;
160 lastEntry = lastEntry == fFirstEntry + fEventsToProcess ? lastEntry : REST_MAXIMUM_EVENTS;
162 fRunInfo->SetCurrentEntry(fFirstEntry);
164 if (fFileSplitSize < 50000000LL || fFileSplitSize > 100000000000LL) {
165 RESTWarning <<
"automatic file splitting size cannot < 10MB or > 100GB, setting to default (10GB)."
167 fFileSplitSize = 10000000000LL;
173 fProcStatus = kIgnore;
174 if (!fOutputAnalysisStorage) {
175 RESTError <<
"output analysis must be turned on to process data!" << RESTendl;
188 if (fThreadNumber < 1) {
191 if (fThreadNumber > 15) {
195 for (
int i = 0; i < fThreadNumber; i++) {
197 t->SetProcessRunner(
this);
198 t->SetVerboseLevel(fVerboseLevel);
200 t->SetCompressionLevel(fFileCompression);
201 fThreads.push_back(t);
214 if (keydeclare ==
"addProcess") {
215 string active = GetParameter(
"value", e,
"");
216 if (active !=
"" &&
ToUpper(active) !=
"ON")
return 0;
218 string processName = GetParameter(
"name", e,
"");
220 string processType = GetParameter(
"type", e,
"");
222 if (processType ==
"") {
223 RESTWarning <<
"Bad expression of addProcess" << RESTendl;
225 }
else if (processName ==
"") {
226 RESTWarning <<
"Event process " << processType <<
" has no name, it will be skipped" << RESTendl;
230 RESTInfo <<
"adding process " << processType <<
" \"" << processName <<
"\"" << RESTendl;
231 vector<TRestEventProcess*> processes;
232 for (
int i = 0; i < fThreadNumber; i++) {
236 fRunInfo->SetExtProcess(p);
241 fUsePauseMenu =
false;
242 fProcStatus = kIgnore;
243 if (fThreadNumber > 1) {
244 RESTInfo <<
"multi-threading is disabled due to process \"" << p->GetName() <<
"\""
246 RESTInfo <<
"This process is in debug mode or is single thread only" << RESTendl;
247 for (i = fThreadNumber; i > 1; i--) {
249 fThreads.erase(fThreads.end() - 1);
254 processes.push_back(p);
260 for (
int i = 0; i < fThreadNumber; i++) {
262 for (
int j = 0; j < fThreadNumber; j++) {
265 fThreads[i]->AddProcess(p);
269 RESTDebug <<
"process \"" << processType <<
"\" has been added!" << RESTendl;
283 RESTDebug <<
"Validating process chain..." << RESTendl;
285 if (fRunInfo->GetFileProcess() !=
nullptr) {
286 fInputEvent = fRunInfo->GetFileProcess()->GetOutputEvent();
288 if (fThreads[0]->GetProcessnum() > 0 &&
289 fThreads[0]->GetProcess(0)->GetInputEvent().address !=
nullptr) {
290 string name = fThreads[0]->GetProcess(0)->GetInputEvent().type;
293 fRunInfo->SetInputEvent(a);
295 fInputEvent = fRunInfo->GetInputEvent();
297 if (fInputEvent ==
nullptr) {
298 RESTError <<
"Cannot determine input event, validating process chain failed!" << RESTendl;
302 if (fProcessNumber > 0) {
303 if (fThreads[0]->ValidateChain(fInputEvent) == -1) exit(1);
315 if (fRunInfo->GetFileProcess() !=
nullptr) {
316 fProcessInfo[
"FirstProcess"] = fRunInfo->GetFileProcess()->GetName();
318 if (fProcessNumber > 0) fProcessInfo[
"FirstProcess"] = fThreads[0]->GetProcess(0)->GetName();
320 int n = fProcessNumber;
321 fProcessInfo[
"LastProcess"] =
322 (n == 0 ? fProcessInfo[
"FirstProcess"] : fThreads[0]->GetProcess(n - 1)->GetName());
323 fProcessInfo[
"ProcNumber"] = ToString(n);
343 RESTDebug <<
"Creating output File " << fRunInfo->GetOutputFileName() << RESTendl;
345 TString filename = fRunInfo->FormFormat(fRunInfo->GetOutputFileName());
346 fOutputDataFileName = filename;
347 fOutputDataFile =
new TFile(filename,
"recreate");
351 if (!fOutputDataFile->IsOpen()) {
352 RESTError <<
"Failed to create output file: " << fOutputDataFile->GetName() << RESTendl;
355 RESTInfo << RESTendl;
356 RESTInfo <<
"TRestProcessRunner : preparing threads..." << RESTendl;
357 fRunInfo->ResetEntry();
358 fRunInfo->SetCurrentEntry(fFirstEntry);
359 for (
int i = 0; i < fThreadNumber; i++) {
360 fThreads[i]->PrepareToProcess(&fInputAnalysisStorage);
364 if (fRunInfo->GetFileProcess() !=
nullptr) {
365 RESTEssential << this->ClassName() <<
": 1 + " << fProcessNumber <<
" processes loaded, "
366 << fThreadNumber <<
" threads prepared!" << RESTendl;
368 RESTEssential << this->ClassName() <<
": " << fProcessNumber <<
" processes loaded, " << fThreadNumber
369 <<
" threads prepared!" << RESTendl;
372 if (fRunInfo->GetFileProcess() !=
nullptr) fRunInfo->GetFileProcess()->PrintMetadata();
374 for (
int i = 0; i < fProcessNumber; i++) {
375 fThreads[0]->GetProcess(i)->PrintMetadata();
378 if (fRunInfo->GetFileProcess() !=
nullptr) {
379 RESTcout <<
"(external) " << fRunInfo->GetFileProcess()->ClassName() <<
" : "
380 << fRunInfo->GetFileProcess()->GetName() << RESTendl;
382 for (
int i = 0; i < fProcessNumber; i++) {
383 RESTcout <<
"++ " << fThreads[0]->GetProcess(i)->ClassName() <<
" : "
384 << fThreads[0]->GetProcess(i)->GetName() << RESTendl;
387 RESTcout <<
"=" << RESTendl;
390 fOutputDataFile->cd();
391 TTree* tree = fThreads[0]->GetEventTree();
392 if (tree !=
nullptr) {
393 fEventTree = (TTree*)tree->Clone();
394 fEventTree->SetName(
"EventTree");
395 fEventTree->SetTitle(
"REST Event Tree");
396 fEventTree->SetDirectory(fOutputDataFile);
397 TTree::SetMaxTreeSize(100000000000LL > fFileSplitSize * 2
399 : fFileSplitSize * 2);
401 fEventTree =
nullptr;
405 fAnalysisTree =
new TRestAnalysisTree(
"AnalysisTree",
"REST Process Analysis Tree");
406 fAnalysisTree->SetDirectory(fOutputDataFile);
407 TRestAnalysisTree::SetMaxTreeSize(100000000000LL > fFileSplitSize * 2 ? 100000000000LL
408 : fFileSplitSize * 2);
410 tree = fThreads[0]->GetAnalysisTree();
411 if (tree !=
nullptr) {
412 fNBranches = tree->GetNbranches();
414 RESTError <<
"Threads are not initialized! No AnalysisTree!" << RESTendl;
418 fOutputDataFile->cd();
419 if (fEventTree !=
nullptr) {
420 fEventTree->Write(
nullptr, kOverwrite);
422 if (fAnalysisTree !=
nullptr) {
423 fAnalysisTree->Write(
nullptr, kOverwrite);
427 this->ResetRunTimes();
428 fProcessedEvents = 0;
429 fRunInfo->ResetEntry();
430 fRunInfo->SetCurrentEntry(fFirstEntry);
431 inputTreeEntries = fRunInfo->GetEntries();
435 ROOT::Math::MinimizerOptions::SetDefaultMinimizer(
"Minuit");
436 TMinuitMinimizer::UseStaticMinuit(
false);
437 if (gGlobalMutex ==
nullptr) {
438 gGlobalMutex =
new TMutex(
true);
439 gROOTMutex = gGlobalMutex;
440 gInterpreterMutex = gGlobalMutex;
443 #ifdef TIME_MEASUREMENT
444 high_resolution_clock::time_point t3 = high_resolution_clock::now();
448 RESTcout << this->ClassName() <<
": Starting the Process.." << RESTendl;
449 for (
int i = 0; i < fThreadNumber; i++) {
450 fThreads[i]->StartThread();
453 while (fProcStatus == kPause ||
454 (fRunInfo->GetInputEvent() !=
nullptr && fEventsToProcess > fProcessedEvents)) {
455 PrintProcessedEvents(100);
457 if (fProcStatus == kNormal && Console::kbhit())
459 int a = Console::ReadKey();
462 fProcStatus = kPause;
465 "| |", TRestStringOutput::REST_Display_Orientation::kMiddle);
466 Console::ClearLinesAfterCursor();
468 RESTLog <<
"-" << RESTendl;
469 RESTLog <<
"PROCESS PAUSED!" << RESTendl;
470 RESTLog <<
"-" << RESTendl;
471 RESTLog <<
" " << RESTendl;
475 if (fProcStatus == kPause) {
478 if (fProcStatus == kStopping) {
482 usleep(printInterval);
489 if (fProcStatus != kIgnore && Console::kbhit())
490 while (getchar() !=
'\n')
493 RESTEssential <<
"Waiting for processes to finish ..." << RESTendl;
497 bool finish = fThreads[0]->Finished();
498 for (
int i = 1; i < fThreadNumber; i++) {
499 finish = finish && fThreads[i]->Finished();
505 fAnalysisTree->GetEntry(fAnalysisTree->GetEntries() - 1);
507 for (
int i = 0; i < fThreadNumber; i++) {
508 fThreads[i]->EndProcess();
510 if (fRunInfo->GetFileProcess()) {
511 fRunInfo->GetFileProcess()->EndProcess();
513 fProcStatus = kFinished;
515 #ifdef TIME_MEASUREMENT
516 high_resolution_clock::time_point t4 = high_resolution_clock::now();
517 deltaTime = (int)duration_cast<microseconds>(t4 - t3).count();
522 gGlobalMutex =
nullptr;
523 gROOTMutex =
nullptr;
524 gInterpreterMutex =
nullptr;
526 RESTcout << this->ClassName() <<
": " << fProcessedEvents <<
" processed events" << RESTendl;
528 #ifdef TIME_MEASUREMENT
529 RESTInfo <<
"Total processing time : " << ((Double_t)deltaTime) / 1000. <<
" ms" << RESTendl;
530 RESTInfo <<
"Average read time from disk (per event) : "
531 << ((Double_t)readTime) / fProcessedEvents / 1000. <<
" ms" << RESTendl;
532 RESTInfo <<
"Average process time (per event) : "
533 << ((Double_t)(deltaTime - readTime - writeTime)) / fProcessedEvents / 1000. <<
" ms"
535 RESTInfo <<
"Average write time to disk (per event) : "
536 << ((Double_t)writeTime) / fProcessedEvents / 1000. <<
" ms" << RESTendl;
537 RESTInfo <<
"=" << RESTendl;
540 if (fRunInfo->GetOutputFileName() !=
"/dev/null") {
556 TRestStringOutput::REST_Display_Orientation::kMiddle);
557 Console::ClearLinesAfterCursor();
559 RESTLog <<
"--------------MENU--------------" << RESTendl;
560 RESTLog <<
"\"v\" : change the verbose level" << RESTendl;
561 RESTLog <<
"\"n\" : push foward one event, then pause" << RESTendl;
562 RESTLog <<
"\"l\" : print the latest processed event" << RESTendl;
563 RESTLog <<
"\"d\" : detach the current process" << RESTendl;
564 RESTLog <<
"\"q\" : stop and quit the process" << RESTendl;
565 RESTLog <<
"press \"p\" to continue process..." << RESTendl;
566 RESTLog <<
"-" << RESTendl;
572 Console::ClearCurrentLine();
573 int b = Console::ReadKey();
576 Console::CursorUp(infobar);
577 RESTLog.setcolor(COLOR_BOLDGREEN);
578 RESTLog <<
"Changing verbose level for all the processes..." << RESTendl;
579 RESTLog.setcolor(COLOR_BOLDWHITE);
580 Console::CursorDown(1);
581 Console::ClearLinesAfterCursor();
582 RESTLog <<
"type \"0\"/\"s\" to set level silent" << RESTendl;
583 RESTLog <<
"type \"1\"/\"e\" to set level essential" << RESTendl;
584 RESTLog <<
"type \"2\"/\"i\" to set level info" << RESTendl;
585 RESTLog <<
"type \"3\"/\"d\" to set level debug" << RESTendl;
586 RESTLog <<
"type \"4\"/\"x\" to set level extreme" << RESTendl;
587 RESTLog <<
"type other to return the pause menu" << RESTendl;
588 RESTLog <<
"-" << RESTendl;
592 Console::CursorUp(1);
593 int c = Console::Read();
595 while (Console::Read() !=
'\n')
598 if (c ==
'0' || c ==
's') {
600 }
else if (c ==
'1' || c ==
'e') {
602 }
else if (c ==
'2' || c ==
'i') {
604 }
else if (c ==
'3' || c ==
'd') {
606 }
else if (c ==
'4' || c ==
'x') {
609 Console::CursorUp(infobar);
610 RESTLog.setcolor(COLOR_BOLDYELLOW);
611 RESTLog <<
"Verbose level not set!" << RESTendl;
612 RESTLog.setcolor(COLOR_BOLDWHITE);
617 this->SetVerboseLevel(l);
618 for (
int i = 0; i < fThreadNumber; i++) {
619 fThreads[i]->SetVerboseLevel(l);
620 for (
int j = 0; j < fThreads[i]->GetProcessnum(); j++) {
621 fThreads[i]->GetProcess(j)->SetVerboseLevel(l);
624 Console::CursorUp(infobar);
625 RESTLog.setcolor(COLOR_BOLDGREEN);
626 RESTLog <<
"Verbose level has been set to " << ToString(
static_cast<int>(l)) <<
"!"
628 RESTLog.setcolor(COLOR_BOLDWHITE);
631 Console::ClearLinesAfterCursor();
633 }
else if (b ==
'd') {
634 Console::CursorUp(infobar);
635 RESTLog.setcolor(COLOR_BOLDGREEN);
636 RESTLog <<
"Detaching restManager to backend" << RESTendl;
637 RESTLog.setcolor(COLOR_BOLDWHITE);
638 Console::CursorDown(1);
639 Console::ClearLinesAfterCursor();
640 RESTLog <<
"type filename for output redirect" << RESTendl;
641 RESTLog <<
"leave blank to redirect to /dev/null" << RESTendl;
642 RESTLog <<
" " << RESTendl;
643 RESTLog <<
" " << RESTendl;
644 RESTLog <<
" " << RESTendl;
645 RESTLog <<
" " << RESTendl;
646 RESTLog <<
"-" << RESTendl;
652 Console::CursorUp(1);
653 file = Console::ReadLine();
654 if (file ==
"") file =
"/dev/null";
657 Console::CursorUp(infobar);
658 RESTLog.setcolor(COLOR_BOLDYELLOW);
659 RESTLog <<
"file not writeable!" << RESTendl;
660 RESTLog.setcolor(COLOR_BOLDWHITE);
665 Console::CursorUp(infobar);
666 RESTLog.setcolor(COLOR_BOLDYELLOW);
667 RESTLog <<
"path not writeable!" << RESTendl;
668 RESTLog.setcolor(COLOR_BOLDWHITE);
674 RESTWarning <<
"fork not available on windows!" << RESTendl;
679 perror(
"fork error:");
684 RESTcout <<
"Child process created! pid: " << getpid() << RESTendl;
685 RESTInfo <<
"Restarting threads" << RESTendl;
686 mutex_write.unlock();
687 for (
int i = 0; i < fThreadNumber; i++) {
688 fThreads[i]->StartThread();
690 RESTInfo <<
"Re-directing output to " << file << RESTendl;
691 FILE* f = freopen(file.c_str(),
"w", stdout);
692 if (f ==
nullptr) RESTWarning <<
"Couldnt redirect output for file: " << file << RESTendl;
693 REST_Display_CompatibilityMode =
true;
699 fProcStatus = kNormal;
700 RESTInfo <<
"Continue processing..." << RESTendl;
705 }
else if (b ==
'n') {
708 }
else if (b ==
'l') {
710 fOutputEvent->PrintEvent();
712 }
else if (b ==
'q') {
713 fProcStatus = kStopping;
715 }
else if (b ==
'p') {
716 Console::CursorUp(menuupper);
717 Console::ClearLinesAfterCursor();
719 fProcStatus = kIgnore;
721 fProcStatus = kNormal;
724 }
else if (b ==
'\n') {
727 Console::CursorUp(infobar);
728 RESTLog.setcolor(COLOR_BOLDYELLOW);
729 RESTLog <<
"Invailed option \"" << (char)b <<
"\" (key value: " << b <<
") !" << RESTendl;
730 RESTLog.setcolor(COLOR_BOLDWHITE);
731 Console::CursorDown(infobar - 1);
761 while (fProcStatus == kPause) {
764 #ifdef TIME_MEASUREMENT
765 high_resolution_clock::time_point t1 = high_resolution_clock::now();
768 if (fProcessedEvents >= fEventsToProcess || targetevt ==
nullptr || fProcStatus == kStopping) {
771 if (!fInputAnalysisStorage) {
772 n = fRunInfo->GetNextEvent(targetevt,
nullptr);
774 n = fRunInfo->GetNextEvent(targetevt, targettree);
778 #ifdef TIME_MEASUREMENT
779 high_resolution_clock::time_point t2 = high_resolution_clock::now();
780 readTime += (int)duration_cast<microseconds>(t2 - t1).count();
782 mutex_write.unlock();
793 if (fSortOutputEvents) {
797 bool smallest =
true;
799 if (otherT->Finished()) {
802 if (t->GetThreadId() == otherT->GetThreadId()) {
805 int id1 = t->GetInputEvent()->GetID();
806 int id2 = otherT->GetInputEvent()->GetID();
809 }
else if (id1 == id2) {
810 cout <<
"warning! Two events with same event id! output events maybe dis-ordered!"
822 #ifdef TIME_MEASUREMENT
823 high_resolution_clock::time_point t5 = high_resolution_clock::now();
825 if (t->GetOutputEvent() !=
nullptr) {
826 fOutputEvent = t->GetOutputEvent();
829 TObjArray* branchesT;
830 TObjArray* branchesL;
832 if (fAnalysisTree !=
nullptr) {
843 fAnalysisTree->SetEventInfo(fOutputEvent);
844 for (
int n = 0; n < remotetree->GetNumberOfObservables(); n++) {
845 fAnalysisTree->SetObservable(n, remotetree->GetObservable(n));
848 fAnalysisTree->Fill();
851 if (fEventTree !=
nullptr) {
853 branchesT = t->GetEventTree()->GetListOfBranches();
854 branchesL = fEventTree->GetListOfBranches();
855 for (
int i = 0; i < branchesT->GetLast() + 1; i++) {
856 TBranch* branchT = (TBranch*)branchesT->UncheckedAt(i);
857 TBranch* branchL = (TBranch*)branchesL->UncheckedAt(i);
858 branchL->SetAddress(branchT->GetAddress());
867 if (fOutputDataFile->GetEND() > fFileSplitSize) {
868 if (fAnalysisTree->GetDirectory() == (TDirectory*)fOutputDataFile) {
870 cout <<
"TRestProcessRunner: file size reaches limit (" << fFileSplitSize
871 <<
" bytes), switching to new file with index " << fNFilesSplit << endl;
875 for (
auto th : fThreads) {
876 for (
int j = 0; j < fProcessNumber; j++) {
877 auto proc = th->GetProcess(j);
878 proc->NotifyAnalysisTreeReset();
882 fAnalysisTree->AutoSave();
883 fAnalysisTree->Reset();
885 if (fEventTree !=
nullptr) {
886 fEventTree->AutoSave();
891 fRunInfo->SetNFilesSplit(fNFilesSplit);
892 if (fOutputDataFile->GetName() != fOutputDataFileName) {
893 auto Mainfile = std::unique_ptr<TFile>{TFile::Open(fOutputDataFileName,
"update")};
894 WriteProcessesMetadata();
895 Mainfile->Write(0, TObject::kOverwrite);
898 WriteProcessesMetadata();
901 TFile* newfile =
new TFile(fOutputDataFileName +
"." + ToString(fNFilesSplit),
"recreate");
903 TBranch* branch =
nullptr;
904 fAnalysisTree->SetDirectory(newfile);
905 TIter nextb1(fAnalysisTree->GetListOfBranches());
906 while ((branch = (TBranch*)nextb1())) {
907 branch->SetFile(newfile);
909 if (fAnalysisTree->GetBranchRef()) {
910 fAnalysisTree->GetBranchRef()->SetFile(newfile);
913 if (fEventTree !=
nullptr) {
914 fEventTree->SetDirectory(newfile);
915 TIter nextb2(fEventTree->GetListOfBranches());
916 while ((branch = (TBranch*)nextb2())) {
917 branch->SetFile(newfile);
919 if (fEventTree->GetBranchRef()) {
920 fEventTree->GetBranchRef()->SetFile(newfile);
924 fOutputDataFile->Write(
nullptr, TObject::kOverwrite);
925 fOutputDataFile->Close();
926 delete fOutputDataFile;
927 fOutputDataFile = newfile;
929 RESTError <<
"internal error!" << RESTendl;
933 #ifdef TIME_MEASUREMENT
934 high_resolution_clock::time_point t6 = high_resolution_clock::now();
935 writeTime += (int)duration_cast<microseconds>(t6 - t5).count();
938 if (fProcStatus == kStep) {
939 fProcStatus = kPause;
940 cout <<
"Processed events:" << fProcessedEvents << endl;
943 mutex_write.unlock();
952 RESTEssential <<
"Configuring output file, writing metadata and tree objects" << RESTendl;
953 #ifdef TIME_MEASUREMENT
954 fProcessInfo[
"ProcessTime"] = ToString(deltaTime) +
"ms";
957 fOutputDataFile->cd();
958 if (fEventTree !=
nullptr) {
959 fEventTree->Write(
nullptr, kOverwrite);
961 if (fAnalysisTree !=
nullptr) {
962 fAnalysisTree->Write(
nullptr, kOverwrite);
966 fOutputDataFile->Write();
967 fOutputDataFile->Close();
968 delete fOutputDataFile;
974 fRunInfo->SetNFilesSplit(fNFilesSplit);
978 WriteProcessesMetadata();
981 if (gGeoManager !=
nullptr) {
982 gGeoManager->Write(
"Geometry", TObject::kOverwrite);
990 fOutputDataFile->cd();
992 this->Write(
nullptr, TObject::kWriteDelete);
994 if (fRunInfo->GetFileProcess() !=
nullptr) {
995 fRunInfo->GetFileProcess()->Write(
nullptr, kOverwrite);
997 for (
int i = 0; i < fProcessNumber; i++) {
998 fThreads[0]->GetProcess(i)->Write(
nullptr, kOverwrite);
1007 RESTEssential <<
"Merging thread files together" << RESTendl;
1012 vector<string> files_to_merge;
1013 for (
int i = 0; i < fThreadNumber; i++) {
1014 TFile* f = fThreads[i]->GetOutputFile();
1016 f->Write(
nullptr, TObject::kOverwrite);
1019 files_to_merge.push_back(f->GetName());
1023 fOutputDataFile = fRunInfo->MergeToOutputFile(files_to_merge, (
string)fOutputDataFileName);
1025 RESTError <<
"Output file: " << fOutputDataFileName <<
" is lost?" << RESTendl;
1034 #ifdef TIME_MEASUREMENT
1039 time_t tt = time(
nullptr);
1051 if (pc ==
nullptr)
return nullptr;
1056 pc->SetObservableValidation(fValidateObservables);
1062 double TRestProcessRunner::GetReadingSpeed() {
1064 for (
auto& n : bytesAdded) bytes += n;
1065 double speedbyte = bytes / (double)printInterval * (
double)1000000 / ncalculated;
1074 if (fProcStatus == kNormal || fProcStatus == kIgnore) {
1083 double speedbyte = GetReadingSpeed();
1086 for (
auto& n : progAdded) progsum += n;
1087 double progspeed = progsum / ncalculated / printInterval * 1000000;
1090 if (fRunInfo->GetFeminosDaqTotalEvents() > 0) {
1091 prog = fProcessedEvents / (double)fRunInfo->GetFeminosDaqTotalEvents() * 100;
1092 }
else if (fEventsToProcess == REST_MAXIMUM_EVENTS && fRunInfo->GetFileProcess() !=
nullptr)
1095 prog = fRunInfo->GetBytesRead() / (double)fRunInfo->GetTotalBytes() * 100;
1096 }
else if (fRunInfo->GetFileProcess() !=
nullptr)
1099 prog = fProcessedEvents / (double)fEventsToProcess * 100;
1100 }
else if (fEventsToProcess == REST_MAXIMUM_EVENTS)
1103 prog = fRunInfo->GetCurrentEntry() / (double)inputTreeEntries * 100;
1105 prog = fProcessedEvents / (double)fEventsToProcess * 100;
1108 char* buffer =
new char[500]();
1109 if (fRunInfo->GetFileProcess() !=
nullptr && speedbyte > 0) {
1110 sprintf(buffer,
"%d Events (%.1fMB/s), ", fProcessedEvents, speedbyte / 1024 / 1024);
1112 sprintf(buffer,
"%d Events, ", fProcessedEvents);
1117 if (fProcStatus == kNormal) {
1118 sprintf(buffer,
"%.1f min ETA, (Pause: \"p\") ", (100 - progressLast) / progspeed / 60);
1120 sprintf(buffer,
"%.1f min ETA, (Pause Disabled) ", (100 - progressLast) / progspeed / 60);
1125 if (REST_Display_CompatibilityMode) {
1128 barlength = Console::GetWidth() - s1.size() - s2.size() - 9;
1130 sprintf(buffer, (
"%.1f%[" + MakeProgressBar(prog, barlength) +
"]").c_str(), prog);
1135 if (REST_Display_CompatibilityMode) {
1136 if (((
int)prog) != progressLastPrinted) {
1137 cout << s1 << s2 << s3 << endl;
1138 progressLastPrinted = (int)prog;
1141 printf(
"%s", (s1 + s2 + s3 +
"\r").c_str());
1145 bytesAdded[positionCalculated] = fRunInfo->GetBytesRead() - bytesReadLast;
1146 bytesReadLast = fRunInfo->GetBytesRead();
1147 progAdded[positionCalculated] = prog - progressLast;
1148 progressLast = prog;
1150 positionCalculated++;
1151 if (positionCalculated >= ncalculated) positionCalculated -= ncalculated;
1159 string progressbar(length,
'-');
1160 int n = (double)progress100 / 100 * length;
1162 if (n > length + 1) n = length + 1;
1163 for (
int i = 0; i < n; i++) {
1164 progressbar[i] =
'=';
1166 if (n < length + 1) progressbar[n] =
'>';
1170 TRestEvent* TRestProcessRunner::GetInputEvent() {
return fRunInfo->GetInputEvent(); }
1172 TRestAnalysisTree* TRestProcessRunner::GetInputAnalysisTree() {
return fRunInfo->GetAnalysisTree(); }
1178 if (fProcStatus == kNormal)
1180 else if (fProcStatus == kStopping)
1181 status =
"Terminated";
1185 RESTMetadata <<
"Status : " << status << RESTendl;
1186 RESTMetadata <<
"Processed events : " << fProcessedEvents << RESTendl;
1187 RESTMetadata <<
"Analysis tree branches : " << fNBranches << RESTendl;
1188 RESTMetadata <<
"Thread number : " << fThreadNumber << RESTendl;
1189 RESTMetadata <<
"Processes in each thread : " << fProcessNumber << RESTendl;
1190 RESTMetadata <<
"File auto split size: " << fFileSplitSize << RESTendl;
1191 RESTMetadata <<
"File compression level: " << fFileCompression << RESTendl;
1192 RESTMetadata <<
"******************************************" << RESTendl;
1193 RESTMetadata << RESTendl;
1194 RESTMetadata << RESTendl;
REST core data-saving helper based on TTree.
A base class for any REST event process.
void SetRunInfo(TRestRun *r)
Set TRestRun for this process.
Bool_t singleThreadOnly() const
Return whether this process is single std::thread only.
Bool_t isExternal() const
Return whether this process is external process.
void SetParallelProcess(TRestEventProcess *p)
Add parallel process to this process.
A base class for any REST event.
virtual void Initialize()=0
Running the processes efficiently with fantastic display.
void ReadProcInfo()
Create a process info list which used called by TRestRun::FormFormat().
void WriteProcessesMetadata()
Write process metadata to fOutputDataFile.
void PrintProcessedEvents(Int_t rateE)
Print number of events processed, file read speed, ETA and a progress bar.
void ResetRunTimes()
Reset running time count to 0.
void BeginOfInit()
Reads information from rml config file.
Int_t ReadConfig(const std::string &keydeclare, TiXmlElement *e)
method to deal with iterated child elements
void MergeOutputFile()
Calls TRestRun::MergeOutputFile() to merge the main file with process's tmp file.
void RunProcess()
The main executer of event process.
void EndOfInit()
Ending of the startup procedure.
TRestEventProcess * InstantiateProcess(TString type, TiXmlElement *ele)
InstantiateProcess in sequential start up.
void PauseMenu()
A pause menu providing some functions during the process.
void Initialize() override
REST run class.
std::string MakeProgressBar(int progress100, int length=100)
Make a string of progress bar with given length and percentage.
Int_t GetNextevtFunc(TRestEvent *targetevt, TRestAnalysisTree *targettree)
Get next event and copy it to the address of targetevt.
void PrintMetadata() override
Implemented it in the derived metadata class to print out specific metadata information.
void FillThreadEventFunc(TRestThread *t)
Calling back the FillEvent() method in TRestThread.
void ConfigOutputFile()
Forming an output file.
REST_Verbose_Level
Enumerate of verbose level, containing five levels.
@ REST_Essential
+show some essential information, as well as warnings
@ REST_Extreme
show everything
@ REST_Info
+show most of the information for each steps
@ REST_Debug
+show the defined debug messages
@ REST_Silent
show minimum information of the software, as well as error messages
Threaded worker of a process chain.
TRestReflector Assembly(const std::string &typeName)
Assembly an object of type: typeName, returning the allocated memory address and size.
std::vector< std::string > Split(std::string in, std::string separator, bool allowBlankString=false, bool removeWhiteSpaces=false, int startPos=-1)
Split the input string according to the given separator. Returning a vector of fragments.
std::string ToUpper(std::string in)
Convert string to its upper case. Alternative of TString::ToUpper.
Int_t StringToInteger(std::string in)
Gets an integer from a string.
std::string ToDateTimeString(time_t time)
Format time_t into string.