1 #include "UDPClient.hh"
8 # include <sys/socket.h>
9 # include <sys/types.h>
10 # include <arpa/inet.h>
19 #include "UDPServerClientDefinitions.hh"
20 #include "UDPDataConsumer.hh"
22 #include <Base/Debug/Error.hh>
23 #include <sys/types.h>
24 #include <Base/Common/W32Compat.hh>
25 #include <Base/Common/BIASpragma.hh>
30 UDPClient::UDPClient(){
31 TCPinsteadOfUDP_ =
false;
35 local_socket_address_.sin_family = AF_INET;
36 local_socket_address_.sin_port = htons(4000);
37 local_socket_address_.sin_addr.s_addr = INADDR_ANY;
38 memset(&(local_socket_address_.sin_zero),0, 8);
46 void UDPClient::Init(
int port ,
const char* bind_to_ip){
47 TCPinsteadOfUDP_ =
false;
51 local_socket_address_.sin_family = AF_INET;
52 local_socket_address_.sin_port = htons(port);
54 local_socket_address_.sin_addr.s_addr = inet_addr(bind_to_ip);
56 local_socket_address_.sin_addr.s_addr = INADDR_ANY;
57 memset(&(local_socket_address_.sin_zero),0, 8);
65 void UDPClient::create_buffer_structure(
unsigned int numbuffers)
75 for(
unsigned int i=0; i < numbuffers; i++) {
77 Buffers_.push_back(buf);
80 completed_message =
new char*[numbuffers];
86 void UDPClient::destroy_buffer_structure()
88 for(
unsigned int i=0; i < Buffers_.size(); i++)
89 delete[] Buffers_[i].frame_data;
92 delete[] completed_message;
96 void UDPClient::check_for_complete_message(
int last_message_number_inserted,
99 bool droppedMessage =
false;
101 int message_frames_so_far = 0;
102 for (
unsigned int i = 0; i < Buffers_.size(); i++) {
103 if (Buffers_[i].number == last_message_number_inserted)
104 message_frames_so_far++;
105 else if (!Buffers_[i].is_free){
109 cerr<<
"UDPClient: Dropped message ! found frame of message:"
110 <<Buffers_[i].number<<
" while expecting "
111 <<last_message_number_inserted<<endl;
112 droppedMessage =
true;
118 for (
unsigned int i = 0; i < Buffers_.size(); i++)
119 if (Buffers_[i].number != last_message_number_inserted)
120 Buffers_[i].is_free =
true;
123 if (message_frames_so_far == frames_total) {
132 int size_of_last_message_frame = -1;
133 for (
unsigned int i = 0; i < Buffers_.size(); i++) {
134 if (Buffers_[i].number == last_message_number_inserted) {
135 completed_message[Buffers_[i].frame_number] = (Buffers_[i].frame_data) +
sizeof(
message_frame_header);
136 Buffers_[i].is_free =
true;
137 if (Buffers_[i].frame_number == frames_total-1)
138 size_of_last_message_frame = Buffers_[i].frame_size;
143 if (message_counter == 1000000000)
149 udc->consume_complete_message(frames_total,
152 size_of_last_message_frame);
183 int UDPClient::get_a_free_buffer()
185 int return_value = -1;
187 for(
unsigned int i=0; i < Buffers_.size(); i++) {
189 if ( Buffers_[i].is_free ) {
196 if (return_value == -1) {
197 const unsigned int newbuffers = 10;
199 return_value = (int)Buffers_.size();
200 BIASERR(
"Ran out of buffers, increasing number of buffers by "<<newbuffers
202 <<
"\tAvoid this by specifying an appropriate messagesize to "
203 <<endl<<
"\tUDPClient::start()");
210 for(
unsigned int i=0; i < newbuffers; i++) {
212 Buffers_.push_back(buf);
222 void* receive_loop_starter(
void* udpclient) {
229 void UDPClient::receive_loop() {
236 struct sockaddr_in server_socket_address_;
240 int address_length_ =
sizeof(server_socket_address_);
242 socklen_t address_length_ =
sizeof(server_socket_address_);
244 int received_bytes = -1;
245 int free_buffer_number = -1;
249 while (is_active_ ) {
251 free_buffer_number = get_a_free_buffer();
252 received_bytes = recvfrom(socket_fd_,
253 Buffers_[free_buffer_number].frame_data,
256 (
struct sockaddr *) &server_socket_address_,
259 if (ServerAddr_ ==NULL) {
262 char *tmp = inet_ntoa(server_socket_address_.sin_addr);
263 size_t len = strlen(tmp)+1;
264 ServerAddr_ =
new char[len];
265 strncpy(ServerAddr_, tmp, len);
268 if (received_bytes < 0) {
270 syscallerror(
"Error receiving bytes. Aborting!");
298 cerr <<
"Warning: Received a truncated UDP frame, which has been discarded." << endl;
306 Buffers_[free_buffer_number].is_free =
false;
323 int UDPClient::start(
int max_message_size,
int number_of_message_buffers,
326 TCPinsteadOfUDP_ = UseTCP;
330 if (number_of_message_buffers < 1)
331 number_of_message_buffers = 1;
338 create_buffer_structure(number_of_message_buffers * temp);
343 WORD wVersionRequested;
346 wVersionRequested = MAKEWORD( 1, 1 );
347 err = WSAStartup( wVersionRequested, &wsaData );
349 cout <<
"Error on WSAStrtup(): "<<err<<endl;
351 if (TCPinsteadOfUDP_)
352 socket_fd_ = socket(PF_INET, SOCK_STREAM, 0);
354 socket_fd_ = socket(PF_INET, SOCK_DGRAM, 0);
355 if (socket_fd_ == INVALID_SOCKET) WSAGetLastError();
357 if (TCPinsteadOfUDP_)
358 socket_fd_ = socket(AF_INET, SOCK_STREAM, 0);
360 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
362 if ( socket_fd_ < 0 ) {
363 BIASERR(
"Could not create a socket. Aborting!");
369 (
struct sockaddr *) &local_socket_address_,
370 sizeof(
struct sockaddr)) < 0 )
372 BIASERR(
"Could not bind to socket. Aborting!");
382 cout <<
"Joining Multicast group - to be implemented" << endl;
398 if (TCPinsteadOfUDP_) {
399 int res = listen(socket_fd_,5);
401 perror(
"Failed to listen");
405 struct sockaddr tmpaddr;
407 int len=
sizeof(tmpaddr);
409 socklen_t len=
sizeof(tmpaddr);
411 accept(socket_fd_,&tmpaddr,&len);
416 pthread_t receive_thread;
417 if (pthread_create(&receive_thread, NULL, receive_loop_starter, (
void*)
this) !=0 ) {
418 syscallerror(
"Could not create a thread.");
429 void UDPClient::stop() {
436 closesocket(socket_fd_);
443 while (!has_ended_ && i<1000) {
456 destroy_buffer_structure();
460 UDPClient::~UDPClient() {
class for receiving UDP packages
Interface class for UDP data.
const int UDP_PACKAGE_SIZE