REST-for-Physics  v2.3
Rare Event Searches ToolKit for Physics
TRestProcessRunner.cxx
1 
25 #include <TGeoManager.h>
26 
27 #include "Math/MinimizerOptions.h"
28 #include "TBranchRef.h"
29 #include "TInterpreter.h"
30 #include "TMinuitMinimizer.h"
31 #include "TMutex.h"
32 #include "TROOT.h"
33 #include "TRestManager.h"
34 #include "TRestThread.h"
35 
36 #ifdef WIN32
37 #include <io.h>
38 #else
39 #include "unistd.h"
40 #endif // !WIN32
41 
42 std::mutex mutex_write;
43 
44 using namespace std;
45 #ifdef TIME_MEASUREMENT
46 #include <chrono>
47 using namespace std::chrono;
48 int deltaTime;
49 int writeTime;
50 int readTime;
51 high_resolution_clock::time_point tS, tE;
52 #endif
53 
54 int ncalculated = 10;
55 Long64_t bytesReadLast = 0;
56 Double_t progressLast = 0;
57 Int_t progressLastPrinted = 0;
58 vector<Long64_t> bytesAdded(ncalculated, 0);
59 vector<Double_t> progAdded(ncalculated, 0);
60 int positionCalculated = 0;
61 int printInterval = 200000; // 0.2s
62 int inputTreeEntries = 0;
63 
64 ClassImp(TRestProcessRunner);
65 
66 TRestProcessRunner::TRestProcessRunner() { Initialize(); }
67 
68 TRestProcessRunner::~TRestProcessRunner() {}
69 
74  fRunInfo = nullptr;
75  fInputEvent = nullptr;
76  fOutputEvent = nullptr;
77  fEventTree = nullptr;
78  fAnalysisTree = nullptr;
79  fOutputDataFile = nullptr;
80  fOutputDataFileName = "";
81 
82  fThreads.clear();
83  fProcessInfo.clear();
84 
85  fThreadNumber = 0;
86  fFirstEntry = 0;
87  fNFilesSplit = 0;
88  fEventsToProcess = 0;
89  fProcessedEvents = 0;
90  fProcessNumber = 0;
91  fProcStatus = kNormal;
92  fFileSplitSize = 10000000000LL; // 10GB maximum
93  fFileCompression = 2; // default compression level
94 
95  fUseTestRun = true;
96  fUsePauseMenu = true;
97  fValidateObservables = false;
98  fSortOutputEvents = true;
99  fInputAnalysisStorage = true;
100  fInputEventStorage = true;
101  fOutputEventStorage = true;
102  fOutputAnalysisStorage = true;
103 }
104 
115  RESTInfo << RESTendl;
116  if (fHostmgr != nullptr) {
117  fRunInfo = fHostmgr->GetRunInfo();
118  if (fRunInfo == nullptr) {
119  RESTError << "File IO has not been specified, " << RESTendl;
120  RESTError << "please make sure the \"TRestFiles\" section is ahead of the "
121  "\"TRestProcessRunner\" section"
122  << RESTendl;
123  exit(0);
124  }
125  } else {
126  RESTError << "manager not initialized!" << RESTendl;
127  exit(0);
128  }
129 
130  // Exiting as soon as possible in order to avoid seg.fault at TRestThread.
131  // And sometimes gets stuck - probably a thread not closed? - and the execution
132  // of restManager does not end even if TRestThread calls exit(1)
133  // I believe it is risky to exit() at TRestThread without closing threads.
134  // It is a guess (J.G.)
135  if (!fRunInfo->GetFileProcess() && fRunInfo->GetEntries() == 0) {
136  RESTError << "TRestProcessRunner::BeginOfInit. The input file is a valid REST file but entries are 0!"
137  << RESTendl;
138  exit(1);
139  }
140 
141  ReadAllParameters();
142 
143  // firstEntry = StringToInteger(GetParameter("firstEntry", "0"));
144  // eventsToProcess = StringToInteger(GetParameter("eventsToProcess", "0"));
145  int lastEntry = StringToInteger(GetParameter("lastEntry", "0"));
146  if (lastEntry - fFirstEntry > 0 && fEventsToProcess == 0) {
147  fEventsToProcess = lastEntry - fFirstEntry;
148  } else if (fEventsToProcess > 0 && lastEntry - fFirstEntry > 0 &&
149  lastEntry - fFirstEntry != fEventsToProcess) {
150  RESTWarning << "Conflict number of events to process!" << RESTendl;
151  } else if (fEventsToProcess > 0 && lastEntry - fFirstEntry == 0) {
152  lastEntry = fFirstEntry + fEventsToProcess;
153  } else if (fEventsToProcess == 0 && fFirstEntry == 0 && lastEntry == 0) {
154  fEventsToProcess = REST_MAXIMUM_EVENTS;
155  lastEntry = REST_MAXIMUM_EVENTS;
156  } else {
157  RESTWarning << "error setting of event number" << RESTendl;
158  fEventsToProcess = fEventsToProcess > 0 ? fEventsToProcess : REST_MAXIMUM_EVENTS;
159  fFirstEntry = fFirstEntry > 0 ? fFirstEntry : 0;
160  lastEntry = lastEntry == fFirstEntry + fEventsToProcess ? lastEntry : REST_MAXIMUM_EVENTS;
161  }
162  fRunInfo->SetCurrentEntry(fFirstEntry);
163 
164  if (fFileSplitSize < 50000000LL || fFileSplitSize > 100000000000LL) {
165  RESTWarning << "automatic file splitting size cannot < 10MB or > 100GB, setting to default (10GB)."
166  << RESTendl;
167  fFileSplitSize = 10000000000LL;
168  }
169 
170  // fUseTestRun = StringToBool(GetParameter("useTestRun", "ON"));
171  // fUsePauseMenu = StringToBool(GetParameter("usePauseMenu", "OFF"));
172  if (!fUsePauseMenu || fVerboseLevel >= TRestStringOutput::REST_Verbose_Level::REST_Debug)
173  fProcStatus = kIgnore;
174  if (!fOutputAnalysisStorage) {
175  RESTError << "output analysis must be turned on to process data!" << RESTendl;
176  exit(1);
177  }
178  // fValidateObservables = StringToBool(GetParameter("validateObservables", "OFF"));
179  // fSortOutputEvents = StringToBool(GetParameter("sortOutputEvents", "ON"));
180  // fThreadNumber = StringToDouble(GetParameter("threadNumber", "1"));
181  // if (ToUpper(GetParameter("inputAnalysis", "ON")) == "ON") fOutputItem[0] = true;
182  // if (ToUpper(GetParameter("inputEvent", "OFF")) == "ON") fOutputItem[1] = true;
183  // if (ToUpper(GetParameter("outputEvent", "ON")) == "ON") fOutputItem[2] = true;
184  // fOutputItem[3] = true;
185 
186  // fOutputItem = Split(GetParameter("treeBranches",
187  // "inputevent:outputevent:inputanalysis"), ":");
188  if (fThreadNumber < 1) {
189  fThreadNumber = 1;
190  }
191  if (fThreadNumber > 15) {
192  fThreadNumber = 15;
193  }
194 
195  for (int i = 0; i < fThreadNumber; i++) {
196  TRestThread* t = new TRestThread();
197  t->SetProcessRunner(this);
198  t->SetVerboseLevel(fVerboseLevel);
199  t->SetThreadId(i);
200  t->SetCompressionLevel(fFileCompression);
201  fThreads.push_back(t);
202  }
203 }
204 
213 Int_t TRestProcessRunner::ReadConfig(const string& keydeclare, TiXmlElement* e) {
214  if (keydeclare == "addProcess") {
215  string active = GetParameter("value", e, "");
216  if (active != "" && ToUpper(active) != "ON") return 0;
217 
218  string processName = GetParameter("name", e, "");
219 
220  string processType = GetParameter("type", e, "");
221 
222  if (processType == "") {
223  RESTWarning << "Bad expression of addProcess" << RESTendl;
224  return 0;
225  } else if (processName == "") {
226  RESTWarning << "Event process " << processType << " has no name, it will be skipped" << RESTendl;
227  return 0;
228  }
229 
230  RESTInfo << "adding process " << processType << " \"" << processName << "\"" << RESTendl;
231  vector<TRestEventProcess*> processes;
232  for (int i = 0; i < fThreadNumber; i++) {
233  TRestEventProcess* p = InstantiateProcess(processType, e);
234  if (p != nullptr) {
235  if (p->isExternal()) {
236  fRunInfo->SetExtProcess(p);
237  return 0;
238  }
240  p->singleThreadOnly())) {
241  fUsePauseMenu = false;
242  fProcStatus = kIgnore;
243  if (fThreadNumber > 1) {
244  RESTInfo << "multi-threading is disabled due to process \"" << p->GetName() << "\""
245  << RESTendl;
246  RESTInfo << "This process is in debug mode or is single thread only" << RESTendl;
247  for (i = fThreadNumber; i > 1; i--) {
248  // delete (*fThreads.end());
249  fThreads.erase(fThreads.end() - 1);
250  fThreadNumber--;
251  }
252  }
253  }
254  processes.push_back(p);
255  } else {
256  return 1;
257  }
258  }
259 
260  for (int i = 0; i < fThreadNumber; i++) {
261  TRestEventProcess* p = processes[i];
262  for (int j = 0; j < fThreadNumber; j++) {
263  p->SetParallelProcess(processes[j]);
264  }
265  fThreads[i]->AddProcess(p);
266  }
267 
268  fProcessNumber++;
269  RESTDebug << "process \"" << processType << "\" has been added!" << RESTendl;
270  return 0;
271  }
272 
273  return 0;
274 }
275 
283  RESTDebug << "Validating process chain..." << RESTendl;
284 
285  if (fRunInfo->GetFileProcess() != nullptr) {
286  fInputEvent = fRunInfo->GetFileProcess()->GetOutputEvent();
287  } else {
288  if (fThreads[0]->GetProcessnum() > 0 &&
289  fThreads[0]->GetProcess(0)->GetInputEvent().address != nullptr) {
290  string name = fThreads[0]->GetProcess(0)->GetInputEvent().type;
292  a->Initialize();
293  fRunInfo->SetInputEvent(a);
294  }
295  fInputEvent = fRunInfo->GetInputEvent();
296  }
297  if (fInputEvent == nullptr) {
298  RESTError << "Cannot determine input event, validating process chain failed!" << RESTendl;
299  exit(1);
300  }
301 
302  if (fProcessNumber > 0) {
303  if (fThreads[0]->ValidateChain(fInputEvent) == -1) exit(1);
304  }
305 
306  ReadProcInfo();
307 }
308 
315  if (fRunInfo->GetFileProcess() != nullptr) {
316  fProcessInfo["FirstProcess"] = fRunInfo->GetFileProcess()->GetName();
317  } else {
318  if (fProcessNumber > 0) fProcessInfo["FirstProcess"] = fThreads[0]->GetProcess(0)->GetName();
319  }
320  int n = fProcessNumber;
321  fProcessInfo["LastProcess"] =
322  (n == 0 ? fProcessInfo["FirstProcess"] : fThreads[0]->GetProcess(n - 1)->GetName());
323  fProcessInfo["ProcNumber"] = ToString(n);
324 }
325 
343  RESTDebug << "Creating output File " << fRunInfo->GetOutputFileName() << RESTendl;
344 
345  TString filename = fRunInfo->FormFormat(fRunInfo->GetOutputFileName());
346  fOutputDataFileName = filename;
347  fOutputDataFile = new TFile(filename, "recreate");
348  // set compression level here will cause problem in pipeline
349  // we must set in each threadCompression
350  // fOutputDataFile->SetCompressionLevel(fFile);
351  if (!fOutputDataFile->IsOpen()) {
352  RESTError << "Failed to create output file: " << fOutputDataFile->GetName() << RESTendl;
353  exit(1);
354  }
355  RESTInfo << RESTendl;
356  RESTInfo << "TRestProcessRunner : preparing threads..." << RESTendl;
357  fRunInfo->ResetEntry();
358  fRunInfo->SetCurrentEntry(fFirstEntry);
359  for (int i = 0; i < fThreadNumber; i++) {
360  fThreads[i]->PrepareToProcess(&fInputAnalysisStorage);
361  }
362 
363  // print metadata
364  if (fRunInfo->GetFileProcess() != nullptr) {
365  RESTEssential << this->ClassName() << ": 1 + " << fProcessNumber << " processes loaded, "
366  << fThreadNumber << " threads prepared!" << RESTendl;
367  } else {
368  RESTEssential << this->ClassName() << ": " << fProcessNumber << " processes loaded, " << fThreadNumber
369  << " threads prepared!" << RESTendl;
370  }
372  if (fRunInfo->GetFileProcess() != nullptr) fRunInfo->GetFileProcess()->PrintMetadata();
373 
374  for (int i = 0; i < fProcessNumber; i++) {
375  fThreads[0]->GetProcess(i)->PrintMetadata();
376  }
377  } else {
378  if (fRunInfo->GetFileProcess() != nullptr) {
379  RESTcout << "(external) " << fRunInfo->GetFileProcess()->ClassName() << " : "
380  << fRunInfo->GetFileProcess()->GetName() << RESTendl;
381  }
382  for (int i = 0; i < fProcessNumber; i++) {
383  RESTcout << "++ " << fThreads[0]->GetProcess(i)->ClassName() << " : "
384  << fThreads[0]->GetProcess(i)->GetName() << RESTendl;
385  }
386  }
387  RESTcout << "=" << RESTendl;
388 
389  // copy thread's event tree to local
390  fOutputDataFile->cd();
391  TTree* tree = fThreads[0]->GetEventTree();
392  if (tree != nullptr) {
393  fEventTree = (TTree*)tree->Clone();
394  fEventTree->SetName("EventTree");
395  fEventTree->SetTitle("REST Event Tree");
396  fEventTree->SetDirectory(fOutputDataFile);
397  TTree::SetMaxTreeSize(100000000000LL > fFileSplitSize * 2
398  ? 100000000000LL
399  : fFileSplitSize * 2); // the default size is 100GB
400  } else {
401  fEventTree = nullptr;
402  }
403 
404  // initialize analysis tree
405  fAnalysisTree = new TRestAnalysisTree("AnalysisTree", "REST Process Analysis Tree");
406  fAnalysisTree->SetDirectory(fOutputDataFile);
407  TRestAnalysisTree::SetMaxTreeSize(100000000000LL > fFileSplitSize * 2 ? 100000000000LL
408  : fFileSplitSize * 2);
409 
410  tree = fThreads[0]->GetAnalysisTree();
411  if (tree != nullptr) {
412  fNBranches = tree->GetNbranches();
413  } else {
414  RESTError << "Threads are not initialized! No AnalysisTree!" << RESTendl;
415  exit(1);
416  }
417 
418  fOutputDataFile->cd();
419  if (fEventTree != nullptr) {
420  fEventTree->Write(nullptr, kOverwrite);
421  }
422  if (fAnalysisTree != nullptr) {
423  fAnalysisTree->Write(nullptr, kOverwrite);
424  }
425 
426  // reset runner
427  this->ResetRunTimes();
428  fProcessedEvents = 0;
429  fRunInfo->ResetEntry();
430  fRunInfo->SetCurrentEntry(fFirstEntry);
431  inputTreeEntries = fRunInfo->GetEntries();
432 
433  // set root mutex
435  ROOT::Math::MinimizerOptions::SetDefaultMinimizer("Minuit");
436  TMinuitMinimizer::UseStaticMinuit(false);
437  if (gGlobalMutex == nullptr) {
438  gGlobalMutex = new TMutex(true);
439  gROOTMutex = gGlobalMutex;
440  gInterpreterMutex = gGlobalMutex;
441  }
442 
443 #ifdef TIME_MEASUREMENT
444  high_resolution_clock::time_point t3 = high_resolution_clock::now();
445 #endif
446 
447  // start the thread!
448  RESTcout << this->ClassName() << ": Starting the Process.." << RESTendl;
449  for (int i = 0; i < fThreadNumber; i++) {
450  fThreads[i]->StartThread();
451  }
452 
453  while (fProcStatus == kPause ||
454  (fRunInfo->GetInputEvent() != nullptr && fEventsToProcess > fProcessedEvents)) {
455  PrintProcessedEvents(100);
456 
457  if (fProcStatus == kNormal && Console::kbhit()) // if keyboard inputs
458  {
459  int a = Console::ReadKey(); // get char
460 
461  if (a == 'p') {
462  fProcStatus = kPause;
463  usleep(100000); // wait 0.1s for the processes to finish;
465  "| |", TRestStringOutput::REST_Display_Orientation::kMiddle);
466  Console::ClearLinesAfterCursor();
467  RESTLog << RESTendl;
468  RESTLog << "-" << RESTendl;
469  RESTLog << "PROCESS PAUSED!" << RESTendl;
470  RESTLog << "-" << RESTendl;
471  RESTLog << " " << RESTendl;
472  }
473  }
474 
475  if (fProcStatus == kPause) {
476  PauseMenu();
477  }
478  if (fProcStatus == kStopping) {
479  break;
480  }
481 
482  usleep(printInterval);
483 
484  // cout << eventsToProcess << " " << fProcessedEvents << " " << lastEntry <<
485  // " " << fCurrentEvent << endl; cout << fProcessedEvents << "\r";
486  // printf("%d processed events now...\r", fProcessedEvents); fflush(stdout);
487  }
488 
489  if (fProcStatus != kIgnore && Console::kbhit())
490  while (getchar() != '\n')
491  ; // clear buffer
492 
493  RESTEssential << "Waiting for processes to finish ..." << RESTendl;
494 
495  while (true) {
496  usleep(100000);
497  bool finish = fThreads[0]->Finished();
498  for (int i = 1; i < fThreadNumber; i++) {
499  finish = finish && fThreads[i]->Finished();
500  }
501  if (finish) break;
502  }
503 
504  // make dummy analysis tree filled with observables
505  fAnalysisTree->GetEntry(fAnalysisTree->GetEntries() - 1);
506  // call EndProcess() for all processes
507  for (int i = 0; i < fThreadNumber; i++) {
508  fThreads[i]->EndProcess();
509  }
510  if (fRunInfo->GetFileProcess()) {
511  fRunInfo->GetFileProcess()->EndProcess();
512  }
513  fProcStatus = kFinished;
514 
515 #ifdef TIME_MEASUREMENT
516  high_resolution_clock::time_point t4 = high_resolution_clock::now();
517  deltaTime = (int)duration_cast<microseconds>(t4 - t3).count();
518 #endif
519 
520  // reset the mutex to null
521  delete gGlobalMutex;
522  gGlobalMutex = nullptr;
523  gROOTMutex = nullptr;
524  gInterpreterMutex = nullptr;
525 
526  RESTcout << this->ClassName() << ": " << fProcessedEvents << " processed events" << RESTendl;
527 
528 #ifdef TIME_MEASUREMENT
529  RESTInfo << "Total processing time : " << ((Double_t)deltaTime) / 1000. << " ms" << RESTendl;
530  RESTInfo << "Average read time from disk (per event) : "
531  << ((Double_t)readTime) / fProcessedEvents / 1000. << " ms" << RESTendl;
532  RESTInfo << "Average process time (per event) : "
533  << ((Double_t)(deltaTime - readTime - writeTime)) / fProcessedEvents / 1000. << " ms"
534  << RESTendl;
535  RESTInfo << "Average write time to disk (per event) : "
536  << ((Double_t)writeTime) / fProcessedEvents / 1000. << " ms" << RESTendl;
537  RESTInfo << "=" << RESTendl;
538 #endif
539 
540  if (fRunInfo->GetOutputFileName() != "/dev/null") {
541  ConfigOutputFile();
542  }
543 }
544 
556  TRestStringOutput::REST_Display_Orientation::kMiddle);
557  Console::ClearLinesAfterCursor();
558 
559  RESTLog << "--------------MENU--------------" << RESTendl;
560  RESTLog << "\"v\" : change the verbose level" << RESTendl;
561  RESTLog << "\"n\" : push foward one event, then pause" << RESTendl;
562  RESTLog << "\"l\" : print the latest processed event" << RESTendl;
563  RESTLog << "\"d\" : detach the current process" << RESTendl;
564  RESTLog << "\"q\" : stop and quit the process" << RESTendl;
565  RESTLog << "press \"p\" to continue process..." << RESTendl;
566  RESTLog << "-" << RESTendl;
567  RESTLog << RESTendl;
568  RESTLog << RESTendl;
569  int menuupper = 15;
570  int infobar = 11;
571  while (true) {
572  Console::ClearCurrentLine();
573  int b = Console::ReadKey(); // no need to press enter
574 
575  if (b == 'v') {
576  Console::CursorUp(infobar);
577  RESTLog.setcolor(COLOR_BOLDGREEN);
578  RESTLog << "Changing verbose level for all the processes..." << RESTendl;
579  RESTLog.setcolor(COLOR_BOLDWHITE);
580  Console::CursorDown(1);
581  Console::ClearLinesAfterCursor();
582  RESTLog << "type \"0\"/\"s\" to set level silent" << RESTendl;
583  RESTLog << "type \"1\"/\"e\" to set level essential" << RESTendl;
584  RESTLog << "type \"2\"/\"i\" to set level info" << RESTendl;
585  RESTLog << "type \"3\"/\"d\" to set level debug" << RESTendl;
586  RESTLog << "type \"4\"/\"x\" to set level extreme" << RESTendl;
587  RESTLog << "type other to return the pause menu" << RESTendl;
588  RESTLog << "-" << RESTendl;
589  RESTLog << RESTendl;
590  RESTLog << RESTendl;
591  while (true) {
592  Console::CursorUp(1);
593  int c = Console::Read();
594  if (c != '\n')
595  while (Console::Read() != '\n')
596  ;
598  if (c == '0' || c == 's') {
600  } else if (c == '1' || c == 'e') {
602  } else if (c == '2' || c == 'i') {
604  } else if (c == '3' || c == 'd') {
606  } else if (c == '4' || c == 'x') {
608  } else {
609  Console::CursorUp(infobar);
610  RESTLog.setcolor(COLOR_BOLDYELLOW);
611  RESTLog << "Verbose level not set!" << RESTendl;
612  RESTLog.setcolor(COLOR_BOLDWHITE);
613  break;
614  }
615 
616  gVerbose = l;
617  this->SetVerboseLevel(l);
618  for (int i = 0; i < fThreadNumber; i++) {
619  fThreads[i]->SetVerboseLevel(l);
620  for (int j = 0; j < fThreads[i]->GetProcessnum(); j++) {
621  fThreads[i]->GetProcess(j)->SetVerboseLevel(l);
622  }
623  }
624  Console::CursorUp(infobar);
625  RESTLog.setcolor(COLOR_BOLDGREEN);
626  RESTLog << "Verbose level has been set to " << ToString(static_cast<int>(l)) << "!"
627  << RESTendl;
628  RESTLog.setcolor(COLOR_BOLDWHITE);
629  break;
630  }
631  Console::ClearLinesAfterCursor();
632  break;
633  } else if (b == 'd') {
634  Console::CursorUp(infobar);
635  RESTLog.setcolor(COLOR_BOLDGREEN);
636  RESTLog << "Detaching restManager to backend" << RESTendl;
637  RESTLog.setcolor(COLOR_BOLDWHITE);
638  Console::CursorDown(1);
639  Console::ClearLinesAfterCursor();
640  RESTLog << "type filename for output redirect" << RESTendl;
641  RESTLog << "leave blank to redirect to /dev/null" << RESTendl;
642  RESTLog << " " << RESTendl;
643  RESTLog << " " << RESTendl;
644  RESTLog << " " << RESTendl;
645  RESTLog << " " << RESTendl;
646  RESTLog << "-" << RESTendl;
647  RESTLog << RESTendl;
648  RESTLog << RESTendl;
649 
650  string file;
651 
652  Console::CursorUp(1);
653  file = Console::ReadLine();
654  if (file == "") file = "/dev/null";
655  if (TRestTools::fileExists(file)) {
656  if (!TRestTools::isPathWritable(file)) {
657  Console::CursorUp(infobar);
658  RESTLog.setcolor(COLOR_BOLDYELLOW);
659  RESTLog << "file not writeable!" << RESTendl;
660  RESTLog.setcolor(COLOR_BOLDWHITE);
661  break;
662  }
663  } else {
665  Console::CursorUp(infobar);
666  RESTLog.setcolor(COLOR_BOLDYELLOW);
667  RESTLog << "path not writeable!" << RESTendl;
668  RESTLog.setcolor(COLOR_BOLDWHITE);
669  break;
670  }
671  }
672 
673 #ifdef WIN32
674  RESTWarning << "fork not available on windows!" << RESTendl;
675 #else
676  pid_t pid;
677  pid = fork();
678  if (pid < 0) {
679  perror("fork error:");
680  exit(1);
681  }
682  // child process
683  if (pid == 0) {
684  RESTcout << "Child process created! pid: " << getpid() << RESTendl;
685  RESTInfo << "Restarting threads" << RESTendl;
686  mutex_write.unlock();
687  for (int i = 0; i < fThreadNumber; i++) {
688  fThreads[i]->StartThread();
689  }
690  RESTInfo << "Re-directing output to " << file << RESTendl;
691  FILE* f = freopen(file.c_str(), "w", stdout);
692  if (f == nullptr) RESTWarning << "Couldnt redirect output for file: " << file << RESTendl;
693  REST_Display_CompatibilityMode = true;
694  }
695  // father process
696  else {
697  exit(0);
698  }
699  fProcStatus = kNormal;
700  RESTInfo << "Continue processing..." << RESTendl;
701 
702 #endif // WIN32
703 
704  break;
705  } else if (b == 'n') {
706  fProcStatus = kStep;
707  break;
708  } else if (b == 'l') {
709  // Console::ClearScreen();
710  fOutputEvent->PrintEvent();
711  break;
712  } else if (b == 'q') {
713  fProcStatus = kStopping;
714  break;
715  } else if (b == 'p') {
716  Console::CursorUp(menuupper);
717  Console::ClearLinesAfterCursor();
719  fProcStatus = kIgnore;
720  } else {
721  fProcStatus = kNormal;
722  }
723  break;
724  } else if (b == '\n') {
725  // CursorUp(1);
726  } else {
727  Console::CursorUp(infobar);
728  RESTLog.setcolor(COLOR_BOLDYELLOW);
729  RESTLog << "Invailed option \"" << (char)b << "\" (key value: " << b << ") !" << RESTendl;
730  RESTLog.setcolor(COLOR_BOLDWHITE);
731  Console::CursorDown(infobar - 1);
732  }
733  }
734 }
735 
760  mutex_write.lock(); // lock on
761  while (fProcStatus == kPause) {
762  usleep(100000);
763  }
764 #ifdef TIME_MEASUREMENT
765  high_resolution_clock::time_point t1 = high_resolution_clock::now();
766 #endif
767  int n;
768  if (fProcessedEvents >= fEventsToProcess || targetevt == nullptr || fProcStatus == kStopping) {
769  n = -1;
770  } else {
771  if (!fInputAnalysisStorage) {
772  n = fRunInfo->GetNextEvent(targetevt, nullptr);
773  } else {
774  n = fRunInfo->GetNextEvent(targetevt, targettree);
775  }
776  }
777 
778 #ifdef TIME_MEASUREMENT
779  high_resolution_clock::time_point t2 = high_resolution_clock::now();
780  readTime += (int)duration_cast<microseconds>(t2 - t1).count();
781 #endif
782  mutex_write.unlock();
783  return n;
784 }
785 
793  if (fSortOutputEvents) {
794  // Make sure the thread has the minimum event id in the all the
795  // threads. Otherwise, just wait.
796  while (true) {
797  bool smallest = true;
798  for (TRestThread* otherT : fThreads) {
799  if (otherT->Finished()) {
800  continue;
801  }
802  if (t->GetThreadId() == otherT->GetThreadId()) {
803  continue;
804  }
805  int id1 = t->GetInputEvent()->GetID();
806  int id2 = otherT->GetInputEvent()->GetID();
807  if (id1 > id2) {
808  smallest = false;
809  } else if (id1 == id2) {
810  cout << "warning! Two events with same event id! output events maybe dis-ordered!"
811  << endl;
812  }
813  }
814 
815  if (smallest) break;
816  usleep(1);
817  }
818  }
819 
820  // Start event saving, entering mutex lock region.
821  mutex_write.lock();
822 #ifdef TIME_MEASUREMENT
823  high_resolution_clock::time_point t5 = high_resolution_clock::now();
824 #endif
825  if (t->GetOutputEvent() != nullptr) {
826  fOutputEvent = t->GetOutputEvent();
827  // copy address of analysis tree of the given thread
828  // to the local tree, then fill the local tree
829  TObjArray* branchesT;
830  TObjArray* branchesL;
831 
832  if (fAnalysisTree != nullptr) {
833  TRestAnalysisTree* remotetree = t->GetAnalysisTree();
834 
835  // t->GetAnalysisTree()->SetEventInfo(t->GetOutputEvent());
836  // branchesT = t->GetAnalysisTree()->GetListOfBranches();
837  // branchesL = fAnalysisTree->GetListOfBranches();
838  // for (int i = 0; i < nBranches; i++) {
839  // TBranch* branchT = (TBranch*)branchesT->UncheckedAt(i);
840  // TBranch* branchL = (TBranch*)branchesL->UncheckedAt(i);
841  // branchL->SetAddress(branchT->GetAddress());
842  //}
843  fAnalysisTree->SetEventInfo(fOutputEvent);
844  for (int n = 0; n < remotetree->GetNumberOfObservables(); n++) {
845  fAnalysisTree->SetObservable(n, remotetree->GetObservable(n));
846  }
847 
848  fAnalysisTree->Fill();
849  }
850 
851  if (fEventTree != nullptr) {
852  // t->GetEventTree()->FillEvent(t->GetOutputEvent());
853  branchesT = t->GetEventTree()->GetListOfBranches();
854  branchesL = fEventTree->GetListOfBranches();
855  for (int i = 0; i < branchesT->GetLast() + 1; i++) {
856  TBranch* branchT = (TBranch*)branchesT->UncheckedAt(i);
857  TBranch* branchL = (TBranch*)branchesL->UncheckedAt(i);
858  branchL->SetAddress(branchT->GetAddress());
859  }
860  fEventTree->Fill();
861  }
862  fProcessedEvents++;
863 
864  // cout << fTempOutputDataFile << " " << fTempOutputDataFile->GetEND() << " " <<
865  // fAnalysisTree->GetDirectory() << endl; cout << fAutoSplitFileSize << endl; switch file if file size
866  // reaches target
867  if (fOutputDataFile->GetEND() > fFileSplitSize) {
868  if (fAnalysisTree->GetDirectory() == (TDirectory*)fOutputDataFile) {
869  fNFilesSplit++;
870  cout << "TRestProcessRunner: file size reaches limit (" << fFileSplitSize
871  << " bytes), switching to new file with index " << fNFilesSplit << endl;
872 
873  // wait 0.1s for the process to finish
874  usleep(100000);
875  for (auto th : fThreads) {
876  for (int j = 0; j < fProcessNumber; j++) {
877  auto proc = th->GetProcess(j);
878  proc->NotifyAnalysisTreeReset();
879  }
880  }
881 
882  fAnalysisTree->AutoSave();
883  fAnalysisTree->Reset();
884 
885  if (fEventTree != nullptr) {
886  fEventTree->AutoSave();
887  fEventTree->Reset();
888  }
889 
890  // write some information to the first(main) data file
891  fRunInfo->SetNFilesSplit(fNFilesSplit);
892  if (fOutputDataFile->GetName() != fOutputDataFileName) {
893  auto Mainfile = std::unique_ptr<TFile>{TFile::Open(fOutputDataFileName, "update")};
894  WriteProcessesMetadata();
895  Mainfile->Write(0, TObject::kOverwrite);
896  Mainfile->Close();
897  } else {
898  WriteProcessesMetadata();
899  }
900 
901  TFile* newfile = new TFile(fOutputDataFileName + "." + ToString(fNFilesSplit), "recreate");
902 
903  TBranch* branch = nullptr;
904  fAnalysisTree->SetDirectory(newfile);
905  TIter nextb1(fAnalysisTree->GetListOfBranches());
906  while ((branch = (TBranch*)nextb1())) {
907  branch->SetFile(newfile);
908  }
909  if (fAnalysisTree->GetBranchRef()) {
910  fAnalysisTree->GetBranchRef()->SetFile(newfile);
911  }
912 
913  if (fEventTree != nullptr) {
914  fEventTree->SetDirectory(newfile);
915  TIter nextb2(fEventTree->GetListOfBranches());
916  while ((branch = (TBranch*)nextb2())) {
917  branch->SetFile(newfile);
918  }
919  if (fEventTree->GetBranchRef()) {
920  fEventTree->GetBranchRef()->SetFile(newfile);
921  }
922  }
923 
924  fOutputDataFile->Write(nullptr, TObject::kOverwrite);
925  fOutputDataFile->Close();
926  delete fOutputDataFile;
927  fOutputDataFile = newfile;
928  } else {
929  RESTError << "internal error!" << RESTendl;
930  }
931  }
932 
933 #ifdef TIME_MEASUREMENT
934  high_resolution_clock::time_point t6 = high_resolution_clock::now();
935  writeTime += (int)duration_cast<microseconds>(t6 - t5).count();
936 #endif
937 
938  if (fProcStatus == kStep) {
939  fProcStatus = kPause;
940  cout << "Processed events:" << fProcessedEvents << endl;
941  }
942  }
943  mutex_write.unlock();
944 }
945 
952  RESTEssential << "Configuring output file, writing metadata and tree objects" << RESTendl;
953 #ifdef TIME_MEASUREMENT
954  fProcessInfo["ProcessTime"] = ToString(deltaTime) + "ms";
955 #endif
956  // write the last tree
957  fOutputDataFile->cd();
958  if (fEventTree != nullptr) {
959  fEventTree->Write(nullptr, kOverwrite);
960  }
961  if (fAnalysisTree != nullptr) {
962  fAnalysisTree->Write(nullptr, kOverwrite);
963  }
964 
965  // close file
966  fOutputDataFile->Write();
967  fOutputDataFile->Close();
968  delete fOutputDataFile;
969 
970  // merge process's data file to the main file
971  // we must call this method before writing process metadata,
972  // otherwise there would be problems of streamer missing
973  // https://github.com/rest-for-physics/framework/issues/348
974  fRunInfo->SetNFilesSplit(fNFilesSplit);
975  MergeOutputFile();
976 
977  // write metadata
978  WriteProcessesMetadata();
979 
980  // Write geometry
981  if (gGeoManager != nullptr) {
982  gGeoManager->Write("Geometry", TObject::kOverwrite);
983  }
984 }
985 
990  fOutputDataFile->cd();
991 
992  this->Write(nullptr, TObject::kWriteDelete);
993 
994  if (fRunInfo->GetFileProcess() != nullptr) {
995  fRunInfo->GetFileProcess()->Write(nullptr, kOverwrite);
996  }
997  for (int i = 0; i < fProcessNumber; i++) {
998  fThreads[0]->GetProcess(i)->Write(nullptr, kOverwrite);
999  }
1000 }
1001 
1007  RESTEssential << "Merging thread files together" << RESTendl;
1008  // add threads file
1009  // processes may have their own TObject output. They are stored in the threads
1010  // file these files are mush smaller that data file, so they are merged to the
1011  // data file.
1012  vector<string> files_to_merge;
1013  for (int i = 0; i < fThreadNumber; i++) {
1014  TFile* f = fThreads[i]->GetOutputFile();
1015  if (f != nullptr) {
1016  f->Write(nullptr, TObject::kOverwrite);
1017  f->Close();
1018  }
1019  files_to_merge.push_back(f->GetName());
1020  }
1021 
1022  if (TRestTools::fileExists((string)fOutputDataFileName)) {
1023  fOutputDataFile = fRunInfo->MergeToOutputFile(files_to_merge, (string)fOutputDataFileName);
1024  } else {
1025  RESTError << "Output file: " << fOutputDataFileName << " is lost?" << RESTendl;
1026  }
1027 }
1028 
1029 // tools
1034 #ifdef TIME_MEASUREMENT
1035  readTime = 0;
1036  writeTime = 0;
1037  deltaTime = 0;
1038 #endif
1039  time_t tt = time(nullptr);
1040  fProcessInfo["ProcessDate"] = Split(ToDateTimeString(tt), " ")[0];
1041 }
1042 
1050  TRestEventProcess* pc = REST_Reflection::Assembly((string)type);
1051  if (pc == nullptr) return nullptr;
1052 
1053  pc->SetConfigFile(fConfigFileName);
1054  pc->SetRunInfo(this->fRunInfo);
1055  pc->SetHostmgr(fHostmgr);
1056  pc->SetObservableValidation(fValidateObservables);
1057  pc->LoadConfigFromElement(ele, fElementGlobal, fVariables);
1058 
1059  return pc;
1060 }
1061 
1062 double TRestProcessRunner::GetReadingSpeed() {
1063  Long64_t bytes = 0;
1064  for (auto& n : bytesAdded) bytes += n;
1065  double speedbyte = bytes / (double)printInterval * (double)1000000 / ncalculated;
1066  return speedbyte;
1067 }
1068 
1074  if (fProcStatus == kNormal || fProcStatus == kIgnore) {
1075  // clearLinesAfterCursor();
1076 
1077  // TRestStringOutput cout;
1078  // cout.setcolor(COLOR_BOLDWHITE);
1079  // cout.setorientation(0);
1080  // cout.setborder("|");
1081  // CursorDown(1);
1082 
1083  double speedbyte = GetReadingSpeed();
1084 
1085  double progsum = 0;
1086  for (auto& n : progAdded) progsum += n;
1087  double progspeed = progsum / ncalculated / printInterval * 1000000;
1088 
1089  double prog = 0;
1090  if (fRunInfo->GetFeminosDaqTotalEvents() > 0) {
1091  prog = fProcessedEvents / (double)fRunInfo->GetFeminosDaqTotalEvents() * 100;
1092  } else if (fEventsToProcess == REST_MAXIMUM_EVENTS && fRunInfo->GetFileProcess() != nullptr)
1093  // Nevents is unknown, reading external data file
1094  {
1095  prog = fRunInfo->GetBytesRead() / (double)fRunInfo->GetTotalBytes() * 100;
1096  } else if (fRunInfo->GetFileProcess() != nullptr)
1097  // Nevents is known, reading external data file
1098  {
1099  prog = fProcessedEvents / (double)fEventsToProcess * 100;
1100  } else if (fEventsToProcess == REST_MAXIMUM_EVENTS)
1101  // Nevents is unknown, reading root file
1102  {
1103  prog = fRunInfo->GetCurrentEntry() / (double)inputTreeEntries * 100;
1104  } else {
1105  prog = fProcessedEvents / (double)fEventsToProcess * 100;
1106  }
1107 
1108  char* buffer = new char[500]();
1109  if (fRunInfo->GetFileProcess() != nullptr && speedbyte > 0) {
1110  sprintf(buffer, "%d Events (%.1fMB/s), ", fProcessedEvents, speedbyte / 1024 / 1024);
1111  } else {
1112  sprintf(buffer, "%d Events, ", fProcessedEvents);
1113  }
1114 
1115  string s1(buffer);
1116 
1117  if (fProcStatus == kNormal) {
1118  sprintf(buffer, "%.1f min ETA, (Pause: \"p\") ", (100 - progressLast) / progspeed / 60);
1119  } else {
1120  sprintf(buffer, "%.1f min ETA, (Pause Disabled) ", (100 - progressLast) / progspeed / 60);
1121  }
1122  string s2(buffer);
1123 
1124  int barlength = 0;
1125  if (REST_Display_CompatibilityMode) {
1126  barlength = 50;
1127  } else {
1128  barlength = Console::GetWidth() - s1.size() - s2.size() - 9;
1129  }
1130  sprintf(buffer, ("%.1f%[" + MakeProgressBar(prog, barlength) + "]").c_str(), prog);
1131  string s3(buffer);
1132 
1133  delete[] buffer;
1134 
1135  if (REST_Display_CompatibilityMode) {
1136  if (((int)prog) != progressLastPrinted) {
1137  cout << s1 << s2 << s3 << endl;
1138  progressLastPrinted = (int)prog;
1139  }
1140  } else if (fThreads[0]->GetVerboseLevel() < TRestStringOutput::REST_Verbose_Level::REST_Debug) {
1141  printf("%s", (s1 + s2 + s3 + "\r").c_str());
1142  fflush(stdout);
1143  }
1144 
1145  bytesAdded[positionCalculated] = fRunInfo->GetBytesRead() - bytesReadLast;
1146  bytesReadLast = fRunInfo->GetBytesRead();
1147  progAdded[positionCalculated] = prog - progressLast;
1148  progressLast = prog;
1149 
1150  positionCalculated++;
1151  if (positionCalculated >= ncalculated) positionCalculated -= ncalculated;
1152  }
1153 }
1154 
1158 string TRestProcessRunner::MakeProgressBar(int progress100, int length) {
1159  string progressbar(length, '-');
1160  int n = (double)progress100 / 100 * length;
1161  if (n < 0) n = 0;
1162  if (n > length + 1) n = length + 1;
1163  for (int i = 0; i < n; i++) {
1164  progressbar[i] = '=';
1165  }
1166  if (n < length + 1) progressbar[n] = '>';
1167  return progressbar;
1168 }
1169 
1170 TRestEvent* TRestProcessRunner::GetInputEvent() { return fRunInfo->GetInputEvent(); }
1171 
1172 TRestAnalysisTree* TRestProcessRunner::GetInputAnalysisTree() { return fRunInfo->GetAnalysisTree(); }
1173 
1176 
1177  string status;
1178  if (fProcStatus == kNormal)
1179  status = "Normal";
1180  else if (fProcStatus == kStopping)
1181  status = "Terminated";
1182  else
1183  status = "Unknown";
1184 
1185  RESTMetadata << "Status : " << status << RESTendl;
1186  RESTMetadata << "Processed events : " << fProcessedEvents << RESTendl;
1187  RESTMetadata << "Analysis tree branches : " << fNBranches << RESTendl;
1188  RESTMetadata << "Thread number : " << fThreadNumber << RESTendl;
1189  RESTMetadata << "Processes in each thread : " << fProcessNumber << RESTendl;
1190  RESTMetadata << "File auto split size: " << fFileSplitSize << RESTendl;
1191  RESTMetadata << "File compression level: " << fFileCompression << RESTendl;
1192  RESTMetadata << "******************************************" << RESTendl;
1193  RESTMetadata << RESTendl;
1194  RESTMetadata << RESTendl;
1195 }
REST core data-saving helper based on TTree.
A base class for any REST event process.
void SetRunInfo(TRestRun *r)
Set TRestRun for this process.
Bool_t singleThreadOnly() const
Return whether this process is single std::thread only.
Bool_t isExternal() const
Return whether this process is external process.
void SetParallelProcess(TRestEventProcess *p)
Add parallel process to this process.
A base class for any REST event.
Definition: TRestEvent.h:38
virtual void Initialize()=0
Definition: TRestEvent.cxx:73
virtual void PrintMetadata()
Implemented it in the derived metadata class to print out specific metadata information.
void SetHostmgr(TRestManager *m)
Set the host manager for this class.
TRestStringOutput::REST_Verbose_Level GetVerboseLevel()
returns the verboselevel in type of REST_Verbose_Level enumerator
Int_t LoadConfigFromElement(TiXmlElement *eSectional, TiXmlElement *eGlobal, std::map< std::string, std::string > envs={})
Main starter method.
void SetConfigFile(std::string configFilename)
set config file path from external
Running the processes efficiently with fantastic display.
void ReadProcInfo()
Create a process info list which used called by TRestRun::FormFormat().
void WriteProcessesMetadata()
Write process metadata to fOutputDataFile.
void PrintProcessedEvents(Int_t rateE)
Print number of events processed, file read speed, ETA and a progress bar.
void ResetRunTimes()
Reset running time count to 0.
void BeginOfInit()
Reads information from rml config file.
Int_t ReadConfig(const std::string &keydeclare, TiXmlElement *e)
method to deal with iterated child elements
void MergeOutputFile()
Calls TRestRun::MergeOutputFile() to merge the main file with process's tmp file.
void RunProcess()
The main executer of event process.
void EndOfInit()
Ending of the startup procedure.
TRestEventProcess * InstantiateProcess(TString type, TiXmlElement *ele)
InstantiateProcess in sequential start up.
void PauseMenu()
A pause menu providing some functions during the process.
void Initialize() override
REST run class.
std::string MakeProgressBar(int progress100, int length=100)
Make a string of progress bar with given length and percentage.
Int_t GetNextevtFunc(TRestEvent *targetevt, TRestAnalysisTree *targettree)
Get next event and copy it to the address of targetevt.
void PrintMetadata() override
Implemented it in the derived metadata class to print out specific metadata information.
void FillThreadEventFunc(TRestThread *t)
Calling back the FillEvent() method in TRestThread.
void ConfigOutputFile()
Forming an output file.
REST_Verbose_Level
Enumerate of verbose level, containing five levels.
@ REST_Essential
+show some essential information, as well as warnings
@ REST_Info
+show most of the information for each steps
@ REST_Debug
+show the defined debug messages
@ REST_Silent
show minimum information of the software, as well as error messages
Threaded worker of a process chain.
Definition: TRestThread.h:24
static std::pair< std::string, std::string > SeparatePathAndName(const std::string &fullname)
Separate path and filename in a full path+filename string, returns a pair of string.
Definition: TRestTools.cxx:813
static bool fileExists(const std::string &filename)
Returns true if the file (or directory) with path filename exists.
Definition: TRestTools.cxx:728
static bool isPathWritable(const std::string &path)
Returns true if the path given by argument is writable.
Definition: TRestTools.cxx:778
TRestReflector Assembly(const std::string &typeName)
Assembly an object of type: typeName, returning the allocated memory address and size.
std::vector< std::string > Split(std::string in, std::string separator, bool allowBlankString=false, bool removeWhiteSpaces=false, int startPos=-1)
Split the input string according to the given separator. Returning a vector of fragments.
std::string ToUpper(std::string in)
Convert string to its upper case. Alternative of TString::ToUpper.
Int_t StringToInteger(std::string in)
Gets an integer from a string.
std::string ToDateTimeString(time_t time)
Format time_t into string.