REST-for-Physics  v2.3
Rare Event Searches ToolKit for Physics
TRestMessenger.cxx
1 /*************************************************************************
2  * This file is part of the REST software framework. *
3  * *
4  * Copyright (C) 2016 GIFNA/TREX (University of Zaragoza) *
5  * For more information see http://gifna.unizar.es/trex *
6  * *
7  * REST is free software: you can redistribute it and/or modify *
8  * it under the terms of the GNU General Public License as published by *
9  * the Free Software Foundation, either version 3 of the License, or *
10  * (at your option) any later version. *
11  * *
12  * REST is distributed in the hope that it will be useful, *
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15  * GNU General Public License for more details. *
16  * *
17  * You should have a copy of the GNU General Public License along with *
18  * REST in $REST_PATH/LICENSE. *
19  * If not, see http://www.gnu.org/licenses/. *
20  * For the list of contributors see $REST_PATH/CREDITS. *
21  *************************************************************************/
22 
70 
71 #include "TRestMessenger.h"
72 
73 #include <sys/ipc.h>
74 #include <sys/shm.h>
75 
76 #ifdef __APPLE__
77 #include <unistd.h>
78 #endif
79 
80 #include "TRestDataBase.h"
81 #include "TRestManager.h"
82 #include "TRestProcessRunner.h"
83 #include "TRestStringOutput.h"
84 
85 using namespace std;
86 ClassImp(TRestMessenger);
87 
88 TRestMessenger::TRestMessenger() { Initialize(); }
89 
90 TRestMessenger::~TRestMessenger() {
91  // clear the shared memories
92  if (fMessagePool != nullptr) {
93  if (fMessagePool->owner == this) {
94  unlock(fMessagePool);
95  }
96  shmdt(fMessagePool);
97  }
98 }
99 
100 TRestMessenger::TRestMessenger(int token, string mode) {
101  Initialize();
102  LoadConfigFromElement(
103  StringToElement("<TRestMessenger token=\"" + ToString(token) + "\" mode=\"" + mode + "\"/>"), nullptr,
104  {});
105 }
106 
108  fRun = nullptr;
109  fMode = MessagePool_TwoWay;
110 }
111 
112 #define SHMFLAG_CREATEUNIQUE (0640 | IPC_CREAT | IPC_EXCL)
113 #define SHMFLAG_CREATEOROPEN (0640 | IPC_CREAT)
114 #define SHMFLAG_OPEN (0640)
115 
116 // Example rml structure:
117 // <TRestMessenger name="Messager" title="Example" verboseLevel="info"
118 // messageSource="outputfile" token="116027" mode="auto"/>
120  fRun = fHostmgr != nullptr ? fHostmgr->GetRunInfo() : nullptr;
121  string modestr = GetParameter("mode", "twoway");
122  if (ToUpper(modestr) == "HOST") {
123  fMode = MessagePool_Host;
124  } else if (ToUpper(modestr) == "CLIENT") {
125  fMode = MessagePool_Client;
126  } else if (ToUpper(modestr) == "TWOWAY" || ToUpper(modestr) == "AUTO") {
127  fMode = MessagePool_TwoWay;
128  }
129 
130  string token = GetParameter("token", "116027");
131  string source = GetParameter("messageSource", "OUTPUTFILE");
132  key_t key = StringToInteger(token);
133  /*
134 int flag = 0;
135 if (fMode == MessagePool_Host) {
136  flag = SHMFLAG_CREATEUNIQUE;
137 } else {
138  flag = SHMFLAG_OPEN;
139 }
140  */
141 
142  bool created = false;
143  int shmid = shmget(key, sizeof(messagepool_t), SHMFLAG_OPEN);
144  if (fMode == MessagePool_Host) {
145  if (shmid == -1) {
146  shmid = shmget(key, 30000, SHMFLAG_CREATEUNIQUE);
147  if (shmid == -1) {
148  RESTWarning << "TRestMessenger: unknown error!" << RESTendl;
149  return;
150  } else {
151  created = true;
152  }
153  } else {
154  RESTWarning << "TRestMessenger: shmget error!" << RESTendl;
155  RESTWarning << "Shared memory not deleted? type \"ipcrm -m " << shmid << "\" in the bash"
156  << RESTendl;
157  return;
158  }
159  } else if (fMode == MessagePool_Client) {
160  if (shmid == -1) {
161  RESTWarning << "TRestMessenger: shmget error!" << RESTendl;
162  RESTWarning << "Shared memory not initialized? Launch Host process first!" << RESTendl;
163  return;
164  }
165  } else if (fMode == MessagePool_TwoWay) {
166  if (shmid == -1) {
167  shmid = shmget(key, 30000, SHMFLAG_CREATEUNIQUE);
168  if (shmid == -1) {
169  RESTWarning << "TRestMessenger: unknown error!" << RESTendl;
170  return;
171  } else {
172  created = true;
173  }
174  }
175  }
176 
177  messagepool_t* message = (messagepool_t*)shmat(shmid, nullptr, 0);
178  if (message == nullptr) {
179  printf("shmat error\n");
180  return;
181  }
182 
183  if ((string)this->GetName() == "defaultName") SetName(message->name);
184 
185  if (created) {
186  message->Reset();
187  strcpy(message->name, this->GetName());
188  cout << "Created shared memory: " << shmid << endl;
189  } else {
190  if (strcmp(message->name, this->GetName()) != 0) {
191  RESTWarning << "TRestMessenger: connected message pool name(" << message->name
192  << ") is different with this(" << this->GetName() << ")!" << RESTendl;
193  }
194  cout << "Connected to shared memory: " << shmid << endl;
195  }
196 
197  fShmId = shmid;
198  fMessagePool = message;
199  fPoolToken = token;
200  fPoolSource = source;
201 }
202 
203 bool TRestMessenger::lock(messagepool_t* pool, int timeoutMs) {
204  int i = 0;
205  while (pool->owner != nullptr && pool->owner != (void*)this) {
206  usleep(1000);
207  i++;
208  if (i >= timeoutMs) {
209  return false;
210  }
211  }
212 
213  pool->owner = (void*)this;
214 
215  return true;
216 }
217 
218 bool TRestMessenger::unlock(messagepool_t* pool, int timeoutMs) {
219  int i = 0;
220  while (pool->owner != nullptr && pool->owner != (void*)this) {
221  usleep(1000);
222  i++;
223  if (i >= timeoutMs) {
224  RESTError << "unlocking pool failed!" << RESTendl;
225  abort();
226  }
227  }
228 
229  pool->owner = nullptr;
230 
231  return true;
232 }
233 
234 void TRestMessenger::AddPool(string message) {
235  if (!IsConnected()) {
236  RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
237  return;
238  }
239 
240  if (message.size() > MsgLength) {
241  message = message.substr(0, MsgLength - 2);
242  }
243  if (message == "") {
244  RESTWarning << "cannot add empty message!" << RESTendl;
245  return;
246  }
247 
248  messagepool_t* pool = fMessagePool;
249  if (!lock(pool)) {
250  RESTWarning << "cannot add message to pool: " << pool->name << ": lock failed!" << RESTendl;
251  return;
252  }
253 
254  int pos = pool->RequirePos();
255  if (pos != -1) {
256  messagepool_t::message_t msg;
257  msg.provider = this;
258  strcpy(msg.content, message.c_str());
259  memcpy(&pool->messages[pos], &msg, sizeof(msg));
260  } else {
261  RESTWarning << "cannot send message: message pool is full!" << RESTendl;
262  }
263 
264  unlock(pool);
265 }
266 
267 void TRestMessenger::SendMessage(string message) {
268  if (!IsConnected()) {
269  RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
270  return;
271  }
272  if (fMode == MessagePool_Client) {
273  RESTWarning << "TRestMessenger: Forbidden to send message from client!" << RESTendl;
274  return;
275  }
276 
277  if (message == "") {
278  if (ToUpper(fPoolSource) == "OUTPUTFILE") {
279  if (fRun != nullptr) {
280  message = fRun->GetOutputFileName();
281  }
282  } else if (ToUpper(fPoolSource) == "TIME") {
283  message = ToDateTimeString(time(0));
284  } else {
285  message = fPoolSource;
286  }
287  }
288 
289  AddPool(message);
290 }
291 
292 vector<string> TRestMessenger::ShowMessagePool() {
293  vector<string> result;
294 
295  if (!IsConnected()) {
296  RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
297  return result;
298  }
299 
300  if (!lock(fMessagePool)) {
301  RESTWarning << "cannot read message to pool: " << fMessagePool->name << ": lock failed!" << RESTendl;
302  return result;
303  }
304 
305  for (int i = 0; i < Nmsg; i++) {
306  if (!fMessagePool->messages[i].IsEmpty()) {
307  string msg = string(fMessagePool->messages[i].content);
308  if (msg != "") {
309  result.push_back(msg);
310  }
311  }
312  }
313 
314  unlock(fMessagePool);
315  return result;
316 }
317 
318 string TRestMessenger::ConsumeMessage() {
319  if (!IsConnected()) {
320  RESTWarning << "TRestMessenger: Not connected!" << RESTendl;
321  return "";
322  }
323  if (fMode == MessagePool_Host) {
324  RESTWarning << "TRestMessenger: Forbidden to consume message from host!" << RESTendl;
325  return "";
326  }
327 
328  if (!lock(fMessagePool)) {
329  RESTWarning << "cannot read message to pool: " << fMessagePool->name << ": lock failed!" << RESTendl;
330  return "";
331  }
332 
333  string msg = "";
334  for (int i = 0; i < Nmsg; i++) {
335  if (!fMessagePool->messages[i].IsEmpty()) {
336  // the process shall not consume the message provided by itself
337  if (fMessagePool->messages[i].provider != this) {
338  // form the message
339  msg = string(fMessagePool->messages[i].content);
340  // clear this message because it will be consumed
341  fMessagePool->messages[i].Reset();
342  break;
343  }
344  }
345  }
346 
347  unlock(fMessagePool);
348  return msg;
349 }
350 
353 
354  if (IsConnected()) {
355  RESTMetadata << "Connected : "
356  << " (token: " << fPoolToken << ", shmid: " << fShmId << ", source: " << fPoolSource
357  << ")" << RESTendl;
358  } else {
359  RESTMetadata << "Not Connected" << RESTendl;
360  }
361  RESTMetadata << "+++++++++++++++++++++++++++++++++++++++++++++" << RESTendl;
362  RESTMetadata << RESTendl;
363  RESTMetadata << RESTendl;
364 }
void PrintMetadata() override
Implemented it in the derived metadata class to print out specific metadata information.
virtual void Initialize() override
Making default settings.
virtual void InitFromConfigFile() override
To make settings from rml file. This method must be implemented in the derived class.
virtual void PrintMetadata()
Implemented it in the derived metadata class to print out specific metadata information.
std::string ToUpper(std::string in)
Convert string to its upper case. Alternative of TString::ToUpper.
Int_t StringToInteger(std::string in)
Gets an integer from a string.
std::string ToDateTimeString(time_t time)
Format time_t into string.
void Reset()
max 100 messages, each 256 char length