XRootD
XrdClThirdPartyCopyJob.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClDefaultEnv.hh"
29 #include "XrdCl/XrdClUtils.hh"
31 #include "XrdCl/XrdClMonitor.hh"
33 #include "XrdCl/XrdClDlgEnv.hh"
34 #include "XrdOuc/XrdOucTPC.hh"
35 #include "XrdSys/XrdSysPthread.hh"
36 #include "XrdSys/XrdSysTimer.hh"
37 
38 #include <iostream>
39 #include <chrono>
40 
41 #include <cctype>
42 #include <sstream>
43 #include <cstdlib>
44 #include <cstdio>
45 #include <sys/time.h>
46 #include <sys/types.h>
47 #include <unistd.h>
48 
49 namespace
50 {
51  //----------------------------------------------------------------------------
53  //----------------------------------------------------------------------------
54  class TPCStatusHandler: public XrdCl::ResponseHandler
55  {
56  public:
57  //------------------------------------------------------------------------
58  // Constructor
59  //------------------------------------------------------------------------
60  TPCStatusHandler():
61  pSem( new XrdSysSemaphore(0) ), pStatus(0)
62  {
63  }
64 
65  //------------------------------------------------------------------------
66  // Destructor
67  //------------------------------------------------------------------------
68  virtual ~TPCStatusHandler()
69  {
70  delete pStatus;
71  delete pSem;
72  }
73 
74  //------------------------------------------------------------------------
75  // Handle Response
76  //------------------------------------------------------------------------
77  virtual void HandleResponse( XrdCl::XRootDStatus *status,
78  XrdCl::AnyObject *response )
79  {
80  delete response;
81  pStatus = status;
82  pSem->Post();
83  }
84 
85  //------------------------------------------------------------------------
86  // Get Mutex
87  //------------------------------------------------------------------------
88  XrdSysSemaphore *GetXrdSysSemaphore()
89  {
90  return pSem;
91  }
92 
93  //------------------------------------------------------------------------
94  // Get status
95  //------------------------------------------------------------------------
96  XrdCl::XRootDStatus *GetStatus()
97  {
98  return pStatus;
99  }
100 
101  private:
102  TPCStatusHandler(const TPCStatusHandler &other);
103  TPCStatusHandler &operator = (const TPCStatusHandler &other);
104 
105  XrdSysSemaphore *pSem;
106  XrdCl::XRootDStatus *pStatus;
107  };
108 
109  class InitTimeoutCalc
110  {
111  public:
112 
113  InitTimeoutCalc( time_t timeLeft ) :
114  hasInitTimeout( timeLeft ), start( time( 0 ) ), timeLeft( timeLeft )
115  {
116 
117  }
118 
119  XrdCl::XRootDStatus operator()()
120  {
121  if( !hasInitTimeout ) return XrdCl::XRootDStatus();
122 
123  time_t now = time( 0 );
124  if( now - start > timeLeft )
126 
127  timeLeft -= ( now - start );
128  return XrdCl::XRootDStatus();
129  }
130 
131  // used to fetch a timeout count in 2 situations: to pass to XrdCl methods
132  // and preserve remaining timeout at end of CanDo(). Zero has special
133  // meaning in both these contexts, so if we had an initial timeout we
134  // return a current timeout of at least 1.
135  operator time_t()
136  {
137  if( !hasInitTimeout ) return timeLeft;
138  return timeLeft ? timeLeft : 1;
139  }
140 
141  private:
142  bool hasInitTimeout;
143  time_t start;
144  time_t timeLeft;
145  };
146 
147  static XrdCl::XRootDStatus& UpdateErrMsg( XrdCl::XRootDStatus &status, const std::string &str )
148  {
149  std::string msg = status.GetErrorMessage();
150  msg += " (" + str + ")";
151  status.SetErrorMessage( msg );
152  return status;
153  }
154 }
155 
156 namespace XrdCl
157 {
158  //----------------------------------------------------------------------------
159  // Constructor
160  //----------------------------------------------------------------------------
162  PropertyList *jobProperties,
163  PropertyList *jobResults ):
164  CopyJob( jobId, jobProperties, jobResults ),
165  dstFile( File::DisableVirtRedirect ),
166  sourceSize( 0 ),
167  initTimeout( 0 ),
168  force( false ),
169  coerce( false ),
170  delegate( false ),
171  nbStrm( 0 ),
172  tpcLite( false )
173  {
174  Log *log = DefaultEnv::GetLog();
175  log->Debug( UtilityMsg, "Creating a third party copy job, from %s to %s",
176  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
177  }
178 
179  //----------------------------------------------------------------------------
180  // Run the copy job
181  //----------------------------------------------------------------------------
183  {
184  Log *log = DefaultEnv::GetLog();
185 
186  XRootDStatus st = CanDo();
187  if( !st.IsOK() ) return st;
188 
189  if( tpcLite )
190  {
191  //------------------------------------------------------------------------
192  // Run TPC-lite algorithm
193  //------------------------------------------------------------------------
194  XRootDStatus st = RunLite( progress );
195  if( !st.IsOK() ) return st;
196  }
197  else
198  {
199  //------------------------------------------------------------------------
200  // Run vanilla TPC algorithm
201  //------------------------------------------------------------------------
202  XRootDStatus st = RunTPC( progress );
203  if( !st.IsOK() ) return st;
204  }
205 
206  //--------------------------------------------------------------------------
207  // Verify the checksums if needed
208  //--------------------------------------------------------------------------
209  if( checkSumMode != "none" )
210  {
211  log->Debug( UtilityMsg, "Attempting checksum calculation." );
212  std::string sourceCheckSum;
213  std::string targetCheckSum;
214 
215  //------------------------------------------------------------------------
216  // Get the check sum at source
217  //------------------------------------------------------------------------
218  timeval oStart, oEnd;
219  XRootDStatus st;
220  if( checkSumMode == "end2end" || checkSumMode == "source" ||
221  !checkSumPreset.empty() )
222  {
223  gettimeofday( &oStart, 0 );
224  if( !checkSumPreset.empty() )
225  {
226  sourceCheckSum = checkSumType + ":";
227  sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
228  checkSumPreset );
229  }
230  else
231  {
232  VirtualRedirector *redirector = 0;
233  std::string vrCheckSum;
234  if( GetSource().IsMetalink() &&
235  ( redirector = RedirectorRegistry::Instance().Get( GetSource() ) ) &&
236  !( vrCheckSum = redirector->GetCheckSum( checkSumType ) ).empty() )
237  sourceCheckSum = vrCheckSum;
238  else
239  st = Utils::GetRemoteCheckSum( sourceCheckSum, checkSumType, tpcSource );
240  }
241  gettimeofday( &oEnd, 0 );
242  if( !st.IsOK() )
243  return UpdateErrMsg( st, "source" );
244 
245  pResults->Set( "sourceCheckSum", sourceCheckSum );
246  }
247 
248  //------------------------------------------------------------------------
249  // Get the check sum at destination
250  //------------------------------------------------------------------------
251  timeval tStart, tEnd;
252 
253  if( checkSumMode == "end2end" || checkSumMode == "target" )
254  {
255  gettimeofday( &tStart, 0 );
256  st = Utils::GetRemoteCheckSum( targetCheckSum, checkSumType, realTarget );
257 
258  gettimeofday( &tEnd, 0 );
259  if( !st.IsOK() )
260  return UpdateErrMsg( st, "destination" );
261  pResults->Set( "targetCheckSum", targetCheckSum );
262  }
263 
264  //------------------------------------------------------------------------
265  // Make sure the checksums are both lower case
266  //------------------------------------------------------------------------
267  auto sanitize_cksum = []( char c )
268  {
269  std::locale loc;
270  if( std::isalpha( c ) ) return std::tolower( c, loc );
271  return c;
272  };
273 
274  std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
275  sourceCheckSum.begin(), sanitize_cksum );
276 
277  std::transform( targetCheckSum.begin(), targetCheckSum.end(),
278  targetCheckSum.begin(), sanitize_cksum );
279 
280  //------------------------------------------------------------------------
281  // Compare and inform monitoring
282  //------------------------------------------------------------------------
283  if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
284  {
285  bool match = false;
286  if( sourceCheckSum == targetCheckSum )
287  match = true;
288 
290  if( mon )
291  {
293  i.transfer.origin = &GetSource();
294  i.transfer.target = &GetTarget();
295  i.cksum = sourceCheckSum;
296  i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
297  i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
298  i.isOK = match;
299  mon->Event( Monitor::EvCheckSum, &i );
300  }
301 
302  if( !match )
303  return XRootDStatus( stError, errCheckSumError, 0 );
304 
305  log->Info(UtilityMsg, "Checksum verification: succeeded." );
306  }
307  }
308 
309  return XRootDStatus();
310  }
311 
312  //----------------------------------------------------------------------------
313  // Check whether doing a third party copy is feasible for given
314  // job descriptor
315  //----------------------------------------------------------------------------
316  XRootDStatus ThirdPartyCopyJob::CanDo()
317  {
318  const URL &source = GetSource();
319  const URL &target = GetTarget();
320 
321  //--------------------------------------------------------------------------
322  // We can only do a TPC if both source and destination are remote files
323  //--------------------------------------------------------------------------
324  if( source.IsLocalFile() || target.IsLocalFile() )
326  "Cannot do a third-party-copy for local file." );
327 
328  //--------------------------------------------------------------------------
329  // Check the initial settings
330  //--------------------------------------------------------------------------
331  Log *log = DefaultEnv::GetLog();
332  log->Debug( UtilityMsg, "Check if third party copy between %s and %s "
333  "is possible", source.GetObfuscatedURL().c_str(),
334  target.GetObfuscatedURL().c_str() );
335 
336  if( target.GetProtocol() != "root" &&
337  target.GetProtocol() != "xroot" &&
338  target.GetProtocol() != "roots" &&
339  target.GetProtocol() != "xroots" )
340  return XRootDStatus( stError, errNotSupported, 0, "Third-party-copy "
341  "is only supported for root/xroot protocol." );
342 
343  pProperties->Get( "initTimeout", initTimeout );
344  InitTimeoutCalc timeLeft( initTimeout );
345 
346  pProperties->Get( "checkSumMode", checkSumMode );
347  pProperties->Get( "checkSumType", checkSumType );
348  pProperties->Get( "checkSumPreset", checkSumPreset );
349  pProperties->Get( "force", force );
350  pProperties->Get( "coerce", coerce );
351  pProperties->Get( "delegate", delegate );
352 
354  env->GetInt( "SubStreamsPerChannel", nbStrm );
355 
356  // account for the control stream
357  if (nbStrm > 0) --nbStrm;
358 
359  bool tpcLiteOnly = false;
360 
361  if( !delegate )
362  log->Info( UtilityMsg, "We are NOT using delegation" );
363 
364  //--------------------------------------------------------------------------
365  // Resolve the 'auto' checksum type.
366  //--------------------------------------------------------------------------
367  if( checkSumType == "auto" )
368  {
369  checkSumType = Utils::InferChecksumType( GetSource(), GetTarget() );
370  if( checkSumType.empty() )
371  log->Info( UtilityMsg, "Could not infer checksum type." );
372  else
373  log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
374  }
375 
376  //--------------------------------------------------------------------------
377  // Check if we can open the source. Note in TPC-lite scenario it is optional
378  // for this step to be successful.
379  //--------------------------------------------------------------------------
380  File sourceFile;
381  XRootDStatus st;
382  URL sourceURL = source;
383  URL::ParamsMap params;
384 
385  // set WriteRecovery property
386  std::string value;
387  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
388  sourceFile.SetProperty( "ReadRecovery", value );
389 
390  // save the original opaque parameter list as specified by the user for later
391  const URL::ParamsMap &srcparams = sourceURL.GetParams();
392 
393  //--------------------------------------------------------------------------
394  // Do the facultative step at source only if the protocol is root/xroot,
395  // otherwise don't bother
396  //--------------------------------------------------------------------------
397  if( sourceURL.GetProtocol() == "root" || sourceURL.GetProtocol() == "xroot" ||
398  sourceURL.GetProtocol() == "roots" || sourceURL.GetProtocol() == "xroots" )
399  {
400  params = sourceURL.GetParams();
401  params["tpc.stage"] = "placement";
402  sourceURL.SetParams( params );
403  log->Debug( UtilityMsg, "Trying to open %s for reading",
404  sourceURL.GetObfuscatedURL().c_str() );
405  st = sourceFile.Open( sourceURL.GetURL(), OpenFlags::Read, Access::None,
406  timeLeft );
407  }
408  else
409  st = XRootDStatus( stError, errNotSupported );
410 
411  if( st.IsOK() )
412  {
413  std::string sourceUrl;
414  sourceFile.GetProperty( "LastURL", sourceUrl );
415  tpcSource = sourceUrl;
416 
417  VirtualRedirector *redirector = 0;
418  long long size = -1;
419  if( source.IsMetalink() &&
420  ( redirector = RedirectorRegistry::Instance().Get( tpcSource ) ) &&
421  ( size = redirector->GetSize() ) >= 0 )
422  sourceSize = size;
423  else
424  {
425  StatInfo *statInfo;
426  st = sourceFile.Stat( false, statInfo );
427  if (st.IsOK()) sourceSize = statInfo->GetSize();
428  delete statInfo;
429  }
430  }
431  else
432  {
433  log->Info( UtilityMsg, "Cannot open source file %s: %s",
434  source.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
435  if( !delegate )
436  {
437  //----------------------------------------------------------------------
438  // If we cannot contact the source and there is no credential to delegate
439  // it cannot possibly work
440  //----------------------------------------------------------------------
441  st.status = stFatal;
442  return st;
443  }
444 
445  tpcSource = sourceURL;
446  tpcLiteOnly = true;
447  }
448 
449  // get the opaque parameters as returned by the redirector
450  URL tpcSourceUrl = tpcSource;
451  URL::ParamsMap tpcsrcparams = tpcSourceUrl.GetParams();
452  // merge the original cgi with the one returned by the redirector,
453  // the original values take precedence
454  URL::ParamsMap::const_iterator itr = srcparams.begin();
455  for( ; itr != srcparams.end(); ++itr )
456  tpcsrcparams[itr->first] = itr->second;
457  tpcSourceUrl.SetParams( tpcsrcparams );
458  // save the merged opaque parameter list for later
459  std::string scgi;
460  const URL::ParamsMap &scgiparams = tpcSourceUrl.GetParams();
461  itr = scgiparams.begin();
462  for( ; itr != scgiparams.end(); ++itr )
463  if( itr->first.compare( 0, 6, "xrdcl." ) != 0 )
464  {
465  if( !scgi.empty() ) scgi += '\t';
466  scgi += itr->first + '=' + itr->second;
467  }
468 
469  if( !timeLeft().IsOK() )
470  {
471  // we still want to send a close, but we time it out quickly
472  st = sourceFile.Close( 1 );
473  return XRootDStatus( stError, errOperationExpired );
474  }
475 
476  st = sourceFile.Close( timeLeft );
477 
478  if( !timeLeft().IsOK() )
479  return XRootDStatus( stError, errOperationExpired );
480 
481  //--------------------------------------------------------------------------
482  // Now we need to check the destination !!!
483  //--------------------------------------------------------------------------
484  if( delegate )
486  else
488 
489  //--------------------------------------------------------------------------
490  // Generate the destination CGI
491  //--------------------------------------------------------------------------
492  log->Debug( UtilityMsg, "Generating the destination TPC URL" );
493 
494  tpcKey = GenerateKey();
495 
496  char *cgiBuff = new char[2048];
497  const char *cgiP = XrdOucTPC::cgiC2Dst( tpcKey.c_str(),
498  tpcSource.GetHostId().c_str(),
499  tpcSource.GetPath().c_str(),
500  0, cgiBuff, 2048, nbStrm,
501  GetSource().GetHostId().c_str(),
502  GetSource().GetProtocol().c_str(),
503  GetTarget().GetProtocol().c_str(),
504  delegate );
505 
506  if( *cgiP == '!' )
507  {
508  log->Error( UtilityMsg, "Unable to setup target url: %s", cgiP+1 );
509  delete [] cgiBuff;
510  return XRootDStatus( stError, errNotSupported );
511  }
512 
513  URL cgiURL; cgiURL.SetParams( cgiBuff );
514  delete [] cgiBuff;
515 
516  realTarget = GetTarget();
517  params = realTarget.GetParams();
518  MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
519 
520  if( !tpcLiteOnly ) // we only append oss.asize if it source file size is actually known
521  {
522  std::ostringstream o; o << sourceSize;
523  params["oss.asize"] = o.str();
524  }
525  params["tpc.stage"] = "copy";
526 
527  // forward source cgi info to the destination in case we are going to do delegation
528  if( !scgi.empty() && delegate )
529  params["tpc.scgi"] = scgi;
530 
531  realTarget.SetParams( params );
532 
533  log->Debug( UtilityMsg, "Target url is: %s", realTarget.GetObfuscatedURL().c_str() );
534 
535  //--------------------------------------------------------------------------
536  // Open the target file
537  //--------------------------------------------------------------------------
538  // set WriteRecovery property
539  DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
540  dstFile.SetProperty( "WriteRecovery", value );
541 
542  OpenFlags::Flags targetFlags = OpenFlags::Update;
543  if( force )
544  targetFlags |= OpenFlags::Delete;
545  else
546  targetFlags |= OpenFlags::New;
547 
548  if( coerce )
549  targetFlags |= OpenFlags::Force;
550 
552  st = dstFile.Open( realTarget.GetURL(), targetFlags, mode, timeLeft );
553  if( !st.IsOK() )
554  {
555  log->Error( UtilityMsg, "Unable to open target %s: %s",
556  realTarget.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
557  if( st.code == errErrorResponse &&
558  st.errNo == kXR_FSError &&
559  st.GetErrorMessage().find( "tpc not supported" ) != std::string::npos )
560  return XRootDStatus( stError, errNotSupported, 0, // the open failed due to lack of TPC support on the server side
561  "Destination does not support third-party-copy." );
562  return UpdateErrMsg( st, "destination" );
563  }
564 
565  std::string lastUrl; dstFile.GetProperty( "LastURL", lastUrl );
566  realTarget = lastUrl;
567 
568  if( !timeLeft().IsOK() )
569  {
570  // we still want to send a close, but we time it out fast
571  st = dstFile.Close( 1 );
572  return XRootDStatus( stError, errOperationExpired );
573  }
574 
575  //--------------------------------------------------------------------------
576  // Verify if the destination supports TPC / TPC-lite
577  //--------------------------------------------------------------------------
578  st = Utils::CheckTPCLite( realTarget.GetURL() );
579  if( !st.IsOK() )
580  {
581  // we still want to send a close, but we time it out fast
582  st = dstFile.Close( 1 );
583  return XRootDStatus( stError, errNotSupported, 0, // doesn't support TPC
584  "Destination does not support third-party-copy.");
585  }
586 
587  //--------------------------------------------------------------------------
588  // if target supports TPC-lite and we have a credential to delegate we can
589  // go ahead and use TPC-lite
590  //--------------------------------------------------------------------------
591  tpcLite = ( st.code != suPartial ) && delegate;
592 
593  if( !tpcLite && tpcLiteOnly ) // doesn't support TPC-lite and it was our only hope
594  {
595  st = dstFile.Close( 1 );
596  return XRootDStatus( stError, errNotSupported, 0, "Destination does not "
597  "support delegation." );
598  }
599 
600  //--------------------------------------------------------------------------
601  // adjust the InitTimeout
602  //--------------------------------------------------------------------------
603  if( !timeLeft().IsOK() )
604  {
605  // we still want to send a close, but we time it out fast
606  st = dstFile.Close( 1 );
607  return XRootDStatus( stError, errOperationExpired );
608  }
609 
610  //--------------------------------------------------------------------------
611  // If we don't use delegation the source has to support TPC
612  //--------------------------------------------------------------------------
613  if( !tpcLite )
614  {
615  st = Utils::CheckTPC( URL( tpcSource ).GetURL(), timeLeft );
616  if( !st.IsOK() )
617  {
618  log->Error( UtilityMsg, "Source (%s) does not support TPC",
619  tpcSource.GetURL().c_str() );
620  return XRootDStatus( stError, errNotSupported, 0, "Source does not "
621  "support third-party-copy" );
622  }
623 
624  if( !timeLeft().IsOK() )
625  {
626  // we still want to send a close, but we time it out quickly
627  st = sourceFile.Close( 1 );
628  return XRootDStatus( stError, errOperationExpired );
629  }
630  }
631 
632  initTimeout = time_t( timeLeft );
633 
634  return XRootDStatus();
635  }
636 
637  //----------------------------------------------------------------------------
638  // Run vanilla copy job
639  //----------------------------------------------------------------------------
640  XRootDStatus ThirdPartyCopyJob::RunTPC( CopyProgressHandler *progress )
641  {
642  Log *log = DefaultEnv::GetLog();
643 
644  //--------------------------------------------------------------------------
645  // Generate the source CGI
646  //--------------------------------------------------------------------------
647  char *cgiBuff = new char[2048];
648  const char *cgiP = XrdOucTPC::cgiC2Src( tpcKey.c_str(),
649  realTarget.GetHostName().c_str(), -1, cgiBuff,
650  2048 );
651  if( *cgiP == '!' )
652  {
653  log->Error( UtilityMsg, "Unable to setup source url: %s", cgiP+1 );
654  delete [] cgiBuff;
655  return XRootDStatus( stError, errInvalidArgs );
656  }
657 
658  URL cgiURL; cgiURL.SetParams( cgiBuff );
659  delete [] cgiBuff;
660  URL::ParamsMap params = tpcSource.GetParams();
661  MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
662  params["tpc.stage"] = "copy";
663  tpcSource.SetParams( params );
664 
665  log->Debug( UtilityMsg, "Source url is: %s", tpcSource.GetObfuscatedURL().c_str() );
666 
667  // Set the close timeout to the default value of the stream timeout
668  int closeTimeout = 0;
669  (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
670 
671  //--------------------------------------------------------------------------
672  // Set up the rendez-vous and open the source
673  //--------------------------------------------------------------------------
674  InitTimeoutCalc timeLeft( initTimeout );
675  XRootDStatus st = dstFile.Sync( timeLeft );
676  if( !st.IsOK() )
677  {
678  log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
679  st.ToStr().c_str() );
680  XRootDStatus status = dstFile.Close( closeTimeout );
681  return UpdateErrMsg( st, "destination" );
682  }
683 
684  //--------------------------------------------------------------------------
685  // Calculate the time we have left to perform source open
686  //--------------------------------------------------------------------------
687  if( !timeLeft().IsOK() )
688  {
689  XRootDStatus status = dstFile.Close( closeTimeout );
690  return XRootDStatus( stError, errOperationExpired );
691  }
692 
693  File sourceFile( File::DisableVirtRedirect );
694  // set ReadRecovery property
695  std::string value;
696  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
697  sourceFile.SetProperty( "ReadRecovery", value );
698 
699  st = sourceFile.Open( tpcSource.GetURL(), OpenFlags::Read, Access::None,
700  timeLeft );
701 
702  if( !st.IsOK() )
703  {
704  log->Error( UtilityMsg, "Unable to open source %s: %s",
705  tpcSource.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
706  XRootDStatus status = dstFile.Close( closeTimeout );
707  return UpdateErrMsg( st, "source" );
708  }
709 
710  //--------------------------------------------------------------------------
711  // Do the copy and follow progress
712  //--------------------------------------------------------------------------
713  TPCStatusHandler statusHandler;
714  XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
715  StatInfo *info = 0;
716 
717  time_t tpcTimeout = 0;
718  pProperties->Get( "tpcTimeout", tpcTimeout );
719 
720  st = dstFile.Sync( &statusHandler, tpcTimeout );
721  if( !st.IsOK() )
722  {
723  log->Error( UtilityMsg, "Unable start the copy: %s",
724  st.ToStr().c_str() );
725  XRootDStatus statusS = sourceFile.Close( closeTimeout );
726  XRootDStatus statusT = dstFile.Close( closeTimeout );
727  return UpdateErrMsg( st, "destination" );
728  }
729 
730  //--------------------------------------------------------------------------
731  // Stat the file every second until sync returns
732  //--------------------------------------------------------------------------
733  bool canceled = false;
734  while( 1 )
735  {
736  XrdSysTimer::Wait( 2500 );
737 
738  if( progress )
739  {
740  st = dstFile.Stat( true, info );
741  if( st.IsOK() )
742  {
743  progress->JobProgress( pJobId, info->GetSize(), sourceSize );
744  delete info;
745  info = 0;
746  }
747  bool shouldCancel = progress->ShouldCancel( pJobId );
748  if( shouldCancel )
749  {
750  log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
751  Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
752  XRootDStatus st = dstFile.Fcntl( arg, response );
753  if( !st.IsOK() )
754  log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
755  st.ToStr().c_str() );
756 
757  delete response;
758  canceled = true;
759  break;
760  }
761  }
762 
763  if( sem->CondWait() )
764  break;
765  }
766 
767  //--------------------------------------------------------------------------
768  // Sync has returned so we can check if it was successful
769  //--------------------------------------------------------------------------
770  if( canceled )
771  sem->Wait();
772 
773  st = *statusHandler.GetStatus();
774 
775  if( !st.IsOK() )
776  {
777  log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
778  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
779  st.ToStr().c_str() );
780 
781  // Ignore close response
782  XRootDStatus statusS = sourceFile.Close( closeTimeout );
783  XRootDStatus statusT = dstFile.Close( closeTimeout );
784  return st;
785  }
786 
787  XRootDStatus statusS = sourceFile.Close( closeTimeout );
788  XRootDStatus statusT = dstFile.Close( closeTimeout );
789 
790  if ( !statusS.IsOK() || !statusT.IsOK() )
791  {
792  st = (statusS.IsOK() ? statusT : statusS);
793  log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
794  "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
795  GetTarget().GetObfuscatedURL().c_str(),
796  (statusS.IsOK() ? "destination" : "source"), st.ToStr().c_str() );
797  return UpdateErrMsg( st, statusS.IsOK() ? "source" : "destination" );
798  }
799 
800  log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
801  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
802 
803  pResults->Set( "size", sourceSize );
804 
805  return XRootDStatus();
806  }
807 
808  XRootDStatus ThirdPartyCopyJob::RunLite( CopyProgressHandler *progress )
809  {
810  Log *log = DefaultEnv::GetLog();
811 
812  // Set the close timeout to the default value of the stream timeout
813  int closeTimeout = 0;
814  (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
815 
816  //--------------------------------------------------------------------------
817  // Set up the rendez-vous
818  //--------------------------------------------------------------------------
819  InitTimeoutCalc timeLeft( initTimeout );
820  XRootDStatus st = dstFile.Sync( timeLeft );
821  if( !st.IsOK() )
822  {
823  log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
824  st.ToStr().c_str() );
825  XRootDStatus status = dstFile.Close( closeTimeout );
826  return UpdateErrMsg( st, "destination" );
827  }
828 
829  //--------------------------------------------------------------------------
830  // Do the copy and follow progress
831  //--------------------------------------------------------------------------
832  TPCStatusHandler statusHandler;
833  XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
834  StatInfo *info = 0;
835 
836  time_t tpcTimeout = 0;
837  pProperties->Get( "tpcTimeout", tpcTimeout );
838 
839  st = dstFile.Sync( &statusHandler, tpcTimeout );
840  if( !st.IsOK() )
841  {
842  log->Error( UtilityMsg, "Unable start the copy: %s",
843  st.ToStr().c_str() );
844  XRootDStatus statusT = dstFile.Close( closeTimeout );
845  return UpdateErrMsg( st, "destination" );
846  }
847 
848  //--------------------------------------------------------------------------
849  // Stat the file every second until sync returns
850  //--------------------------------------------------------------------------
851  bool canceled = false;
852  while( 1 )
853  {
854  XrdSysTimer::Wait( 2500 );
855 
856  if( progress )
857  {
858  st = dstFile.Stat( true, info );
859  if( st.IsOK() )
860  {
861  progress->JobProgress( pJobId, info->GetSize(), sourceSize );
862  delete info;
863  info = 0;
864  }
865  bool shouldCancel = progress->ShouldCancel( pJobId );
866  if( shouldCancel )
867  {
868  log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
869  Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
870  XRootDStatus st = dstFile.Fcntl( arg, response );
871  if( !st.IsOK() )
872  log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
873  st.ToStr().c_str() );
874 
875  delete response;
876  canceled = true;
877  break;
878  }
879  }
880 
881  if( sem->CondWait() )
882  break;
883  }
884 
885  //--------------------------------------------------------------------------
886  // Sync has returned so we can check if it was successful
887  //--------------------------------------------------------------------------
888  if( canceled )
889  sem->Wait();
890 
891  st = *statusHandler.GetStatus();
892 
893  if( !st.IsOK() )
894  {
895  log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
896  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
897  st.ToStr().c_str() );
898 
899  // Ignore close response
900  XRootDStatus statusT = dstFile.Close( closeTimeout );
901  return st;
902  }
903 
904  st = dstFile.Close( closeTimeout );
905 
906  if ( !st.IsOK() )
907  {
908  log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
909  "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
910  GetTarget().GetObfuscatedURL().c_str(),
911  "destination", st.ToStr().c_str() );
912  return UpdateErrMsg( st, "destination" );
913  }
914 
915  log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
916  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
917 
918  pResults->Set( "size", sourceSize );
919 
920  return XRootDStatus();
921  }
922 
923 
924  //----------------------------------------------------------------------------
925  // Generate a rendez-vous key
926  //----------------------------------------------------------------------------
927  std::string ThirdPartyCopyJob::GenerateKey()
928  {
929  static const int _10to9 = 1000000000;
930 
931  char tpcKey[25];
932 
933  auto tp = std::chrono::high_resolution_clock::now();
934  auto d = tp.time_since_epoch();
935  auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>( d );
936  auto s = std::chrono::duration_cast<std::chrono::seconds>( d );
937  uint32_t k1 = ns.count() - s.count() * _10to9;
938  uint32_t k2 = getpid() | (getppid() << 16);
939  uint32_t k3 = s.count();
940  snprintf( tpcKey, 25, "%08x%08x%08x", k1, k2, k3 );
941  return std::string(tpcKey);
942  }
943 }
@ kXR_FSError
Definition: XProtocol.hh:1037
XrdOucString File
PropertyList * pResults
const URL & GetSource() const
Get source.
Definition: XrdClCopyJob.hh:94
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
void Enable()
Enable delegation in the environment.
Definition: XrdClDlgEnv.hh:47
static DlgEnv & Instance()
Definition: XrdClDlgEnv.hh:28
void Disable()
Disable delegation in the environment.
Definition: XrdClDlgEnv.hh:55
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A file.
Definition: XrdClFile.hh:52
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, time_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:125
@ DisableVirtRedirect
Definition: XrdClFile.hh:59
XRootDStatus Sync(ResponseHandler *handler, time_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:473
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, time_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:694
bool GetProperty(const std::string &name, std::string &value) const
Definition: XrdClFile.cc:994
XRootDStatus Close(ResponseHandler *handler, time_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:210
bool SetProperty(const std::string &name, const std::string &value)
Definition: XrdClFile.cc:983
XRootDStatus Stat(bool force, ResponseHandler *handler, time_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:236
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
ThirdPartyCopyJob(uint32_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition: XrdClURL.cc:498
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
Definition: XrdClUtils.cc:648
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
Definition: XrdClUtils.cc:771
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
Definition: XrdClUtils.cc:269
static XRootDStatus CheckTPCLite(const std::string &server, time_t timeout=0)
Definition: XrdClUtils.cc:426
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
Definition: XrdClUtils.cc:279
static XRootDStatus CheckTPC(const std::string &server, time_t timeout=0)
Check if peer supports tpc.
Definition: XrdClUtils.cc:382
An interface for metadata redirectors.
virtual std::string GetCheckSum(const std::string &type) const =0
void SetErrorMessage(const std::string &message)
Set the error message.
const std::string & GetErrorMessage() const
Get error message.
static const char * cgiC2Dst(const char *cKey, const char *xSrc, const char *xLfn, const char *xCks, char *Buff, int Blen, int strms=0, const char *iHst=0, const char *sprt=0, const char *tprt=0, bool dlgon=false, bool push=false)
Definition: XrdOucTPC.cc:62
static const char * cgiC2Src(const char *cKey, const char *xDst, int xTTL, char *Buff, int Blen)
Definition: XrdOucTPC.cc:136
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
const uint16_t suPartial
Definition: XrdClStatus.hh:41
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errCheckSumError
Definition: XrdClStatus.hh:101
XrdSysError Log
Definition: XrdConfig.cc:113
Mode
Access mode.
@ UR
owner readable
@ GR
group readable
@ UW
owner writable
@ OR
world readable
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124