XRootD
XrdClParallelOperation.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
28 
29 #include "XrdCl/XrdClOperations.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClPostMaster.hh"
33 #include "XrdCl/XrdClJobManager.hh"
34 
35 #include <atomic>
36 #include <condition_variable>
37 #include <mutex>
38 
39 namespace XrdCl
40 {
41 
42  //----------------------------------------------------------------------------
43  // Interface for different execution policies:
44  // - all : all operations need to succeed in order for the parallel
45  // operation to be successful
46  // - any : just one of the operations needs to succeed in order for
47  // the parallel operation to be successful
48  // - some : n (user defined) operations need to succeed in order for
49  // the parallel operation to be successful
50  // - at least : at least n (user defined) operations need to succeed in
51  // order for the parallel operation to be successful (the
52  // user handler will be called only when all operations are
53  // resolved)
54  //
55  // @param status : status returned by one of the aggregated operations
56  //
57  // @return : true if the status should be passed to the user handler,
58  // false otherwise.
59  //----------------------------------------------------------------------------
61  {
62  virtual ~PolicyExecutor()
63  {
64  }
65 
66  virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67 
68  virtual XRootDStatus Result() = 0;
69  };
70 
71  //----------------------------------------------------------------------------
77  //----------------------------------------------------------------------------
78  template<bool HasHndl>
79  class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
80  {
81  template<bool> friend class ParallelOperation;
82 
83  public:
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  template<bool from>
90  ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
91  pipelines( std::move( obj.pipelines ) ),
92  policy( std::move( obj.policy ) )
93  {
94  }
95 
96  //------------------------------------------------------------------------
102  //------------------------------------------------------------------------
103  template<class Container>
104  ParallelOperation( Container &&container )
105  {
106  static_assert( !HasHndl, "Constructor is available only operation without handler");
107 
108  pipelines.reserve( container.size() );
109  auto begin = std::make_move_iterator( container.begin() );
110  auto end = std::make_move_iterator( container.end() );
111  std::copy( begin, end, std::back_inserter( pipelines ) );
112  container.clear(); // there's junk inside so we clear it
113  }
114 
116  {
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
122  std::string ToString()
123  {
124  std::ostringstream oss;
125  oss << "Parallel(";
126  for( size_t i = 0; i < pipelines.size(); i++ )
127  {
128  oss << pipelines[i]->ToString();
129  if( i + 1 != pipelines.size() )
130  {
131  oss << " && ";
132  }
133  }
134  oss << ")";
135  return oss.str();
136  }
137 
138  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
145  {
146  policy.reset( new AllPolicy() );
147  return std::move( *this );
148  }
149 
150  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
157  {
158  policy.reset( new AnyPolicy( pipelines.size() ) );
159  return std::move( *this );
160  }
161 
162  //------------------------------------------------------------------------
163  // Set policy to `Some`
167  //------------------------------------------------------------------------
168  ParallelOperation<HasHndl> Some( size_t threshold )
169  {
170  policy.reset( new SomePolicy( pipelines.size(), threshold ) );
171  return std::move( *this );
172  }
173 
174  //------------------------------------------------------------------------
180  //------------------------------------------------------------------------
182  {
183  policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
184  return std::move( *this );
185  }
186 
187  private:
188 
189  //------------------------------------------------------------------------
194  //------------------------------------------------------------------------
195  struct AllPolicy : public PolicyExecutor
196  {
197  bool Examine( const XrdCl::XRootDStatus &status )
198  {
199  std::unique_lock<std::mutex> lck(resMtx);
200 
201  // keep the status in case this is the final result
202  res = status;
203  if( status.IsOK() ) return false;
204  // we require all request to succeed
205  return true;
206  }
207 
208  XRootDStatus Result()
209  {
210  std::unique_lock<std::mutex> lck(resMtx);
211  return res;
212  }
213 
214  std::mutex resMtx;
215  XRootDStatus res;
216  };
217 
218  //------------------------------------------------------------------------
223  //------------------------------------------------------------------------
224  struct AnyPolicy : public PolicyExecutor
225  {
226  AnyPolicy( size_t size) : cnt( size )
227  {
228  }
229 
230  bool Examine( const XrdCl::XRootDStatus &status )
231  {
232  std::unique_lock<std::mutex> lck(resMtx);
233 
234  // keep the status in case this is the final result
235  res = status;
236  // decrement the counter
237  --cnt;
238  // we require just one operation to be successful
239  if( status.IsOK() ) return true;
240  // lets see if this is the last one?
241  if( cnt == 0 ) return true;
242  // we still have a chance there will be one that is successful
243  return false;
244  }
245 
246  XRootDStatus Result()
247  {
248  std::unique_lock<std::mutex> lck(resMtx);
249  return res;
250  }
251 
252  private:
253  std::mutex resMtx;
254  size_t cnt;
255  XRootDStatus res;
256  };
257 
258  //------------------------------------------------------------------------
263  //------------------------------------------------------------------------
264  struct SomePolicy : PolicyExecutor
265  {
266  SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
267  threshold( threshold ), size( size )
268  {
269  }
270 
271  bool Examine( const XrdCl::XRootDStatus &status )
272  {
273  std::unique_lock<std::mutex> resMtx;
274 
275  // keep the status in case this is the final result
276  res = status;
277  if( status.IsOK() )
278  {
279  ++succeeded;
280  if( succeeded == threshold ) return true; // we reached the threshold
281  // we are not yet there
282  return false;
283  }
284  ++failed;
285  // did we drop below the threshold
286  if( failed > size - threshold ) return true;
287  // we still have a chance there will be enough of successful operations
288  return false;
289  }
290 
291  XRootDStatus Result()
292  {
293  std::unique_lock<std::mutex> lck(resMtx);
294  return res;
295  }
296 
297  private:
298  std::mutex resMtx;
299  size_t failed;
300  size_t succeeded;
301  const size_t threshold;
302  const size_t size;
303  XRootDStatus res;
304  };
305 
306  //------------------------------------------------------------------------
312  //------------------------------------------------------------------------
313  struct AtLeastPolicy : PolicyExecutor
314  {
315  AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
316  failed_cnt( 0 ),
317  failed_threshold( size - threshold )
318  {
319  }
320 
321  //----------------------------------------------------------------------
325  //----------------------------------------------------------------------
326  bool Examine( const XrdCl::XRootDStatus &status )
327  {
328  std::unique_lock<std::mutex> lck(resMtx);
329 
330  if (!status.IsOK()) {
331  ++failed_cnt;
332  if (failed_cnt > failed_threshold) {
333  res = status;
334  return true;
335  }
336  }
337 
338  --pending_cnt;
339  return pending_cnt == 0;
340  }
341 
342  XRootDStatus Result()
343  {
344  std::unique_lock<std::mutex> lck(resMtx);
345  return res;
346  }
347 
348  private:
349  std::mutex resMtx;
350  size_t pending_cnt;
351  size_t failed_cnt;
352  const size_t failed_threshold;
353  XRootDStatus res;
354  };
355 
356  //------------------------------------------------------------------------
358  //------------------------------------------------------------------------
359  struct barrier_t
360  {
361  barrier_t() : on( true ) { }
362 
363  void wait()
364  {
365  std::unique_lock<std::mutex> lck( mtx );
366  if( on ) cv.wait( lck );
367  }
368 
369  void lift()
370  {
371  std::unique_lock<std::mutex> lck( mtx );
372  on = false;
373  cv.notify_all();
374  }
375 
376  private:
377  std::condition_variable cv;
378  std::mutex mtx;
379  bool on;
380  };
381 
382  //------------------------------------------------------------------------
387  //------------------------------------------------------------------------
388  struct Ctx
389  {
390  //----------------------------------------------------------------------
394  //----------------------------------------------------------------------
395  Ctx( PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
396  policy( policy )
397  {
398  }
399 
400  //----------------------------------------------------------------------
402  //----------------------------------------------------------------------
403  ~Ctx()
404  {
405  Handle( XRootDStatus() );
406  }
407 
408  //----------------------------------------------------------------------
413  //----------------------------------------------------------------------
414  inline void Examine( const XRootDStatus &st )
415  {
416  if( policy->Examine( st ) )
417  Handle( policy->Result() );
418  }
419 
420  //----------------------------------------------------------------------
425  //---------------------------------------------------------------------
426  inline void Handle( const XRootDStatus &st )
427  {
428  PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
429  if( hdlr )
430  {
431  barrier.wait();
432  hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
433  }
434  }
435 
436  //----------------------------------------------------------------------
438  //----------------------------------------------------------------------
439  std::atomic<PipelineHandler*> handler;
440 
441  //----------------------------------------------------------------------
443  //----------------------------------------------------------------------
444  std::unique_ptr<PolicyExecutor> policy;
445 
446  //----------------------------------------------------------------------
449  //----------------------------------------------------------------------
450  barrier_t barrier;
451  };
452 
453  //------------------------------------------------------------------------
455  //------------------------------------------------------------------------
456  struct PipelineEnd : public Job
457  {
458  //----------------------------------------------------------------------
459  // Constructor
460  //----------------------------------------------------------------------
461  PipelineEnd( std::shared_ptr<Ctx> &ctx,
462  const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
463  {
464  }
465 
466  //----------------------------------------------------------------------
467  // Run Ctx::Examine in the thread-pool
468  //----------------------------------------------------------------------
469  void Run( void* )
470  {
471  ctx->Examine( st );
472  delete this;
473  }
474 
475  private:
476  std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
477  XrdCl::XRootDStatus st; //< final status of the ParallelOperation
478  };
479 
480  //------------------------------------------------------------------------
482  //------------------------------------------------------------------------
483  inline static
484  void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
485  {
487  PipelineEnd *end = new PipelineEnd( ctx, st );
488  mgr->QueueJob( end, nullptr );
489  }
490 
491  //------------------------------------------------------------------------
497  //------------------------------------------------------------------------
498  XRootDStatus RunImpl( PipelineHandler *handler, time_t pipelineTimeout )
499  {
500  // make sure we have a valid policy for the parallel operation
501  if( !policy ) policy.reset( new AllPolicy() );
502 
503  std::shared_ptr<Ctx> ctx =
504  std::make_shared<Ctx>( handler, policy.release() );
505 
506  time_t timeout = pipelineTimeout < this->timeout ?
507  pipelineTimeout : this->timeout;
508 
509  for( size_t i = 0; i < pipelines.size(); ++i )
510  {
511  if( !pipelines[i] ) continue;
512  pipelines[i].Run( timeout,
513  [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
514  }
515 
516  ctx->barrier.lift();
517  return XRootDStatus();
518  }
519 
520  std::vector<Pipeline> pipelines;
521  std::unique_ptr<PolicyExecutor> policy;
522  };
523 
524  //----------------------------------------------------------------------------
526  //----------------------------------------------------------------------------
527  template<class Container>
528  inline ParallelOperation<false> Parallel( Container &&container )
529  {
530  return ParallelOperation<false>( container );
531  }
532 
533  //----------------------------------------------------------------------------
535  //----------------------------------------------------------------------------
536  inline void PipesToVec( std::vector<Pipeline>& )
537  {
538  // base case
539  }
540 
541  //----------------------------------------------------------------------------
542  // Declare PipesToVec (we need to do declare those functions ahead of
543  // definitions, as they may call each other.
544  //----------------------------------------------------------------------------
545  template<typename ... Others>
546  inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
547  Others&... others );
548 
549  template<typename ... Others>
550  inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
551  Others&... others );
552 
553  template<typename ... Others>
554  inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
555  Others&... others );
556 
557  //----------------------------------------------------------------------------
558  // Define PipesToVec
559  //----------------------------------------------------------------------------
560  template<typename ... Others>
561  void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
562  Others&... others )
563  {
564  v.emplace_back( operation );
565  PipesToVec( v, others... );
566  }
567 
568  template<typename ... Others>
569  void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
570  Others&... others )
571  {
572  v.emplace_back( operation );
573  PipesToVec( v, others... );
574  }
575 
576  template<typename ... Others>
577  void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
578  Others&... others )
579  {
580  v.emplace_back( std::move( pipeline ) );
581  PipesToVec( v, others... );
582  }
583 
584  //----------------------------------------------------------------------------
589  //----------------------------------------------------------------------------
590  template<typename ... Operations>
591  inline ParallelOperation<false> Parallel( Operations&& ... operations )
592  {
593  constexpr size_t size = sizeof...( operations );
594  std::vector<Pipeline> v;
595  v.reserve( size );
596  PipesToVec( v, operations... );
597  return Parallel( v );
598  }
599 }
600 
601 #endif // __XRD_CL_OPERATIONS_HH__
static PostMaster * GetPostMaster()
Get default post master.
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
friend class PipelineHandler
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
std::unique_ptr< PipelineHandler > handler
Operation handler.
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
ParallelOperation(Container &&container)
ParallelOperation< HasHndl > Some(size_t threshold)
ParallelOperation< HasHndl > All()
ParallelOperation< HasHndl > Any()
ParallelOperation< HasHndl > AtLeast(size_t threshold)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124