26 # define _CRT_SECURE_NO_DEPRECATE // vs8
27 # pragma warning (disable: 4251) // STL MAP DLL warning noise (JW)
30 #include "CScommBase.hh"
36 # include <netinet/in.h>
37 # include <sys/socket.h>
42 #include <sys/types.h>
45 #include <Base/Debug/TimeMeasure.hh>
46 #include <Base/Debug/Error.hh>
52 const unsigned int maxBinaryG= 1<<24;
55 CScommBase::~CScommBase()
60 terminateReceiver_=
true;
62 for (
unsigned int i=0;i<commPartners_.size();i++) {
63 if (commPartners_[i].connected) {
64 pthread_join(commPartners_[i].threadid,NULL);
69 for (
unsigned int i=0;i<commPartners_.size();i++) {
70 if (commPartners_[i].connected) {
72 closesocket(commPartners_[i].fd);
74 close(commPartners_[i].fd);
86 CScommBase::CScommBase(
bool threaded) : logOut_(std::cout.rdbuf()),
87 verboseOut_(std::cout.rdbuf())
110 if (WSAStartup(MAKEWORD(1, 1), &wsa))
112 BIASERR(
"WSAStartup() failed with error: "
113 << (
unsigned long)GetLastError() <<
"\n");
122 #include <Base/Common/BIASpragmaStart.hh>
125 : logOut_(std::cout.rdbuf()),
126 verboseOut_(std::cout.rdbuf())
128 BIASERR(
"not implemented");
135 BIASERR(
"not implemented");
139 #include <Base/Common/BIASpragmaEnd.hh>
148 map<string, int>::iterator it;
149 BIASDOUT(D_CS_GETDATA,
"Searchin index for:"<<msgName);
150 if ( (it=indexRegMsgs_.find(msgName)) != indexRegMsgs_.end() ) {
160 BIASERR(
"GetData(): msg "<< msgName<<
" not registered");
164 indexRegMsgs_[msgName]=index;
166 BIASDOUT(D_CS_GETDATA,
"found index: "<<index);
193 }
else retValue=index;
220 }
else retValue=index;
248 }
else retValue=index;
271 memcpy(&(binaryData[0]),
280 }
else retValue=index;
287 BIASERR(
"not working for threaded");
299 while (nrMewMsgs==0) {
317 return RegisterMsg(
string(
""), msgName, dataType, amount);
322 string msgName,
EdataType dataType,
int amount)
328 BIASERR(
"msg:"<<msgName<<
" already registered for:"
361 data.reserve(msgName.size()+19*floatData.size());
363 data =
'[' + msgName +
']' +
' ';
364 for (
unsigned int f=0;f<floatData.size();f++) {
365 sprintf(dummy,
"%.12f ",floatData[f]);
374 data.reserve(msgName.size()+20*intData.size());
376 data =
'[' + msgName +
']' +
' ';
377 for (
unsigned int f=0;f<intData.size();f++) {
378 sprintf(dummy,
"%i ",intData[f]);
387 data.reserve(msgName.size()+20*stringData.size());
388 data =
'[' + msgName +
']' +
' ';
389 for (
unsigned int i=0;i<stringData.size();i++) {
390 data+= (stringData[i]+string(
" "));
401 logOut_<<mygetpid()<<
": Sending "<<data.size()
403 logOutPut_(data.c_str(),(
unsigned int)data.size());
406 res = send(
commPartners_[i].fd, data.c_str(),(int)data.size(),0);
411 res = send(
commPartners_[i].fd, data.c_str(),data.size(),MSG_NOSIGNAL);
415 BIASERR(mygetpid()<<
"sendmsg() error sending data");
424 char* binaryData,
unsigned int size)
426 std::vector<char> data;
428 sprintf(dummy,
"[%s] %i ",msgName.c_str(),size);
429 int pos=(int)strlen(dummy);
430 data.resize(pos+size);
431 memcpy(&data[0],dummy,pos);
432 memcpy(&data[pos], binaryData,size);
438 logOut_<<mygetpid()<<
": Sending "<<data.size()
439 <<
" bytes of binary data to "
443 res = send(
commPartners_[i].fd, &(data[0]),(
int)data.size(),0);
448 res = send(
commPartners_[i].fd, &(data[0]),data.size(),MSG_NOSIGNAL);
452 BIASERR(mygetpid()<<
"sendmsg() error sending data");
461 const std::string &msgName,
462 char* binaryData,
unsigned int size)
466 return SendMsg(i, msgName, binaryData, size);
473 const std::string &msgName,
474 char* binaryData,
unsigned int size)
476 std::vector<char> data;
478 sprintf(dummy,
"[%s] %i ",msgName.c_str(),size);
479 int pos=(int)strlen(dummy);
480 data.resize(pos+size);
481 memcpy(&data[0],dummy,pos);
482 memcpy(&data[pos], binaryData,size);
486 logOut_<<mygetpid()<<
": Sending "<<data.size()
487 <<
" bytes of binary data to "
492 &(data[0]),(
int)data.size(),0);
496 &(data[0]),data.size(),0);
499 &(data[0]),data.size(),MSG_NOSIGNAL);
503 BIASERR(mygetpid()<<
"sendmsg() error sending data:"<<strerror(errno));
513 unsigned int &dataSizeToRead)
521 BIASDOUT(D_CS_ANALYZE,
"checking, if msg is registered");
535 ": msg not registered for acceptance for this commParnter");
540 ": msg:"<<msgName<<
" not registered from:"<<cPartner.
name);
554 BIASDOUT(D_CS_GETDATA,
"msg:"<<msgName<<
" has index "<<msgFound);
573 BIASDOUT(D_CS_ANALYZE,
"msgBuffer.size():"<< cPartner.
msgBuffer.size()
574 <<
"pos: "<<pos<<
" end:"<<end);
586 msgName.reserve(100);
588 unsigned int dataSizeToRead = 0;
591 bool waitForMore=
false;
592 bool bNeedMoreBinaryData =
false;
593 while ((pos<end) && (!waitForMore)) {
596 BIASDOUT(D_CS_ANALYZE,
"msgBuffer.size():"<< cPartner.
msgBuffer.size()
597 <<
"pos: "<<pos<<
" end:"<<end);
600 while ((pos<end) && cPartner.
msgBuffer[pos]!=
'[' ) pos++;
602 while ((pos<end) && (cPartner.
msgBuffer[pos]==
'[' ||
605 BIASDOUT(D_CS_ANALYZE,
"reading msg name at pos:"<<pos);
606 while ((pos<end) && cPartner.
msgBuffer[pos]!=
']') {
607 msgName.push_back(cPartner.
msgBuffer[pos]);
611 if (retValue<2) retValue=2;
614 dataTypeToRead, dataSizeToRead))<0) {
615 string msgNameCut=msgName;
616 if (msgNameCut.length()>30)
618 BIASERR(mygetpid()<<
": not a registered msg: "<<msgName
619 <<
" for commPartner:" << cPartner.
name);
621 while ((pos<end) && (cPartner.
msgBuffer[pos]!=
']')) pos++;
625 BIASDOUT(D_CS_ANALYZE,
"msgName: "<< msgName);
628 unsigned int dataRead=0;
630 vector<string> dummyList;
633 while ( (pos<end) && !bNextMsg) {
634 while ((pos<end) && (cPartner.
msgBuffer[pos]==
']' ||
637 while ((pos<end) && cPartner.
msgBuffer[pos]!=
' ' &&
639 dummy.push_back(cPartner.
msgBuffer[pos]);
642 BIASDOUT(D_CS_ANALYZE,
"trying to read data pos:"<< pos);
644 dummyList.push_back(dummy);
648 while ((pos<end) && (cPartner.
msgBuffer[pos]==
']' ||
653 if (dataRead!=dataSizeToRead)
657 if (dataRead==dataSizeToRead) {
660 unsigned int size=strtol(&(dummy[0]),NULL,10);
662 BIASERR(
"error converting size of binary data '"<<dummy<<
"'");
664 BIASDOUT(D_CS_ANALYZE,
"checking for "<<size<<
665 " binary bytes of data");
666 if ((end-pos)<size) {
670 bNeedMoreBinaryData =
true;
671 BIASDOUT(D_CS_ANALYZE,
"waiting for more data. need: " << size <<
" have: " << end-pos);
673 dataRead=dataSizeToRead=size;
675 BIASDOUT(D_CS_ANALYZE,
"got complete message.");
680 if (dataRead>dataSizeToRead) {
681 if (retValue<3) retValue=3;
687 if (dataRead>dataSizeToRead) { retValue=4; }
691 ": parse error while parsing msg: "<<msgName<< endl
692 <<
"'[' from next msg found, but more or less data read than"
693 <<
" expected. Going on though.");
698 if (dataRead==dataSizeToRead) {
700 BIASDOUT(D_CS_ANALYZE,
"converting data types. dataRead:"<<dataRead);
710 for (
unsigned int i=0;i<dataRead;i++) {
711 iTemp=atoi(dummyList[i].c_str());
718 for (
unsigned int i=0;i<dataRead;i++) {
719 fTemp=float(atof(dummyList[i].c_str()));
739 if (cPartner.
msgBuffer.size()>maxBinaryG) {
745 BIASDOUT(D_CS_ANALYZE,
"moving:"<<
756 if (retValue == 0 && bNeedMoreBinaryData) {
767 char firstMsg [1000];
768 unsigned int nr= (
unsigned int)
commPartners_[commNr].msgBuffer.size();
770 memset(firstMsg,0, 1000);
774 if ( (pos=strstr(firstMsg,
"[CLIENTNAME]"))!=NULL) {
777 while ( (*pos != 0) && (*pos!=
'[') ) {
783 if ( ((
int)i!=commNr) &&
785 BIASERR(mygetpid()<<
": a commPartner with name:"<<
commPartners_[i].name
786 <<
" is already connected. Refusing connection");
791 BIASERR(mygetpid()<<
": unrecognized comm partner:"<<firstMsg
792 <<
" Disconnecting.");
796 unsigned int newSize= (
unsigned int)(
commPartners_[commNr].msgBuffer.size()-(pos-firstMsg));
807 logOut_<<endl<<
"LOG "<<mygetpid()<<
": data of "<<size<<
" bytes:"<<endl;
808 for (
unsigned int i=0;i<size;i++)
816 unsigned int bufSize=0;
817 const unsigned int dataSize=200000;
824 bufSize=(
unsigned int)
commPartners_[commNr].msgBuffer.size();
825 BIASDOUT(D_CS_RECEIVER,mygetpid()<<
"Expecting data. "
826 "Buffer size is:"<<bufSize);
836 logOut_<<mygetpid()<<
": Received "<<bytesRecv<<
" bytes from "
838 int outBytes= ((bytesRecv>200) ? 200 : bytesRecv);
845 verboseOut_<<mygetpid()<<
": Conn on slot "<<commNr<<
" is "
855 ": Critical error during analyzing data.");
870 verboseOut_<<mygetpid()<<
": Disconnecting commPartner on slot:"
887 unsigned int bufSize=0;
888 const unsigned int dataSize=200000;
890 bool bConnectionError=
false;
892 for (
unsigned int commNr=0; commNr <
commPartners_.size(); commNr++) {
905 bufSize=(
unsigned int)
commPartners_[commNr].msgBuffer.size();
906 BIASDOUT(D_CS_RECEIVER,
"Expecting data on slot: " << commNr <<
907 " Buffer size is:"<<bufSize);
917 logOut_<<
"Slot " << commNr <<
": Received "<<bytesRecv<<
919 int outBytes= ((bytesRecv>200) ? 200 : bytesRecv);
930 bConnectionError=
true;
934 BIASERR(
"Slot " << commNr <<
935 ": Critical error during analyzing data.");
942 bConnectionError=
true;
945 if (bConnectionError) {
947 verboseOut_<<mygetpid()<<
": Disconnecting commPartner on slot:"
960 bConnectionError =
false;
974 for (
unsigned int i=0 ; firstfree <0 && i<
commPartners_.size(); i++)
979 BIASERR(mygetpid()<<
": New connection refused, all "<<
commPartners_.size()
980 <<
" channels are busy");
984 BIASDOUT(D_CS_LISTENER,
"Accepting connection on slot "<<firstfree);
996 threadcalldata->
commNr = firstfree;
997 threadcalldata->
comm =
this;
1015 logOut_.rdbuf(outputStream.rdbuf());
int commPartnersConnected_
int GetData_(const std::string &msgName)
int WaitForNewMessage(std::string &msgName)
only if not threaded! <br> Blocks until any new msg arrives, or returns instantly if a new message i...
int AnalyzeFirstMsg_(int commNr)
std::string GetCommPartnerName(int commPartnerNr)
returns the name of the commPartner with nr commPartnerNr or empty string in case of a number...
helper class for CScommBase
std::vector< CScommMsg > registeredMsgs_
helper class for CScommBase
friend void * receivethread(void *data)
these are wrapper to comply to the pthread_create() interface
int SendMsg_(const std::string &data)
int RegisterMsg(std::string msgName, EdataType dataType, int amount=1)
only registered msgs are accepted by commPartners Register a msg with this functions.
pthread_mutex_t registerLock_
int CheckRegisteredMsgs_(std::string &msgName, const CScommData &cPartner, EdataType &dataTypeToRead, unsigned int &dataSizeToRead)
CScommBase & operator=(const CScommBase &arg)
assignment op. required due to ostream member with explicit ctor JW
void SetVerboseOut(std::ostream &outputStream)
set the output of verbose msgs to another stream, e.g.
void SetVerbose(bool on)
gives some information about establising conn, and disconnecting etc.
std::vector< CScommData > commPartners_
this class CScomm (ClientServer communciation) handles data comunication via TCP/IP for one server an...
unsigned int lastProcessed
void receiveThread_(int clientNr)
void * receivethread(void *data)
these are wrapper to comply to the pthread_create() interface
void logOutPut_(const char *data, unsigned int size)
helper class for CScommBase
void SetLog(bool on)
gives a lot of information regarding the connections, like the exact data received and send...
void SetLogOut(std::ostream &outputStream)
set the output of log msgs to another stream, e.g.
std::vector< char > msgBuffer
int SendMsg(const std::string &msgName, std::vector< float > &floatData)
sends msg msgName to all connected commPartners.
int AnalyzeData_(int clientNr)
return values: 0 no error all fine
CScommBase(bool threaded=true)
Set threaded to false for deactivating the receive-thread.
int GetData(const std::string &msgName, std::vector< float > &floatData)
returns instantly:<br> 0 if new data has arrived and the data in xxxData 1 if no new data has arri...