66 RESTInfo <<
"thread " << fThreadId <<
": validating process chain" << RESTendl;
69 vector<TRestEventProcess*> processes;
70 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
71 RESTValue inEvent = fProcessChain[i]->GetInputEvent();
72 RESTValue outEvent = fProcessChain[i]->GetOutputEvent();
74 if (outEvent.
type ==
"TRestEvent" && inEvent.
type ==
"TRestEvent") {
75 RESTInfo <<
"general process: " << fProcessChain[i]->GetName() << RESTendl;
77 }
else if (outEvent.
cl && outEvent.
cl->InheritsFrom(
"TRestEvent") && inEvent.
cl &&
78 inEvent.
cl->InheritsFrom(
"TRestEvent")) {
79 processes.push_back(fProcessChain[i]);
81 RESTError <<
"Process: " << fProcessChain[i]->ClassName()
82 <<
" not properly written, the input/output event is illegal!" << RESTendl;
83 RESTError <<
"Hint: they must be inherited from TRestEvent" << RESTendl;
88 if (processes.size() > 0) {
90 if (input !=
nullptr) {
91 if ((
string)input->ClassName() != processes[0]->GetInputEvent().type) {
92 RESTError <<
"(ValidateChain): Input event type does not match!" << RESTendl;
93 cout <<
"Input type of the first non-external process in chain: "
94 << processes[0]->GetInputEvent().type << endl;
95 cout <<
"The event type from file: " << input->ClassName() << endl;
96 cout <<
"No events will be processed. Please correct event process "
103 for (
unsigned int i = 0; i < processes.size() - 1; i++) {
104 string outEventType = processes[i]->GetOutputEvent().type;
105 string nextinEventType = processes[i + 1]->GetInputEvent().type;
106 if (outEventType != nextinEventType && outEventType !=
"TRestEvent" &&
107 nextinEventType !=
"TRestEvent") {
108 RESTError <<
"(ValidateChain): Event process input/output does not match" << RESTendl;
109 RESTError <<
"The event output for process " << processes[i]->GetName() <<
" is "
110 << outEventType << RESTendl;
111 RESTError <<
"The event input for process " << processes[i + 1]->GetName() <<
" is "
112 << nextinEventType << RESTendl;
113 RESTError <<
"No events will be processed. Please correctly connect the process chain!"
152 RESTDebug <<
"Processing ..." << RESTendl;
153 for (
int i = 0; i < 5; i++) {
155 RESTDebug <<
"Test run " << i <<
" : Input Event ---- " << fInputEvent->ClassName() <<
"("
156 << fInputEvent <<
")" << RESTendl;
157 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
158 RESTDebug <<
"t" << fThreadId <<
"p" << j <<
": " << fProcessChain[j]->ClassName() << RESTendl;
160 fProcessChain[j]->SetObservableValidation(
true);
162 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
163 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
164 if (fProcessChain[j]->ApplyCut()) ProcessedEvent =
nullptr;
167 if (ProcessedEvent ==
nullptr) {
168 ProcessedEvent = fProcessChain[j]->GetOutputEvent();
171 if (ProcessedEvent ==
nullptr) {
172 RESTDebug <<
" ---- NULL" << RESTendl;
176 TRestEvent* outputevent = fProcessChain[j]->GetOutputEvent();
177 if (outputevent != ProcessedEvent) {
179 <<
"Test run, in " << fProcessChain[j]->ClassName()
180 <<
" : output event is different with process returned event! Please check to assign "
181 "the TRestEvent datamember as inputEvent in ProcessEvent() method"
185 fProcessChain[j]->EndOfEventProcess();
187 fProcessChain[j]->SetObservableValidation(
false);
189 RESTDebug <<
" .... " << ProcessedEvent->ClassName() <<
"(" << ProcessedEvent <<
")" << RESTendl;
192 fOutputEvent = ProcessedEvent;
193 if (fOutputEvent !=
nullptr) {
194 RESTDebug <<
"Output Event ---- " << fOutputEvent->ClassName() <<
"(" << fOutputEvent <<
")"
198 RESTDebug <<
"Null output, trying again" << RESTendl;
202 if (fOutputEvent ==
nullptr) {
224 RESTDebug <<
"Entering TRestThread::PrepareToProcess" << RESTendl;
226 string threadFileName;
227 if (fHostRunner->GetOutputDataFile()->GetName() == (
string)
"/dev/null") {
228 threadFileName =
"/dev/null";
230 threadFileName = REST_TMP_PATH +
"rest_thread_tmp" + ToString(
this) +
".root";
233 bool outputConfigToDel =
false;
234 if (outputConfig ==
nullptr) {
235 outputConfigToDel =
true;
236 outputConfig =
new bool[4];
237 for (
int i = 0; i < 4; i++) {
238 outputConfig[i] =
true;
241 if (outputConfig[3] ==
false) {
242 cout <<
"Error! output analysis must be on!" << endl;
246 if (fProcessChain.size() > 0) {
247 RESTDebug <<
"TRestThread: Creating file : " << threadFileName << RESTendl;
248 fOutputFile =
new TFile(threadFileName.c_str(),
"recreate");
249 fOutputFile->SetCompressionLevel(fCompressionLevel);
250 fAnalysisTree =
new TRestAnalysisTree(
"AnalysisTree_" + ToString(fThreadId),
"dummyTree");
253 RESTDebug <<
"TRestThread: Finding first input event of process chain..." << RESTendl;
254 if (fHostRunner->GetInputEvent() ==
nullptr) {
256 <<
"Input event is not initialized from TRestRun! Please check your input file and file "
261 fInputEvent = (
TRestEvent*)fHostRunner->GetInputEvent()->Clone();
262 string chainInputType = fProcessChain[0]->GetInputEvent().type;
264 if (chainInputType !=
"TRestEvent" && chainInputType != (
string)fInputEvent->ClassName()) {
265 cout <<
"REST ERROR: Input event type does not match!" << endl;
266 cout <<
"Process input type : " << chainInputType
267 <<
", File input type : " << fInputEvent->ClassName() << endl;
271 RESTDebug <<
"TRestThread: Reading input event and input observable..." << RESTendl;
272 if (fHostRunner->
GetNextevtFunc(fInputEvent, fAnalysisTree) != 0) {
273 RESTError <<
"In thread " << fThreadId <<
")::Failed to read input event, process cannot start!"
278 RESTDebug <<
"TRestThread: Init process..." << RESTendl;
279 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
280 fProcessChain[i]->SetAnalysisTree(fAnalysisTree);
281 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
282 fProcessChain[i]->SetFriendProcess(fProcessChain[j]);
284 RESTDebug <<
"InitProcess() process for " << fProcessChain[i]->ClassName() << RESTendl;
285 fProcessChain[i]->InitProcess();
289 if (fHostRunner->UseTestRun()) {
290 RESTDebug <<
"Test Run..." << RESTendl;
292 RESTError <<
"In thread " << fThreadId <<
")::test run failed!" << RESTendl;
293 RESTError <<
"One of the processes has NULL pointer fOutputEvent!" << RESTendl;
295 RESTError <<
"To see more detail, turn on debug mode for "
296 "TRestProcessRunner!"
300 RESTDebug <<
"Test Run complete!" << RESTendl;
302 RESTDebug <<
"Initializing output event" << RESTendl;
303 string chainOutputType = fProcessChain[fProcessChain.size() - 1]->GetOutputEvent().type;
305 if (fOutputEvent ==
nullptr) {
312 fEventTree =
new TTree((TString)
"EventTree_" + ToString(fThreadId),
"dummyTree");
313 vector<pair<TString, TRestEvent*>> branchesToAdd;
317 if (outputConfig[1] ==
true) {
323 TString BranchName = (TString)evt->GetName() +
"Branch";
324 if (branchesToAdd.size() == 0)
325 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
327 for (
unsigned int j = 0; j < branchesToAdd.size(); j++) {
328 if (branchesToAdd[j].first == BranchName)
329 branchesToAdd[j].second = evt;
330 else if (j == branchesToAdd.size() - 1)
331 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
335 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
336 TRestEvent* evt = fProcessChain[i]->GetOutputEvent();
337 if (evt !=
nullptr) {
338 TString BranchName = (TString)evt->GetName() +
"Branch";
339 if (branchesToAdd.size() == 0)
340 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
342 for (
unsigned int j = 0; j < branchesToAdd.size(); j++) {
343 if (branchesToAdd[j].first == BranchName)
344 branchesToAdd[j].second = evt;
345 else if (j == branchesToAdd.size() - 1)
346 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
352 if (outputConfig[2] ==
true) {
356 TString BranchName = (TString)evt->GetName() +
"Branch";
357 if (branchesToAdd.size() == 0)
358 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
360 for (
unsigned int j = 0; j < branchesToAdd.size(); j++) {
361 if (branchesToAdd[j].first == BranchName)
362 branchesToAdd[j].second = evt;
363 else if (j == branchesToAdd.size() - 1)
364 branchesToAdd.push_back(pair<TString, TRestEvent*>(BranchName, evt));
369 auto iter = branchesToAdd.begin();
370 while (iter != branchesToAdd.end()) {
371 fEventTree->Branch(iter->first, iter->second->ClassName(), iter->second);
375 if (fEventTree->GetListOfBranches()->GetLast() < 0)
378 fEventTree =
nullptr;
386 fOutputFile->Clear();
387 for (
unsigned int i = 0; i < fProcessChain.size(); i++) {
388 fProcessChain[i]->InitProcess();
391 RESTDebug <<
"Thread " << fThreadId <<
" Ready!" << RESTendl;
393 string tmp = fHostRunner->GetInputEvent()->ClassName();
395 fOutputEvent = fInputEvent;
396 fOutputFile =
new TFile(threadFileName.c_str(),
"recreate");
397 fOutputFile->SetCompressionLevel(fCompressionLevel);
400 RESTDebug <<
"Creating Analysis Tree..." << RESTendl;
401 fAnalysisTree =
new TRestAnalysisTree(
"AnalysisTree_" + ToString(fThreadId),
"dummyTree");
403 fEventTree =
new TTree((TString)
"EventTree_" + ToString(fThreadId),
"dummyTree");
406 if (outputConfig[2]) {
407 TString BranchName = (TString)fInputEvent->GetName() +
"Branch";
408 if (fEventTree->GetBranch(BranchName) ==
nullptr)
410 fEventTree->Branch(BranchName, fInputEvent->ClassName(), fInputEvent);
415 if (outputConfigToDel)
delete outputConfig;
485 fProcessNullReturned =
false;
488#ifdef TIME_MEASUREMENT
489 vector<int> processtime(fProcessChain.size());
492 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
493 cout <<
"------- Starting process " + (string)fProcessChain[j]->GetName() +
" -------" << endl;
495#ifdef TIME_MEASUREMENT
496 high_resolution_clock::time_point t1 = high_resolution_clock::now();
499 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
500 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
501 if (fProcessChain[j]->ApplyCut()) ProcessedEvent =
nullptr;
502 fProcessChain[j]->EndOfEventProcess();
504#ifdef TIME_MEASUREMENT
505 high_resolution_clock::time_point t2 = high_resolution_clock::now();
506 processtime[j] = (int)duration_cast<microseconds>(t2 - t1).count();
509 if (ProcessedEvent ==
nullptr) {
510 cout <<
"------- End of process " + (string)fProcessChain[j]->GetName() +
511 " (NULL returned) -------"
513 fProcessNullReturned =
true;
516 cout <<
"------- End of process " + (string)fProcessChain[j]->GetName() +
" -------" << endl;
520 j < fProcessChain.size() - 1) {
525#ifdef TIME_MEASUREMENT
526 cout <<
"Process timing summary : " << endl;
527 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
528 cout << fProcessChain[j]->ClassName() <<
"(" << fProcessChain[j]->GetName()
529 <<
") : " << processtime[j] / (double)1000 <<
" ms" << endl;
533 if (fHostRunner->UseTestRun()) {
534 fOutputEvent = ProcessedEvent;
536 cout <<
"Transferring output event..." << endl;
537 if (ProcessedEvent !=
nullptr) {
538 ProcessedEvent->
CloneTo(fOutputEvent);
543 "======= End of Event " +
544 ((fOutputEvent ==
nullptr) ? ToString(fInputEvent->GetID()) : ToString(fOutputEvent->GetID())) +
547 for (
unsigned int j = 0; j < fProcessChain.size(); j++) {
548 fProcessChain[j]->BeginOfEventProcess(ProcessedEvent);
549 ProcessedEvent = fProcessChain[j]->ProcessEvent(ProcessedEvent);
550 if (fProcessChain[j]->ApplyCut()) ProcessedEvent =
nullptr;
551 fProcessChain[j]->EndOfEventProcess();
552 if (ProcessedEvent ==
nullptr) {
553 fProcessNullReturned =
true;
558 if (fHostRunner->UseTestRun()) {
559 fOutputEvent = ProcessedEvent;
561 if (ProcessedEvent !=
nullptr) {
562 ProcessedEvent->
CloneTo(fOutputEvent);