Basic Image AlgorithmS Library  2.8.0
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
CScommBase.cpp
1 /*
2 This file is part of the BIAS library (Basic ImageAlgorithmS).
3 
4 Copyright (C) 2003-2009 (see file CONTACT for details)
5 Multimediale Systeme der Informationsverarbeitung
6 Institut fuer Informatik
7 Christian-Albrechts-Universitaet Kiel
8 
9 
10 BIAS is free software; you can redistribute it and/or modify
11 it under the terms of the GNU Lesser General Public License as published by
12 the Free Software Foundation; either version 2.1 of the License, or
13 (at your option) any later version.
14 
15 BIAS is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU Lesser General Public License for more details.
19 
20 You should have received a copy of the GNU Lesser General Public License
21 along with BIAS; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 */
24 
25 #ifdef WIN32
26 # define _CRT_SECURE_NO_DEPRECATE // vs8
27 # pragma warning (disable: 4251) // STL MAP DLL warning noise (JW)
28 #endif
29 
30 #include "CScommBase.hh"
31 
32 #ifdef WIN32
33 # include <winsock.h>
34 //# include <winsock2.h>
35 #else //WIN32
36 # include <netinet/in.h>
37 # include <sys/socket.h>
38 # include <netdb.h>
39 # include <unistd.h>
40 #endif //WIN32
41 
42 #include <sys/types.h>
43 #include <cstdio>
44 #include <cstring>
45 #include <Base/Debug/TimeMeasure.hh>
46 #include <Base/Debug/Error.hh>
47 #include <cstring>
48 
49 using namespace std;
50 using namespace BIAS;
51 
52 const unsigned int maxBinaryG= 1<<24; // 16MB
53 
54 
55 CScommBase::~CScommBase()
56 {
57  //if threaded recv is active ...
58  if (threaded_) {
59  //... tell threads to stop ...
60  terminateReceiver_=true;
61  //... and wait unitl all receive-threads are finished
62  for (unsigned int i=0;i<commPartners_.size();i++) {
63  if (commPartners_[i].connected) {
64  pthread_join(commPartners_[i].threadid,NULL);
65  }
66  }
67  } else {
68  //else close all connections
69  for (unsigned int i=0;i<commPartners_.size();i++) {
70  if (commPartners_[i].connected) {
71 #ifdef WIN32
72  closesocket(commPartners_[i].fd);
73 #else //WIN32
74  close(commPartners_[i].fd);
75 #endif //WIN32
76  }
77  }
78  }
79 
80 #ifdef WIN32
81  //do cleanup for winsock-DLL
82  WSACleanup();
83 #endif //WIN32
84 }
85 
86 CScommBase::CScommBase(bool threaded) : logOut_(std::cout.rdbuf()),
87 verboseOut_(std::cout.rdbuf())
88 {
89  //SetDebugLevel(D_CS_ANALYZE | D_CS_RECEIVER | D_CS_CONNECT |
90  // D_CS_LISTENER | D_CS_SEND);
91  timeOut_=0; // no timeOut
92  onlyNewMsgs_=true;
93  connectPort_=D_CS_DEFAULT_PORT;
94  portChange_=false;
95  threaded_ = threaded;
96  terminateReceiver_=false;
97  commPartners_.resize(D_CS_MAXCLIENTS);
99  log_=false;
100  verbose_=false;
101  registeredMsgs_.reserve(100);
102  timeOutFlag_=false;
103  keepAll_=false;
104  pthread_mutex_init(&registerLock_,NULL);
105 
106 #ifdef WIN32
107  /* activate sockets within WIN32*/
108  {
109  WSADATA wsa;
110  if (WSAStartup(MAKEWORD(1, 1), &wsa))
111  {
112  BIASERR("WSAStartup() failed with error: "
113  << (unsigned long)GetLastError() << "\n");
114  exit(-1);
115  }
116  }
117 #endif //WIN32
118 }
119 
120 
121 /// -----
122 #include <Base/Common/BIASpragmaStart.hh>
123 
125 : logOut_(std::cout.rdbuf()),
126 verboseOut_(std::cout.rdbuf())
127 {
128  BIASERR("not implemented");
129  BIASABORT;
130 }
131 
132 CScommBase &
134 {
135  BIASERR("not implemented");
136  BIASABORT;
137  return *this;
138 }
139 #include <Base/Common/BIASpragmaEnd.hh>
140 // -----
141 
142 
143 int CScommBase::GetData_(const string &msgName)
144 {
145  // check for registered msg
146  // first in the indexMap, then search extensively
147  int index=-1;
148  map<string, int>::iterator it;
149  BIASDOUT(D_CS_GETDATA,"Searchin index for:"<<msgName);
150  if ( (it=indexRegMsgs_.find(msgName)) != indexRegMsgs_.end() ) {
151  index= it->second;
152  } else { // search for index
153  unsigned int i=0;
154  while ( (index==-1) && i<registeredMsgs_.size() ) {
155  if ((registeredMsgs_[i].msgName.compare(msgName)==0))
156  index=i;
157  i++;
158  }
159  if (index<0) {
160  BIASERR("GetData(): msg "<< msgName<<" not registered");
161  return -1;
162  }
163  // new entry in the map
164  indexRegMsgs_[msgName]=index;
165  }
166  BIASDOUT(D_CS_GETDATA,"found index: "<<index);
167  return index;
168 }
169 
170 int CScommBase::GetData(const string &msgName, vector<float> &floatData)
171 {
172  int retValue=0;
173  int index=GetData_(msgName);
174  if (index>=0) {
175  // check for new data and copy it
176  if (threaded_)
177  pthread_mutex_lock(&registeredMsgs_[index].dataLock);
178  if (registeredMsgs_[index].newData) {
179  floatData=registeredMsgs_[index].floatData;
180  registeredMsgs_[index].newData=false;
181  } else {
182  if (threaded_) retValue=1;
183  else {
184  syncReceive_();
185  if (registeredMsgs_[index].newData) {
186  floatData=registeredMsgs_[index].floatData;
187  registeredMsgs_[index].newData=false;
188  } else retValue=1;
189  }
190  }
191  if (threaded_)
192  pthread_mutex_unlock(&registeredMsgs_[index].dataLock);
193  } else retValue=index;
194  return retValue;
195 }
196 
197 int CScommBase::GetData(const string &msgName, vector<int> &intData)
198 {
199  int retValue=0;
200  int index=GetData_(msgName);
201  if (index>=0) {
202  // check for new data and copy it
203  if (threaded_)
204  pthread_mutex_lock(&registeredMsgs_[index].dataLock);
205  if (registeredMsgs_[index].newData) {
206  intData=registeredMsgs_[index].intData;
207  registeredMsgs_[index].newData=false;
208  } else {
209  if (threaded_) retValue=1;
210  else {
211  syncReceive_();
212  if (registeredMsgs_[index].newData) {
213  intData=registeredMsgs_[index].intData;
214  registeredMsgs_[index].newData=false;
215  } else retValue=1;
216  }
217  }
218  if (threaded_)
219  pthread_mutex_unlock(&registeredMsgs_[index].dataLock);
220  } else retValue=index;
221  return retValue;
222 }
223 
224 
225 int CScommBase::GetData(const string &msgName, vector<string> &stringData)
226 {
227  int retValue=0;
228  int index=GetData_(msgName);
229  if (index>=0) {
230  // check for new data and copy it
231  if (threaded_)
232  pthread_mutex_lock(&registeredMsgs_[index].dataLock);
233  if (registeredMsgs_[index].newData) {
234  stringData=registeredMsgs_[index].stringData;
235  registeredMsgs_[index].newData=false;
236  } else {
237  if (threaded_) retValue=1;
238  else {
239  syncReceive_();
240  if (registeredMsgs_[index].newData) {
241  stringData=registeredMsgs_[index].stringData;
242  registeredMsgs_[index].newData=false;
243  } else retValue=1;
244  }
245  }
246  if (threaded_)
247  pthread_mutex_unlock(&registeredMsgs_[index].dataLock);
248  } else retValue=index;
249  return retValue;
250 }
251 
252 
253 int CScommBase::GetData(const string &msgName, vector<char> &binaryData)
254 {
255  int retValue=0;
256  int index=GetData_(msgName);
257  if (index>=0) {
258  // check for new data and copy it
259  if (threaded_)
260  pthread_mutex_lock(&registeredMsgs_[index].dataLock);
261  if (registeredMsgs_[index].newData) {
262  binaryData.resize(registeredMsgs_[index].binaryDataSize);
263  memcpy(&(binaryData[0]),registeredMsgs_[index].binaryData,registeredMsgs_[index].binaryDataSize);
264  registeredMsgs_[index].newData=false;
265  } else {
266  if (threaded_) retValue=1;
267  else {
268  syncReceive_();
269  if (registeredMsgs_[index].newData) {
270  binaryData.resize(registeredMsgs_[index].binaryDataSize);
271  memcpy(&(binaryData[0]),
272  registeredMsgs_[index].binaryData,
273  registeredMsgs_[index].binaryDataSize);
274  registeredMsgs_[index].newData=false;
275  } else retValue=1;
276  }
277  }
278  if (threaded_)
279  pthread_mutex_unlock(&registeredMsgs_[index].dataLock);
280  } else retValue=index;
281  return retValue;
282 }
283 
284 int CScommBase::WaitForNewMessage(std::string &msgName)
285 {
286  if (threaded_) {
287  BIASERR("not working for threaded");
288  return -2;
289  }
290  // look for new msgs in the slots
291  int nrMewMsgs=0;
292  for (unsigned int s=0; s<registeredMsgs_.size(); s++) {
293  if (registeredMsgs_[s].newData) {
294  msgName=registeredMsgs_[s].msgName;
295  nrMewMsgs++;
296  }
297  }
298 
299  while (nrMewMsgs==0) { // wait for new msgs
300  syncReceive_(); // blocks until new data arrived
301 
302  // look which one arrived
303  for (unsigned int s=0; s<registeredMsgs_.size(); s++) {
304  if (registeredMsgs_[s].newData) {
305  msgName=registeredMsgs_[s].msgName;
306  nrMewMsgs++;
307  }
308  }
309  }
310 
311  return nrMewMsgs-1;
312 }
313 
314 int
315 CScommBase::RegisterMsg(string msgName, EdataType dataType, int amount)
316 {
317  return RegisterMsg(string(""), msgName, dataType, amount);
318 }
319 
320 int
321 CScommBase::RegisterMsg(string commPartnerName,
322  string msgName, EdataType dataType, int amount)
323 {
324 #ifdef BIAS_DEBUG
325  // look if this msg was registered already
326  for (unsigned int i=0;i<registeredMsgs_.size();i++) {
327  if ( (registeredMsgs_[i].msgName.compare(msgName)==0) ) {
328  BIASERR("msg:"<<msgName<<" already registered for:"
329  <<registeredMsgs_[i].commName);
330  return -1;
331  }
332  }
333 #endif
334 
335  CScommMsg newMsg;
336  newMsg.msgName=msgName;
337  newMsg.commName=commPartnerName;
338  newMsg.dataType=dataType;
339  if (dataType==CS_BINARY)
340  amount=1;
341  newMsg.dataSize=amount;
342  newMsg.server=isServer_;
343 
344  if (threaded_) pthread_mutex_lock(&registerLock_);
345 
346  registeredMsgs_.push_back(newMsg);
347 
348  if (threaded_)
349  {
350  pthread_mutex_init(&registeredMsgs_.back().dataLock,NULL);
351  pthread_mutex_unlock(&registerLock_);
352  }
353 
354  return 0;
355 }
356 
357 
358 int CScommBase::SendMsg(const string &msgName, vector<float> &floatData)
359 {
360  string data;
361  data.reserve(msgName.size()+19*floatData.size());
362  char dummy[24];
363  data = '[' + msgName + ']' + ' ';
364  for (unsigned int f=0;f<floatData.size();f++) {
365  sprintf(dummy,"%.12f ",floatData[f]);
366  data+= dummy;
367  }
368  return SendMsg_(data);
369 }
370 
371 int CScommBase::SendMsg(const string &msgName, vector<int> &intData)
372 {
373  string data;
374  data.reserve(msgName.size()+20*intData.size());
375  char dummy[24];
376  data = '[' + msgName + ']' + ' ';
377  for (unsigned int f=0;f<intData.size();f++) {
378  sprintf(dummy,"%i ",intData[f]);
379  data+= dummy;
380  }
381  return SendMsg_(data);
382 }
383 
384 int CScommBase::SendMsg(const string &msgName, vector<string> &stringData)
385 {
386  string data;
387  data.reserve(msgName.size()+20*stringData.size());
388  data = '[' + msgName + ']' + ' ';
389  for (unsigned int i=0;i<stringData.size();i++) {
390  data+= (stringData[i]+string(" "));
391  }
392  return SendMsg_(data);
393 }
394 
395 int CScommBase::SendMsg_(const string &data)
396 {
397  int res;
398  for (unsigned int i=0;i<commPartners_.size();i++) {
399  if (commPartners_[i].connected) {
400  if (log_) {
401  logOut_<<mygetpid()<<": Sending "<<data.size()
402  <<" bytes to "<<commPartners_[i].name<<"."<<endl;
403  logOutPut_(data.c_str(),(unsigned int)data.size());
404  }
405 #ifdef WIN32
406  res = send(commPartners_[i].fd, data.c_str(),(int)data.size(),0);
407 #else //WIN32
408 # ifdef __APPLE__
409  res = send(commPartners_[i].fd, data.c_str(),data.size(),0);
410 # else // _APPLE_
411  res = send(commPartners_[i].fd, data.c_str(),data.size(),MSG_NOSIGNAL);
412 # endif // _APPLE_
413 #endif //WIN32
414  if (res <= 0) {
415  BIASERR(mygetpid()<<"sendmsg() error sending data");
416  return -1;
417  }
418  }
419  }
420  return 0;
421 }
422 
423 int CScommBase::SendMsg(const std::string &msgName,
424  char* binaryData, unsigned int size)
425 {
426  std::vector<char> data;
427  char dummy[255];
428  sprintf(dummy,"[%s] %i ",msgName.c_str(),size);
429  int pos=(int)strlen(dummy);
430  data.resize(pos+size);
431  memcpy(&data[0],dummy,pos);
432  memcpy(&data[pos], binaryData,size);
433 
434  int res;
435  for (unsigned int i=0;i<commPartners_.size();i++) {
436  if (commPartners_[i].connected) {
437  if (log_) {
438  logOut_<<mygetpid()<<": Sending "<<data.size()
439  <<" bytes of binary data to "
440  <<commPartners_[i].name<<"."<<endl;
441  }
442 #ifdef WIN32
443  res = send(commPartners_[i].fd, &(data[0]),(int)data.size(),0);
444 #else //WIN32
445 # ifdef __APPLE__
446  res = send(commPartners_[i].fd, &(data[0]),data.size(),0);
447 # else // __APPLE__
448  res = send(commPartners_[i].fd, &(data[0]),data.size(),MSG_NOSIGNAL);
449 # endif //__APPLE__
450 #endif //WIN32
451  if (res <= 0) {
452  BIASERR(mygetpid()<<"sendmsg() error sending data");
453  return -1;
454  }
455  }
456  }
457  return 0;
458 }
459 
460 int CScommBase::SendMsg(const std::string commPartnerName,
461  const std::string &msgName,
462  char* binaryData, unsigned int size)
463 {
464  for (unsigned int i=0;i<commPartners_.size();i++) {
465  if (GetCommPartnerName(i)==commPartnerName) {
466  return SendMsg(i, msgName, binaryData, size);
467  }
468  }
469  return -2;
470 }
471 
472 int CScommBase::SendMsg(int commPartnerNr,
473  const std::string &msgName,
474  char* binaryData, unsigned int size)
475 {
476  std::vector<char> data;
477  char dummy[255];
478  sprintf(dummy,"[%s] %i ",msgName.c_str(),size);
479  int pos=(int)strlen(dummy);
480  data.resize(pos+size);
481  memcpy(&data[0],dummy,pos);
482  memcpy(&data[pos], binaryData,size);
483 
484  if (commPartners_[commPartnerNr].connected) {
485  if (log_) {
486  logOut_<<mygetpid()<<": Sending "<<data.size()
487  <<" bytes of binary data to "
488  <<commPartners_[commPartnerNr].name<<"."<<endl;
489  }
490 #ifdef WIN32
491  int res = send(commPartners_[commPartnerNr].fd,
492  &(data[0]),(int)data.size(),0);
493 #else //WIN32
494 # ifdef __APPLE__
495  int res = send(commPartners_[commPartnerNr].fd,
496  &(data[0]),data.size(),0);
497 # else // __APPLE__
498  int res = send(commPartners_[commPartnerNr].fd,
499  &(data[0]),data.size(),MSG_NOSIGNAL);
500 # endif // __APPLE__
501 #endif //WIN32
502  if (res < 0) {
503  BIASERR(mygetpid()<<"sendmsg() error sending data:"<<strerror(errno));
504  return -1;
505  }
506  } else return -2;
507  return 0;
508 }
509 
510 int
511 CScommBase::CheckRegisteredMsgs_(string &msgName, const CScommData &cPartner,
512  EdataType &dataTypeToRead,
513  unsigned int &dataSizeToRead)
514 {
515  int retValue=-1;
516  // check if this msg is registered for this comm partner
517  int msgFound=-1;
518 
519  if (threaded_) pthread_mutex_lock(&registerLock_);
520 
521  BIASDOUT(D_CS_ANALYZE,"checking, if msg is registered");
522  for (unsigned int i=0;i<registeredMsgs_.size();i++) {
523  if (registeredMsgs_[i].msgName.compare(msgName)==0) {
524  if ( (registeredMsgs_[i].commName.size()==0) ||
525  (registeredMsgs_[i].commName.compare(cPartner.name)==0) ) {
526  if ( (isServer_ && registeredMsgs_[i].server) ||
527  (!isServer_ && !registeredMsgs_[i].server) ) {
528  msgFound=i;
529  // if the msg is registered without commName bind it to this
530  // commPartner
531  if (registeredMsgs_[i].commName.size()==0)
532  registeredMsgs_[i].commName=cPartner.name;
533  } else {
534  BIASERR(mygetpid()<<
535  ": msg not registered for acceptance for this commParnter");
536  retValue=-1;
537  }
538  } else {
539  BIASERR(mygetpid()<<
540  ": msg:"<<msgName<<" not registered from:"<<cPartner.name);
541  retValue=-2;
542  }
543  }
544  }
545  // set the dataType to read and the dataSize
546  if (msgFound>=0) {
547  dataTypeToRead= registeredMsgs_[msgFound].dataType;
548  dataSizeToRead= registeredMsgs_[msgFound].dataSize;;
549  retValue=msgFound;
550  }
551 
552  if (threaded_) pthread_mutex_unlock(&registerLock_);
553 
554  BIASDOUT(D_CS_GETDATA,"msg:"<<msgName<<" has index "<<msgFound);
555  return retValue;
556 }
557 
558 
559 // Communication is organized like follows: For each commPartner
560 // a msg buffer is filled with incoming bytes, where data is stored
561 // as long there are incomplete msgs.<br>
562 // Complete msgs are copied to the registered Msgs data field.
563 // Complete binary msgs are not copied but just a pointer is stored
565 {
566 
567  //// read the data, parse for registered msgs, copy data to msgBuffer_
568  CScommData &cPartner= commPartners_[commNr]; // for faster access
569 
570  // parse for registered msgs
571  unsigned int pos= cPartner.lastProcessed, end = (unsigned int)cPartner.msgBuffer.size();
572  //unsigned int lastProcessed=0;
573  BIASDOUT(D_CS_ANALYZE,"msgBuffer.size():"<< cPartner.msgBuffer.size()
574  <<"pos: "<<pos<<" end:"<<end);
575 
576  if (cPartner.msgBuffer.size()<=3) {
577  // there is nothing to analyze yet, collect more data first
578  return 1;
579  }
580 
581  // if (binaryMsgActive_)
582  // AnalyzeBinaryMsg_();
583 
584  int retValue=0;
585  string msgName;
586  msgName.reserve(100);
587  bool bMsgOk=true;
588  unsigned int dataSizeToRead = 0;
589  EdataType dataTypeToRead = CS_NONE;
590  int msgIndex=-1;
591  bool waitForMore=false;
592  bool bNeedMoreBinaryData = false;
593  while ((pos<end) && (!waitForMore)) {
594  msgName= "";
595  msgIndex= -1;
596  BIASDOUT(D_CS_ANALYZE,"msgBuffer.size():"<< cPartner.msgBuffer.size()
597  <<"pos: "<<pos<<" end:"<<end);
598 
599  // go to first char of msgName
600  while ((pos<end) && cPartner.msgBuffer[pos]!='[' ) pos++;
601  // we are at end or at '['
602  while ((pos<end) && (cPartner.msgBuffer[pos]=='[' ||
603  cPartner.msgBuffer[pos]==' ') ) pos++;
604  //read msgName
605  BIASDOUT(D_CS_ANALYZE,"reading msg name at pos:"<<pos);
606  while ((pos<end) && cPartner.msgBuffer[pos]!=']') {
607  msgName.push_back(cPartner.msgBuffer[pos]);
608  pos++;
609  }
610  if (pos>=end) {
611  if (retValue<2) retValue=2;
612  } else { // check for registered msg
613  if ( (msgIndex=CheckRegisteredMsgs_(msgName, cPartner,
614  dataTypeToRead, dataSizeToRead))<0) {
615  string msgNameCut=msgName;
616  if (msgNameCut.length()>30)
617  msgName.resize(30);
618  BIASERR(mygetpid()<<": not a registered msg: "<<msgName
619  <<" for commPartner:" << cPartner.name);
620  // go to next '[' and clear buffer
621  while ((pos<end) && (cPartner.msgBuffer[pos]!=']')) pos++;
622  cPartner.lastProcessed=pos;
623  }
624  }
625  BIASDOUT(D_CS_ANALYZE,"msgName: "<< msgName);
626  if (msgIndex>=0) {
627  // read data until next '[' or end
628  unsigned int dataRead=0;
629  string dummy;
630  vector<string> dummyList;
631  dummy.reserve(100);
632  bool bNextMsg=false;
633  while ( (pos<end) && !bNextMsg) {
634  while ((pos<end) && (cPartner.msgBuffer[pos]==']' ||
635  cPartner.msgBuffer[pos]==' ') ) pos++;
636  dummy="";
637  while ((pos<end) && cPartner.msgBuffer[pos]!=' ' &&
638  cPartner.msgBuffer[pos]!='[' ) {
639  dummy.push_back(cPartner.msgBuffer[pos]);
640  pos++;
641  }
642  BIASDOUT(D_CS_ANALYZE,"trying to read data pos:"<< pos);
643  if (pos<end) { // one data type read
644  dummyList.push_back(dummy);
645  dataRead++;
646  }
647  if (dataTypeToRead!=CS_BINARY) {
648  while ((pos<end) && (cPartner.msgBuffer[pos]==']' ||
649  cPartner.msgBuffer[pos]==' ') ) pos++;
650  if (cPartner.msgBuffer[pos]=='[') {
651  bNextMsg=true;
652  cPartner.lastProcessed=pos;
653  if (dataRead!=dataSizeToRead)
654  bMsgOk = false;
655  }
656  } else { // BINARY
657  if (dataRead==dataSizeToRead) {
658  pos++; //there is a ' ' after the size
659  errno=0;
660  unsigned int size=strtol(&(dummy[0]),NULL,10);
661  if (errno==EINVAL)
662  BIASERR("error converting size of binary data '"<<dummy<<"'");
663 
664  BIASDOUT(D_CS_ANALYZE,"checking for "<<size<<
665  " binary bytes of data");
666  if ((end-pos)<size) { //there is not enough data
667  dataRead=0; // such that data is not copied to msgBuffer
668  waitForMore=true; // end the while (pos<end) loop
669  bNextMsg=true;
670  bNeedMoreBinaryData = true;
671  BIASDOUT(D_CS_ANALYZE,"waiting for more data. need: " << size << " have: " << end-pos);
672  } else {
673  dataRead=dataSizeToRead=size;
674  bNextMsg=true;
675  BIASDOUT(D_CS_ANALYZE,"got complete message.");
676  }
677  }
678  }
679  }
680  if (dataRead>dataSizeToRead) {
681  if (retValue<3) retValue=3;
682  cPartner.lastProcessed=pos;
683  }
684 
685  if (!bMsgOk) {
686  if (retValue<3) {
687  if (dataRead>dataSizeToRead) { retValue=4; }
688  else { retValue=3; }
689  // '[' from next msg found. but not all expected data read
690  BIASERR(mygetpid()<<
691  ": parse error while parsing msg: "<<msgName<< endl
692  <<"'[' from next msg found, but more or less data read than"
693  <<" expected. Going on though.");
694  }
695  }
696 
697  //// data read, now process dummyList and copy to registered MsgBuffer
698  if (dataRead==dataSizeToRead) {
699  cPartner.lastProcessed=pos;
700  BIASDOUT(D_CS_ANALYZE,"converting data types. dataRead:"<<dataRead);
701  // convert to real data types
702  if (threaded_)
703  pthread_mutex_lock(&registeredMsgs_[msgIndex].dataLock);
704  if (registeredMsgs_[msgIndex].dataType == CS_STRING) {
705  registeredMsgs_[msgIndex].stringData=dummyList;
706  } else
707  if (registeredMsgs_[msgIndex].dataType == CS_INT) {
708  registeredMsgs_[msgIndex].intData.clear();
709  int iTemp;
710  for (unsigned int i=0;i<dataRead;i++) {
711  iTemp=atoi(dummyList[i].c_str());
712  registeredMsgs_[msgIndex].intData.push_back(iTemp);
713  }
714  } else
715  if (registeredMsgs_[msgIndex].dataType == CS_FLOAT) {
716  registeredMsgs_[msgIndex].floatData.clear();
717  float fTemp;
718  for (unsigned int i=0;i<dataRead;i++) {
719  fTemp=float(atof(dummyList[i].c_str()));
720  registeredMsgs_[msgIndex].floatData.push_back(fTemp);
721  }
722  } else
723  if (registeredMsgs_[msgIndex].dataType ==CS_BINARY) {
724  registeredMsgs_[msgIndex].binaryDataSize=dataSizeToRead;
725  registeredMsgs_[msgIndex].binaryData=
726  &(cPartner.msgBuffer[pos]);
727  pos+=dataSizeToRead;
728  cPartner.lastProcessed=pos;
729  }
730  registeredMsgs_[msgIndex].newData = true;
731  if (threaded_)
732  pthread_mutex_unlock(&registeredMsgs_[msgIndex].dataLock);
733  if (keepAll_) waitForMore=true;
734  }
735  } // if msgIndex
736  } // while (pos<end)
737 
738  // clear msgBuffer, if larger than 16MB
739  if (cPartner.msgBuffer.size()>maxBinaryG) {
740  // make all binary msgs old
741  for (int i=0;i<(int)registeredMsgs_.size();i++)
742  if ((i!=msgIndex) && (registeredMsgs_[i].dataType==CS_BINARY))
743  registeredMsgs_[i].newData=false;
744 
745  BIASDOUT(D_CS_ANALYZE,"moving:"<<
746  end-cPartner.lastProcessed<<"bytes. lastprocessed:"<<
747  cPartner.lastProcessed);
748  memmove(&(cPartner.msgBuffer[0]),
749  &(cPartner.msgBuffer[cPartner.lastProcessed]),
750  end-cPartner.lastProcessed);
751  cPartner.msgBuffer.resize(end-cPartner.lastProcessed);
752  cPartner.lastProcessed=0;
753 
754  }
755  // indicate we need more data
756  if (retValue == 0 && bNeedMoreBinaryData) {
757  retValue = 5;
758  }
759  return retValue;
760 }
761 
762 
764 {
765  // parse data and check for registered msgs
766  // for testing:
767  char firstMsg [1000];
768  unsigned int nr= (unsigned int)commPartners_[commNr].msgBuffer.size();
769  if (nr>999) nr=999;
770  memset(firstMsg,0, 1000);
771  memcpy(firstMsg, &(commPartners_[commNr].msgBuffer[0]), nr);
772 
773  char *pos;
774  if ( (pos=strstr(firstMsg,"[CLIENTNAME]"))!=NULL) {
775  commPartners_[commNr].name=string("");
776  pos+=13; // first byte after "] "
777  while ( (*pos != 0) && (*pos!='[') ) {
778  commPartners_[commNr].name += *pos;
779  pos++;
780  }
781  // Check if a client with this name is already connected
782  for (unsigned int i=0;i<commPartners_.size();i++) {
783  if ( ((int)i!=commNr) &&
784  (commPartners_[i].name.compare( commPartners_[commNr].name)==0) ) {
785  BIASERR(mygetpid()<<": a commPartner with name:"<<commPartners_[i].name
786  <<" is already connected. Refusing connection");
787  return -2;
788  }
789  }
790  } else {
791  BIASERR(mygetpid()<<": unrecognized comm partner:"<<firstMsg
792  <<" Disconnecting.");
793  return -1;
794  }
795  pos++; // overread the ending 0
796  unsigned int newSize= (unsigned int)(commPartners_[commNr].msgBuffer.size()-(pos-firstMsg));
797  memmove(&(commPartners_[commNr].msgBuffer[0]),
798  &(commPartners_[commNr].msgBuffer[pos-firstMsg]),
799  newSize);
800  commPartners_[commNr].msgBuffer.resize(newSize);
801  return 0;
802 }
803 
804 
805 void CScommBase::logOutPut_(const char *data, unsigned int size){
806  if (!log_) return;
807  logOut_<<endl<<"LOG "<<mygetpid()<<": data of "<<size<<" bytes:"<<endl;
808  for (unsigned int i=0;i<size;i++)
809  logOut_<<data[i];
810  logOut_<<endl;
811 }
812 
814 {
815  int res=-1;
816  unsigned int bufSize=0;
817  const unsigned int dataSize=200000;
818  int bytesRecv=0;
819  bool bReceive=true;
820 
821  while (bReceive && commPartners_[commNr].connected &&
822  ! terminateReceiver_) {
823 
824  bufSize=(unsigned int)commPartners_[commNr].msgBuffer.size();
825  BIASDOUT(D_CS_RECEIVER,mygetpid()<<"Expecting data. "
826  "Buffer size is:"<<bufSize);
827  // append new data to msgBuffer
828  commPartners_[commNr].msgBuffer.resize(bufSize + dataSize);
829  // receive blocks until a msg arrives
830  bytesRecv = recv(commPartners_[commNr].fd,
831  &(commPartners_[commNr].msgBuffer[bufSize]),
832  dataSize,0);
833  if (bytesRecv>0) {
834  commPartners_[commNr].msgBuffer.resize(bufSize + bytesRecv);
835  if (log_) {
836  logOut_<<mygetpid()<<": Received "<<bytesRecv<<" bytes from "
837  <<commPartners_[commNr].name<<endl;
838  int outBytes= ((bytesRecv>200) ? 200 : bytesRecv);
839  logOutPut_(&(commPartners_[commNr].msgBuffer[bufSize]),
840  outBytes);
841  }
842  if (isServer_ && (commPartners_[commNr].firstcontact)) {
843  if (AnalyzeFirstMsg_(commNr)==0) {
844  if (verbose_)
845  verboseOut_<<mygetpid()<<": Conn on slot "<<commNr<<" is "
846  <<commPartners_[commNr].name<<endl;
847  commPartners_[commNr].firstcontact = false;
848  } else {
849  bReceive=false;
850  }
851  } else
852  if (bReceive) { // not firstcontact, usual case
853  if ((res=AnalyzeData_(commNr))<0) {
854  BIASERR(mygetpid()<<
855  ": Critical error during analyzing data.");
856  }
857  }
858  } else { // zero bytes received
859  if (verbose_)
860  verboseOut_<<mygetpid()<<": zero bytes from "
861  <<commPartners_[commNr].name<<" Disconnect assumed."<<endl;
862  bReceive=false;
863  }
864 
865  } //while (sensor[commNr].connected &&! terminateReceiver_)
866 
867  // this line is reached when the connection is closed or terminateReceiver_
868  // is true
869  if (verbose_)
870  verboseOut_<<mygetpid()<<": Disconnecting commPartner on slot:"
871  <<commNr<<endl;
873  commPartners_[commNr].connected = false;
874  commPartners_[commNr].firstcontact = true;
875  commPartners_[commNr].msgBuffer.clear();
876 #ifdef WIN32
877  closesocket(commPartners_[commNr].fd);
878 #else //WIN32
879  close(commPartners_[commNr].fd);
880 #endif //WIN32
881 
882  // now return to receivethread() and exit this thread
883 }
884 
886 {
887  unsigned int bufSize=0;
888  const unsigned int dataSize=200000;
889  int bytesRecv=0;
890  bool bConnectionError=false;
891 
892  for (unsigned int commNr=0; commNr < commPartners_.size(); commNr++) {
893  if (commPartners_[commNr].connected) {
894  bool getMore = true;
895  if (commPartners_[commNr].lastProcessed != commPartners_[commNr].msgBuffer.size()) {
896  //if there is data left in data-buffer, get it...
897  if (AnalyzeData_(commNr) == 5) {
898  getMore = true;
899  } else {
900  getMore = false;
901  }
902  } //else {
903  if (getMore) {
904  //... else read data from TCP-buffer!
905  bufSize=(unsigned int)commPartners_[commNr].msgBuffer.size();
906  BIASDOUT(D_CS_RECEIVER,"Expecting data on slot: " << commNr <<
907  " Buffer size is:"<<bufSize);
908  // append new data to msgBuffer
909  commPartners_[commNr].msgBuffer.resize(bufSize + dataSize);
910  // receive blocks until a msg arrives
911  bytesRecv = recv(commPartners_[commNr].fd,
912  &(commPartners_[commNr].msgBuffer[bufSize]),
913  dataSize,0);
914  if (bytesRecv>0) {
915  commPartners_[commNr].msgBuffer.resize(bufSize + bytesRecv);
916  if (log_) {
917  logOut_<< "Slot " << commNr <<": Received "<<bytesRecv<<
918  " bytes from "<<commPartners_[commNr].name<<endl;
919  int outBytes= ((bytesRecv>200) ? 200 : bytesRecv);
920  logOutPut_(&(commPartners_[commNr].msgBuffer[bufSize]),
921  outBytes);
922  }
923  if (isServer_ && (commPartners_[commNr].firstcontact)) {
924  if (AnalyzeFirstMsg_(commNr)==0) {
925  if (verbose_)
926  verboseOut_<<"Conn on slot "<<commNr<<" is "
927  <<commPartners_[commNr].name<<endl;
928  commPartners_[commNr].firstcontact = false;
929  } else {
930  bConnectionError=true;
931  }
932  } else {
933  if (AnalyzeData_(commNr)<0) {
934  BIASERR("Slot " << commNr <<
935  ": Critical error during analyzing data.");
936  }
937  }
938  } else { // zero bytes received
939  if (verbose_)
940  verboseOut_<<mygetpid()<<": zero bytes from "
941  <<commPartners_[commNr].name<<" Disconnect assumed."<<endl;
942  bConnectionError=true;
943  }
944  //if somethings wrong disconnect comm-partner
945  if (bConnectionError) {
946  if (verbose_)
947  verboseOut_<<mygetpid()<<": Disconnecting commPartner on slot:"
948  <<commNr<<endl;
950  commPartners_[commNr].connected = false;
951  commPartners_[commNr].firstcontact = true;
952  commPartners_[commNr].msgBuffer.clear();
953 
954 #ifdef WIN32
955  closesocket(commPartners_[commNr].fd);
956 #else //WIN32
957  close(commPartners_[commNr].fd);
958 #endif //WIN32
959  //reset for next comm-partner
960  bConnectionError = false;
961  }
962  }
963  } //if (sensor[commNr].connected)
964  }
965 
966 }
967 
969 {
970  /// create a new receivethread
971  int firstfree = -1;
972 
973  // search for the first not connected thread
974  for (unsigned int i=0 ; firstfree <0 && i<commPartners_.size(); i++)
975  if ( ! commPartners_[i].connected) firstfree= i;
976 
977  // sorry, all channels are busy, return to listener
978  if (firstfree <0 ) {
979  BIASERR(mygetpid()<<": New connection refused, all "<<commPartners_.size()
980  <<" channels are busy");
981  return -1;
982  }
983 
984  BIASDOUT(D_CS_LISTENER,"Accepting connection on slot "<<firstfree);
985 
986  commPartners_[firstfree].connected = true;
987  commPartners_[firstfree].fd = fd;
988  commPartners_[firstfree].msgBuffer.reserve(maxBinaryG);
989  commPartners_[firstfree].lastProcessed=0;
991 
992  if (threaded_)
993  {
994  CSCommWithClientNr *threadcalldata;
995  threadcalldata = new CSCommWithClientNr;
996  threadcalldata->commNr = firstfree;
997  threadcalldata->comm = this;
998 
999  // spawn of the receiver
1000  pthread_create(&commPartners_[firstfree].threadid, NULL,
1001  receivethread, threadcalldata);
1002  }
1003 
1004  return firstfree;
1005 }
1006 
1007 
1008 void CScommBase::SetLog(bool on)
1009 {
1010  log_=on;
1011  if (on) verbose_=true;
1012 }
1013 void CScommBase::SetLogOut(ostream &outputStream)
1014 {
1015  logOut_.rdbuf(outputStream.rdbuf());
1016 }
1017 
1019 {
1020  verbose_=on;
1021 }
1022 void CScommBase::SetVerboseOut(ostream &outputStream)
1023 {
1024  verboseOut_.rdbuf(outputStream.rdbuf());
1025 }
1026 
1027 std::string CScommBase::GetCommPartnerName(int commPartnerNr)
1028 {
1029  if (commPartnerNr>=(int)commPartners_.size())
1030  return string("");
1031 
1032  return commPartners_[commPartnerNr].name;
1033 }
1034 
1035 void *BIAS::receivethread(void *data)
1036 {
1037  CSCommWithClientNr *calldata;
1038 
1039  calldata = (CSCommWithClientNr*)data;
1040 
1041  // enter CScommBase::receiveThread_(), it will loop until disconnect
1042  calldata->comm->receiveThread_(calldata->commNr);
1043 
1044  delete calldata;
1045 
1046  pthread_exit(NULL);
1047 
1048  return NULL;
1049 }
int commPartnersConnected_
Definition: CScommBase.hh:277
unsigned int dataSize
int GetData_(const std::string &msgName)
Definition: CScommBase.cpp:143
int WaitForNewMessage(std::string &msgName)
only if not threaded! &lt;br&gt; Blocks until any new msg arrives, or returns instantly if a new message i...
Definition: CScommBase.cpp:284
int AnalyzeFirstMsg_(int commNr)
Definition: CScommBase.cpp:763
unsigned int timeOut_
Definition: CScommBase.hh:294
std::string GetCommPartnerName(int commPartnerNr)
returns the name of the commPartner with nr commPartnerNr or empty string in case of a number...
helper class for CScommBase
std::vector< CScommMsg > registeredMsgs_
Definition: CScommBase.hh:300
std::string msgName
helper class for CScommBase
friend void * receivethread(void *data)
these are wrapper to comply to the pthread_create() interface
EdataType dataType
int SendMsg_(const std::string &data)
Definition: CScommBase.cpp:395
int RegisterMsg(std::string msgName, EdataType dataType, int amount=1)
only registered msgs are accepted by commPartners Register a msg with this functions.
Definition: CScommBase.cpp:315
pthread_mutex_t registerLock_
Definition: CScommBase.hh:297
int CheckRegisteredMsgs_(std::string &msgName, const CScommData &cPartner, EdataType &dataTypeToRead, unsigned int &dataSizeToRead)
Definition: CScommBase.cpp:511
CScommBase & operator=(const CScommBase &arg)
assignment op. required due to ostream member with explicit ctor JW
Definition: CScommBase.cpp:133
void SetVerboseOut(std::ostream &outputStream)
set the output of verbose msgs to another stream, e.g.
void SetVerbose(bool on)
gives some information about establising conn, and disconnecting etc.
std::vector< CScommData > commPartners_
Definition: CScommBase.hh:275
this class CScomm (ClientServer communciation) handles data comunication via TCP/IP for one server an...
Definition: CScommBase.hh:73
unsigned int lastProcessed
void receiveThread_(int clientNr)
Definition: CScommBase.cpp:813
void * receivethread(void *data)
these are wrapper to comply to the pthread_create() interface
void logOutPut_(const char *data, unsigned int size)
Definition: CScommBase.cpp:805
std::string name
helper class for CScommBase
std::ostream logOut_
Definition: CScommBase.hh:289
std::ostream verboseOut_
Definition: CScommBase.hh:291
std::string commName
void SetLog(bool on)
gives a lot of information regarding the connections, like the exact data received and send...
void SetLogOut(std::ostream &outputStream)
set the output of log msgs to another stream, e.g.
int AcceptConn_(int fd)
Definition: CScommBase.cpp:968
std::vector< char > msgBuffer
int SendMsg(const std::string &msgName, std::vector< float > &floatData)
sends msg msgName to all connected commPartners.
Definition: CScommBase.cpp:358
int AnalyzeData_(int clientNr)
return values: 0 no error all fine
Definition: CScommBase.cpp:564
CScommBase(bool threaded=true)
Set threaded to false for deactivating the receive-thread.
Definition: CScommBase.cpp:86
int GetData(const std::string &msgName, std::vector< float > &floatData)
returns instantly:&lt;br&gt; 0 if new data has arrived and the data in xxxData 1 if no new data has arri...
Definition: CScommBase.cpp:170