REST-for-Physics  v2.3
Rare Event Searches ToolKit for Physics
TRestThread.cxx
1 
25 
26 #include "TRestThread.h"
27 
28 using namespace std;
29 
30 #ifdef TIME_MEASUREMENT
31 #include <chrono>
32 using namespace chrono;
33 #endif
34 
39  fOutputEvent = nullptr;
40  fInputEvent = nullptr;
41 
42  fAnalysisTree = nullptr;
43  fEventTree = nullptr;
44 
45  fOutputFile = nullptr;
46 
47  fProcessChain.clear();
48 
49  isFinished = false;
50 
51  fCompressionLevel = 1;
53 }
54 
66  RESTInfo << "thread " << fThreadId << ": validating process chain" << RESTendl;
67 
68  // add the non-general processes to list for validation
69  vector<TRestEventProcess*> processes;
70  for (unsigned int i = 0; i < fProcessChain.size(); i++) {
71  RESTValue inEvent = fProcessChain[i]->GetInputEvent();
72  RESTValue outEvent = fProcessChain[i]->GetOutputEvent();
73 
74  if (outEvent.type == "TRestEvent" && inEvent.type == "TRestEvent") {
75  RESTInfo << "general process: " << fProcessChain[i]->GetName() << RESTendl;
76  continue;
77  } else if (outEvent.cl && outEvent.cl->InheritsFrom("TRestEvent") && inEvent.cl &&
78  inEvent.cl->InheritsFrom("TRestEvent")) {
79  processes.push_back(fProcessChain[i]);
80  } else {
81  RESTError << "Process: " << fProcessChain[i]->ClassName()
82  << " not properly written, the input/output event is illegal!" << RESTendl;
83  RESTError << "Hint: they must be inherited from TRestEvent" << RESTendl;
84  abort();
85  }
86  }
87 
88  if (processes.size() > 0) {
89  // verify that the input event of first process is OK
90  if (input != nullptr) {
91  if ((string)input->ClassName() != processes[0]->GetInputEvent().type) {
92  RESTError << "(ValidateChain): Input event type does not match!" << RESTendl;
93  cout << "Input type of the first non-external process in chain: "
94  << processes[0]->GetInputEvent().type << endl;
95  cout << "The event type from file: " << input->ClassName() << endl;
96  cout << "No events will be processed. Please correct event process "
97  "input/output."
98  << endl;
99  }
100  }
101 
102  // verify that the output event type is good to be the input event of the next process
103  for (unsigned int i = 0; i < processes.size() - 1; i++) {
104  string outEventType = processes[i]->GetOutputEvent().type;
105  string nextinEventType = processes[i + 1]->GetInputEvent().type;
106  if (outEventType != nextinEventType && outEventType != "TRestEvent" &&
107  nextinEventType != "TRestEvent") {
108  RESTError << "(ValidateChain): Event process input/output does not match" << RESTendl;
109  RESTError << "The event output for process " << processes[i]->GetName() << " is "
110  << outEventType << RESTendl;
111  RESTError << "The event input for process " << processes[i + 1]->GetName() << " is "
112  << nextinEventType << RESTendl;
113  RESTError << "No events will be processed. Please correctly connect the process chain!"
114  << RESTendl;
115  GetChar();
116  return -1;
117  }
118  }
119  }
120  return 0;
121 }
122 
123 void TRestThread::SetThreadId(Int_t id) {
124  fThreadId = id;
125  if (fThreadId != 0 && fVerboseLevel > TRestStringOutput::REST_Verbose_Level::REST_Essential)
127 }
128 
152  RESTDebug << "Processing ..." << RESTendl;
153  for (int i = 0; i < 5; i++) {
154  TRestEvent* ProcessedEvent = fInputEvent;
155  RESTDebug << "Test run " << i << " : Input Event ---- " << fInputEvent->ClassName() << "("
156  << fInputEvent << ")" << RESTendl;
157  for (unsigned int j = 0; j < fProcessChain.size(); j++) {
158  RESTDebug << "t" << fThreadId << "p" << j << ": " << fProcessChain[j]->ClassName() << RESTendl;
159 
160  fProcessChain[j]->SetObservableValidation(true);
161 
162  fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
163  ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
164  if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
165  // if the output of ProcessEvent() is NULL we assume the event is cut.
166  // we try to use GetOutputEvent()
167  if (ProcessedEvent == nullptr) {
168  ProcessedEvent = fProcessChain[j]->GetOutputEvent();
169  }
170  // if still null we perform another try
171  if (ProcessedEvent == nullptr) {
172  RESTDebug << " ---- NULL" << RESTendl;
173  break;
174  }
175  // check if the output event is same as the processed event
176  TRestEvent* outputevent = fProcessChain[j]->GetOutputEvent();
177  if (outputevent != ProcessedEvent) {
178  RESTWarning
179  << "Test run, in " << fProcessChain[j]->ClassName()
180  << " : output event is different with process returned event! Please check to assign "
181  "the TRestEvent datamember as inputEvent in ProcessEvent() method"
182  << RESTendl;
183  }
184 
185  fProcessChain[j]->EndOfEventProcess();
186 
187  fProcessChain[j]->SetObservableValidation(false);
188 
189  RESTDebug << " .... " << ProcessedEvent->ClassName() << "(" << ProcessedEvent << ")" << RESTendl;
190  }
191 
192  fOutputEvent = ProcessedEvent;
193  if (fOutputEvent != nullptr) {
194  RESTDebug << "Output Event ---- " << fOutputEvent->ClassName() << "(" << fOutputEvent << ")"
195  << RESTendl;
196  break;
197  } else {
198  RESTDebug << "Null output, trying again" << RESTendl;
199  }
200  fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree);
201  }
202  if (fOutputEvent == nullptr) {
203  // fOutputEvent = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent();
204  return false;
205  }
206  return true;
207 }
208 
223 void TRestThread::PrepareToProcess(bool* outputConfig) {
224  RESTDebug << "Entering TRestThread::PrepareToProcess" << RESTendl;
225 
226  string threadFileName;
227  if (fHostRunner->GetOutputDataFile()->GetName() == (string) "/dev/null") {
228  threadFileName = "/dev/null";
229  } else {
230  threadFileName = REST_TMP_PATH + "rest_thread_tmp" + ToString(this) + ".root";
231  }
232 
233  bool outputConfigToDel = false;
234  if (outputConfig == nullptr) {
235  outputConfigToDel = true;
236  outputConfig = new bool[4];
237  for (int i = 0; i < 4; i++) {
238  outputConfig[i] = true;
239  }
240  }
241  if (outputConfig[3] == false) {
242  cout << "Error! output analysis must be on!" << endl;
243  exit(1);
244  }
245 
246  if (fProcessChain.size() > 0) {
247  RESTDebug << "TRestThread: Creating file : " << threadFileName << RESTendl;
248  fOutputFile = new TFile(threadFileName.c_str(), "recreate");
249  fOutputFile->SetCompressionLevel(fCompressionLevel);
250  fAnalysisTree = new TRestAnalysisTree("AnalysisTree_" + ToString(fThreadId), "dummyTree");
251  fAnalysisTree->DisableQuickObservableValueSetting();
252 
253  RESTDebug << "TRestThread: Finding first input event of process chain..." << RESTendl;
254  if (fHostRunner->GetInputEvent() == nullptr) {
255  RESTError
256  << "Input event is not initialized from TRestRun! Please check your input file and file "
257  "reading process!"
258  << RESTendl;
259  exit(1);
260  }
261  fInputEvent = (TRestEvent*)fHostRunner->GetInputEvent()->Clone();
262  string chainInputType = fProcessChain[0]->GetInputEvent().type;
263  // we check if the process chain input type matches fInputEvent
264  if (chainInputType != "TRestEvent" && chainInputType != (string)fInputEvent->ClassName()) {
265  cout << "REST ERROR: Input event type does not match!" << endl;
266  cout << "Process input type : " << chainInputType
267  << ", File input type : " << fInputEvent->ClassName() << endl;
268  exit(1);
269  }
270 
271  RESTDebug << "TRestThread: Reading input event and input observable..." << RESTendl;
272  if (fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree) != 0) {
273  RESTError << "In thread " << fThreadId << ")::Failed to read input event, process cannot start!"
274  << RESTendl;
275  exit(1);
276  }
277 
278  RESTDebug << "TRestThread: Init process..." << RESTendl;
279  for (unsigned int i = 0; i < fProcessChain.size(); i++) {
280  fProcessChain[i]->SetAnalysisTree(fAnalysisTree);
281  for (unsigned int j = 0; j < fProcessChain.size(); j++) {
282  fProcessChain[i]->SetFriendProcess(fProcessChain[j]);
283  }
284  RESTDebug << "InitProcess() process for " << fProcessChain[i]->ClassName() << RESTendl;
285  fProcessChain[i]->InitProcess();
286  }
287 
288  // test run
289  if (fHostRunner->UseTestRun()) {
290  RESTDebug << "Test Run..." << RESTendl;
291  if (!TestRun()) {
292  RESTError << "In thread " << fThreadId << ")::test run failed!" << RESTendl;
293  RESTError << "One of the processes has NULL pointer fOutputEvent!" << RESTendl;
295  RESTError << "To see more detail, turn on debug mode for "
296  "TRestProcessRunner!"
297  << RESTendl;
298  exit(1);
299  }
300  RESTDebug << "Test Run complete!" << RESTendl;
301  } else {
302  RESTDebug << "Initializing output event" << RESTendl;
303  string chainOutputType = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent().type;
304  fOutputEvent = REST_Reflection::Assembly(chainOutputType);
305  if (fOutputEvent == nullptr) {
306  exit(1);
307  }
308  }
309 
311  // create dummy tree to store events
312  fEventTree = new TTree((TString) "EventTree_" + ToString(fThreadId), "dummyTree");
313  vector<pair<TString, TRestEvent*>> branchesToAdd;
314  // avoid duplicated branch
315  // if event type is same, we only create branch for the last of this type
316  // event
317  if (outputConfig[1] == true) {
318  // outputConfig[1]: the input events of each processes in the process chain(can be many)
319 
320  // the input event of process chain
321  TRestEvent* evt = fInputEvent;
322  {
323  TString BranchName = (TString)evt->GetName() + "Branch";
324  if (branchesToAdd.size() == 0)
325  branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
326  else
327  for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
328  if (branchesToAdd[j].first == BranchName)
329  branchesToAdd[j].second = evt;
330  else if (j == branchesToAdd.size() - 1)
331  branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
332  }
333  }
334  // the input event of other processes
335  for (unsigned int i = 0; i < fProcessChain.size(); i++) {
336  TRestEvent* evt = fProcessChain[i]->GetOutputEvent();
337  if (evt != nullptr) {
338  TString BranchName = (TString)evt->GetName() + "Branch";
339  if (branchesToAdd.size() == 0)
340  branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
341  else
342  for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
343  if (branchesToAdd[j].first == BranchName)
344  branchesToAdd[j].second = evt;
345  else if (j == branchesToAdd.size() - 1)
346  branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
347  }
348  }
349  }
350  }
351 
352  if (outputConfig[2] == true) {
353  // outputConfig[2]: output event: the output event of the last process in the process chain
354  TRestEvent* evt = fOutputEvent;
355  {
356  TString BranchName = (TString)evt->GetName() + "Branch";
357  if (branchesToAdd.size() == 0)
358  branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
359  else
360  for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
361  if (branchesToAdd[j].first == BranchName)
362  branchesToAdd[j].second = evt;
363  else if (j == branchesToAdd.size() - 1)
364  branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
365  }
366  }
367  }
368 
369  auto iter = branchesToAdd.begin();
370  while (iter != branchesToAdd.end()) {
371  fEventTree->Branch(iter->first, iter->second->ClassName(), iter->second);
372  iter++;
373  }
374 
375  if (fEventTree->GetListOfBranches()->GetLast() < 0) // if no event branches are created
376  {
377  delete fEventTree;
378  fEventTree = nullptr;
379  }
380 
381  // fAnalysisTree->CreateBranches();
382 
383  // create output temp file for process-defined output object
384 
385  fOutputFile->cd();
386  fOutputFile->Clear();
387  for (unsigned int i = 0; i < fProcessChain.size(); i++) {
388  fProcessChain[i]->InitProcess();
389  }
390 
391  RESTDebug << "Thread " << fThreadId << " Ready!" << RESTendl;
392  } else {
393  string tmp = fHostRunner->GetInputEvent()->ClassName();
394  fInputEvent = REST_Reflection::Assembly(tmp);
395  fOutputEvent = fInputEvent;
396  fOutputFile = new TFile(threadFileName.c_str(), "recreate");
397  fOutputFile->SetCompressionLevel(fCompressionLevel);
398  fOutputFile->cd();
399 
400  RESTDebug << "Creating Analysis Tree..." << RESTendl;
401  fAnalysisTree = new TRestAnalysisTree("AnalysisTree_" + ToString(fThreadId), "dummyTree");
402  fAnalysisTree->DisableQuickObservableValueSetting();
403  fEventTree = new TTree((TString) "EventTree_" + ToString(fThreadId), "dummyTree");
404  // fEventTree->CreateEventBranches();
405 
406  if (outputConfig[2]) {
407  TString BranchName = (TString)fInputEvent->GetName() + "Branch";
408  if (fEventTree->GetBranch(BranchName) == nullptr) // avoid duplicated branch
409  {
410  fEventTree->Branch(BranchName, fInputEvent->ClassName(), fInputEvent);
411  }
412  }
413  // currently, external process analysis is not supported!
414  }
415  if (outputConfigToDel) delete outputConfig;
416 }
417 
434  isFinished = false;
435 
436  while (fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree) == 0) {
437  ProcessEvent();
438  /*if (fOutputEvent != nullptr) */ fHostRunner->FillThreadEventFunc(this);
439  }
440 
441  // fHostRunner->WriteThreadFileFunc(this);
442  isFinished = true;
443 }
444 
450  t = thread(&TRestThread::StartProcess, this);
451  t.detach();
452  // t.join();
453 }
454 
463  RESTDebug << "Entering TRestThread::AddProcess" << RESTendl;
464 
465  fProcessChain.push_back(process);
466  // if (Console::CompatibilityMode && process->GetVerboseLevel() >= REST_Debug) {
467  // warning << "REST WARNING! Cannot use \"debug\" output level for process " << process->GetName()
468  // << endl;
469  // process->SetVerboseLevel(REST_Info);
470  //}
471  if (process->GetVerboseLevel() > fVerboseLevel) {
472  SetVerboseLevel(process->GetVerboseLevel());
473  }
474 }
475 
484  TRestEvent* ProcessedEvent = fInputEvent;
485  fProcessNullReturned = false;
486 
488 #ifdef TIME_MEASUREMENT
489  vector<int> processtime(fProcessChain.size());
490 #endif
491 
492  for (unsigned int j = 0; j < fProcessChain.size(); j++) {
493  cout << "------- Starting process " + (string)fProcessChain[j]->GetName() + " -------" << endl;
494 
495 #ifdef TIME_MEASUREMENT
496  high_resolution_clock::time_point t1 = high_resolution_clock::now();
497 #endif
498 
499  fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
500  ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
501  if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
502  fProcessChain[j]->EndOfEventProcess();
503 
504 #ifdef TIME_MEASUREMENT
505  high_resolution_clock::time_point t2 = high_resolution_clock::now();
506  processtime[j] = (int)duration_cast<microseconds>(t2 - t1).count();
507 #endif
508 
509  if (ProcessedEvent == nullptr) {
510  cout << "------- End of process " + (string)fProcessChain[j]->GetName() +
511  " (NULL returned) -------"
512  << endl;
513  fProcessNullReturned = true;
514  break;
515  } else {
516  cout << "------- End of process " + (string)fProcessChain[j]->GetName() + " -------" << endl;
517  }
518 
519  if (fProcessChain[j]->GetVerboseLevel() >= TRestStringOutput::REST_Verbose_Level::REST_Extreme &&
520  j < fProcessChain.size() - 1) {
521  GetChar();
522  }
523  }
524 
525 #ifdef TIME_MEASUREMENT
526  cout << "Process timing summary : " << endl;
527  for (unsigned int j = 0; j < fProcessChain.size(); j++) {
528  cout << fProcessChain[j]->ClassName() << "(" << fProcessChain[j]->GetName()
529  << ") : " << processtime[j] / (double)1000 << " ms" << endl;
530  }
531 #endif
532 
533  if (fHostRunner->UseTestRun()) {
534  fOutputEvent = ProcessedEvent;
535  } else {
536  cout << "Transferring output event..." << endl;
537  if (ProcessedEvent != nullptr) {
538  ProcessedEvent->CloneTo(fOutputEvent);
539  }
540  }
541 
542  GetChar(
543  "======= End of Event " +
544  ((fOutputEvent == nullptr) ? ToString(fInputEvent->GetID()) : ToString(fOutputEvent->GetID())) +
545  " =======");
546  } else {
547  for (unsigned int j = 0; j < fProcessChain.size(); j++) {
548  fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
549  ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
550  if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
551  fProcessChain[j]->EndOfEventProcess();
552  if (ProcessedEvent == nullptr) {
553  fProcessNullReturned = true;
554  break;
555  }
556  }
557 
558  if (fHostRunner->UseTestRun()) {
559  fOutputEvent = ProcessedEvent;
560  } else {
561  if (ProcessedEvent != nullptr) {
562  ProcessedEvent->CloneTo(fOutputEvent);
563  }
564  }
565  }
566 }
567 
574  RESTInfo << "TRestThread : Writing temp file. Thread id : " << fThreadId << RESTendl;
575 
576  if (fOutputFile == nullptr) return;
577 
578  fOutputFile->cd();
579  Int_t nErrors = 0;
580  Int_t nWarnings = 0;
581  for (unsigned int i = 0; i < fProcessChain.size(); i++) {
582  // The processes must call object->Write in this method
583  fProcessChain[i]->EndProcess();
584  if (fProcessChain[i]->GetError()) nErrors++;
585  if (fProcessChain[i]->GetWarning()) nWarnings++;
586  }
587 
588  if (nWarnings)
589  RESTWarning << nWarnings
590  << " process warnings were found! Use run0->PrintWarnings(); to get additional info."
591  << RESTendl;
592  if (nErrors)
593  RESTError << nErrors << " process errors were found! Use run0->PrintErrors(); to get additional info."
594  << RESTendl;
595 
596  delete fAnalysisTree;
597 }
std::string type
Type of the wrapped object.
TClass * cl
Pointer to the corresponding TClass helper, if the wrapped object is in class type.
REST core data-saving helper based on TTree.
A base class for any REST event process.
A base class for any REST event.
Definition: TRestEvent.h:38
virtual void CloneTo(TRestEvent *target)
Clone the content of this TRestEvent object to another.
Definition: TRestEvent.cxx:98
TRestStringOutput::REST_Verbose_Level GetVerboseLevel()
returns the verboselevel in type of REST_Verbose_Level enumerator
@ REST_Essential
+show some essential information, as well as warnings
@ REST_Debug
+show the defined debug messages
void AddProcess(TRestEventProcess *process)
Add a process.
bool TestRun()
Make a test run of our process chain.
void Initialize()
Set variables by default during initialization.
Definition: TRestThread.cxx:38
void StartProcess()
The main function of this class. Thread will run this function until the end.
void EndProcess()
Write and close the output file.
void ProcessEvent()
Process a single event.
void PrepareToProcess(bool *outputConfig=nullptr)
Prepare some thing before we can start process.
Int_t ValidateChain(TRestEvent *input)
Check if the input/output of each process in the process chain matches.
Definition: TRestThread.cxx:65
void StartThread()
Create a thread with the method StartProcess().
TRestReflector Assembly(const std::string &typeName)
Assembly an object of type: typeName, returning the allocated memory address and size.
Int_t GetChar(std::string hint="Press a KEY to continue ...")
Helps to pause the program, printing a message before pausing.