26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
36 #include <condition_variable>
78 template<
bool HasHndl>
91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
103 template<
class Container>
106 static_assert( !HasHndl,
"Constructor is available only operation without handler");
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
124 std::ostringstream oss;
126 for(
size_t i = 0; i < pipelines.size(); i++ )
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
146 policy.reset(
new AllPolicy() );
147 return std::move( *
this );
158 policy.reset(
new AnyPolicy( pipelines.size() ) );
159 return std::move( *
this );
170 policy.reset(
new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *
this );
183 policy.reset(
new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *
this );
199 std::unique_lock<std::mutex> lck(resMtx);
203 if( status.
IsOK() )
return false;
208 XRootDStatus Result()
210 std::unique_lock<std::mutex> lck(resMtx);
224 struct AnyPolicy :
public PolicyExecutor
226 AnyPolicy(
size_t size) : cnt( size )
232 std::unique_lock<std::mutex> lck(resMtx);
239 if( status.
IsOK() )
return true;
241 if( cnt == 0 )
return true;
246 XRootDStatus Result()
248 std::unique_lock<std::mutex> lck(resMtx);
264 struct SomePolicy : PolicyExecutor
266 SomePolicy(
size_t size,
size_t threshold ) : failed( 0 ), succeeded( 0 ),
267 threshold( threshold ), size( size )
273 std::unique_lock<std::mutex> resMtx;
280 if( succeeded == threshold )
return true;
286 if( failed > size - threshold )
return true;
291 XRootDStatus Result()
293 std::unique_lock<std::mutex> lck(resMtx);
301 const size_t threshold;
313 struct AtLeastPolicy : PolicyExecutor
315 AtLeastPolicy(
size_t size,
size_t threshold ) : pending_cnt( size ),
317 failed_threshold( size - threshold )
328 std::unique_lock<std::mutex> lck(resMtx);
330 if (!status.
IsOK()) {
332 if (failed_cnt > failed_threshold) {
339 return pending_cnt == 0;
342 XRootDStatus Result()
344 std::unique_lock<std::mutex> lck(resMtx);
352 const size_t failed_threshold;
361 barrier_t() : on( true ) { }
365 std::unique_lock<std::mutex> lck( mtx );
366 if( on ) cv.wait( lck );
371 std::unique_lock<std::mutex> lck( mtx );
377 std::condition_variable cv;
405 Handle( XRootDStatus() );
414 inline void Examine(
const XRootDStatus &st )
416 if( policy->Examine( st ) )
417 Handle( policy->Result() );
426 inline void Handle(
const XRootDStatus &st )
432 hdlr->HandleResponse(
new XRootDStatus( st ),
nullptr );
439 std::atomic<PipelineHandler*>
handler;
444 std::unique_ptr<PolicyExecutor> policy;
456 struct PipelineEnd :
public Job
461 PipelineEnd( std::shared_ptr<Ctx> &ctx,
476 std::shared_ptr<Ctx> ctx;
487 PipelineEnd *end =
new PipelineEnd( ctx, st );
501 if( !policy ) policy.reset(
new AllPolicy() );
503 std::shared_ptr<Ctx> ctx =
504 std::make_shared<Ctx>(
handler, policy.release() );
506 time_t
timeout = pipelineTimeout < this->timeout ?
507 pipelineTimeout : this->
timeout;
509 for(
size_t i = 0; i < pipelines.size(); ++i )
511 if( !pipelines[i] )
continue;
513 [ctx](
const XRootDStatus &st )
mutable { Schedule( ctx, st ); } );
517 return XRootDStatus();
520 std::vector<Pipeline> pipelines;
521 std::unique_ptr<PolicyExecutor> policy;
527 template<
class Container>
545 template<
typename ... Others>
546 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
549 template<
typename ... Others>
550 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
553 template<
typename ... Others>
554 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
560 template<
typename ... Others>
564 v.emplace_back( operation );
568 template<
typename ... Others>
572 v.emplace_back( operation );
576 template<
typename ... Others>
580 v.emplace_back( std::move( pipeline ) );
590 template<
typename ... Operations>
593 constexpr
size_t size =
sizeof...( operations );
594 std::vector<Pipeline> v;
time_t timeout
Operation timeout.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
friend class PipelineHandler
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
std::unique_ptr< PipelineHandler > handler
Operation handler.
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
ParallelOperation(Container &&container)
ParallelOperation< HasHndl > Some(size_t threshold)
ParallelOperation< HasHndl > All()
ParallelOperation< HasHndl > Any()
ParallelOperation< HasHndl > AtLeast(size_t threshold)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
virtual ~PolicyExecutor()
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
bool IsOK() const
We're fine.