EIC Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Sequencer.cpp
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file Sequencer.cpp
1 // This file is part of the Acts project.
2 //
3 // Copyright (C) 2017-2019 CERN for the benefit of the Acts project
4 //
5 // This Source Code Form is subject to the terms of the Mozilla Public
6 // License, v. 2.0. If a copy of the MPL was not distributed with this
7 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 
10 
14 
15 #include <algorithm>
16 #include <chrono>
17 #include <exception>
18 #include <numeric>
19 
20 #include <TROOT.h>
21 #include <dfe/dfe_io_dsv.hpp>
22 #include <dfe/dfe_namedtuple.hpp>
23 #include <tbb/tbb.h>
24 
26  : m_cfg(cfg),
27  m_logger(Acts::getDefaultLogger("Sequencer", m_cfg.logLevel)) {
28  // automatically determine the number of concurrent threads to use
29  if (m_cfg.numThreads < 0) {
30  m_cfg.numThreads = tbb::task_scheduler_init::default_num_threads();
31  }
32  ROOT::EnableThreadSafety();
33 }
34 
35 void ActsExamples::Sequencer::addService(std::shared_ptr<IService> service) {
36  if (not service) {
37  throw std::invalid_argument("Can not add empty/NULL service");
38  }
39  m_services.push_back(std::move(service));
40  ACTS_INFO("Added service '" << m_services.back()->name() << "'");
41 }
42 
44  std::shared_ptr<IContextDecorator> decorator) {
45  if (not decorator) {
46  throw std::invalid_argument("Can not add empty/NULL context decorator");
47  }
48  m_decorators.push_back(std::move(decorator));
49  ACTS_INFO("Added context decarator '" << m_decorators.back()->name() << "'");
50 }
51 
52 void ActsExamples::Sequencer::addReader(std::shared_ptr<IReader> reader) {
53  if (not reader) {
54  throw std::invalid_argument("Can not add empty/NULL reader");
55  }
56  m_readers.push_back(std::move(reader));
57  ACTS_INFO("Added reader '" << m_readers.back()->name() << "'");
58 }
59 
61  std::shared_ptr<IAlgorithm> algorithm) {
62  if (not algorithm) {
63  throw std::invalid_argument("Can not add empty/NULL algorithm");
64  }
65  m_algorithms.push_back(std::move(algorithm));
66  ACTS_INFO("Added algorithm '" << m_algorithms.back()->name() << "'");
67 }
68 
69 void ActsExamples::Sequencer::addWriter(std::shared_ptr<IWriter> writer) {
70  if (not writer) {
71  throw std::invalid_argument("Can not add empty/NULL writer");
72  }
73  m_writers.push_back(std::move(writer));
74  ACTS_INFO("Added writer '" << m_writers.back()->name() << "'");
75 }
76 
77 std::vector<std::string> ActsExamples::Sequencer::listAlgorithmNames() const {
78  std::vector<std::string> names;
79 
80  // WARNING this must be done in the same order as in the processing
81  for (const auto& service : m_services) {
82  names.push_back("Service:" + service->name());
83  }
84  for (const auto& decorator : m_decorators) {
85  names.push_back("Decorator:" + decorator->name());
86  }
87  for (const auto& reader : m_readers) {
88  names.push_back("Reader:" + reader->name());
89  }
90  for (const auto& algorithm : m_algorithms) {
91  names.push_back("Algorithm:" + algorithm->name());
92  }
93  for (const auto& writer : m_writers) {
94  names.push_back("Writer:" + writer->name());
95  }
96 
97  return names;
98 }
99 
100 namespace {
101 // Saturated addition that does not overflow and exceed SIZE_MAX.
102 //
103 // From http://locklessinc.com/articles/sat_arithmetic/
104 size_t saturatedAdd(size_t a, size_t b) {
105  size_t res = a + b;
106  res |= -(res < a);
107  return res;
108 }
109 } // namespace
110 
111 std::pair<std::size_t, std::size_t>
113  constexpr auto kInvalidEventsRange = std::make_pair(SIZE_MAX, SIZE_MAX);
114 
115  // Note on skipping events:
116  //
117  // Previously, skipping events was only allowed when readers where available,
118  // since only readers had a `.skip()` functionality. The `.skip()` interface
119  // has been removed in favour of telling the readers the event they are
120  // requested to read via the algorithm context.
121  // Skipping can now also be used when no readers are configured, e.g. for
122  // generating only a few specific events in a simulation setup.
123 
124  // determine intersection of event ranges available from readers
125  size_t beg = 0u;
126  size_t end = SIZE_MAX;
127  for (const auto& reader : m_readers) {
128  auto available = reader->availableEvents();
129  beg = std::max(beg, available.first);
130  end = std::min(end, available.second);
131  }
132 
133  // since we use event ranges (and not just num events) they might not overlap
134  if (end < beg) {
135  ACTS_ERROR("Available events ranges from readers do not overlap");
136  return kInvalidEventsRange;
137  }
138  // configured readers without available events makes no sense
139  // TODO could there be a use-case for zero events? run only setup functions?
140  if (beg == end) {
141  ACTS_ERROR("No events available");
142  return kInvalidEventsRange;
143  }
144  // trying to skip too many events must be an error
145  if (end <= saturatedAdd(beg, m_cfg.skip)) {
146  ACTS_ERROR("Less events available than requested to skip");
147  return kInvalidEventsRange;
148  }
149  // events range was not defined by either the readers or user command line.
150  if ((beg == 0u) and (end == SIZE_MAX) and (m_cfg.events == SIZE_MAX)) {
151  ACTS_ERROR("Could not determine number of events");
152  return kInvalidEventsRange;
153  }
154 
155  // take user selection into account
156  auto begSelected = saturatedAdd(beg, m_cfg.skip);
157  auto endRequested = saturatedAdd(begSelected, m_cfg.events);
158  auto endSelected = std::min(end, endRequested);
159  if (end < endRequested) {
160  ACTS_INFO("Restrict requested number of events to available ones");
161  }
162 
163  return {begSelected, endSelected};
164 }
165 
166 // helpers for per-algorithm timing information
167 namespace {
168 using Clock = std::chrono::high_resolution_clock;
169 using Duration = Clock::duration;
170 using Timepoint = Clock::time_point;
171 using Seconds = std::chrono::duration<double>;
172 using NanoSeconds = std::chrono::duration<double, std::nano>;
173 
174 // RAII-based stopwatch to time execution within a block
175 struct StopWatch {
176  Timepoint start;
177  Duration& store;
178 
179  StopWatch(Duration& s) : start(Clock::now()), store(s) {}
180  ~StopWatch() { store += Clock::now() - start; }
181 };
182 
183 // Convert duration to a printable string w/ reasonable unit.
184 template <typename D>
185 inline std::string asString(D duration) {
186  double ns = std::chrono::duration_cast<NanoSeconds>(duration).count();
187  if (1e9 < std::abs(ns)) {
188  return std::to_string(ns / 1e9) + " s";
189  } else if (1e6 < std::abs(ns)) {
190  return std::to_string(ns / 1e6) + " ms";
191  } else if (1e3 < std::abs(ns)) {
192  return std::to_string(ns / 1e3) + " us";
193  } else {
194  return std::to_string(ns) + " ns";
195  }
196 }
197 
198 // Convert duration scaled to one event to a printable string.
199 template <typename D>
200 inline std::string perEvent(D duration, size_t numEvents) {
201  return asString(duration / numEvents) + "/event";
202 }
203 
204 // Store timing data
205 struct TimingInfo {
206  std::string identifier;
207  double time_total_s;
208  double time_perevent_s;
209 
210  DFE_NAMEDTUPLE(TimingInfo, identifier, time_total_s, time_perevent_s);
211 };
212 
213 void storeTiming(const std::vector<std::string>& identifiers,
214  const std::vector<Duration>& durations, std::size_t numEvents,
215  std::string path) {
216  dfe::NamedTupleTsvWriter<TimingInfo> writer(std::move(path), 4);
217  for (size_t i = 0; i < identifiers.size(); ++i) {
218  TimingInfo info;
219  info.identifier = identifiers[i];
220  info.time_total_s =
221  std::chrono::duration_cast<Seconds>(durations[i]).count();
222  info.time_perevent_s = info.time_total_s / numEvents;
223  writer.append(info);
224  }
225 }
226 } // namespace
227 
229  // measure overall wall clock
230  Timepoint clockWallStart = Clock::now();
231  // per-algorithm time measures
232  std::vector<std::string> names = listAlgorithmNames();
233  std::vector<Duration> clocksAlgorithms(names.size(), Duration::zero());
234  tbb::queuing_mutex clocksAlgorithmsMutex;
235 
236  // processing only works w/ a well-known number of events
237  // error message is already handled by the helper function
238  std::pair<size_t, size_t> eventsRange = determineEventsRange();
239  if ((eventsRange.first == SIZE_MAX) and (eventsRange.second == SIZE_MAX)) {
240  return EXIT_FAILURE;
241  }
242 
243  ACTS_INFO("Processing events [" << eventsRange.first << ", "
244  << eventsRange.second << ")");
245  ACTS_INFO("Starting event loop with " << m_cfg.numThreads << " threads");
246  ACTS_INFO(" " << m_services.size() << " services");
247  ACTS_INFO(" " << m_decorators.size() << " context decorators");
248  ACTS_INFO(" " << m_readers.size() << " readers");
249  ACTS_INFO(" " << m_algorithms.size() << " algorithms");
250  ACTS_INFO(" " << m_writers.size() << " writers");
251 
252  // run start-of-run hooks
253  for (auto& service : m_services) {
254  names.push_back("Service:" + service->name() + ":startRun");
255  clocksAlgorithms.push_back(Duration::zero());
256  StopWatch sw(clocksAlgorithms.back());
257  service->startRun();
258  }
259 
260  // execute the parallel event loop
261  tbb::task_scheduler_init init(m_cfg.numThreads);
262  tbb::parallel_for(
263  tbb::blocked_range<size_t>(eventsRange.first, eventsRange.second),
264  [&](const tbb::blocked_range<size_t>& r) {
265  std::vector<Duration> localClocksAlgorithms(names.size(),
266  Duration::zero());
267 
268  for (size_t event = r.begin(); event != r.end(); ++event) {
269  // Use per-event store
271  "EventStore#" + std::to_string(event), m_cfg.logLevel));
272  // If we ever wanted to run algorithms in parallel, this needs to be
273  // changed to Algorithm context copies
274  AlgorithmContext context(0, event, eventStore);
275  size_t ialgo = 0;
276 
277  // Prepare event store w/ service information
278  for (auto& service : m_services) {
279  StopWatch sw(localClocksAlgorithms[ialgo++]);
280  service->prepare(++context);
281  }
283  for (auto& cdr : m_decorators) {
284  StopWatch sw(localClocksAlgorithms[ialgo++]);
285  if (cdr->decorate(++context) != ProcessCode::SUCCESS) {
286  throw std::runtime_error("Failed to decorate event context");
287  }
288  }
289  // Read everything in
290  for (auto& rdr : m_readers) {
291  StopWatch sw(localClocksAlgorithms[ialgo++]);
292  if (rdr->read(++context) != ProcessCode::SUCCESS) {
293  throw std::runtime_error("Failed to read input data");
294  }
295  }
296  // Execute all algorithms
297  for (auto& alg : m_algorithms) {
298  StopWatch sw(localClocksAlgorithms[ialgo++]);
299  if (alg->execute(++context) != ProcessCode::SUCCESS) {
300  throw std::runtime_error("Failed to process event data");
301  }
302  }
303  // Write out results
304  for (auto& wrt : m_writers) {
305  StopWatch sw(localClocksAlgorithms[ialgo++]);
306  if (wrt->write(++context) != ProcessCode::SUCCESS) {
307  throw std::runtime_error("Failed to write output data");
308  }
309  }
310  ACTS_INFO("finished event " << event);
311  }
312 
313  // add timing info to global information
314  {
315  tbb::queuing_mutex::scoped_lock lock(clocksAlgorithmsMutex);
316  for (size_t i = 0; i < clocksAlgorithms.size(); ++i) {
317  clocksAlgorithms[i] += localClocksAlgorithms[i];
318  }
319  }
320  });
321 
322  // run end-of-run hooks
323  for (auto& wrt : m_writers) {
324  names.push_back("Writer:" + wrt->name() + ":endRun");
325  clocksAlgorithms.push_back(Duration::zero());
326  StopWatch sw(clocksAlgorithms.back());
327  if (wrt->endRun() != ProcessCode::SUCCESS) {
328  return EXIT_FAILURE;
329  }
330  }
331 
332  // summarize timing
333  Duration totalWall = Clock::now() - clockWallStart;
334  Duration totalReal = std::accumulate(
335  clocksAlgorithms.begin(), clocksAlgorithms.end(), Duration::zero());
336  size_t numEvents = eventsRange.second - eventsRange.first;
337  ACTS_INFO("Processed " << numEvents << " events in " << asString(totalWall)
338  << " (wall clock)");
339  ACTS_INFO("Average time per event: " << perEvent(totalReal, numEvents));
340  ACTS_DEBUG("Average time per algorithm:");
341  for (size_t i = 0; i < names.size(); ++i) {
342  ACTS_DEBUG(" " << names[i] << ": "
343  << perEvent(clocksAlgorithms[i], numEvents));
344  }
345  storeTiming(names, clocksAlgorithms, numEvents,
346  joinPaths(m_cfg.outputDir, "timing.tsv"));
347 
348  return EXIT_SUCCESS;
349 }