Basic Image AlgorithmS Library  2.8.0
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
FiFoQueue.hh
1 /*
2 This file is part of the BIAS library (Basic ImageAlgorithmS).
3 
4 Copyright (C) 2003-2009 (see file CONTACT for details)
5  Multimediale Systeme der Informationsverarbeitung
6  Institut fuer Informatik
7  Christian-Albrechts-Universitaet Kiel
8 
9 
10 BIAS is free software; you can redistribute it and/or modify
11 it under the terms of the GNU Lesser General Public License as published by
12 the Free Software Foundation; either version 2.1 of the License, or
13 (at your option) any later version.
14 
15 BIAS is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU Lesser General Public License for more details.
19 
20 You should have received a copy of the GNU Lesser General Public License
21 along with BIAS; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 */
24 #ifndef __FiFoQueue_hh
25 #define __FiFoQueue_hh
26 
27 #include <bias_config.h>
28 #include <Base/Debug/Error.hh>
29 
30 #ifndef BIAS_HAVE_PTHREADS
31 # error pthreads need to be activated for FiFo-queue
32 #endif
33 
34 #include <pthread.h>
35 
36 #define __BIAS_FIFOQUEUE_DEFAULT_SIZE__ 10
37 
38 namespace BIAS {
39 
40  /**
41  * @class FiFoQueue
42  * @brief This class implements a templated standard FiFo-queue for inter
43  * process communication (arbitrary number of readers/writers), e.g.
44  * between GUIs and underlying processes. It can be used as synced or
45  * unsynced.
46  * FiFo: Oldest item will be return at first. Items once fetched are
47  * 'deleted' from the queue.
48  * synced: process is block until a free slot available for writing
49  * -> AppendSynced
50  * or an item is available for read
51  * -> FetchSynced
52  * unsynced: no item is returned from empty buffer when reading and
53  * oldest item gets 'lost' when buffer is full during
54  * writing
55  * !!! ATTENTION: WHEN TEMPLATED OVER POINTERS DATA 'BEHIND' !!!
56  * !!! THE POINTER WON'T DELETED WHEN BUFFER IS FULL DURING !!!
57  * !!! WRITING !!!
58  * @author BIAS
59  */
60  template<class T> class FiFoQueue {
61 
62  public:
63  /** Default constructor. Creates a queue with defined default size
64  __BIAS_FIFOQUEUE_DEFAULT_SIZE__ and initializes mutex-/condition-
65  variables
66  */
67  FiFoQueue() : length_(__BIAS_FIFOQUEUE_DEFAULT_SIZE__), head_(0),
68  tail_(1%__BIAS_FIFOQUEUE_DEFAULT_SIZE__), numItems_(0)
69  {
70  data_ = new T[__BIAS_FIFOQUEUE_DEFAULT_SIZE__];
71 
72  //initialize trigger/lock
73  triggerCondFull_ = new pthread_cond_t();
74  pthread_cond_init(triggerCondFull_, NULL);
75  triggerCondEmpty_ = new pthread_cond_t();
76  pthread_cond_init(triggerCondEmpty_, NULL);
77  syncMutex_ = new pthread_mutex_t();
78  pthread_mutex_init(syncMutex_, NULL);
79  }
80 
81  /** Constructor. Creates a queue with given size and initializes
82  mutex-/condition-variables
83  */
84  FiFoQueue(unsigned length) :
85  length_(length), head_(0), tail_(1%length), numItems_(0)
86  {
87  data_ = new T[length_];
88 
89  //initialize trigger/lock
90  triggerCondFull_ = new pthread_cond_t();
91  pthread_cond_init(triggerCondFull_, NULL);
92  triggerCondEmpty_ = new pthread_cond_t();
93  pthread_cond_init(triggerCondEmpty_, NULL);
94  syncMutex_ = new pthread_mutex_t();
95  pthread_mutex_init(syncMutex_, NULL);
96  }
97 
98  /** Just a cleanup.
99  !!! ATTENTION: WHEN USING POINTERS AS TEMPLATE TYPE ONLY THE !!!
100  !!! BUFFER IS DELETED - NOT THE DATA 'BEHIND' THE POINTERS !!!
101  */
103  {
104  delete data_;
105 
106  pthread_cond_destroy(triggerCondFull_);
107  delete triggerCondFull_;
108  pthread_cond_destroy(triggerCondEmpty_);
109  delete triggerCondEmpty_;
110  pthread_mutex_destroy(syncMutex_);
111  delete syncMutex_;
112  }
113 
114 
115  /** Return the number of items in queue
116  */
117  inline unsigned GetNumItems()
118  { unsigned numItems;
119  pthread_mutex_lock(syncMutex_);
120  numItems = numItems_;
121  pthread_mutex_unlock(syncMutex_);
122  return numItems_; }
123 
124 
125  /** Returns the size of underlying ringbuffer
126  -> maximum number of items in queue
127  */
128  inline unsigned GetBufferSize()
129  { return length_; }
130 
131 
132  /** Append an datum as newest item. Syncron in the sens of blocking,
133  that is, threads inserting an item to a full queue are delayed until
134  a slot is available (item has been fetched)
135  */
136  inline int AppendSynced(T &item)
137  {
138  //get exclusive access
139  pthread_mutex_lock(syncMutex_);
140  while (numItems_ == length_) {
141  //ensure consisntency for full buffer
142  BIASASSERT((head_+1)%length_ == tail_);
143  //in case of full buffer wait for free slot
144  //...while needed because more than 1 waiting process might
145  // be unblocked from cond_signal!
146  pthread_cond_wait(triggerCondFull_,syncMutex_);
147  }
148 
149  //signal new item inserted
150  pthread_cond_signal(triggerCondEmpty_);
151 
152  //make head_ point to next slot
153  head_ = (head_+1)%length_;
154  //insert data at head_
155  data_[head_] = item;
156  //increase number of used slots
157  numItems_++;
158  //release exclusiv access
159  pthread_mutex_unlock(syncMutex_);
160 
161  return 0;
162  }
163 
164  /** Append an datum as newest item. Asyncron in the sens of non-blocking,
165  that is, threads inserting an item to a full queue will overwrite the
166  oldest item. This gets 'lost'.
167  !!! ATTENTION: OVERWRITING A POINTER TYPE DOES NO FREE MEMORY !!!
168  */
169  inline int AppendUnsynced(T &item)
170  {
171  //get exclusive access
172  pthread_mutex_lock(syncMutex_);
173 
174  //make tail_ point to next slot
175  head_ = (head_+1)%length_;
176  //insert data at tail_
177  data_[head_] = item;
178 
179  if (numItems_ == length_) {
180  //if buffer was full before insert, oldest item is deleted
181  // -> increase tail_
182  tail_ = (tail_+1)%length_;
183  //release exclusiv access
184  pthread_mutex_unlock(syncMutex_);
185  //return message that oldest item was 'lost'
186  return 1;
187  }
188 
189  //signal new item inserted
190  pthread_cond_signal(triggerCondEmpty_);
191 
192  //increase number of used slots if buffer was not full before insert
193  numItems_++;
194  //release exclusiv access
195  pthread_mutex_unlock(syncMutex_);
196 
197  return 0;
198  }
199 
200  /** Fetch the oldest item from the queue. Again synced is blocking,
201  thread will wait for an item if buffer is empty.
202  */
203  inline int FetchSynced(T &item)
204  {
205  //get exclusive access
206  pthread_mutex_lock(syncMutex_);
207  while (numItems_ == 0) {
208  //ensure consisntency for empty buffer
209  BIASASSERT((head_+1)%length_ == tail_);
210  //in case of buffer is empty wait for item
211  //...while needed because more than 1 waiting process might
212  // be unblocked from pthread_cond_signal!
213  pthread_cond_wait(triggerCondEmpty_,syncMutex_);
214  }
215 
216  //signal new slot is available
217  pthread_cond_signal(triggerCondFull_);
218 
219  //fetch item at tail_
220  item = data_[tail_];
221  //make tail_ point to next slot
222  tail_ = (tail_+1)%length_;
223  //decrease number of used slots if buffer was not empty before insert
224  numItems_--;
225  //release exclusiv access
226  pthread_mutex_unlock(syncMutex_);
227 
228  return 0;
229  }
230 
231  /** Fetch the oldest item from the queue. Again unsynced is non-blocking,
232  thread won't wait for an item if buffer is empty. In this case the input-/
233  output-param is left unchanged and 1 is returned.
234  */
235  inline int FetchUnsynced(T &item)
236  {
237  //get exclusive access
238  pthread_mutex_lock(syncMutex_);
239 
240  if (numItems_ == 0) {
241  //if buffer was empty before fetch, no item is returned...
242  // release exclusiv access (only possible if not synced)
243  pthread_mutex_unlock(syncMutex_);
244  //signal that no item was available
245  return 1;
246  }
247 
248  //signal new slot is available
249  pthread_cond_signal(triggerCondFull_);
250 
251  //fetch data at head_
252  item = data_[tail_];
253  //make head_ point to next slot
254  tail_ = (tail_+1)%length_;
255  //decrease item counter
256  numItems_--;
257  //release exclusiv access
258  pthread_mutex_unlock(syncMutex_);
259 
260  return 0;
261  }
262 
263  protected:
264 
265  //prohibit copy -> no consistent copy feasible
267 
268  ///array holding cycled buffer
269  T* data_;
270 
271  /// pthread condition variable for waiting for non full buf
272  pthread_cond_t *triggerCondFull_;
273  /// pthread condition variable for waiting for non empty buf
274  pthread_cond_t *triggerCondEmpty_;
275  /// mutex for sync. locking (conditions and access)
276  pthread_mutex_t *syncMutex_;
277 
278  ///length of buffer (number of slots)
279  const unsigned length_;
280 
281  ///first (oldest) element in queue
282  unsigned head_;
283  ///last (newest) element in queue
284  unsigned tail_;
285  ///number of items in buffer
286  unsigned numItems_;
287  };
288 }
289 
290 #endif // __FiFoQueue_hh
FiFoQueue()
Default constructor.
Definition: FiFoQueue.hh:67
pthread_cond_t * triggerCondFull_
pthread condition variable for waiting for non full buf
Definition: FiFoQueue.hh:272
This class implements a templated standard FiFo-queue for inter process communication (arbitrary numb...
Definition: FiFoQueue.hh:60
unsigned head_
first (oldest) element in queue
Definition: FiFoQueue.hh:282
unsigned numItems_
number of items in buffer
Definition: FiFoQueue.hh:286
~FiFoQueue()
Just a cleanup.
Definition: FiFoQueue.hh:102
pthread_mutex_t * syncMutex_
mutex for sync. locking (conditions and access)
Definition: FiFoQueue.hh:276
int FetchSynced(T &item)
Fetch the oldest item from the queue.
Definition: FiFoQueue.hh:203
FiFoQueue(unsigned length)
Constructor.
Definition: FiFoQueue.hh:84
unsigned GetBufferSize()
Returns the size of underlying ringbuffer -&gt; maximum number of items in queue.
Definition: FiFoQueue.hh:128
FiFoQueue(FiFoQueue &data)
Definition: FiFoQueue.hh:266
const unsigned length_
length of buffer (number of slots)
Definition: FiFoQueue.hh:279
pthread_cond_t * triggerCondEmpty_
pthread condition variable for waiting for non empty buf
Definition: FiFoQueue.hh:274
unsigned tail_
last (newest) element in queue
Definition: FiFoQueue.hh:284
int AppendSynced(T &item)
Append an datum as newest item.
Definition: FiFoQueue.hh:136
int FetchUnsynced(T &item)
Fetch the oldest item from the queue.
Definition: FiFoQueue.hh:235
unsigned GetNumItems()
Return the number of items in queue.
Definition: FiFoQueue.hh:117
T * data_
array holding cycled buffer
Definition: FiFoQueue.hh:266
int AppendUnsynced(T &item)
Append an datum as newest item.
Definition: FiFoQueue.hh:169