EIC Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ospBuffer.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file ospBuffer.cc
1 
2 #include <sys/types.h>
3 #include <sys/stat.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6 #include <string.h>
7 
8 #include "ospBuffer.h"
9 #include "ospEvent.h"
10 
11 #include "BufferConstants.h"
12 #include "EventTypes.h"
13 
14 
15 // the constructor first ----------------
16 
17 
18 // the constructor first ----------------
19 ospBuffer::ospBuffer (const char *filename, PHDWORD * where, const int length, int &status
20  , const int irun, const int iseq)
21 {
22  status = 0;
23  our_fd = 1;
24 
25  fd = open(filename, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
26  S_IRWXU | S_IROTH | S_IRGRP );
27 
28  if ( fd < 0)
29  {
30  status =1;
31  good_object = 0;
32  return;
33  }
34  good_object = 1;
35  bptr = ( buffer_ptr) where;
36  data_ptr = &(bptr->data[0]);
40  bptr->ID = -64;
41  bptr->Bufseq = iseq;
42  bptr->Runnr = 0;
43  current_event = 0;
45  sequence = iseq;
46  eventsequence = 0;
47  runnumber = irun;
48  byteswritten = 0;
49 
50  prepare_next ();
51 }
52 
53 ospBuffer::ospBuffer (int fdin, PHDWORD * where, const int length
54  , const int irun, const int iseq)
55 {
56  fd = fdin;
57  our_fd = 0;
58  good_object = 1;
59  bptr = ( buffer_ptr) where;
60  data_ptr = &(bptr->data[0]);
64  bptr->ID = -64;
65  bptr->Bufseq = iseq;
66  bptr->Runnr = 0;
67  current_event = 0;
69  sequence = iseq;
70  eventsequence = 0;
71  runnumber = irun;
72  byteswritten = 0;
73 
74  prepare_next ();
75 }
76 
77 
78 // ---------------------------------------------------------
79 int ospBuffer::nextEvent( const unsigned int evtsize, const int etype, const int evtseq)
80 
81 {
82 
83  if (current_event) delete current_event;
84  current_event = 0;
85 
86  if (evtsize > max_size - EVTHEADERLENGTH) return -1;
87  if (evtsize <=0) return -2;
88 
89  if (evtsize > left-EOBLENGTH)
90  {
91  writeout();
92  prepare_next();
93  }
94 
95  if (etype >0) current_etype = etype;
96 
97  if (evtseq > 0) eventsequence = evtseq;
98  else eventsequence++;
99 
100 
101  current_event = new ospEvent(&(bptr->data[current_index]), evtsize
103 
107  bptr->Bufseq++;
108 
109  dirty = 1;
110  return 0;
111 }
112 
113 
114 
115 
116 // ---------------------------------------------------------
117 int ospBuffer::addRawEvent( unsigned int *data)
118 {
119 
120  if ( ! good_object) return -1;
121  int wstatus;
122 
123  unsigned int nw = data[0];
124 
125  if ( nw > left-EOBLENGTH)
126  {
127  wstatus = writeout();
128  prepare_next();
129  if (wstatus) return wstatus;
130  }
131 
132  memcpy ( (char *) &(bptr->data[current_index]), (char *) data, 4*nw);
133 
134  left -= nw;
135  current_index += nw;
136  bptr->Length += nw*4;
137  bptr->Bufseq++;
138  dirty =1;
139  return 0;
140 }
141 
142 
143 // ---------------------------------------------------------
145 {
146 
147  if ( ! good_object) return -1;
148  int nw;
149 
150  int wstatus;
151 
152  runnumber = Evt->getRunNumber();
153 
154  if ( Evt->getEvtLength() > left-EOBLENGTH)
155  {
156  wstatus = writeout();
157  prepare_next();
158  if (wstatus) return wstatus;
159  }
160 
161  Evt->Copy( (int *) &(bptr->data[current_index]), Evt->getEvtLength(), &nw);
162 
163  left -= nw;
164  current_index += nw;
165  bptr->Length += nw*4;
166  bptr->Bufseq++;
167  dirty =1;
168  return 0;
169 }
170 
171 
172 // ----------------------------------------------------------
174 {
175  int len;
176 
177  if ( ! good_object) return 0;
178 
179  len = current_event->addPacket(p);
180  if (len < 0) return 0;
181 
182  left -= len;
183  current_index += len;
184  bptr->Length += len*4;
185 
186 
187  return len;
188 }
189 
190 
191 
192 
193 // ----------------------------------------------------------
195  const int length,
196  const int id,
197  const int wordsize,
198  const int hitformat)
199 {
200  int len;
201 
202  if ( ! good_object) return 0;
203 
204  len = current_event->addUnstructPacketData(data, length
205  ,id , wordsize , hitformat);
206  if (len < 0) return 0;
207 
208  left -= len;
209  current_index += len;
210  bptr->Length += len*4;
211 
212  // for (int k = 0; k<current_index; k++)
213  // COUT << k << " " << bptr->data[k] << std::endl;
214 
215  // COUT << "------------------" << std::endl;
216 
217 
218  return len;
219 }
220 
221 // ---------------------------------------------------------
223 {
224 
225  // re-initialize the event header length
227  bptr->ID = 0xffffc0c0;
228  bptr->Bufseq = 0;
229  sequence++;
230  bptr->Runnr = runnumber;
231 
232  current_index = 0;
234  has_end = 0;
235  dirty = 1;
236  return 0;
237 }
238 
239 // ----------------------------------------------------------
240 int ospBuffer::setMaxSize(const int size)
241 {
242  if (size < 0) return -1;
243  if (size == 0) max_size = max_length;
244  else
245  {
246  max_size = (size + ( BUFFERBLOCKSIZE - size%BUFFERBLOCKSIZE) ) /4;
247  if (max_size > max_length)
248  {
250  return -2;
251  }
252  }
253  return 0;
254 }
255 
256 // ----------------------------------------------------------
258 {
259  return max_size;
260 }
261 
262 // ----------------------------------------------------------
263 unsigned long long ospBuffer::getBytesWritten() const
264 {
265  return byteswritten;
266 }
267 
268 // ----------------------------------------------------------
269 //
270 //
272 {
273 
274  // void *writeThread(void *);
275 
276  if ( ! good_object || fd <= 0 ) return -1;
277 
278 
279  if (! dirty) return 0;
280 
281  if (! has_end) addEoB();
282 
283  if (fd < 0) return 0;
284 
285 
286  unsigned int ip =0;
287  char *cp = (char *) bptr;
288 
289  while (ip < bptr->Length)
290  {
291  int n = write ( fd, cp, BUFFERBLOCKSIZE);
292  if ( n != BUFFERBLOCKSIZE)
293  {
294  std::cout << " could not write output, bytes written: " << n << std::endl;
295  return 0;
296  }
297  cp += BUFFERBLOCKSIZE;
298  ip+=BUFFERBLOCKSIZE;
299  }
300  dirty = 0;
301  byteswritten += ip;
302  return 0;
303 #ifdef WITHTHREADS
304 
305  }
306  else
307  {
308  //wait for a potential old thread to complete
309  if (ThreadId)
310  {
311  pthread_join(ThreadId, NULL);
312  byteswritten += thread_arg[2]; // the number of bytes written from previosu thread
313  }
314  if (! dirty) return 0;
315 
316  if (! has_end) addEoB();
317 
318  if (fd < 0) return 0;
319 
320  //swap the buffers around
321  buffer_ptr tmp = bptr_being_written;
322  bptr_being_written = bptr;
323  bptr = tmp;
324  dirty = 0;
325  // now fork off the write thread
326 
327  thread_arg[0] = (int) fd;
328  thread_arg[1] = (int) bptr_being_written;
329  thread_arg[2] = 0;
330 
331  // COUT << "starting write thread" << std::endl;
332  int s = pthread_create(&ThreadId, NULL, ophBuffer::writeThread, (void *) thread_arg);
333  //COUT << "create status is " << s << std::endl;
334 
335  return 0;
336 
337  }
338 #endif
339 }
340 
341 
342 
343 // ----------------------------------------------------------
345 {
346  if (has_end) return -1;
347  bptr->data[current_index++] = 2;
348  bptr->data[current_index++] = 0;
349  bptr->Length += 2*4;
350 
351  has_end = 1;
352  return 0;
353 }
354 
355 
356 // ----------------------------------------------------------
358 {
359  writeout();
360  if (our_fd) close (fd);
361 
362 }
363 
364 
365