Basic Image AlgorithmS Library  2.8.0
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
UDPClient.cc
1 #include "UDPClient.hh"
2 
3 #ifdef WIN32
4 # include <winsock.h>
5 # include <io.h>
6 #else
7 # include <unistd.h>
8 # include <sys/socket.h>
9 # include <sys/types.h>
10 # include <arpa/inet.h>
11 # include <netdb.h>
12 #endif
13 
14 #include <cstdlib>
15 #include <string>
16 #include <cstring>
17 #include <iostream>
18 #include <cstdio>
19 #include "UDPServerClientDefinitions.hh"
20 #include "UDPDataConsumer.hh"
21 #include <pthread.h>
22 #include <Base/Debug/Error.hh>
23 #include <sys/types.h>
24 #include <Base/Common/W32Compat.hh>
25 #include <Base/Common/BIASpragma.hh>
26 
27 using namespace std;
28 using namespace BIAS;
29 
30 UDPClient::UDPClient(){
31  TCPinsteadOfUDP_ = false;
32  message_counter = 0;
33  isMulticast_ = false;
34  // Set up the local address
35  local_socket_address_.sin_family = AF_INET; // host byte order
36  local_socket_address_.sin_port = htons(4000);// short, network byte order
37  local_socket_address_.sin_addr.s_addr = INADDR_ANY;
38  memset(&(local_socket_address_.sin_zero),0, 8);// zero the rest of the struct
39  // The Client is not running yet
40  is_active_ = false;
41  has_ended_ = true;
42  ServerAddr_ = NULL;
43 }
44 
45 
46 void UDPClient::Init(int port ,const char* bind_to_ip){
47  TCPinsteadOfUDP_ = false;
48  message_counter = 0;
49  isMulticast_ = false;
50  // Set up the local address
51  local_socket_address_.sin_family = AF_INET; // host byte order
52  local_socket_address_.sin_port = htons(port);// short, network byte order
53  if(bind_to_ip !=NULL)
54  local_socket_address_.sin_addr.s_addr = inet_addr(bind_to_ip);
55  else
56  local_socket_address_.sin_addr.s_addr = INADDR_ANY;
57  memset(&(local_socket_address_.sin_zero),0, 8);// zero the rest of the struct
58  // The Client is not running yet
59  is_active_ = false;
60  has_ended_ = true;
61  ServerAddr_ = NULL;
62 }
63 
64 
65 void UDPClient::create_buffer_structure(unsigned int numbuffers)
66 {
67 
69  buf.is_free = true;
70  buf.number = -1;
71  buf.frame_number = -1;
72  buf.frame_size = -1;
73  buf.frames_total = -1;
74 
75  for(unsigned int i=0; i < numbuffers; i++) {
76  buf.frame_data = new char[UDP_PACKAGE_SIZE];
77  Buffers_.push_back(buf);
78  }
79  // get one array of pointer to assemble completed message
80  completed_message = new char*[numbuffers];
81 
82 } // create_buffer_structure
83 
84 
85 
86 void UDPClient::destroy_buffer_structure()
87 {
88  for(unsigned int i=0; i < Buffers_.size(); i++)
89  delete[] Buffers_[i].frame_data;
90  Buffers_.clear();
91 
92  delete[] completed_message;
93 } // destroy_buffer_structure
94 
95 
96 void UDPClient::check_for_complete_message(int last_message_number_inserted,
97  int frames_total) \
98 {
99  bool droppedMessage = false;
100 
101  int message_frames_so_far = 0;
102  for (unsigned int i = 0; i < Buffers_.size(); i++) {
103  if (Buffers_[i].number == last_message_number_inserted)
104  message_frames_so_far++;
105  else if (!Buffers_[i].is_free){
106  // if a non-free buffers has a message number other then the
107  // expected one, it must be a frame of an older message which
108  // has not been identified as completed earlier in this method.
109  cerr<<"UDPClient: Dropped message ! found frame of message:"
110  <<Buffers_[i].number<<" while expecting "
111  <<last_message_number_inserted<<endl;
112  droppedMessage = true;
113  }
114  }
115 
116  // remove frames of deprecated messages
117  if (droppedMessage)
118  for (unsigned int i = 0; i < Buffers_.size(); i++)
119  if (Buffers_[i].number != last_message_number_inserted)
120  Buffers_[i].is_free = true;
121 
122  // Is it enough for a complete message?
123  if (message_frames_so_far == frames_total) {
124  // cout <<" Message is complete"<<endl;
125  // We have a complete message!
126 
127  // Look for the buffers containing the data belonging to the actual
128  // message number and put the addresses of the frame data into
129  // the array completed_message, which will be consumed by the
130  // UDP data consumer.
131 
132  int size_of_last_message_frame = -1;
133  for (unsigned int i = 0; i < Buffers_.size(); i++) {
134  if (Buffers_[i].number == last_message_number_inserted) {
135  completed_message[Buffers_[i].frame_number] = (Buffers_[i].frame_data) + sizeof(message_frame_header);
136  Buffers_[i].is_free = true;
137  if (Buffers_[i].frame_number == frames_total-1)
138  size_of_last_message_frame = Buffers_[i].frame_size;
139  }
140  }
141 
142  message_counter++;
143  if (message_counter == 1000000000)
144  message_counter = 0;
145  // cout << "*** Message " << message_counter << " received in " << frames_total << " frames. \r";
146  cout.flush();
147 
148  // Let the UDP data consumer proceed
149  udc->consume_complete_message(frames_total,
150  completed_message,
152  size_of_last_message_frame);
153  }
154  // else cout <<" Message is NOT complete: "<<message_frames_so_far<<"/"<<frames_total<<endl;
155 } // check_for_complete_message
156 
157 
158 /* Buffer management
159 
160  Buffers_ is a vector of mesage_frame_buffer each with a pointer to
161  data. A message is split up into frames. Each frame is prepended
162  with a header (message_frame_header). which contains a message-ID,
163  the number of frames of this message, and the framenumber.
164 
165  The receiver call get_a_free_buffer(), places
166  the next received frame in it and marks that buffer as !free.
167  After each reception, the method check_for_complete_message() is
168  called, which, of course, scans the buffer for a completed message.
169  If on message is complete, udp_data_comsumer() is called to
170  assemble the message. All buffers from this mesage are freed by
171  check_for _complete_message() regardless the implementation of
172  udp_data_consumer.
173 
174  Thus, get_a_free_buffer() _never_ has to free a buffer. it only
175  hast to search one that is marked as free. If no free buffer is
176  available, this indicates that the amount of receive buffers is to
177  small. So more buffers should be allocated and the user should be
178  informed about that.
179 
180 */
181 
182 
183 int UDPClient::get_a_free_buffer()
184 {
185  int return_value = -1;
186  // Look for next free buffer
187  for(unsigned int i=0; i < Buffers_.size(); i++) {
188  // Is this buffer free?
189  if ( Buffers_[i].is_free ) {
190  return_value = i;
191  break;
192  }
193  }
194 
195  // If we haven't found a free buffer
196  if (return_value == -1) {
197  const unsigned int newbuffers = 10;
198  // get the index of the first new element
199  return_value = (int)Buffers_.size();
200  BIASERR("Ran out of buffers, increasing number of buffers by "<<newbuffers
201  <<"."<<endl
202  <<"\tAvoid this by specifying an appropriate messagesize to "
203  <<endl<<"\tUDPClient::start()");
205  buf.is_free = true;
206  buf.number = -1;
207  buf.frame_number = -1;
208  buf.frame_size = -1;
209  buf.frames_total = -1;
210  for(unsigned int i=0; i < newbuffers; i++) {
211  buf.frame_data = new char[UDP_PACKAGE_SIZE];
212  Buffers_.push_back(buf);
213  }
214 
215  }
216 
217  return return_value;
218 }
219 
220 
221 
222 void* receive_loop_starter(void* udpclient) {
223  UDPClient* c = (UDPClient*) udpclient;
224  c->receive_loop();
225  return NULL;
226 } // receive_loop_starter
227 
228 
229 void UDPClient::receive_loop() {
230 
231  // cout << "UDPClient starts to listen for incoming packages." << endl;
232 
233  // The server socket address, to store the
234  // address of the server from which a
235  // package just was received
236  struct sockaddr_in server_socket_address_;
237 
238  // The length of the socket address;
239 #if WIN32
240  int address_length_ = sizeof(server_socket_address_);
241 #else
242  socklen_t address_length_ = sizeof(server_socket_address_);
243 #endif
244  int received_bytes = -1;
245  int free_buffer_number = -1;
246 
247  has_ended_ = false;
248 
249  while (is_active_ ) {
250 
251  free_buffer_number = get_a_free_buffer();
252  received_bytes = recvfrom(socket_fd_,
253  Buffers_[free_buffer_number].frame_data,
255  0,
256  (struct sockaddr *) &server_socket_address_,
257  &address_length_);
258 
259  if (ServerAddr_ ==NULL) {
260  // cout <<"addr len: "<<address_length_<<endl;
261  // cout <<"port: "<<ntohs(server_socket_address_.sin_port)<<endl;
262  char *tmp = inet_ntoa(server_socket_address_.sin_addr);
263  size_t len = strlen(tmp)+1;
264  ServerAddr_ = new char[len];
265  strncpy(ServerAddr_, tmp, len);
266  // cout <<"addr: "<<ServerAddr_<<" len: "<<len<<endl;
267  }
268  if (received_bytes < 0) {
269  if (is_active_) {
270  syscallerror("Error receiving bytes. Aborting!");
271  is_active_ = false;
272  }
273  } else {
274  // obviously we received some data
275 
276  // extract the information from the message frame header
277  message_frame_header* mfh = (message_frame_header*) Buffers_[free_buffer_number].frame_data;
278 
279  Buffers_[free_buffer_number].number = mfh->message_number;
280  Buffers_[free_buffer_number].frame_number = mfh->message_frame_number;
281  Buffers_[free_buffer_number].frame_size = received_bytes - sizeof(message_frame_header);
282  Buffers_[free_buffer_number].frames_total = mfh->message_frames_total;
283 
284 
285  // cerr << "=======================" << endl;
286  // cerr << "package received! Size:" << received_bytes << endl;
287  // cerr << "message number: " << mfh->message_number << endl;
288  // cerr << "message frame number: " << mfh->message_frame_number << endl;
289  // cerr << "message frames total: " << mfh->message_frames_total << endl;
290  // cerr << "=======================" << endl;
291 
292 
293  // Did we receive enough bytes?
294  if (mfh->message_frame_number < (mfh->message_frames_total-1) && received_bytes < UDP_PACKAGE_SIZE) {
295 
296  // It is not not the last message frame of a message, so it should
297  // contain UDP_PACKAGE_SIZE bytes. Doing nothing.
298  cerr << "Warning: Received a truncated UDP frame, which has been discarded." << endl;
299  } else {
300 
301  //
302  // WARNING! CHECK FOR DUPLICATED UPD PACKAGES!!!
303  //
304 
305  // Now we can analyse, wether we have enough frames for a complete message
306  Buffers_[free_buffer_number].is_free = false;
307  check_for_complete_message(mfh->message_number, mfh->message_frames_total);
308 
309  // cerr << "package received! Size:" << received_bytes << endl;
310  // cerr << "message number: " << mfh->message_number << endl;
311  // cerr << "message frame number: " << mfh->message_frame_number << endl;
312  // cerr << "message frames total: " << mfh->message_frames_total << endl;
313  }
314  }
315 
316  } // while (is_active)
317 
318  has_ended_ = true;
319 
320 } // receive_loop
321 
322 /////////////////////////////////////////////////////////////////////
323 int UDPClient::start(int max_message_size, int number_of_message_buffers,
324  UDPDataConsumer* uc, bool UseTCP)
325 {
326  TCPinsteadOfUDP_ = UseTCP;
327  udc = uc;
328 
329  // One message must be bufferd at least
330  if (number_of_message_buffers < 1)
331  number_of_message_buffers = 1;
332 
333  // So how many UDP packages must be buffered?
334  int temp = (int) (max_message_size / UDP_PACKAGE_SIZE + 1);
335  if (temp <1)
336  temp = 1;
337 
338  create_buffer_structure(number_of_message_buffers * temp);
339 
340 
341  // Create a UDP socket
342 #ifdef WIN32
343  WORD wVersionRequested;
344  WSADATA wsaData;
345  int err;
346  wVersionRequested = MAKEWORD( 1, 1 );
347  err = WSAStartup( wVersionRequested, &wsaData );
348  if ( err != 0 ) {
349  cout <<"Error on WSAStrtup(): "<<err<<endl;
350  }
351  if (TCPinsteadOfUDP_)
352  socket_fd_ = socket(PF_INET, SOCK_STREAM, 0);
353  else
354  socket_fd_ = socket(PF_INET, SOCK_DGRAM, 0);
355  if (socket_fd_ == INVALID_SOCKET) WSAGetLastError();
356 #else
357  if (TCPinsteadOfUDP_)
358  socket_fd_ = socket(AF_INET, SOCK_STREAM, 0);
359  else
360  socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
361 #endif
362  if ( socket_fd_ < 0 ) {
363  BIASERR("Could not create a socket. Aborting!");
364  return -1;
365  }
366 
367  // Bind the socket to socket address
368  if (bind(socket_fd_,
369  (struct sockaddr *) &local_socket_address_,
370  sizeof(struct sockaddr)) < 0 )
371  {
372  BIASERR("Could not bind to socket. Aborting!");
373  return -1;
374  }
375 
376 
377  //
378  // Is the data transmitted via multicast?
379  //
380  if (isMulticast_) {
381 
382  cout << "Joining Multicast group - to be implemented" << endl;
383 
384  // struct ip_mreq mreq;
385  // mreq.imr_multiaddr.s_addr=inet_addr(ipaddress_);
386  // mreq.imr_interface.s_addr=htonl(INADDR_ANY);
387  // if (setsockopt(socket_fd_, IPPROTO_IP,IP_ADD_MEMBERSHIP,&mreq,sizeof(mreq)) < 0) {
388  // syscallerror("Could not join multicast group. Aborting!");
389  // return -1;
390  // }
391  }
392 
393 
394  //
395  // Now, the socket is ready and the buffer structure is built
396  // up, so we can create a thread
397  //
398  if (TCPinsteadOfUDP_) {
399  int res = listen(socket_fd_,5);
400  if (res<0) {
401  perror("Failed to listen");
402  return -1;
403  }
404  else {
405  struct sockaddr tmpaddr;
406 #ifdef WIN32
407  int len=sizeof(tmpaddr);
408 #else
409  socklen_t len=sizeof(tmpaddr);
410 #endif
411  accept(socket_fd_,&tmpaddr,&len);
412  }
413  }
414 
415  is_active_ = true;
416  pthread_t receive_thread;
417  if (pthread_create(&receive_thread, NULL, receive_loop_starter, (void*) this) !=0 ) {
418  syscallerror("Could not create a thread.");
419  return -1;
420  is_active_ = false;
421  } else {
422  return 0;
423  }
424 
425 } // start
426 
427 
428 
429 void UDPClient::stop() {
430 
431  // Leave threaded receive loop
432  is_active_ = false;
433 
434  // Close socket to unblock recvfrom
435 #ifdef WIN32
436  closesocket(socket_fd_);
437 #else
438  close(socket_fd_);
439 #endif
440 
441  // Wait until thread has ended
442  int i=0;
443  while (!has_ended_ && i<1000) {
444 #ifdef WIN32
445  Sleep(1);
446 #else
447  usleep(1000);
448 #endif
449  i++;
450  }
451 
452 #ifdef WIN32
453  WSACleanup();
454 #endif
455 
456  destroy_buffer_structure();
457 } // stop
458 
459 
460 UDPClient::~UDPClient() {
461  is_active_ = false;
462 }
class for receiving UDP packages
Definition: UDPClient.hh:37
Interface class for UDP data.
Header for sending images over UDP connections.
void receive_loop()
Definition: UDPClient.cc:229
const int UDP_PACKAGE_SIZE