XRootD
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

+ Inheritance diagram for XrdCl::XRootDTransport:
+ Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor. More...
 
 ~XRootDTransport ()
 Destructor. More...
 
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel. More...
 
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups. More...
 
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel. More...
 
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream. More...
 
virtual XRootDStatus GetBody (Message &message, Socket *socket)
 
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
 
virtual XRootDStatus GetMore (Message &message, Socket *socket)
 
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message. More...
 
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message. More...
 
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake. More...
 
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel. More...
 
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
 
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected. More...
 
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action. More...
 
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent. More...
 
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual bool NeedControlConnection ()
 
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel. More...
 
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created. More...
 
virtual void WaitBeforeExit ()
 Wait until the program can safely exit. More...
 
- Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()
 

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message. More...
 
static void LogErrorResponse (const Message &msg)
 Log server error response. More...
 
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message. More...
 
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message. More...
 
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams. More...
 
static void SetDescription (Message *msg)
 Get the description of a message. More...
 
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite. More...
 
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message. More...
 
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message. More...
 
static XRootDStatus UnMarshallRequest (Message *msg)
 
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response. More...
 

Friends

struct PluginUnloadHandler
 

Additional Inherited Members

- Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...
 

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 293 of file XrdClXRootDTransport.cc.

293  :
294  pSecUnloadHandler( new PluginUnloadHandler() )
295  {
296  }

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 301 of file XrdClXRootDTransport.cc.

302  {
303  delete pSecUnloadHandler; pSecUnloadHandler = 0;
304  }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1828 of file XrdClXRootDTransport.cc.

1829  {
1830  XRootDChannelInfo *info = 0;
1831  channelData.Get( info );
1832  if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1833  info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1834  }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject channelData,
uint16_t  subStreamId 
)
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1561 of file XrdClXRootDTransport.cc.

1563  {
1564  XRootDChannelInfo *info = 0;
1565  channelData.Get( info );
1566 
1567  if (!info) {
1568  DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1569  return;
1570  }
1571 
1572  XrdSysMutexHelper scopedLock( info->mutex );
1573 
1574  if( !info->stream.empty() )
1575  {
1576  XRootDStreamInfo &sInfo = info->stream[subStreamId];
1577  sInfo.status = XRootDStreamInfo::Disconnected;
1578  }
1579 
1580  if( subStreamId == 0 )
1581  {
1582  CleanUpProtection( info );
1583  info->sidManager->ReleaseAllTimedOut();
1584  info->sentOpens.clear();
1585  info->sentCloses.clear();
1586  info->openFiles = 0;
1587  info->waitBarrier = 0;
1588  }
1589  }
static Log * GetLog()
Get default log.
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
const uint64_t XRootDTransportMsg

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 462 of file XrdClXRootDTransport.cc.

463  {
464  }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char *  msg,
std::ostringstream &  o 
)
static

Get the description of a message.

Definition at line 3006 of file XrdClXRootDTransport.cc.

3007  {
3008  Log *log = DefaultEnv::GetLog();
3009  if( log->GetLevel() < Log::ErrorMsg )
3010  return;
3011 
3012  ClientRequestHdr *req = (ClientRequestHdr *)msg;
3013  switch( req->requestid )
3014  {
3015  //------------------------------------------------------------------------
3016  // kXR_open
3017  //------------------------------------------------------------------------
3018  case kXR_open:
3019  {
3020  ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
3021  o << "kXR_open (";
3022  char *fn = GetDataAsString( msg );
3023  o << "file: " << fn << ", ";
3024  delete [] fn;
3025  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3026  o << std::setbase(10);
3027  o << "flags: ";
3028  if( sreq->options == 0 )
3029  o << "none ";
3030  else
3031  {
3032  if( sreq->options & kXR_compress )
3033  o << "kXR_compress ";
3034  if( sreq->options & kXR_delete )
3035  o << "kXR_delete ";
3036  if( sreq->options & kXR_force )
3037  o << "kXR_force ";
3038  if( sreq->options & kXR_mkpath )
3039  o << "kXR_mkpath ";
3040  if( sreq->options & kXR_new )
3041  o << "kXR_new ";
3042  if( sreq->options & kXR_nowait )
3043  o << "kXR_nowait ";
3044  if( sreq->options & kXR_open_apnd )
3045  o << "kXR_open_apnd ";
3046  if( sreq->options & kXR_open_read )
3047  o << "kXR_open_read ";
3048  if( sreq->options & kXR_open_updt )
3049  o << "kXR_open_updt ";
3050  if( sreq->options & kXR_open_wrto )
3051  o << "kXR_open_wrto ";
3052  if( sreq->options & kXR_posc )
3053  o << "kXR_posc ";
3054  if( sreq->options & kXR_prefname )
3055  o << "kXR_prefname ";
3056  if( sreq->options & kXR_refresh )
3057  o << "kXR_refresh ";
3058  if( sreq->options & kXR_4dirlist )
3059  o << "kXR_4dirlist ";
3060  if( sreq->options & kXR_replica )
3061  o << "kXR_replica ";
3062  if( sreq->options & kXR_seqio )
3063  o << "kXR_seqio ";
3064  if( sreq->options & kXR_async )
3065  o << "kXR_async ";
3066  if( sreq->options & kXR_retstat )
3067  o << "kXR_retstat ";
3068  }
3069  o << "flagt: ";
3070  if( sreq->optiont == 0 )
3071  o << "none ";
3072  else
3073  {
3074  if( sreq->optiont & kXR_dup )
3075  o << "kXR_dup ";
3076  if( sreq->options & kXR_samefs )
3077  o << "kXR_samefs ";
3078  }
3079  o << "fhtemplt: " << FileHandleToStr( sreq->fhtemplt );
3080  o << ")";
3081  break;
3082  }
3083 
3084  //------------------------------------------------------------------------
3085  // kXR_clone
3086  //------------------------------------------------------------------------
3087  case kXR_clone:
3088  {
3089  ClientCloneRequest *sreq = (ClientCloneRequest *)msg;
3090  XrdProto::clone_list *dataChunk = (XrdProto::clone_list*)(msg + 24 );
3091  o << "kXR_clone ( ";
3092  o << "handle: " << FileHandleToStr( sreq->fhandle );
3093  o << std::setbase(10);
3094  o << " list [ ";
3095  for( size_t i = 0; i < req->dlen/sizeof(XrdProto::clone_list); ++i )
3096  {
3097  o << "(src_handle: ";
3098  o << FileHandleToStr( dataChunk[i].srcFH );
3099  o << ", ";
3100  o << std::setbase(10);
3101  o << "src_offset: " << dataChunk[i].srcOffs;
3102  o << ", src_length: " << dataChunk[i].srcLen;
3103  o << ", dst_offset: " << dataChunk[i].dstOffs << "); ";
3104  }
3105 
3106  o << " ] )";
3107  break;
3108  }
3109 
3110  //------------------------------------------------------------------------
3111  // kXR_close
3112  //------------------------------------------------------------------------
3113  case kXR_close:
3114  {
3115  ClientCloseRequest *sreq = (ClientCloseRequest *)msg;
3116  o << "kXR_close (";
3117  o << "handle: " << FileHandleToStr( sreq->fhandle );
3118  o << ")";
3119  break;
3120  }
3121 
3122  //------------------------------------------------------------------------
3123  // kXR_stat
3124  //------------------------------------------------------------------------
3125  case kXR_stat:
3126  {
3127  ClientStatRequest *sreq = (ClientStatRequest *)msg;
3128  o << "kXR_stat (";
3129  if( sreq->dlen )
3130  {
3131  char *fn = GetDataAsString( msg );;
3132  o << "path: " << fn << ", ";
3133  delete [] fn;
3134  }
3135  else
3136  {
3137  o << "handle: " << FileHandleToStr( sreq->fhandle );
3138  o << ", ";
3139  }
3140  o << "flags: ";
3141  if( sreq->options == 0 )
3142  o << "none";
3143  else
3144  {
3145  if( sreq->options & kXR_vfs )
3146  o << "kXR_vfs";
3147  }
3148  o << ")";
3149  break;
3150  }
3151 
3152  //------------------------------------------------------------------------
3153  // kXR_read
3154  //------------------------------------------------------------------------
3155  case kXR_read:
3156  {
3157  ClientReadRequest *sreq = (ClientReadRequest *)msg;
3158  o << "kXR_read (";
3159  o << "handle: " << FileHandleToStr( sreq->fhandle );
3160  o << std::setbase(10);
3161  o << ", ";
3162  o << "offset: " << sreq->offset << ", ";
3163  o << "size: " << sreq->rlen << ")";
3164  break;
3165  }
3166 
3167  //------------------------------------------------------------------------
3168  // kXR_pgread
3169  //------------------------------------------------------------------------
3170  case kXR_pgread:
3171  {
3173  o << "kXR_pgread (";
3174  o << "handle: " << FileHandleToStr( sreq->fhandle );
3175  o << std::setbase(10);
3176  o << ", ";
3177  o << "offset: " << sreq->offset << ", ";
3178  o << "size: " << sreq->rlen << ")";
3179  break;
3180  }
3181 
3182  //------------------------------------------------------------------------
3183  // kXR_write
3184  //------------------------------------------------------------------------
3185  case kXR_write:
3186  {
3187  ClientWriteRequest *sreq = (ClientWriteRequest *)msg;
3188  o << "kXR_write (";
3189  o << "handle: " << FileHandleToStr( sreq->fhandle );
3190  o << std::setbase(10);
3191  o << ", ";
3192  o << "offset: " << sreq->offset << ", ";
3193  o << "size: " << sreq->dlen << ")";
3194  break;
3195  }
3196 
3197  //------------------------------------------------------------------------
3198  // kXR_pgwrite
3199  //------------------------------------------------------------------------
3200  case kXR_pgwrite:
3201  {
3203  o << "kXR_pgwrite (";
3204  o << "handle: " << FileHandleToStr( sreq->fhandle );
3205  o << std::setbase(10);
3206  o << ", ";
3207  o << "offset: " << sreq->offset << ", ";
3208  o << "size: " << sreq->dlen << ")";
3209  break;
3210  }
3211 
3212  //------------------------------------------------------------------------
3213  // kXR_fattr
3214  //------------------------------------------------------------------------
3215  case kXR_fattr:
3216  {
3217  ClientFattrRequest *sreq = (ClientFattrRequest *)msg;
3218  int nattr = sreq->numattr;
3219  int options = sreq->options;
3220  o << "kXR_fattr";
3221  switch (sreq->subcode) {
3222  case kXR_fattrGet:
3223  o << "Get";
3224  break;
3225  case kXR_fattrSet:
3226  o << "Set";
3227  break;
3228  case kXR_fattrList:
3229  o << "List";
3230  break;
3231  case kXR_fattrDel:
3232  o << "Delete";
3233  break;
3234  default:
3235  o << " unknown subcode: " << sreq->subcode;
3236  break;
3237  }
3238  o << " (handle: " << FileHandleToStr( sreq->fhandle );
3239  o << std::setbase(10);
3240  if (nattr)
3241  o << ", numattr: " << nattr;
3242  if (options) {
3243  o << ", options: ";
3244  if (options & 0x01)
3245  o << "new";
3246  if (options & 0x10)
3247  o << "list values";
3248  }
3249  o << ", total size: " << req->dlen << ")";
3250  break;
3251  }
3252 
3253  //------------------------------------------------------------------------
3254  // kXR_sync
3255  //------------------------------------------------------------------------
3256  case kXR_sync:
3257  {
3258  ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3259  o << "kXR_sync (";
3260  o << "handle: " << FileHandleToStr( sreq->fhandle );
3261  o << ")";
3262  break;
3263  }
3264 
3265  //------------------------------------------------------------------------
3266  // kXR_truncate
3267  //------------------------------------------------------------------------
3268  case kXR_truncate:
3269  {
3271  o << "kXR_truncate (";
3272  if( !sreq->dlen )
3273  o << "handle: " << FileHandleToStr( sreq->fhandle );
3274  else
3275  {
3276  char *fn = GetDataAsString( msg );
3277  o << "file: " << fn;
3278  delete [] fn;
3279  }
3280  o << std::setbase(10);
3281  o << ", ";
3282  o << "offset: " << sreq->offset;
3283  o << ")";
3284  break;
3285  }
3286 
3287  //------------------------------------------------------------------------
3288  // kXR_readv
3289  //------------------------------------------------------------------------
3290  case kXR_readv:
3291  {
3292  unsigned char *fhandle = 0;
3293  o << "kXR_readv (";
3294 
3295  o << "handle: ";
3296  readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3297  fhandle = dataChunk[0].fhandle;
3298  if( fhandle )
3299  o << FileHandleToStr( fhandle );
3300  else
3301  o << "unknown";
3302  o << ", ";
3303  o << std::setbase(10);
3304  o << "chunks: [";
3305  uint64_t size = 0;
3306  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3307  {
3308  size += dataChunk[i].rlen;
3309  o << "(offset: " << dataChunk[i].offset;
3310  o << ", size: " << dataChunk[i].rlen << "); ";
3311  }
3312  o << "], ";
3313  o << "total size: " << size << ")";
3314  break;
3315  }
3316 
3317  //------------------------------------------------------------------------
3318  // kXR_writev
3319  //------------------------------------------------------------------------
3320  case kXR_writev:
3321  {
3322  unsigned char *fhandle = 0;
3323  o << "kXR_writev (";
3324 
3325  XrdProto::write_list *wrtList =
3326  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3327  uint64_t size = 0;
3328  uint32_t numChunks = 0;
3329  for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3330  {
3331  fhandle = wrtList[i].fhandle;
3332  size += wrtList[i].wlen;
3333  ++numChunks;
3334  }
3335  o << "handle: ";
3336  if( fhandle )
3337  o << FileHandleToStr( fhandle );
3338  else
3339  o << "unknown";
3340  o << ", ";
3341  o << std::setbase(10);
3342  o << "chunks: " << numChunks << ", ";
3343  o << "total size: " << size << ")";
3344  break;
3345  }
3346 
3347  //------------------------------------------------------------------------
3348  // kXR_locate
3349  //------------------------------------------------------------------------
3350  case kXR_locate:
3351  {
3353  char *fn = GetDataAsString( msg );;
3354  o << "kXR_locate (";
3355  o << "path: " << fn << ", ";
3356  delete [] fn;
3357  o << "flags: ";
3358  if( sreq->options == 0 )
3359  o << "none";
3360  else
3361  {
3362  if( sreq->options & kXR_refresh )
3363  o << "kXR_refresh ";
3364  if( sreq->options & kXR_prefname )
3365  o << "kXR_prefname ";
3366  if( sreq->options & kXR_nowait )
3367  o << "kXR_nowait ";
3368  if( sreq->options & kXR_force )
3369  o << "kXR_force ";
3370  if( sreq->options & kXR_compress )
3371  o << "kXR_compress ";
3372  }
3373  o << ")";
3374  break;
3375  }
3376 
3377  //------------------------------------------------------------------------
3378  // kXR_mv
3379  //------------------------------------------------------------------------
3380  case kXR_mv:
3381  {
3382  ClientMvRequest *sreq = (ClientMvRequest *)msg;
3383  o << "kXR_mv (";
3384  o << "source: ";
3385  o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3386  o << ", ";
3387  o << "destination: ";
3388  o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3389  o << ")";
3390  break;
3391  }
3392 
3393  //------------------------------------------------------------------------
3394  // kXR_query
3395  //------------------------------------------------------------------------
3396  case kXR_query:
3397  {
3398  ClientQueryRequest *sreq = (ClientQueryRequest *)msg;
3399  o << "kXR_query (";
3400  o << "code: ";
3401  switch( sreq->infotype )
3402  {
3403  case kXR_Qconfig: o << "kXR_Qconfig"; break;
3404  case kXR_Qckscan: o << "kXR_Qckscan"; break;
3405  case kXR_Qcksum: o << "kXR_Qcksum"; break;
3406  case kXR_Qopaque: o << "kXR_Qopaque"; break;
3407  case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3408  case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3409  case kXR_QPrep: o << "kXR_QPrep"; break;
3410  case kXR_Qspace: o << "kXR_Qspace"; break;
3411  case kXR_QStats: o << "kXR_QStats"; break;
3412  case kXR_Qvisa: o << "kXR_Qvisa"; break;
3413  case kXR_Qxattr: o << "kXR_Qxattr"; break;
3414  default: o << sreq->infotype; break;
3415  }
3416  o << ", ";
3417 
3418  if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3419  {
3420  o << "handle: " << FileHandleToStr( sreq->fhandle );
3421  o << ", ";
3422  }
3423 
3424  o << "arg length: " << sreq->dlen << ")";
3425  break;
3426  }
3427 
3428  //------------------------------------------------------------------------
3429  // kXR_rm
3430  //------------------------------------------------------------------------
3431  case kXR_rm:
3432  {
3433  o << "kXR_rm (";
3434  char *fn = GetDataAsString( msg );;
3435  o << "path: " << fn << ")";
3436  delete [] fn;
3437  break;
3438  }
3439 
3440  //------------------------------------------------------------------------
3441  // kXR_mkdir
3442  //------------------------------------------------------------------------
3443  case kXR_mkdir:
3444  {
3445  ClientMkdirRequest *sreq = (ClientMkdirRequest *)msg;
3446  o << "kXR_mkdir (";
3447  char *fn = GetDataAsString( msg );
3448  o << "path: " << fn << ", ";
3449  delete [] fn;
3450  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3451  o << std::setbase(10);
3452  o << "flags: ";
3453  if( sreq->options[0] == 0 )
3454  o << "none";
3455  else
3456  {
3457  if( sreq->options[0] & kXR_mkdirpath )
3458  o << "kXR_mkdirpath";
3459  }
3460  o << ")";
3461  break;
3462  }
3463 
3464  //------------------------------------------------------------------------
3465  // kXR_rmdir
3466  //------------------------------------------------------------------------
3467  case kXR_rmdir:
3468  {
3469  o << "kXR_rmdir (";
3470  char *fn = GetDataAsString( msg );
3471  o << "path: " << fn << ")";
3472  delete [] fn;
3473  break;
3474  }
3475 
3476  //------------------------------------------------------------------------
3477  // kXR_chmod
3478  //------------------------------------------------------------------------
3479  case kXR_chmod:
3480  {
3481  ClientChmodRequest *sreq = (ClientChmodRequest *)msg;
3482  o << "kXR_chmod (";
3483  char *fn = GetDataAsString( msg );
3484  o << "path: " << fn << ", ";
3485  delete [] fn;
3486  o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3487  break;
3488  }
3489 
3490  //------------------------------------------------------------------------
3491  // kXR_ping
3492  //------------------------------------------------------------------------
3493  case kXR_ping:
3494  {
3495  o << "kXR_ping ()";
3496  break;
3497  }
3498 
3499  //------------------------------------------------------------------------
3500  // kXR_protocol
3501  //------------------------------------------------------------------------
3502  case kXR_protocol:
3503  {
3505  o << "kXR_protocol (";
3506  o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3507  break;
3508  }
3509 
3510  //------------------------------------------------------------------------
3511  // kXR_dirlist
3512  //------------------------------------------------------------------------
3513  case kXR_dirlist:
3514  {
3515  o << "kXR_dirlist (";
3516  char *fn = GetDataAsString( msg );;
3517  o << "path: " << fn << ")";
3518  delete [] fn;
3519  break;
3520  }
3521 
3522  //------------------------------------------------------------------------
3523  // kXR_set
3524  //------------------------------------------------------------------------
3525  case kXR_set:
3526  {
3527  o << "kXR_set (";
3528  char *fn = GetDataAsString( msg );;
3529  o << "data: " << fn << ")";
3530  delete [] fn;
3531  break;
3532  }
3533 
3534  //------------------------------------------------------------------------
3535  // kXR_prepare
3536  //------------------------------------------------------------------------
3537  case kXR_prepare:
3538  {
3540  o << "kXR_prepare (";
3541  o << "flags: ";
3542 
3543  if( sreq->options == 0 )
3544  o << "none";
3545  else
3546  {
3547  if( sreq->options & kXR_stage )
3548  o << "kXR_stage ";
3549  if( sreq->options & kXR_wmode )
3550  o << "kXR_wmode ";
3551  if( sreq->options & kXR_coloc )
3552  o << "kXR_coloc ";
3553  if( sreq->options & kXR_fresh )
3554  o << "kXR_fresh ";
3555  }
3556 
3557  o << ", priority: " << (int) sreq->prty << ", ";
3558 
3559  char *fn = GetDataAsString( msg );
3560  char *cursor;
3561  for( cursor = fn; *cursor; ++cursor )
3562  if( *cursor == '\n' ) *cursor = ' ';
3563 
3564  o << "paths: " << fn << ")";
3565  delete [] fn;
3566  break;
3567  }
3568 
3569  case kXR_chkpoint:
3570  {
3572  o << "kXR_chkpoint (";
3573  o << "opcode: ";
3574  if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3575  else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3576  else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3577  else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3578  else if( sreq->opcode == kXR_ckpXeq )
3579  {
3580  o << "kXR_ckpXeq) ";
3581  // In this case our request body will be one of kXR_pgwrite,
3582  // kXR_truncate, kXR_write, or kXR_writev request.
3583  GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3584  }
3585 
3586  break;
3587  }
3588 
3589  //------------------------------------------------------------------------
3590  // Default
3591  //------------------------------------------------------------------------
3592  default:
3593  {
3594  o << "kXR_unknown (length: " << req->dlen << ")";
3595  break;
3596  }
3597  };
3598  }
static const int kXR_ckpRollback
Definition: XProtocol.hh:217
kXR_int16 arg1len
Definition: XProtocol.hh:460
@ kXR_fattrDel
Definition: XProtocol.hh:300
@ kXR_fattrSet
Definition: XProtocol.hh:303
@ kXR_fattrList
Definition: XProtocol.hh:302
@ kXR_fattrGet
Definition: XProtocol.hh:301
kXR_char fhandle[4]
Definition: XProtocol.hh:565
kXR_char fhandle[4]
Definition: XProtocol.hh:823
kXR_char fhandle[4]
Definition: XProtocol.hh:848
kXR_char fhandle[4]
Definition: XProtocol.hh:812
kXR_int32 dlen
Definition: XProtocol.hh:461
kXR_int64 offset
Definition: XProtocol.hh:682
kXR_char fhtemplt[4]
Definition: XProtocol.hh:516
kXR_unt16 options
Definition: XProtocol.hh:513
static const int kXR_ckpXeq
Definition: XProtocol.hh:218
@ kXR_open_wrto
Definition: XProtocol.hh:499
@ kXR_compress
Definition: XProtocol.hh:482
@ kXR_async
Definition: XProtocol.hh:488
@ kXR_delete
Definition: XProtocol.hh:483
@ kXR_prefname
Definition: XProtocol.hh:491
@ kXR_nowait
Definition: XProtocol.hh:497
@ kXR_open_read
Definition: XProtocol.hh:486
@ kXR_open_updt
Definition: XProtocol.hh:487
@ kXR_mkpath
Definition: XProtocol.hh:490
@ kXR_seqio
Definition: XProtocol.hh:498
@ kXR_replica
Definition: XProtocol.hh:495
@ kXR_posc
Definition: XProtocol.hh:496
@ kXR_refresh
Definition: XProtocol.hh:489
@ kXR_new
Definition: XProtocol.hh:485
@ kXR_force
Definition: XProtocol.hh:484
@ kXR_4dirlist
Definition: XProtocol.hh:494
@ kXR_open_apnd
Definition: XProtocol.hh:492
@ kXR_retstat
Definition: XProtocol.hh:493
kXR_char fhandle[4]
Definition: XProtocol.hh:543
kXR_unt16 optiont
Definition: XProtocol.hh:514
kXR_unt16 infotype
Definition: XProtocol.hh:667
kXR_char fhandle[4]
Definition: XProtocol.hh:681
kXR_char fhandle[4]
Definition: XProtocol.hh:695
kXR_char fhandle[4]
Definition: XProtocol.hh:258
kXR_unt16 requestid
Definition: XProtocol.hh:159
kXR_char fhandle[4]
Definition: XProtocol.hh:669
@ kXR_read
Definition: XProtocol.hh:126
@ kXR_open
Definition: XProtocol.hh:123
@ kXR_writev
Definition: XProtocol.hh:144
@ kXR_clone
Definition: XProtocol.hh:145
@ kXR_readv
Definition: XProtocol.hh:138
@ kXR_mkdir
Definition: XProtocol.hh:121
@ kXR_sync
Definition: XProtocol.hh:129
@ kXR_chmod
Definition: XProtocol.hh:115
@ kXR_dirlist
Definition: XProtocol.hh:117
@ kXR_fattr
Definition: XProtocol.hh:133
@ kXR_rm
Definition: XProtocol.hh:127
@ kXR_query
Definition: XProtocol.hh:114
@ kXR_write
Definition: XProtocol.hh:132
@ kXR_set
Definition: XProtocol.hh:131
@ kXR_rmdir
Definition: XProtocol.hh:128
@ kXR_truncate
Definition: XProtocol.hh:141
@ kXR_protocol
Definition: XProtocol.hh:119
@ kXR_mv
Definition: XProtocol.hh:122
@ kXR_ping
Definition: XProtocol.hh:124
@ kXR_stat
Definition: XProtocol.hh:130
@ kXR_pgread
Definition: XProtocol.hh:143
@ kXR_chkpoint
Definition: XProtocol.hh:125
@ kXR_locate
Definition: XProtocol.hh:140
@ kXR_close
Definition: XProtocol.hh:116
@ kXR_pgwrite
Definition: XProtocol.hh:139
@ kXR_prepare
Definition: XProtocol.hh:134
kXR_int32 rlen
Definition: XProtocol.hh:696
kXR_char fhandle[4]
Definition: XProtocol.hh:835
kXR_unt16 mode
Definition: XProtocol.hh:512
kXR_char options[1]
Definition: XProtocol.hh:446
static const int kXR_ckpCommit
Definition: XProtocol.hh:215
kXR_int64 offset
Definition: XProtocol.hh:697
@ kXR_vfs
Definition: XProtocol.hh:799
@ kXR_mkdirpath
Definition: XProtocol.hh:440
@ kXR_wmode
Definition: XProtocol.hh:625
@ kXR_fresh
Definition: XProtocol.hh:627
@ kXR_coloc
Definition: XProtocol.hh:626
@ kXR_stage
Definition: XProtocol.hh:624
static const int kXR_ckpQuery
Definition: XProtocol.hh:216
kXR_int64 offset
Definition: XProtocol.hh:849
@ kXR_dup
Definition: XProtocol.hh:503
@ kXR_samefs
Definition: XProtocol.hh:504
kXR_int32 dlen
Definition: XProtocol.hh:813
kXR_char options
Definition: XProtocol.hh:809
kXR_int32 rlen
Definition: XProtocol.hh:683
@ kXR_QPrep
Definition: XProtocol.hh:650
@ kXR_Qopaqug
Definition: XProtocol.hh:661
@ kXR_Qconfig
Definition: XProtocol.hh:655
@ kXR_Qopaquf
Definition: XProtocol.hh:660
@ kXR_Qckscan
Definition: XProtocol.hh:654
@ kXR_Qxattr
Definition: XProtocol.hh:652
@ kXR_Qspace
Definition: XProtocol.hh:653
@ kXR_Qvisa
Definition: XProtocol.hh:656
@ kXR_QStats
Definition: XProtocol.hh:649
@ kXR_Qcksum
Definition: XProtocol.hh:651
@ kXR_Qopaque
Definition: XProtocol.hh:659
kXR_char fhandle[4]
Definition: XProtocol.hh:231
kXR_int32 dlen
Definition: XProtocol.hh:161
static const int kXR_ckpBegin
Definition: XProtocol.hh:214
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition: XrdConfig.cc:113
kXR_char fhandle[4]
Definition: XProtocol.hh:873
kXR_char fhandle[4]
Definition: XProtocol.hh:318

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdProto::clone_list::dstOffs, XrdCl::Log::ErrorMsg, ClientCloneRequest::fhandle, ClientCloseRequest::fhandle, ClientFattrRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, readahead_list::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, XrdProto::write_list::fhandle, ClientOpenRequest::fhtemplt, XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_4dirlist, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_clone, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_dup, kXR_fattr, kXR_fattrDel, kXR_fattrGet, kXR_fattrList, kXR_fattrSet, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_open_wrto, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_samefs, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientFattrRequest::numattr, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, ClientChkPointRequest::opcode, ClientFattrRequest::options, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientOpenRequest::optiont, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, XrdProto::clone_list::srcLen, XrdProto::clone_list::srcOffs, ClientFattrRequest::subcode, and XrdProto::write_list::wlen.

Referenced by SetDescription().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL url,
AnyObject channelData 
)
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1926 of file XrdClXRootDTransport.cc.

1928  {
1929  XRootDChannelInfo *info = 0;
1930  channelData.Get( info );
1931 
1932  if(!info || !info->bindSelector)
1933  return url;
1934 
1935  return URL( info->bindSelector->Get() );
1936  }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message message,
Socket socket 
)
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 349 of file XrdClXRootDTransport.cc.

350  {
351  //--------------------------------------------------------------------------
352  // Retrieve the body
353  //--------------------------------------------------------------------------
354  size_t leftToBeRead = 0;
355  uint32_t bodySize = 0;
356  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
357  bodySize = rsphdr->dlen;
358 
359  if( message.GetSize() < bodySize + 8 )
360  message.ReAllocate( bodySize + 8 );
361 
362  leftToBeRead = bodySize-(message.GetCursor()-8);
363  while( leftToBeRead )
364  {
365  int bytesRead = 0;
366  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
367 
368  if( !status.IsOK() || status.code == suRetry )
369  return status;
370 
371  leftToBeRead -= bytesRead;
372  message.AdvanceCursor( bytesRead );
373  }
374 
375  return XRootDStatus( stOK, suDone );
376  }
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t suDone
Definition: XrdClStatus.hh:38

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message message,
Socket socket 
)
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 309 of file XrdClXRootDTransport.cc.

310  {
311  //--------------------------------------------------------------------------
312  // A new message - allocate the space needed for the header
313  //--------------------------------------------------------------------------
314  if( message.GetCursor() == 0 && message.GetSize() < 8 )
315  message.Allocate( 8 );
316 
317  //--------------------------------------------------------------------------
318  // Read the message header
319  //--------------------------------------------------------------------------
320  if( message.GetCursor() < 8 )
321  {
322  size_t leftToBeRead = 8 - message.GetCursor();
323  while( leftToBeRead )
324  {
325  int bytesRead = 0;
326  XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
327  leftToBeRead, bytesRead );
328  if( !status.IsOK() || status.code == suRetry )
329  return status;
330 
331  leftToBeRead -= bytesRead;
332  message.AdvanceCursor( bytesRead );
333  }
334  UnMarshallHeader( message );
335 
336  uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
337  Log *log = DefaultEnv::GetLog();
338  log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
339  "body", (void*)&message, bodySize );
340 
341  return XRootDStatus( stOK, suDone );
342  }
343  return XRootDStatus( stError, errInternal );
344  }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message message,
Socket socket 
)
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 381 of file XrdClXRootDTransport.cc.

382  {
383  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
384  if( rsphdr->status != kXR_status )
385  return XRootDStatus( stError, errInvalidOp );
386 
387  //--------------------------------------------------------------------------
388  // In case of non kXR_status responses we read all the response, including
389  // data. For kXR_status responses we first read only the remainder of the
390  // header. The header must then be unmarshalled, and then a second call to
391  // GetMore (repeated for suRetry as needed) will read the data.
392  //--------------------------------------------------------------------------
393 
394  uint32_t bodySize = rsphdr->dlen;
395  if( bodySize+8 < sizeof( ServerResponseStatus ) )
396  return XRootDStatus( stError, errInvalidMessage, 0,
397  "kXR_status: invalid message size." );
398 
399  ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
400  bodySize += rspst->bdy.dlen;
401 
402  if( message.GetSize() < bodySize + 8 )
403  message.ReAllocate( bodySize + 8 );
404 
405  size_t leftToBeRead = bodySize-(message.GetCursor()-8);
406  while( leftToBeRead )
407  {
408  int bytesRead = 0;
409  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
410 
411  if( !status.IsOK() || status.code == suRetry )
412  return status;
413 
414  leftToBeRead -= bytesRead;
415  message.AdvanceCursor( bytesRead );
416  }
417 
418  // Unmarchal to message body
419  Log *log = DefaultEnv::GetLog();
420  XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
421  if( !st.IsOK() && st.code == errDataError )
422  {
423  log->Error( XRootDTransportMsg, "[msg: %p] %s", (void*)&message,
424  st.GetErrorMessage().c_str() );
425  return st;
426  }
427 
428  if( !st.IsOK() )
429  {
430  log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
431  (void*)&message );
432  return st;
433  }
434 
435  return XRootDStatus( stOK, suDone );
436  }
@ kXR_status
Definition: XProtocol.hh:949
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1304
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
AnyObject channelData 
)
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1788 of file XrdClXRootDTransport.cc.

1789  {
1790  XRootDChannelInfo *info = 0;
1791  channelData.Get( info );
1792  return GetSignature( toSign, sign, info );
1793  }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
XRootDChannelInfo info 
)
virtual

Get signature for given message.

Definition at line 1798 of file XrdClXRootDTransport.cc.

1801  {
1802  XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1803  if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1804 
1805  ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1806  if( !info ) return Status( stError, errInternal );
1807  if( info->protection )
1808  {
1809  SecurityRequest *newreq = 0;
1810  // check if we have to secure the request in the first place
1811  if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1812  // secure (sign/encrypt) the request
1813  int rc = info->protection->Secure( newreq, *thereq, 0 );
1814  // there was an error
1815  if( rc < 0 )
1816  return Status( stError, errInternal, -rc );
1817 
1818  sign = new Message();
1819  sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1820  }
1821 
1822  return Status();
1823  }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), XrdCl::PluginUnloadHandler::lock, NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), XrdCl::stError, and XrdCl::PluginUnloadHandler::unloaded.

+ Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 469 of file XrdClXRootDTransport.cc.

471  {
472  XRootDChannelInfo *info = 0;
473  channelData.Get( info );
474 
475  if (!info)
476  return XRootDStatus(stFatal, errInternal);
477 
478  XrdSysMutexHelper scopedLock( info->mutex );
479 
480  if( info->stream.size() <= handShakeData->subStreamId )
481  {
482  Log *log = DefaultEnv::GetLog();
483  log->Error( XRootDTransportMsg,
484  "[%s] Internal error: not enough substreams",
485  handShakeData->streamName.c_str() );
486  return XRootDStatus( stFatal, errInternal );
487  }
488 
489  if( handShakeData->subStreamId == 0 )
490  {
491  info->streamName = handShakeData->streamName;
492  return HandShakeMain( handShakeData, channelData );
493  }
494  return HandShakeParallel( handShakeData, channelData );
495  }
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

Implements XrdCl::TransportHandler.

Definition at line 748 of file XrdClXRootDTransport.cc.

750  {
751  XRootDChannelInfo *info = 0;
752  channelData.Get( info );
753 
754  if (!info) {
756  "[%s] Internal error: no channel info",
757  handShakeData->streamName.c_str());
758  return false;
759  }
760 
761  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
762  return ( sInfo.status == XRootDStreamInfo::Connected );
763  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL url,
AnyObject channelData 
)
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 441 of file XrdClXRootDTransport.cc.

443  {
444  XRootDChannelInfo *info = new XRootDChannelInfo( url );
445  XrdSysMutexHelper scopedLock( info->mutex );
446  channelData.Set( info );
447 
448  Env *env = DefaultEnv::GetEnv();
449  int streams = DefaultSubStreamsPerChannel;
450  env->GetInt( "SubStreamsPerChannel", streams );
451  if( streams < 1 ) streams = 1;
452  info->stream.resize( streams );
453  info->strmSelector.reset( new StreamSelector( streams ) );
454  info->encrypted = url.IsSecure();
455  info->istpc = url.IsTPC();
456  info->logintoken = url.GetLoginToken();
457  }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t  inactiveTime,
AnyObject channelData 
)
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 821 of file XrdClXRootDTransport.cc.

823  {
824  XRootDChannelInfo *info = 0;
825  channelData.Get( info );
826  Env *env = DefaultEnv::GetEnv();
827  Log *log = DefaultEnv::GetLog();
828 
829  if (!info) {
830  log->Error(XRootDTransportMsg,
831  "Internal error: no channel info, behaving as if stream is broken");
832  return true;
833  }
834 
835  int streamTimeout = DefaultStreamTimeout;
836  env->GetInt( "StreamTimeout", streamTimeout );
837 
838  XrdSysMutexHelper scopedLock( info->mutex );
839 
840  const time_t now = time(0);
841  const bool anySID =
842  info->sidManager->IsAnySIDOldAs( now - streamTimeout );
843 
844  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
845  "stream timeout: %d, any SID: %d, wait barrier: %s",
846  info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
847  anySID, Utils::TimeToString(info->waitBarrier).c_str() );
848 
849  if( inactiveTime < streamTimeout )
850  return Status();
851 
852  if( now < info->waitBarrier )
853  return Status();
854 
855  if( !anySID )
856  return Status();
857 
858  return Status( stError, errSocketTimeout );
859  }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
Definition: XrdClUtils.cc:256
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t  time,
AnyObject channelData 
)
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 768 of file XrdClXRootDTransport.cc.

770  {
771  XRootDChannelInfo *info = 0;
772  channelData.Get( info );
773 
774  Env *env = DefaultEnv::GetEnv();
775  Log *log = DefaultEnv::GetLog();
776 
777  if (!info) {
778  log->Error(XRootDTransportMsg,
779  "Internal error: no channel info, behaving as if TTL has elapsed");
780  return true;
781  }
782 
783  //--------------------------------------------------------------------------
784  // Check the TTL settings for the current server
785  //--------------------------------------------------------------------------
786  int ttl;
787  if( info->serverFlags & kXR_isServer )
788  {
789  ttl = DefaultDataServerTTL;
790  env->GetInt( "DataServerTTL", ttl );
791  }
792  else
793  {
795  env->GetInt( "LoadBalancerTTL", ttl );
796  }
797 
798  //--------------------------------------------------------------------------
799  // See whether we can give a go-ahead for the disconnection
800  //--------------------------------------------------------------------------
801  XrdSysMutexHelper scopedLock( info->mutex );
802  uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
803  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
804  "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
805  info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
806  info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
807 
808  if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
809  return false;
810 
811  if( !allocatedSIDs && inactiveTime > ttl )
812  return true;
813 
814  return false;
815  }
#define kXR_isServer
Definition: XProtocol.hh:1199
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message msg)
static

Log server error response.

Definition at line 1524 of file XrdClXRootDTransport.cc.

1525  {
1526  Log *log = DefaultEnv::GetLog();
1527  ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1528  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1529  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1530  log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1531  rsp->body.error.errnum, errmsg );
1532  delete [] errmsg;
1533  }
union ServerResponse::@0 body
ServerResponseHeader hdr
Definition: XProtocol.hh:1330

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char *  msg)
static

Marshal the outgoing message.

Definition at line 1105 of file XrdClXRootDTransport.cc.

1106  {
1107  ClientRequest *req = (ClientRequest*)msg;
1108  switch( req->header.requestid )
1109  {
1110  //------------------------------------------------------------------------
1111  // kXR_protocol
1112  //------------------------------------------------------------------------
1113  case kXR_protocol:
1114  req->protocol.clientpv = htonl( req->protocol.clientpv );
1115  break;
1116 
1117  //------------------------------------------------------------------------
1118  // kXR_login
1119  //------------------------------------------------------------------------
1120  case kXR_login:
1121  req->login.pid = htonl( req->login.pid );
1122  break;
1123 
1124  //------------------------------------------------------------------------
1125  // kXR_locate
1126  //------------------------------------------------------------------------
1127  case kXR_locate:
1128  req->locate.options = htons( req->locate.options );
1129  break;
1130 
1131  //------------------------------------------------------------------------
1132  // kXR_query
1133  //------------------------------------------------------------------------
1134  case kXR_query:
1135  req->query.infotype = htons( req->query.infotype );
1136  break;
1137 
1138  //------------------------------------------------------------------------
1139  // kXR_truncate
1140  //------------------------------------------------------------------------
1141  case kXR_truncate:
1142  req->truncate.offset = htonll( req->truncate.offset );
1143  break;
1144 
1145  //------------------------------------------------------------------------
1146  // kXR_mkdir
1147  //------------------------------------------------------------------------
1148  case kXR_mkdir:
1149  req->mkdir.mode = htons( req->mkdir.mode );
1150  break;
1151 
1152  //------------------------------------------------------------------------
1153  // kXR_chmod
1154  //------------------------------------------------------------------------
1155  case kXR_chmod:
1156  req->chmod.mode = htons( req->chmod.mode );
1157  break;
1158 
1159  //------------------------------------------------------------------------
1160  // kXR_open
1161  //------------------------------------------------------------------------
1162  case kXR_open:
1163  req->open.mode = htons( req->open.mode );
1164  req->open.options = htons( req->open.options );
1165  req->open.optiont = htons( req->open.optiont );
1166  break;
1167 
1168  //------------------------------------------------------------------------
1169  // kXR_read
1170  //------------------------------------------------------------------------
1171  case kXR_read:
1172  req->read.offset = htonll( req->read.offset );
1173  req->read.rlen = htonl( req->read.rlen );
1174  break;
1175 
1176  //------------------------------------------------------------------------
1177  // kXR_write
1178  //------------------------------------------------------------------------
1179  case kXR_write:
1180  req->write.offset = htonll( req->write.offset );
1181  break;
1182 
1183  //------------------------------------------------------------------------
1184  // kXR_mv
1185  //------------------------------------------------------------------------
1186  case kXR_mv:
1187  req->mv.arg1len = htons( req->mv.arg1len );
1188  break;
1189 
1190  //------------------------------------------------------------------------
1191  // kXR_readv
1192  //------------------------------------------------------------------------
1193  case kXR_readv:
1194  {
1195  uint16_t numChunks = (req->readv.dlen)/16;
1196  readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1197  for( size_t i = 0; i < numChunks; ++i )
1198  {
1199  dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1200  dataChunk[i].offset = htonll( dataChunk[i].offset );
1201  }
1202  break;
1203  }
1204 
1205  case kXR_clone:
1206  {
1207  uint32_t numChunks = (req->clone.dlen)/sizeof(XrdProto::clone_list);
1208  XrdProto::clone_list *dataChunk =
1209  (XrdProto::clone_list*)( msg + sizeof( ClientRequestHdr ) );
1210  for( size_t i = 0; i < numChunks; ++i )
1211  {
1212  dataChunk[i].srcOffs = htonll( dataChunk[i].srcOffs );
1213  dataChunk[i].srcLen = htonll( dataChunk[i].srcLen );
1214  dataChunk[i].dstOffs = htonll( dataChunk[i].dstOffs );
1215  }
1216  break;
1217  }
1218 
1219  //------------------------------------------------------------------------
1220  // kXR_writev
1221  //------------------------------------------------------------------------
1222  case kXR_writev:
1223  {
1224  uint16_t numChunks = (req->writev.dlen)/16;
1225  XrdProto::write_list *wrtList =
1226  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1227  for( size_t i = 0; i < numChunks; ++i )
1228  {
1229  wrtList[i].wlen = htonl( wrtList[i].wlen );
1230  wrtList[i].offset = htonll( wrtList[i].offset );
1231  }
1232 
1233  break;
1234  }
1235 
1236  case kXR_pgread:
1237  {
1238  req->pgread.offset = htonll( req->pgread.offset );
1239  req->pgread.rlen = htonl( req->pgread.rlen );
1240  break;
1241  }
1242 
1243  case kXR_pgwrite:
1244  {
1245  req->pgwrite.offset = htonll( req->pgwrite.offset );
1246  break;
1247  }
1248 
1249  //------------------------------------------------------------------------
1250  // kXR_prepare
1251  //------------------------------------------------------------------------
1252  case kXR_prepare:
1253  {
1254  req->prepare.optionX = htons( req->prepare.optionX );
1255  req->prepare.port = htons( req->prepare.port );
1256  break;
1257  }
1258 
1259  case kXR_chkpoint:
1260  {
1261  if( req->chkpoint.opcode == kXR_ckpXeq )
1262  MarshallRequest( msg + 24 );
1263  break;
1264  }
1265  };
1266 
1267  req->header.requestid = htons( req->header.requestid );
1268  req->header.dlen = htonl( req->header.dlen );
1269  return XRootDStatus();
1270  }
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:917
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:903
struct ClientMkdirRequest mkdir
Definition: XProtocol.hh:900
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:904
struct ClientReadVRequest readv
Definition: XProtocol.hh:910
struct ClientOpenRequest open
Definition: XProtocol.hh:902
struct ClientRequestHdr header
Definition: XProtocol.hh:887
struct ClientWriteVRequest writev
Definition: XProtocol.hh:919
struct ClientLoginRequest login
Definition: XProtocol.hh:899
@ kXR_login
Definition: XProtocol.hh:120
struct ClientChmodRequest chmod
Definition: XProtocol.hh:891
struct ClientQueryRequest query
Definition: XProtocol.hh:908
struct ClientReadRequest read
Definition: XProtocol.hh:909
struct ClientMvRequest mv
Definition: XProtocol.hh:901
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:890
struct ClientPrepareRequest prepare
Definition: XProtocol.hh:906
struct ClientWriteRequest write
Definition: XProtocol.hh:918
struct ClientProtocolRequest protocol
Definition: XProtocol.hh:907
struct ClientLocateRequest locate
Definition: XProtocol.hh:898
struct ClientCloneRequest clone
Definition: XProtocol.hh:892
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientRequest::clone, ClientRequestHdr::dlen, ClientCloneRequest::dlen, ClientReadVRequest::dlen, ClientWriteVRequest::dlen, XrdProto::clone_list::dstOffs, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_clone, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientOpenRequest::optiont, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, XrdProto::clone_list::srcLen, XrdProto::clone_list::srcOffs, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

+ Here is the call graph for this function:

◆ MarshallRequest() [2/2]

static XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176  {
177  MarshallRequest( msg->GetBuffer() );
178  msg->SetIsMarshalled( true );
179  return XRootDStatus();
180  }

References XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message msg,
uint16_t  subStream,
AnyObject channelData 
)
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1646 of file XrdClXRootDTransport.cc.

1649  {
1650  XRootDChannelInfo *info = 0;
1651  channelData.Get( info );
1652  XrdSysMutexHelper scopedLock( info->mutex );
1653  Log *log = DefaultEnv::GetLog();
1654 
1655  //--------------------------------------------------------------------------
1656  // Update the substream queues
1657  //--------------------------------------------------------------------------
1658  info->strmSelector->MsgReceived( subStream );
1659 
1660  //--------------------------------------------------------------------------
1661  // Check whether this message is a response to a request that has
1662  // timed out, and if so, drop it
1663  //--------------------------------------------------------------------------
1664  ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1665  if( rsp->hdr.status == kXR_attn )
1666  {
1667  return NoAction;
1668  }
1669 
1670  if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1671  {
1672  log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1673  "response that we're no longer interested in (timed out)",
1674  (void*)&msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1675  //------------------------------------------------------------------------
1676  // If it is kXR_waitresp there will be another one,
1677  // so we don't release the sid yet
1678  //------------------------------------------------------------------------
1679  if( rsp->hdr.status != kXR_waitresp )
1680  info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1681  //------------------------------------------------------------------------
1682  // If it is a successful response to an open request
1683  // that timed out, we need to send a close
1684  //------------------------------------------------------------------------
1685  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1686  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1687  if( sidIt != info->sentOpens.end() )
1688  {
1689  info->sentOpens.erase( sidIt );
1690  if( rsp->hdr.status == kXR_ok ) return RequestClose;
1691  }
1692  return DigestMsg;
1693  }
1694 
1695  //--------------------------------------------------------------------------
1696  // If we have a wait or waitresp
1697  //--------------------------------------------------------------------------
1698  uint32_t seconds = 0;
1699  if( rsp->hdr.status == kXR_wait )
1700  seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1701  // to re-send the request
1702  else if( rsp->hdr.status == kXR_waitresp )
1703  {
1704  seconds = ntohl( rsp->body.waitresp.seconds );
1705 
1706  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1707  "setting up wait barrier.",
1708  info->streamName.c_str(),
1709  seconds );
1710  }
1711 
1712  time_t barrier = time(0) + seconds;
1713  if( info->waitBarrier < barrier )
1714  info->waitBarrier = barrier;
1715 
1716  //--------------------------------------------------------------------------
1717  // If we got a response to an open request, we may need to bump the counter
1718  // of open files
1719  //--------------------------------------------------------------------------
1720  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1721  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1722  if( sidIt != info->sentOpens.end() )
1723  {
1724  if( rsp->hdr.status == kXR_waitresp )
1725  return NoAction;
1726  info->sentOpens.erase( sidIt );
1727  if( rsp->hdr.status == kXR_ok )
1728  {
1729  ++info->openFiles;
1730  info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1731  }
1732  return NoAction;
1733  }
1734 
1735  //--------------------------------------------------------------------------
1736  // If we got a response to a close, we may need to decrement the counter of
1737  // open files
1738  //--------------------------------------------------------------------------
1739  sidIt = info->sentCloses.find( sid );
1740  if( sidIt != info->sentCloses.end() )
1741  {
1742  if( rsp->hdr.status == kXR_waitresp )
1743  return NoAction;
1744  info->sentCloses.erase( sidIt );
1745  --info->openFiles;
1746  return NoAction;
1747  }
1748  return NoAction;
1749  }
kXR_char streamid[2]
Definition: XProtocol.hh:956
@ kXR_waitresp
Definition: XProtocol.hh:948
@ kXR_ok
Definition: XProtocol.hh:941
@ kXR_attn
Definition: XProtocol.hh:943
@ kXR_wait
Definition: XProtocol.hh:947
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message msg,
uint16_t  subStream,
uint32_t  bytesSent,
AnyObject channelData 
)
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1754 of file XrdClXRootDTransport.cc.

1758  {
1759  // Called when a message has been sent. For messages that return on a
1760  // different pathid (and hence may use a different poller) it is possible
1761  // that the server has already replied and the reply will trigger
1762  // MessageReceived() before this method has been called. However for open
1763  // and close this is never the case and this method is used for tracking
1764  // only those.
1765  XRootDChannelInfo *info = 0;
1766  channelData.Get( info );
1767  XrdSysMutexHelper scopedLock( info->mutex );
1768  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1769  uint16_t reqid = ntohs( req->header.requestid );
1770 
1771 
1772  //--------------------------------------------------------------------------
1773  // We need to track opens to know if we can close streams due to idleness
1774  //--------------------------------------------------------------------------
1775  uint16_t sid;
1776  memcpy( &sid, req->header.streamid, 2 );
1777 
1778  if( reqid == kXR_open )
1779  info->sentOpens.insert( sid );
1780  else if( reqid == kXR_close )
1781  info->sentCloses.insert( sid );
1782  }
kXR_char streamid[2]
Definition: XProtocol.hh:158

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 864 of file XrdClXRootDTransport.cc.

865  {
866  return PathID( 0, 0 );
867  }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 872 of file XrdClXRootDTransport.cc.

875  {
876  XRootDChannelInfo *info = 0;
877  channelData.Get( info );
878 
879  if (!info) {
881  "Internal error: no channel info, cannot multiplex");
882  return PathID(0,0);
883  }
884 
885  XrdSysMutexHelper scopedLock( info->mutex );
886 
887  //--------------------------------------------------------------------------
888  // If we're not connected to a data server or we don't know that yet
889  // we stream through 0
890  //--------------------------------------------------------------------------
891  if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
892  return PathID( 0, 0 );
893 
894  //--------------------------------------------------------------------------
895  // Select the streams
896  //--------------------------------------------------------------------------
897  Log *log = DefaultEnv::GetLog();
898  uint16_t upStream = 0;
899  uint16_t downStream = 0;
900 
901  if( hint )
902  {
903  upStream = hint->up;
904  downStream = hint->down;
905  }
906  else
907  {
908  upStream = 0;
909  std::vector<bool> connected;
910  connected.reserve( info->stream.size() - 1 );
911  size_t nbConnected = 0;
912  for( size_t i = 1; i < info->stream.size(); ++i )
913  if( info->stream[i].status == XRootDStreamInfo::Connected )
914  {
915  connected.push_back( true );
916  ++nbConnected;
917  }
918  else
919  connected.push_back( false );
920 
921  if( nbConnected == 0 )
922  downStream = 0;
923  else
924  downStream = info->strmSelector->Select( connected );
925  }
926 
927  if( upStream >= info->stream.size() )
928  {
929  log->Debug( XRootDTransportMsg,
930  "[%s] Up link stream %d does not exist, using 0",
931  info->streamName.c_str(), upStream );
932  upStream = 0;
933  }
934 
935  if( downStream >= info->stream.size() )
936  {
937  log->Debug( XRootDTransportMsg,
938  "[%s] Down link stream %d does not exist, using 0",
939  info->streamName.c_str(), downStream );
940  downStream = 0;
941  }
942 
943  //--------------------------------------------------------------------------
944  // Modify the message
945  //--------------------------------------------------------------------------
946  UnMarshallRequest( msg );
947  ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
948  switch( hdr->requestid )
949  {
950  //------------------------------------------------------------------------
951  // Read - we update the path id to tell the server where we want to
952  // get the response, but we still send the request through stream 0
953  // We need to allocate space for read_args if we don't have it
954  // included yet
955  //------------------------------------------------------------------------
956  case kXR_read:
957  {
958  if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
959  {
960  msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
961  void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
962  memset( newBuf, 0, 8 );
963  ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
964  req->dlen += 8;
965  }
966  read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
967  args->pathid = info->stream[downStream].pathId;
968  break;
969  }
970 
971 
972  //------------------------------------------------------------------------
973  // PgRead - we update the path id to tell the server where we want to
974  // get the response, but we still send the request through stream 0
975  // We need to allocate space for ClientPgReadReqArgs if we don't have it
976  // included yet
977  //------------------------------------------------------------------------
978  case kXR_pgread:
979  {
980  if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
981  {
982  msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
983  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
984  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
985  ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
986  req->dlen += sizeof( ClientPgReadReqArgs );
987  }
988  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
989  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
990  args->pathid = info->stream[downStream].pathId;
991  break;
992  }
993 
994  //------------------------------------------------------------------------
995  // ReadV - the situation is identical to read but we don't need any
996  // additional structures to specify the return path
997  //------------------------------------------------------------------------
998  case kXR_readv:
999  {
1000  ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
1001  req->pathid = info->stream[downStream].pathId;
1002  break;
1003  }
1004 
1005  //------------------------------------------------------------------------
1006  // Write - multiplexing writes doesn't work properly in the server
1007  //------------------------------------------------------------------------
1008  case kXR_write:
1009  {
1010 // ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
1011 // req->pathid = info->stream[downStream].pathId;
1012  break;
1013  }
1014 
1015  //------------------------------------------------------------------------
1016  // WriteV - multiplexing writes doesn't work properly in the server
1017  //------------------------------------------------------------------------
1018  case kXR_writev:
1019  {
1020 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1021 // req->pathid = info->stream[downStream].pathId;
1022  break;
1023  }
1024 
1025  //------------------------------------------------------------------------
1026  // PgWrite - multiplexing writes doesn't work properly in the server
1027  //------------------------------------------------------------------------
1028  case kXR_pgwrite:
1029  {
1030 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1031 // req->pathid = info->stream[downStream].pathId;
1032  break;
1033  }
1034  };
1035  MarshallRequest( msg );
1036  return PathID( upStream, downStream );
1037  }
kXR_char pathid
Definition: XProtocol.hh:689
kXR_int32 dlen
Definition: XProtocol.hh:684
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, read_args::pathid, ClientReadVRequest::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject channelData)
static

Number of currently connected data streams.

Definition at line 1538 of file XrdClXRootDTransport.cc.

1539  {
1540  XRootDChannelInfo *info = 0;
1541  channelData.Get( info );
1542 
1543  if (!info) {
1544  DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1545  return 0;
1546  }
1547 
1548  XrdSysMutexHelper scopedLock( info->mutex );
1549 
1550  uint16_t nbConnected = 0;
1551  for( size_t i = 1; i < info->stream.size(); ++i )
1552  if( info->stream[i].status == XRootDStreamInfo::Connected )
1553  ++nbConnected;
1554 
1555  return nbConnected;
1556  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDTransportMsg.

Referenced by XrdCl::Channel::NbConnectedStrm().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168  {
169  return true;
170  }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1848 of file XrdClXRootDTransport.cc.

1850  {
1851  XRootDChannelInfo *info = 0;
1852  channelData.Get( info );
1853 
1855  int notlsok = DefaultNoTlsOK;
1856  env->GetInt( "NoTlsOK", notlsok );
1857 
1858 
1859  if( notlsok )
1860  return info->encrypted;
1861 
1862  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1863 
1864  // Did the server instructed us to switch to TLS right away?
1865  if( sInfo.serverFlags & kXR_gotoTLS )
1866  {
1867  if( handShakeData->subStreamId == 0 ) info->encrypted = true;
1868  return true ;
1869  }
1870 
1871  //--------------------------------------------------------------------------
1872  // The control stream (sub-stream 0) might need to switch to TLS before
1873  // login or after login
1874  //--------------------------------------------------------------------------
1875  if( handShakeData->subStreamId == 0 )
1876  {
1877  //------------------------------------------------------------------------
1878  // We are about to login and the server asked to start encrypting
1879  // before login
1880  //------------------------------------------------------------------------
1881  if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1882  ( info->serverFlags & kXR_tlsLogin ) )
1883  {
1884  info->encrypted = true;
1885  return true;
1886  }
1887 
1888  //--------------------------------------------------------------------
1889  // The hand-shake is done and the server requested to encrypt the session
1890  //--------------------------------------------------------------------
1891  if( (sInfo.status == XRootDStreamInfo::Connected ||
1892  //--------------------------------------------------------------------
1893  // we really need to turn on TLS before we sent kXR_endsess and we
1894  // are about to do so (1st enable encryption, then send kXR_endsess)
1895  //--------------------------------------------------------------------
1896  sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1897  ( info->serverFlags & kXR_tlsSess ) )
1898  {
1899  info->encrypted = true;
1900  return true;
1901  }
1902  }
1903  //--------------------------------------------------------------------------
1904  // A data stream (sub-stream > 0) if need be will be switched to TLS before
1905  // bind.
1906  //--------------------------------------------------------------------------
1907  else
1908  {
1909  //------------------------------------------------------------------------
1910  // We are about to bind a data stream and the server asked to start
1911  // encrypting before bind
1912  //------------------------------------------------------------------------
1913  if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1914  ( info->serverFlags & kXR_tlsData ) )
1915  {
1916  return true;
1917  }
1918  }
1919 
1920  return false;
1921  }
#define kXR_tlsLogin
Definition: XProtocol.hh:1226
#define kXR_gotoTLS
Definition: XProtocol.hh:1222
#define kXR_tlsSess
Definition: XProtocol.hh:1227
#define kXR_tlsData
Definition: XProtocol.hh:1224
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDStreamInfo::serverFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t  query,
AnyObject result,
AnyObject channelData 
)
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1594 of file XrdClXRootDTransport.cc.

1597  {
1598  XRootDChannelInfo *info = 0;
1599  channelData.Get( info );
1600 
1601  if (!info)
1602  return XRootDStatus(stFatal, errInternal);
1603 
1604  XrdSysMutexHelper scopedLock( info->mutex );
1605 
1606  switch( query )
1607  {
1608  //------------------------------------------------------------------------
1609  // Protocol name
1610  //------------------------------------------------------------------------
1611  case TransportQuery::Name:
1612  result.Set( (const char*)"XRootD", false );
1613  return Status();
1614 
1615  //------------------------------------------------------------------------
1616  // Authentication
1617  //------------------------------------------------------------------------
1618  case TransportQuery::Auth:
1619  result.Set( new std::string( info->authProtocolName ), false );
1620  return Status();
1621 
1622  //------------------------------------------------------------------------
1623  // Server flags
1624  //------------------------------------------------------------------------
1626  result.Set( new int( info->serverFlags ), false );
1627  return Status();
1628 
1629  //------------------------------------------------------------------------
1630  // Protocol version
1631  //------------------------------------------------------------------------
1633  result.Set( new int( info->protocolVersion ), false );
1634  return Status();
1635 
1637  result.Set( new bool( info->encrypted ), false );
1638  return Status();
1639  };
1640  return Status( stError, errQueryNotSupported );
1641  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errInternal, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), XrdCl::stError, and XrdCl::stFatal.

+ Here is the call graph for this function:

◆ SetDescription()

static void XrdCl::XRootDTransport::SetDescription ( Message msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246  {
247  std::ostringstream o;
248  GenerateDescription( msg->GetBuffer(), o );
249  msg->SetDescription( o.str() );
250  }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Clone(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileStateHandler::PreRead(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileSystem::Stat(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 1044 of file XrdClXRootDTransport.cc.

1045  {
1046  XRootDChannelInfo *info = 0;
1047  channelData.Get( info );
1048 
1049  if (!info) {
1050  DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1051  return 1;
1052  }
1053 
1054  XrdSysMutexHelper scopedLock( info->mutex );
1055 
1056  //--------------------------------------------------------------------------
1057  // If the connection has been opened in order to orchestrate a TPC or
1058  // the remote server is a Manager or Metamanager we will need only one
1059  // (control) stream.
1060  //--------------------------------------------------------------------------
1061  if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1062 
1063  //--------------------------------------------------------------------------
1064  // Number of streams requested by user
1065  //--------------------------------------------------------------------------
1066  uint16_t ret = info->stream.size();
1067 
1069  int nodata = DefaultTlsNoData;
1070  env->GetInt( "TlsNoData", nodata );
1071 
1072  // Does the server require the stream 0 to be encrypted?
1073  bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1074  ( info->serverFlags & kXR_tlsLogin ) ||
1075  ( info->serverFlags & kXR_tlsSess );
1076  // Does the server NOT require the data streams to be encrypted?
1077  bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1078  // Does the user require the stream 0 to be encrypted?
1079  bool usrTlsStrm0 = info->encrypted;
1080  // Does the user NOT require the data streams to be encrypted?
1081  bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1082 
1083  if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1084  ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1085  {
1086  //------------------------------------------------------------------------
1087  // The server or user asked us to encrypt stream 0, but to send the data
1088  // (read/write) using a plain TCP connection
1089  //------------------------------------------------------------------------
1090  if( ret == 1 ) ++ret;
1091  }
1092 
1093  if( ret > info->stream.size() )
1094  {
1095  info->stream.resize( ret );
1096  info->strmSelector->AdjustQueues( ret );
1097  }
1098 
1099  return ret;
1100  }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::strmSelector, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1451 of file XrdClXRootDTransport.cc.

1452  {
1453  ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1454  uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1455 
1456  switch( reqType )
1457  {
1458  case kXR_pgwrite:
1459  {
1460  //--------------------------------------------------------------------------
1461  // If there's no additional data there's nothing to unmarshal
1462  //--------------------------------------------------------------------------
1463  if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1464  //--------------------------------------------------------------------------
1465  // If there's not enough data to form correction-segment report an error
1466  //--------------------------------------------------------------------------
1467  if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1468  return XRootDStatus( stError, errInvalidMessage, 0,
1469  "kXR_status: invalid message size." );
1470 
1471  //--------------------------------------------------------------------------
1472  // Calculate the crc32c for the additional data
1473  //--------------------------------------------------------------------------
1474  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) );
1475  cse->cseCRC = ntohl( cse->cseCRC );
1476  size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1477  void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1478  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1479 
1480  //--------------------------------------------------------------------------
1481  // Do the integrity checks
1482  //--------------------------------------------------------------------------
1483  if( crcval != cse->cseCRC )
1484  {
1485  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1486  "corrupted (crc32c integrity check failed)." );
1487  }
1488 
1489  cse->dlFirst = ntohs( cse->dlFirst );
1490  cse->dlLast = ntohs( cse->dlLast );
1491 
1492  size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1493  sizeof( kXR_int64 );
1494  kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1495  sizeof( ServerResponseBody_pgWrCSE ) );
1496 
1497  for( size_t i = 0; i < pgcnt; ++i )
1498  pgoffs[i] = ntohll( pgoffs[i] );
1499 
1500  return XRootDStatus();
1501  break;
1502  }
1503 
1504  default:
1505  break;
1506  }
1507 
1508  return XRootDStatus( stError, errNotSupported );
1509  }
ServerResponseStatus status
Definition: XProtocol.hh:1352
@ kXR_1stRequest
Definition: XProtocol.hh:112
long long kXR_int64
Definition: XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the incoming message.

Definition at line 1297 of file XrdClXRootDTransport.cc.

1298  {
1299  ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1300 
1301  //--------------------------------------------------------------------------
1302  // kXR_ok
1303  //--------------------------------------------------------------------------
1304  if( m->hdr.status == kXR_ok )
1305  {
1306  switch( reqType )
1307  {
1308  //----------------------------------------------------------------------
1309  // kXR_protocol
1310  //----------------------------------------------------------------------
1311  case kXR_protocol:
1312  if( m->hdr.dlen < 8 )
1313  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1314  m->body.protocol.pval = ntohl( m->body.protocol.pval );
1315  m->body.protocol.flags = ntohl( m->body.protocol.flags );
1316  break;
1317  }
1318  }
1319  //--------------------------------------------------------------------------
1320  // kXR_error
1321  //--------------------------------------------------------------------------
1322  else if( m->hdr.status == kXR_error )
1323  {
1324  if( m->hdr.dlen < 4 )
1325  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1326  m->body.error.errnum = ntohl( m->body.error.errnum );
1327  }
1328 
1329  //--------------------------------------------------------------------------
1330  // kXR_wait
1331  //--------------------------------------------------------------------------
1332  else if( m->hdr.status == kXR_wait )
1333  {
1334  if( m->hdr.dlen < 4 )
1335  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1336  m->body.wait.seconds = htonl( m->body.wait.seconds );
1337  }
1338 
1339  //--------------------------------------------------------------------------
1340  // kXR_redirect
1341  //--------------------------------------------------------------------------
1342  else if( m->hdr.status == kXR_redirect )
1343  {
1344  if( m->hdr.dlen < 4 )
1345  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1346  m->body.redirect.port = htonl( m->body.redirect.port );
1347  }
1348 
1349  //--------------------------------------------------------------------------
1350  // kXR_waitresp
1351  //--------------------------------------------------------------------------
1352  else if( m->hdr.status == kXR_waitresp )
1353  {
1354  if( m->hdr.dlen < 4 )
1355  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1356  m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1357  }
1358 
1359  //--------------------------------------------------------------------------
1360  // kXR_attn
1361  //--------------------------------------------------------------------------
1362  else if( m->hdr.status == kXR_attn )
1363  {
1364  if( m->hdr.dlen < 4 )
1365  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1366  m->body.attn.actnum = htonl( m->body.attn.actnum );
1367  }
1368 
1369  return XRootDStatus();
1370  }
@ kXR_redirect
Definition: XProtocol.hh:946
@ kXR_error
Definition: XProtocol.hh:945

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message msg)
static

Unmarshall the header incoming message.

Definition at line 1514 of file XrdClXRootDTransport.cc.

1515  {
1516  ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1517  header->status = ntohs( header->status );
1518  header->dlen = ntohl( header->dlen );
1519  }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1276 of file XrdClXRootDTransport.cc.

1277  {
1278  if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1279  // We rely on the marshaling process to be symmetric!
1280  // First we unmarshall the request ID and the length because
1281  // MarshallRequest() relies on these, and then we need to unmarshall these
1282  // two again, because they get marshalled in MarshallRequest().
1283  // All this is pretty damn ugly and should be rewritten.
1284  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1285  req->header.requestid = htons( req->header.requestid );
1286  req->header.dlen = htonl( req->header.dlen );
1287  XRootDStatus st = MarshallRequest( msg );
1288  req->header.requestid = htons( req->header.requestid );
1289  req->header.dlen = htonl( req->header.dlen );
1290  msg->SetIsMarshalled( false );
1291  return st;
1292  }
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the status response.

Definition at line 1375 of file XrdClXRootDTransport.cc.

1376  {
1377  //--------------------------------------------------------------------------
1378  // Calculate the crc32c before the unmarshaling the body!
1379  //--------------------------------------------------------------------------
1380  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1381  char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1382  size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1383  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1384 
1385  size_t stlen = sizeof( ServerResponseStatus );
1386  switch( reqType )
1387  {
1388  case kXR_pgread:
1389  {
1390  stlen += sizeof( ServerResponseBody_pgRead );
1391  break;
1392  }
1393 
1394  case kXR_pgwrite:
1395  {
1396  stlen += sizeof( ServerResponseBody_pgWrite );
1397  break;
1398  }
1399  }
1400 
1401  if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1402  "kXR_status: invalid message size." );
1403 
1404  rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1405  rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1406 
1407  switch( reqType )
1408  {
1409  case kXR_pgread:
1410  {
1411  ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1412  pgrdbdy->offset = ntohll( pgrdbdy->offset );
1413  break;
1414  }
1415 
1416  case kXR_pgwrite:
1417  {
1418  ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1419  pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1420  break;
1421  }
1422  }
1423 
1424  //--------------------------------------------------------------------------
1425  // Do the integrity checks
1426  //--------------------------------------------------------------------------
1427  if( crcval != rspst->bdy.crc32c )
1428  {
1429  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1430  "corrupted (crc32c integrity check failed)." );
1431  }
1432 
1433  if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1434  rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1435  {
1436  return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1437  "(stream ID mismatch)." );
1438  }
1439 
1440 
1441 
1442  if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1443  {
1444  return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1445  "(request ID mismatch)." );
1446  }
1447 
1448  return XRootDStatus();
1449  }
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1303

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseHeader::streamid, and ServerResponseBody_Status::streamID.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1839 of file XrdClXRootDTransport.cc.

1840  {
1841  XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1842  pSecUnloadHandler->unloaded = true;
1843  }

References XrdCl::PluginUnloadHandler::lock, and XrdCl::PluginUnloadHandler::unloaded.

Friends And Related Function Documentation

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.


The documentation for this class was generated from the following files: