REST-for-Physics  v2.3
Rare Event Searches ToolKit for Physics
Loading...
Searching...
No Matches
TRestThread.cxx
1
25
26#include "TRestThread.h"
27
28using namespace std;
29
30#ifdef TIME_MEASUREMENT
31#include <chrono>
32using namespace chrono;
33#endif
34
39 fOutputEvent = nullptr;
40 fInputEvent = nullptr;
41
42 fAnalysisTree = nullptr;
43 fEventTree = nullptr;
44
45 fOutputFile = nullptr;
46
47 fProcessChain.clear();
48
49 isFinished = false;
50
51 fCompressionLevel = 1;
53}
54
66 RESTInfo << "thread " << fThreadId << ": validating process chain" << RESTendl;
67
68 // add the non-general processes to list for validation
69 vector<TRestEventProcess*> processes;
70 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
71 RESTValue inEvent = fProcessChain[i]->GetInputEvent();
72 RESTValue outEvent = fProcessChain[i]->GetOutputEvent();
73
74 if (outEvent.type == "TRestEvent" && inEvent.type == "TRestEvent") {
75 RESTInfo << "general process: " << fProcessChain[i]->GetName() << RESTendl;
76 continue;
77 } else if (outEvent.cl && outEvent.cl->InheritsFrom("TRestEvent") && inEvent.cl &&
78 inEvent.cl->InheritsFrom("TRestEvent")) {
79 processes.push_back(fProcessChain[i]);
80 } else {
81 RESTError << "Process: " << fProcessChain[i]->ClassName()
82 << " not properly written, the input/output event is illegal!" << RESTendl;
83 RESTError << "Hint: they must be inherited from TRestEvent" << RESTendl;
84 abort();
85 }
86 }
87
88 if (processes.size() > 0) {
89 // verify that the input event of first process is OK
90 if (input != nullptr) {
91 if ((string)input->ClassName() != processes[0]->GetInputEvent().type) {
92 RESTError << "(ValidateChain): Input event type does not match!" << RESTendl;
93 cout << "Input type of the first non-external process in chain: "
94 << processes[0]->GetInputEvent().type << endl;
95 cout << "The event type from file: " << input->ClassName() << endl;
96 cout << "No events will be processed. Please correct event process "
97 "input/output."
98 << endl;
99 }
100 }
101
102 // verify that the output event type is good to be the input event of the next process
103 for (unsigned int i = 0; i < processes.size() - 1; i++) {
104 string outEventType = processes[i]->GetOutputEvent().type;
105 string nextinEventType = processes[i + 1]->GetInputEvent().type;
106 if (outEventType != nextinEventType && outEventType != "TRestEvent" &&
107 nextinEventType != "TRestEvent") {
108 RESTError << "(ValidateChain): Event process input/output does not match" << RESTendl;
109 RESTError << "The event output for process " << processes[i]->GetName() << " is "
110 << outEventType << RESTendl;
111 RESTError << "The event input for process " << processes[i + 1]->GetName() << " is "
112 << nextinEventType << RESTendl;
113 RESTError << "No events will be processed. Please correctly connect the process chain!"
114 << RESTendl;
115 GetChar();
116 return -1;
117 }
118 }
119 }
120 return 0;
121}
122
123void TRestThread::SetThreadId(Int_t id) {
124 fThreadId = id;
125 if (fThreadId != 0 && fVerboseLevel > TRestStringOutput::REST_Verbose_Level::REST_Essential)
127}
128
152 RESTDebug << "Processing ..." << RESTendl;
153 for (int i = 0; i < 5; i++) {
154 TRestEvent* ProcessedEvent = fInputEvent;
155 RESTDebug << "Test run " << i << " : Input Event ---- " << fInputEvent->ClassName() << "("
156 << fInputEvent << ")" << RESTendl;
157 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
158 RESTDebug << "t" << fThreadId << "p" << j << ": " << fProcessChain[j]->ClassName() << RESTendl;
159
160 fProcessChain[j]->SetObservableValidation(true);
161
162 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
163 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
164 if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
165 // if the output of ProcessEvent() is NULL we assume the event is cut.
166 // we try to use GetOutputEvent()
167 if (ProcessedEvent == nullptr) {
168 ProcessedEvent = fProcessChain[j]->GetOutputEvent();
169 }
170 // if still null we perform another try
171 if (ProcessedEvent == nullptr) {
172 RESTDebug << " ---- NULL" << RESTendl;
173 break;
174 }
175 // check if the output event is same as the processed event
176 TRestEvent* outputevent = fProcessChain[j]->GetOutputEvent();
177 if (outputevent != ProcessedEvent) {
178 RESTWarning
179 << "Test run, in " << fProcessChain[j]->ClassName()
180 << " : output event is different with process returned event! Please check to assign "
181 "the TRestEvent datamember as inputEvent in ProcessEvent() method"
182 << RESTendl;
183 }
184
185 fProcessChain[j]->EndOfEventProcess();
186
187 fProcessChain[j]->SetObservableValidation(false);
188
189 RESTDebug << " .... " << ProcessedEvent->ClassName() << "(" << ProcessedEvent << ")" << RESTendl;
190 }
191
192 fOutputEvent = ProcessedEvent;
193 if (fOutputEvent != nullptr) {
194 RESTDebug << "Output Event ---- " << fOutputEvent->ClassName() << "(" << fOutputEvent << ")"
195 << RESTendl;
196 break;
197 } else {
198 RESTDebug << "Null output, trying again" << RESTendl;
199 }
200 fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree);
201 }
202 if (fOutputEvent == nullptr) {
203 // fOutputEvent = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent();
204 return false;
205 }
206 return true;
207}
208
223void TRestThread::PrepareToProcess(bool* outputConfig) {
224 RESTDebug << "Entering TRestThread::PrepareToProcess" << RESTendl;
225
226 string threadFileName;
227 if (fHostRunner->GetOutputDataFile()->GetName() == (string) "/dev/null") {
228 threadFileName = "/dev/null";
229 } else {
230 threadFileName = REST_TMP_PATH + "rest_thread_tmp" + ToString(this) + ".root";
231 }
232
233 bool outputConfigToDel = false;
234 if (outputConfig == nullptr) {
235 outputConfigToDel = true;
236 outputConfig = new bool[4];
237 for (int i = 0; i < 4; i++) {
238 outputConfig[i] = true;
239 }
240 }
241 if (outputConfig[3] == false) {
242 cout << "Error! output analysis must be on!" << endl;
243 exit(1);
244 }
245
246 if (fProcessChain.size() > 0) {
247 RESTDebug << "TRestThread: Creating file : " << threadFileName << RESTendl;
248 fOutputFile = new TFile(threadFileName.c_str(), "recreate");
249 fOutputFile->SetCompressionLevel(fCompressionLevel);
250 fAnalysisTree = new TRestAnalysisTree("AnalysisTree_" + ToString(fThreadId), "dummyTree");
251 fAnalysisTree->DisableQuickObservableValueSetting();
252
253 RESTDebug << "TRestThread: Finding first input event of process chain..." << RESTendl;
254 if (fHostRunner->GetInputEvent() == nullptr) {
255 RESTError
256 << "Input event is not initialized from TRestRun! Please check your input file and file "
257 "reading process!"
258 << RESTendl;
259 exit(1);
260 }
261 fInputEvent = (TRestEvent*)fHostRunner->GetInputEvent()->Clone();
262 string chainInputType = fProcessChain[0]->GetInputEvent().type;
263 // we check if the process chain input type matches fInputEvent
264 if (chainInputType != "TRestEvent" && chainInputType != (string)fInputEvent->ClassName()) {
265 cout << "REST ERROR: Input event type does not match!" << endl;
266 cout << "Process input type : " << chainInputType
267 << ", File input type : " << fInputEvent->ClassName() << endl;
268 exit(1);
269 }
270
271 RESTDebug << "TRestThread: Reading input event and input observable..." << RESTendl;
272 if (fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree) != 0) {
273 RESTError << "In thread " << fThreadId << ")::Failed to read input event, process cannot start!"
274 << RESTendl;
275 exit(1);
276 }
277
278 RESTDebug << "TRestThread: Init process..." << RESTendl;
279 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
280 fProcessChain[i]->SetAnalysisTree(fAnalysisTree);
281 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
282 fProcessChain[i]->SetFriendProcess(fProcessChain[j]);
283 }
284 RESTDebug << "InitProcess() process for " << fProcessChain[i]->ClassName() << RESTendl;
285 fProcessChain[i]->InitProcess();
286 }
287
288 // test run
289 if (fHostRunner->UseTestRun()) {
290 RESTDebug << "Test Run..." << RESTendl;
291 if (!TestRun()) {
292 RESTError << "In thread " << fThreadId << ")::test run failed!" << RESTendl;
293 RESTError << "One of the processes has NULL pointer fOutputEvent!" << RESTendl;
295 RESTError << "To see more detail, turn on debug mode for "
296 "TRestProcessRunner!"
297 << RESTendl;
298 exit(1);
299 }
300 RESTDebug << "Test Run complete!" << RESTendl;
301 } else {
302 RESTDebug << "Initializing output event" << RESTendl;
303 string chainOutputType = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent().type;
304 fOutputEvent = REST_Reflection::Assembly(chainOutputType);
305 if (fOutputEvent == nullptr) {
306 exit(1);
307 }
308 }
309
311 // create dummy tree to store events
312 fEventTree = new TTree((TString) "EventTree_" + ToString(fThreadId), "dummyTree");
313 vector<pair<TString, TRestEvent*>> branchesToAdd;
314 // avoid duplicated branch
315 // if event type is same, we only create branch for the last of this type
316 // event
317 if (outputConfig[1] == true) {
318 // outputConfig[1]: the input events of each processes in the process chain(can be many)
319
320 // the input event of process chain
321 TRestEvent* evt = fInputEvent;
322 {
323 TString BranchName = (TString)evt->GetName() + "Branch";
324 if (branchesToAdd.size() == 0)
325 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
326 else
327 for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
328 if (branchesToAdd[j].first == BranchName)
329 branchesToAdd[j].second = evt;
330 else if (j == branchesToAdd.size() - 1)
331 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
332 }
333 }
334 // the input event of other processes
335 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
336 TRestEvent* evt = fProcessChain[i]->GetOutputEvent();
337 if (evt != nullptr) {
338 TString BranchName = (TString)evt->GetName() + "Branch";
339 if (branchesToAdd.size() == 0)
340 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
341 else
342 for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
343 if (branchesToAdd[j].first == BranchName)
344 branchesToAdd[j].second = evt;
345 else if (j == branchesToAdd.size() - 1)
346 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
347 }
348 }
349 }
350 }
351
352 if (outputConfig[2] == true) {
353 // outputConfig[2]: output event: the output event of the last process in the process chain
354 TRestEvent* evt = fOutputEvent;
355 {
356 TString BranchName = (TString)evt->GetName() + "Branch";
357 if (branchesToAdd.size() == 0)
358 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
359 else
360 for (unsigned int j = 0; j < branchesToAdd.size(); j++) {
361 if (branchesToAdd[j].first == BranchName)
362 branchesToAdd[j].second = evt;
363 else if (j == branchesToAdd.size() - 1)
364 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
365 }
366 }
367 }
368
369 auto iter = branchesToAdd.begin();
370 while (iter != branchesToAdd.end()) {
371 fEventTree->Branch(iter->first, iter->second->ClassName(), iter->second);
372 iter++;
373 }
374
375 if (fEventTree->GetListOfBranches()->GetLast() < 0) // if no event branches are created
376 {
377 delete fEventTree;
378 fEventTree = nullptr;
379 }
380
381 // fAnalysisTree->CreateBranches();
382
383 // create output temp file for process-defined output object
384
385 fOutputFile->cd();
386 fOutputFile->Clear();
387 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
388 fProcessChain[i]->InitProcess();
389 }
390
391 RESTDebug << "Thread " << fThreadId << " Ready!" << RESTendl;
392 } else {
393 string tmp = fHostRunner->GetInputEvent()->ClassName();
394 fInputEvent = REST_Reflection::Assembly(tmp);
395 fOutputEvent = fInputEvent;
396 fOutputFile = new TFile(threadFileName.c_str(), "recreate");
397 fOutputFile->SetCompressionLevel(fCompressionLevel);
398 fOutputFile->cd();
399
400 RESTDebug << "Creating Analysis Tree..." << RESTendl;
401 fAnalysisTree = new TRestAnalysisTree("AnalysisTree_" + ToString(fThreadId), "dummyTree");
402 fAnalysisTree->DisableQuickObservableValueSetting();
403 fEventTree = new TTree((TString) "EventTree_" + ToString(fThreadId), "dummyTree");
404 // fEventTree->CreateEventBranches();
405
406 if (outputConfig[2]) {
407 TString BranchName = (TString)fInputEvent->GetName() + "Branch";
408 if (fEventTree->GetBranch(BranchName) == nullptr) // avoid duplicated branch
409 {
410 fEventTree->Branch(BranchName, fInputEvent->ClassName(), fInputEvent);
411 }
412 }
413 // currently, external process analysis is not supported!
414 }
415 if (outputConfigToDel) delete outputConfig;
416}
417
434 isFinished = false;
435
436 while (fHostRunner->GetNextevtFunc(fInputEvent, fAnalysisTree) == 0) {
437 ProcessEvent();
438 /*if (fOutputEvent != nullptr) */ fHostRunner->FillThreadEventFunc(this);
439 }
440
441 // fHostRunner->WriteThreadFileFunc(this);
442 isFinished = true;
443}
444
450 t = thread(&TRestThread::StartProcess, this);
451 t.detach();
452 // t.join();
453}
454
463 RESTDebug << "Entering TRestThread::AddProcess" << RESTendl;
464
465 fProcessChain.push_back(process);
466 // if (Console::CompatibilityMode && process->GetVerboseLevel() >= REST_Debug) {
467 // warning << "REST WARNING! Cannot use \"debug\" output level for process " << process->GetName()
468 // << endl;
469 // process->SetVerboseLevel(REST_Info);
470 //}
471 if (process->GetVerboseLevel() > fVerboseLevel) {
472 SetVerboseLevel(process->GetVerboseLevel());
473 }
474}
475
484 TRestEvent* ProcessedEvent = fInputEvent;
485 fProcessNullReturned = false;
486
488#ifdef TIME_MEASUREMENT
489 vector<int> processtime(fProcessChain.size());
490#endif
491
492 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
493 cout << "------- Starting process " + (string)fProcessChain[j]->GetName() + " -------" << endl;
494
495#ifdef TIME_MEASUREMENT
496 high_resolution_clock::time_point t1 = high_resolution_clock::now();
497#endif
498
499 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
500 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
501 if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
502 fProcessChain[j]->EndOfEventProcess();
503
504#ifdef TIME_MEASUREMENT
505 high_resolution_clock::time_point t2 = high_resolution_clock::now();
506 processtime[j] = (int)duration_cast<microseconds>(t2 - t1).count();
507#endif
508
509 if (ProcessedEvent == nullptr) {
510 cout << "------- End of process " + (string)fProcessChain[j]->GetName() +
511 " (NULL returned) -------"
512 << endl;
513 fProcessNullReturned = true;
514 break;
515 } else {
516 cout << "------- End of process " + (string)fProcessChain[j]->GetName() + " -------" << endl;
517 }
518
519 if (fProcessChain[j]->GetVerboseLevel() >= TRestStringOutput::REST_Verbose_Level::REST_Extreme &&
520 j < fProcessChain.size() - 1) {
521 GetChar();
522 }
523 }
524
525#ifdef TIME_MEASUREMENT
526 cout << "Process timing summary : " << endl;
527 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
528 cout << fProcessChain[j]->ClassName() << "(" << fProcessChain[j]->GetName()
529 << ") : " << processtime[j] / (double)1000 << " ms" << endl;
530 }
531#endif
532
533 if (fHostRunner->UseTestRun()) {
534 fOutputEvent = ProcessedEvent;
535 } else {
536 cout << "Transferring output event..." << endl;
537 if (ProcessedEvent != nullptr) {
538 ProcessedEvent->CloneTo(fOutputEvent);
539 }
540 }
541
542 GetChar(
543 "======= End of Event " +
544 ((fOutputEvent == nullptr) ? ToString(fInputEvent->GetID()) : ToString(fOutputEvent->GetID())) +
545 " =======");
546 } else {
547 for (unsigned int j = 0; j < fProcessChain.size(); j++) {
548 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
549 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
550 if (fProcessChain[j]->ApplyCut()) ProcessedEvent = nullptr;
551 fProcessChain[j]->EndOfEventProcess();
552 if (ProcessedEvent == nullptr) {
553 fProcessNullReturned = true;
554 break;
555 }
556 }
557
558 if (fHostRunner->UseTestRun()) {
559 fOutputEvent = ProcessedEvent;
560 } else {
561 if (ProcessedEvent != nullptr) {
562 ProcessedEvent->CloneTo(fOutputEvent);
563 }
564 }
565 }
566}
567
574 RESTInfo << "TRestThread : Writing temp file. Thread id : " << fThreadId << RESTendl;
575
576 if (fOutputFile == nullptr) return;
577
578 fOutputFile->cd();
579 Int_t nErrors = 0;
580 Int_t nWarnings = 0;
581 for (unsigned int i = 0; i < fProcessChain.size(); i++) {
582 // The processes must call object->Write in this method
583 fProcessChain[i]->EndProcess();
584 if (fProcessChain[i]->GetError()) nErrors++;
585 if (fProcessChain[i]->GetWarning()) nWarnings++;
586 }
587
588 if (nWarnings)
589 RESTWarning << nWarnings
590 << " process warnings were found! Use run0->PrintWarnings(); to get additional info."
591 << RESTendl;
592 if (nErrors)
593 RESTError << nErrors << " process errors were found! Use run0->PrintErrors(); to get additional info."
594 << RESTendl;
595
596 delete fAnalysisTree;
597}
std::string type
Type of the wrapped object.
TClass * cl
Pointer to the corresponding TClass helper, if the wrapped object is in class type.
REST core data-saving helper based on TTree.
void DisableQuickObservableValueSetting()
It will disable quick observable value setting.
A base class for any REST event process.
A base class for any REST event.
Definition TRestEvent.h:38
virtual void CloneTo(TRestEvent *target)
Clone the content of this TRestEvent object to another.
TRestStringOutput::REST_Verbose_Level GetVerboseLevel()
returns the verboselevel in type of REST_Verbose_Level enumerator
Int_t GetNextevtFunc(TRestEvent *targetevt, TRestAnalysisTree *targettree)
Get next event and copy it to the address of targetevt.
void FillThreadEventFunc(TRestThread *t)
Calling back the FillEvent() method in TRestThread.
@ REST_Essential
+show some essential information, as well as warnings
@ REST_Debug
+show the defined debug messages
void AddProcess(TRestEventProcess *process)
Add a process.
bool TestRun()
Make a test run of our process chain.
void Initialize()
Set variables by default during initialization.
void StartProcess()
The main function of this class. Thread will run this function until the end.
void EndProcess()
Write and close the output file.
void ProcessEvent()
Process a single event.
void PrepareToProcess(bool *outputConfig=nullptr)
Prepare some thing before we can start process.
Int_t ValidateChain(TRestEvent *input)
Check if the input/output of each process in the process chain matches.
void StartThread()
Create a thread with the method StartProcess().
TRestReflector Assembly(const std::string &typeName)
Assembly an object of type: typeName, returning the allocated memory address and size.
Int_t GetChar(std::string hint="Press a KEY to continue ...")
Helps to pause the program, printing a message before pausing.