32 ROOT::EnableThreadSafety();
37 throw std::invalid_argument(
"Can not add empty/NULL service");
39 m_services.push_back(std::move(service));
40 ACTS_INFO(
"Added service '" << m_services.back()->name() <<
"'");
44 std::shared_ptr<IContextDecorator> decorator) {
46 throw std::invalid_argument(
"Can not add empty/NULL context decorator");
48 m_decorators.push_back(std::move(decorator));
49 ACTS_INFO(
"Added context decarator '" << m_decorators.back()->name() <<
"'");
54 throw std::invalid_argument(
"Can not add empty/NULL reader");
56 m_readers.push_back(std::move(reader));
57 ACTS_INFO(
"Added reader '" << m_readers.back()->name() <<
"'");
63 throw std::invalid_argument(
"Can not add empty/NULL algorithm");
65 m_algorithms.push_back(std::move(algorithm));
66 ACTS_INFO(
"Added algorithm '" << m_algorithms.back()->name() <<
"'");
71 throw std::invalid_argument(
"Can not add empty/NULL writer");
73 m_writers.push_back(std::move(writer));
74 ACTS_INFO(
"Added writer '" << m_writers.back()->name() <<
"'");
78 std::vector<std::string>
names;
81 for (
const auto& service : m_services) {
82 names.push_back(
"Service:" + service->name());
84 for (
const auto& decorator : m_decorators) {
85 names.push_back(
"Decorator:" + decorator->name());
87 for (
const auto&
reader : m_readers) {
88 names.push_back(
"Reader:" +
reader->name());
90 for (
const auto&
algorithm : m_algorithms) {
91 names.push_back(
"Algorithm:" +
algorithm->name());
93 for (
const auto& writer : m_writers) {
94 names.push_back(
"Writer:" + writer->name());
104 size_t saturatedAdd(
size_t a,
size_t b) {
111 std::pair<std::size_t, std::size_t>
113 constexpr
auto kInvalidEventsRange = std::make_pair(SIZE_MAX, SIZE_MAX);
126 size_t end = SIZE_MAX;
127 for (
const auto&
reader : m_readers) {
128 auto available =
reader->availableEvents();
129 beg =
std::max(beg, available.first);
130 end =
std::min(end, available.second);
135 ACTS_ERROR(
"Available events ranges from readers do not overlap");
136 return kInvalidEventsRange;
142 return kInvalidEventsRange;
145 if (end <= saturatedAdd(beg, m_cfg.skip)) {
146 ACTS_ERROR(
"Less events available than requested to skip");
147 return kInvalidEventsRange;
150 if ((beg == 0
u) and (end == SIZE_MAX) and (m_cfg.events == SIZE_MAX)) {
151 ACTS_ERROR(
"Could not determine number of events");
152 return kInvalidEventsRange;
156 auto begSelected = saturatedAdd(beg, m_cfg.skip);
157 auto endRequested = saturatedAdd(begSelected, m_cfg.events);
158 auto endSelected =
std::min(end, endRequested);
159 if (end < endRequested) {
160 ACTS_INFO(
"Restrict requested number of events to available ones");
163 return {begSelected, endSelected};
168 using Clock = std::chrono::high_resolution_clock;
169 using Duration = Clock::duration;
170 using Timepoint = Clock::time_point;
171 using Seconds = std::chrono::duration<double>;
172 using NanoSeconds = std::chrono::duration<double, std::nano>;
179 StopWatch(Duration&
s) :
start(Clock::now()), store(s) {}
180 ~StopWatch() { store += Clock::now() -
start; }
184 template <
typename D>
185 inline std::string asString(D duration) {
186 double ns = std::chrono::duration_cast<NanoSeconds>(duration).count();
199 template <
typename D>
200 inline std::string perEvent(D duration,
size_t numEvents) {
201 return asString(duration / numEvents) +
"/event";
206 std::string identifier;
208 double time_perevent_s;
210 DFE_NAMEDTUPLE(TimingInfo, identifier, time_total_s, time_perevent_s);
213 void storeTiming(
const std::vector<std::string>& identifiers,
214 const std::vector<Duration>& durations, std::size_t numEvents,
217 for (
size_t i = 0; i < identifiers.size(); ++i) {
219 info.identifier = identifiers[i];
221 std::chrono::duration_cast<Seconds>(durations[i]).count();
222 info.time_perevent_s = info.time_total_s / numEvents;
230 Timepoint clockWallStart = Clock::now();
232 std::vector<std::string>
names = listAlgorithmNames();
233 std::vector<Duration> clocksAlgorithms(names.size(),
Duration::zero());
234 tbb::queuing_mutex clocksAlgorithmsMutex;
238 std::pair<size_t, size_t> eventsRange = determineEventsRange();
239 if ((eventsRange.first == SIZE_MAX) and (eventsRange.second == SIZE_MAX)) {
243 ACTS_INFO(
"Processing events [" << eventsRange.first <<
", "
244 << eventsRange.second <<
")");
245 ACTS_INFO(
"Starting event loop with " << m_cfg.numThreads <<
" threads");
246 ACTS_INFO(
" " << m_services.size() <<
" services");
247 ACTS_INFO(
" " << m_decorators.size() <<
" context decorators");
248 ACTS_INFO(
" " << m_readers.size() <<
" readers");
249 ACTS_INFO(
" " << m_algorithms.size() <<
" algorithms");
250 ACTS_INFO(
" " << m_writers.size() <<
" writers");
253 for (
auto& service : m_services) {
254 names.push_back(
"Service:" + service->name() +
":startRun");
256 StopWatch sw(clocksAlgorithms.back());
261 tbb::task_scheduler_init
init(m_cfg.numThreads);
263 tbb::blocked_range<size_t>(eventsRange.first, eventsRange.second),
264 [&](
const tbb::blocked_range<size_t>& r) {
265 std::vector<Duration> localClocksAlgorithms(names.size(),
268 for (
size_t event = r.begin();
event != r.end(); ++event) {
278 for (
auto& service : m_services) {
279 StopWatch sw(localClocksAlgorithms[ialgo++]);
280 service->prepare(++context);
283 for (
auto& cdr : m_decorators) {
284 StopWatch sw(localClocksAlgorithms[ialgo++]);
286 throw std::runtime_error(
"Failed to decorate event context");
290 for (
auto& rdr : m_readers) {
291 StopWatch sw(localClocksAlgorithms[ialgo++]);
293 throw std::runtime_error(
"Failed to read input data");
297 for (
auto& alg : m_algorithms) {
298 StopWatch sw(localClocksAlgorithms[ialgo++]);
300 throw std::runtime_error(
"Failed to process event data");
304 for (
auto& wrt : m_writers) {
305 StopWatch sw(localClocksAlgorithms[ialgo++]);
307 throw std::runtime_error(
"Failed to write output data");
315 tbb::queuing_mutex::scoped_lock lock(clocksAlgorithmsMutex);
316 for (
size_t i = 0; i < clocksAlgorithms.size(); ++i) {
317 clocksAlgorithms[i] += localClocksAlgorithms[i];
323 for (
auto& wrt : m_writers) {
324 names.push_back(
"Writer:" + wrt->name() +
":endRun");
326 StopWatch sw(clocksAlgorithms.back());
333 Duration totalWall = Clock::now() - clockWallStart;
334 Duration totalReal = std::accumulate(
335 clocksAlgorithms.begin(), clocksAlgorithms.end(),
Duration::zero());
336 size_t numEvents = eventsRange.second - eventsRange.first;
337 ACTS_INFO(
"Processed " << numEvents <<
" events in " << asString(totalWall)
339 ACTS_INFO(
"Average time per event: " << perEvent(totalReal, numEvents));
341 for (
size_t i = 0; i <
names.size(); ++i) {
343 << perEvent(clocksAlgorithms[i], numEvents));
345 storeTiming(
names, clocksAlgorithms, numEvents,
346 joinPaths(m_cfg.outputDir,
"timing.tsv"));