XRootD
XrdClAsyncSocketHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XrdCl/XrdClStream.hh"
20 #include "XrdCl/XrdClConstants.hh"
21 #include "XrdCl/XrdClLog.hh"
22 #include "XrdCl/XrdClMessage.hh"
26 #include "XrdCl/XrdClOptimizers.hh"
27 #include "XrdSys/XrdSysE2T.hh"
28 #include <netinet/tcp.h>
29 
30 namespace XrdCl
31 {
32  //----------------------------------------------------------------------------
33  // Constructor
34  //----------------------------------------------------------------------------
36  Poller *poller,
37  TransportHandler *transport,
38  AnyObject *channelData,
39  uint16_t subStreamNum,
40  Stream *strm ):
41  pPoller( poller ),
42  pTransport( transport ),
43  pChannelData( channelData ),
44  pSubStreamNum( subStreamNum ),
45  pStream( strm ),
46  pStreamName( ToStreamName( url, subStreamNum ) ),
47  pSocket( new Socket() ),
48  pHandShakeDone( false ),
49  pConnectionStarted( 0 ),
50  pConnectionTimeout( 0 ),
51  pHSWaitStarted( 0 ),
52  pHSWaitSeconds( 0 ),
53  pUrl( url ),
54  pTlsHandShakeOngoing( false ),
55  pDoTransportDisc( false )
56  {
57  Env *env = DefaultEnv::GetEnv();
58 
59  int timeoutResolution = DefaultTimeoutResolution;
60  env->GetInt( "TimeoutResolution", timeoutResolution );
61  pTimeoutResolution = timeoutResolution;
62 
64  pLastActivity = time(0);
65  }
66 
67  //----------------------------------------------------------------------------
68  // Destructor
69  //----------------------------------------------------------------------------
71  {
72  Close();
73  delete pSocket;
74  }
75 
76  //----------------------------------------------------------------------------
77  // Connect to given address
78  //----------------------------------------------------------------------------
80  {
81  Log *log = DefaultEnv::GetLog();
82  pLastActivity = pConnectionStarted = ::time(0);
83  pConnectionTimeout = timeout;
84 
85  //--------------------------------------------------------------------------
86  // Initialize the socket
87  //--------------------------------------------------------------------------
89  if( !st.IsOK() )
90  {
91  log->Error( AsyncSockMsg, "[%s] Unable to initialize socket: %s",
92  pStreamName.c_str(), st.ToString().c_str() );
93  st.status = stFatal;
94  return st;
95  }
96 
97  //--------------------------------------------------------------------------
98  // Set the keep-alive up
99  //--------------------------------------------------------------------------
100  Env *env = DefaultEnv::GetEnv();
101 
102  int keepAlive = DefaultTCPKeepAlive;
103  env->GetInt( "TCPKeepAlive", keepAlive );
104  if( keepAlive )
105  {
106  int param = 1;
107  XRootDStatus st = pSocket->SetSockOpt( SOL_SOCKET, SO_KEEPALIVE, &param,
108  sizeof(param) );
109  if( !st.IsOK() )
110  log->Error( AsyncSockMsg, "[%s] Unable to turn on keepalive: %s",
111  pStreamName.c_str(), st.ToString().c_str() );
112 
113 #if ( defined(__linux__) || defined(__GNU__) ) && defined( TCP_KEEPIDLE ) && \
114  defined( TCP_KEEPINTVL ) && defined( TCP_KEEPCNT )
115 
116  param = DefaultTCPKeepAliveTime;
117  env->GetInt( "TCPKeepAliveTime", param );
118  st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPIDLE, &param, sizeof(param));
119  if( !st.IsOK() )
120  log->Error( AsyncSockMsg, "[%s] Unable to set keepalive time: %s",
121  pStreamName.c_str(), st.ToString().c_str() );
122 
124  env->GetInt( "TCPKeepAliveInterval", param );
125  st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPINTVL, &param, sizeof(param));
126  if( !st.IsOK() )
127  log->Error( AsyncSockMsg, "[%s] Unable to set keepalive interval: %s",
128  pStreamName.c_str(), st.ToString().c_str() );
129 
131  env->GetInt( "TCPKeepAliveProbes", param );
132  st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPCNT, &param, sizeof(param));
133  if( !st.IsOK() )
134  log->Error( AsyncSockMsg, "[%s] Unable to set keepalive probes: %s",
135  pStreamName.c_str(), st.ToString().c_str() );
136 #endif
137  }
138 
139  pHandShakeDone = false;
140  pTlsHandShakeOngoing = false;
141  pHSWaitStarted = 0;
142  pHSWaitSeconds = 0;
144 
145  //--------------------------------------------------------------------------
146  // Initiate async connection to the address
147  //--------------------------------------------------------------------------
148  char nameBuff[256];
149  pSockAddr.Format( nameBuff, sizeof(nameBuff), XrdNetAddrInfo::fmtAdv6 );
150  log->Debug( AsyncSockMsg, "[%s] Attempting connection to %s",
151  pStreamName.c_str(), nameBuff );
152 
153  st = pSocket->ConnectToAddress( pSockAddr, 0 );
154  if( !st.IsOK() )
155  {
156  log->Error( AsyncSockMsg, "[%s] Unable to initiate the connection: %s",
157  pStreamName.c_str(), st.ToString().c_str() );
158  return st;
159  }
160 
162 
164  pDoTransportDisc = true;
165 
166  //--------------------------------------------------------------------------
167  // We should get the ready to write event once we're really connected
168  // so we need to listen to it
169  //--------------------------------------------------------------------------
170  if( !pPoller->AddSocket( pSocket, this ) )
171  {
173  pSocket->Close();
174  pDoTransportDisc = false;
175  pOpenChannel.reset();
176  return st;
177  }
178 
180  {
183  pSocket->Close();
184  pDoTransportDisc = false;
185  pOpenChannel.reset();
186  return st;
187  }
188 
189  return XRootDStatus();
190  }
191 
192  //----------------------------------------------------------------------------
193  // PreClose performs as many close actions as possible with fewer blocking
194  // conditions.
195  //----------------------------------------------------------------------------
197  {
198  Log *log = DefaultEnv::GetLog();
199  log->Debug( AsyncSockMsg, "[%s] PreClosing the socket", pStreamName.c_str() );
200 
202 
203  if( pDoTransportDisc )
204  {
206  pSubStreamNum );
207  pDoTransportDisc = false;
208  }
209 
210  return XRootDStatus();
211  }
212 
213  //----------------------------------------------------------------------------
214  // Close the connection
215  //----------------------------------------------------------------------------
217  {
218  Log *log = DefaultEnv::GetLog();
219  log->Debug( AsyncSockMsg, "[%s] Closing the socket", pStreamName.c_str() );
220 
222 
223  if( pDoTransportDisc )
224  {
226  pSubStreamNum );
227  pDoTransportDisc = false;
228  }
229 
230  pSocket->Close();
231  //--------------------------------------------------------------------------
232  // Releases a reference count on Channel. May possibly cause it to be
233  // destroyed, which will in turn destory pStream and thus us.
234  //--------------------------------------------------------------------------
235  pOpenChannel.reset();
236  return XRootDStatus();
237  }
238 
239  std::string AsyncSocketHandler::ToStreamName( const URL &url, uint16_t strmnb )
240  {
241  std::ostringstream o;
242  o << url.GetHostId();
243  o << "." << strmnb;
244  return o.str();
245  }
246 
247  //----------------------------------------------------------------------------
248  // Handler a socket event
249  //----------------------------------------------------------------------------
250  void AsyncSocketHandler::Event( uint8_t type, XrdCl::Socket */*socket*/ )
251  {
252  //--------------------------------------------------------------------------
253  // First check if the socket itself wants to apply some mapping on the
254  // event. E.g. in case of TLS socket it might want to map read events to
255  // write events and vice-versa.
256  //--------------------------------------------------------------------------
257  type = pSocket->MapEvent( type );
258 
259  //--------------------------------------------------------------------------
260  // Handle any read or write events. If any of the handlers indicate an error
261  // we will have been disconnected. A disconnection may cause the current
262  // object to be asynchronously reused or deleted, so we return immediately.
263  //--------------------------------------------------------------------------
264  if( !EventRead( type ) )
265  return;
266 
267  //--------------------------------------------------------------------------
268  // If there's a previosuly noted ECONNRESET error from write we give the
269  // fault now. This gave us the chance to process a read event.
270  //--------------------------------------------------------------------------
271  if( !pReqConnResetError.IsOK() )
272  {
274  return;
275  }
276 
277  if( !EventWrite( type ) )
278  return;
279  }
280 
281  //----------------------------------------------------------------------------
282  // Handler for read related socket events
283  //----------------------------------------------------------------------------
284  bool AsyncSocketHandler::EventRead( uint8_t type )
285  {
286  //--------------------------------------------------------------------------
287  // Read event
288  //--------------------------------------------------------------------------
289  if( type & ReadyToRead )
290  {
291  pLastActivity = time(0);
293  return OnTLSHandShake();
294 
295  if( likely( pHandShakeDone ) )
296  return OnRead();
297 
298  return OnReadWhileHandshaking();
299  }
300 
301  //--------------------------------------------------------------------------
302  // Read timeout
303  //--------------------------------------------------------------------------
304  else if( type & ReadTimeOut )
305  {
306  if( pHSWaitSeconds )
307  {
308  if( !CheckHSWait() )
309  return false;
310  }
311 
312  if( likely( pHandShakeDone ) )
313  return OnReadTimeout();
314 
315  return OnTimeoutWhileHandshaking();
316  }
317 
318  return true;
319  }
320 
321  //----------------------------------------------------------------------------
322  // Handler for write related socket events
323  //----------------------------------------------------------------------------
324  bool AsyncSocketHandler::EventWrite( uint8_t type )
325  {
326  //--------------------------------------------------------------------------
327  // Write event
328  //--------------------------------------------------------------------------
329  if( type & ReadyToWrite )
330  {
331  pLastActivity = time(0);
333  return OnConnectionReturn();
334 
335  //------------------------------------------------------------------------
336  // Make sure we are not writing anything if we have been told to wait.
337  //------------------------------------------------------------------------
338  if( pHSWaitSeconds != 0 )
339  return true;
340 
342  return OnTLSHandShake();
343 
344  if( likely( pHandShakeDone ) )
345  return OnWrite();
346 
347  return OnWriteWhileHandshaking();
348  }
349 
350  //--------------------------------------------------------------------------
351  // Write timeout
352  //--------------------------------------------------------------------------
353  else if( type & WriteTimeOut )
354  {
355  if( likely( pHandShakeDone ) )
356  return OnWriteTimeout();
357 
358  return OnTimeoutWhileHandshaking();
359  }
360 
361  return true;
362  }
363 
364  //----------------------------------------------------------------------------
365  // Connect returned
366  //----------------------------------------------------------------------------
368  {
369  //--------------------------------------------------------------------------
370  // Check whether we were able to connect
371  //--------------------------------------------------------------------------
372  Log *log = DefaultEnv::GetLog();
373  log->Debug( AsyncSockMsg, "[%s] Async connection call returned",
374  pStreamName.c_str() );
375 
376  int errorCode = 0;
377  socklen_t optSize = sizeof( errorCode );
378  XRootDStatus st = pSocket->GetSockOpt( SOL_SOCKET, SO_ERROR, &errorCode,
379  &optSize );
380 
381  //--------------------------------------------------------------------------
382  // This is an internal error really (either logic or system fault),
383  // so we call it a day and don't retry
384  //--------------------------------------------------------------------------
385  if( !st.IsOK() )
386  {
387  log->Error( AsyncSockMsg, "[%s] Unable to get the status of the "
388  "connect operation: %s", pStreamName.c_str(),
389  XrdSysE2T( errno ) );
392  return false;
393  }
394 
395  //--------------------------------------------------------------------------
396  // We were unable to connect
397  //--------------------------------------------------------------------------
398  if( errorCode )
399  {
400  log->Error( AsyncSockMsg, "[%s] Unable to connect: %s",
401  pStreamName.c_str(), XrdSysE2T( errorCode ) );
404  return false;
405  }
407 
408  //--------------------------------------------------------------------------
409  // Cork the socket
410  //--------------------------------------------------------------------------
411  st = pSocket->Cork();
412  if( !st.IsOK() )
413  {
415  return false;
416  }
417 
418  //--------------------------------------------------------------------------
419  // Initialize the handshake
420  //--------------------------------------------------------------------------
422  pSubStreamNum ) );
423  pHandShakeData->serverAddr = pSocket->GetServerAddress();
424  pHandShakeData->clientName = pSocket->GetSockName();
425  pHandShakeData->streamName = pStreamName;
426 
428  if( !st.IsOK() )
429  {
430  log->Error( AsyncSockMsg, "[%s] Connection negotiation failed",
431  pStreamName.c_str() );
433  return false;
434  }
435 
436  if( st.code != suRetry )
437  ++pHandShakeData->step;
438 
439  //--------------------------------------------------------------------------
440  // Initialize the hand-shake reader and writer
441  //--------------------------------------------------------------------------
442  hswriter.reset( new AsyncHSWriter( *pSocket, pStreamName ) );
444 
445  //--------------------------------------------------------------------------
446  // Transport has given us something to send
447  //--------------------------------------------------------------------------
448  if( pHandShakeData->out )
449  {
450  hswriter->Reset( pHandShakeData->out );
451  pHandShakeData->out = nullptr;
452  }
453 
454  //--------------------------------------------------------------------------
455  // Listen to what the server has to say
456  //--------------------------------------------------------------------------
458  {
461  return false;
462  }
463  return true;
464  }
465 
466  //----------------------------------------------------------------------------
467  // Got a write readiness event
468  //----------------------------------------------------------------------------
470  {
471  if( !reqwriter )
472  {
473  OnFault( XRootDStatus( stError, errInternal, 0, "Request writer is null." ) );
474  return false;
475  }
476  //--------------------------------------------------------------------------
477  // Let's do the writing ...
478  //--------------------------------------------------------------------------
479  XRootDStatus st = reqwriter->Write();
480 
481  //--------------------------------------------------------------------------
482  // In the case of ECONNRESET perhaps the server sent us something.
483  // To give a chance to read it in the next event poll we pass this as a
484  // retry, but return the error after the next event.
485  //--------------------------------------------------------------------------
486  if( st.code == errSocketError && st.errNo == ECONNRESET )
487  {
488  if( (DisableUplink()).IsOK() )
489  {
490  pReqConnResetError = st;
491  st = XRootDStatus( stOK, suRetry );
492  }
493  }
494  if( !st.IsOK() )
495  {
496  //------------------------------------------------------------------------
497  // We failed
498  //------------------------------------------------------------------------
499  OnFault( st );
500  return false;
501  }
502  //--------------------------------------------------------------------------
503  // We are not done yet
504  //--------------------------------------------------------------------------
505  if( st.code == suRetry) return true;
506  //--------------------------------------------------------------------------
507  // Disable the respective substream if empty
508  //--------------------------------------------------------------------------
509  reqwriter->Reset();
511  return true;
512  }
513 
514  //----------------------------------------------------------------------------
515  // Got a write readiness event while handshaking
516  //----------------------------------------------------------------------------
518  {
519  XRootDStatus st;
520  if( !hswriter || !hswriter->HasMsg() )
521  {
522  if( !(st = DisableUplink()).IsOK() )
523  {
525  return false;
526  }
527  return true;
528  }
529  //--------------------------------------------------------------------------
530  // Let's do the writing ...
531  //--------------------------------------------------------------------------
532  st = hswriter->Write();
533  if( !st.IsOK() )
534  {
535  //------------------------------------------------------------------------
536  // We failed
537  //------------------------------------------------------------------------
539  return false;
540  }
541  //--------------------------------------------------------------------------
542  // We are not done yet
543  //--------------------------------------------------------------------------
544  if( st.code == suRetry ) return true;
545  //--------------------------------------------------------------------------
546  // Disable the uplink
547  // Note: at this point we don't deallocate the HS message as we might need
548  // to re-send it in case of a kXR_wait response
549  //--------------------------------------------------------------------------
550  if( !(st = DisableUplink()).IsOK() )
551  {
553  return false;
554  }
555  return true;
556  }
557 
558  //----------------------------------------------------------------------------
559  // Got a read readiness event
560  //----------------------------------------------------------------------------
562  {
563  //--------------------------------------------------------------------------
564  // Make sure the response reader object exists
565  //--------------------------------------------------------------------------
566  if( !rspreader )
567  {
568  OnFault( XRootDStatus( stError, errInternal, 0, "Response reader is null." ) );
569  return false;
570  }
571 
572  //--------------------------------------------------------------------------
573  // Readout the data from the socket
574  //--------------------------------------------------------------------------
575  XRootDStatus st = rspreader->Read();
576 
577  //--------------------------------------------------------------------------
578  // Handler header corruption
579  //--------------------------------------------------------------------------
580  if( !st.IsOK() && st.code == errCorruptedHeader )
581  {
583  return false;
584  }
585 
586  //--------------------------------------------------------------------------
587  // Handler other errors
588  //--------------------------------------------------------------------------
589  if( !st.IsOK() )
590  {
591  OnFault( st );
592  return false;
593  }
594 
595  //--------------------------------------------------------------------------
596  // We are not done yet
597  //--------------------------------------------------------------------------
598  if( st.code == suRetry ) return true;
599 
600  //--------------------------------------------------------------------------
601  // We are done, reset the response reader so we can read out next message
602  //--------------------------------------------------------------------------
603  rspreader->Reset();
604  return true;
605  }
606 
607  //----------------------------------------------------------------------------
608  // Got a read readiness event while handshaking
609  //----------------------------------------------------------------------------
611  {
612  //--------------------------------------------------------------------------
613  // Make sure the response reader object exists
614  //--------------------------------------------------------------------------
615  if( !hsreader )
616  {
617  OnFault( XRootDStatus( stError, errInternal, 0, "Hand-shake reader is null." ) );
618  return false;
619  }
620 
621  //--------------------------------------------------------------------------
622  // Read the message and let the transport handler look at it when
623  // reading has finished
624  //--------------------------------------------------------------------------
625  XRootDStatus st = hsreader->Read();
626  if( !st.IsOK() )
627  {
629  return false;
630  }
631 
632  if( st.code != suDone )
633  return true;
634 
635  return HandleHandShake( hsreader->ReleaseMsg() );
636  }
637 
638  //------------------------------------------------------------------------
639  // Handle the handshake message
640  //------------------------------------------------------------------------
641  bool AsyncSocketHandler::HandleHandShake( std::unique_ptr<Message> msg )
642  {
643  //--------------------------------------------------------------------------
644  // OK, we have a new message, let's deal with it;
645  //--------------------------------------------------------------------------
646  pHandShakeData->in = msg.release();
648 
649  //--------------------------------------------------------------------------
650  // Deal with wait responses
651  //--------------------------------------------------------------------------
652  kXR_int32 waitSeconds = HandleWaitRsp( pHandShakeData->in );
653 
654  delete pHandShakeData->in;
655  pHandShakeData->in = 0;
656 
657  if( !st.IsOK() )
658  {
660  return false;
661  }
662 
663  if( st.code == suRetry )
664  {
665  //------------------------------------------------------------------------
666  // We are handling a wait response and the transport handler told
667  // as to retry the request
668  //------------------------------------------------------------------------
669  if( waitSeconds >=0 )
670  {
671  time_t resendTime = ::time( 0 ) + waitSeconds;
672  if( resendTime > pConnectionStarted + pConnectionTimeout )
673  {
674  Log *log = DefaultEnv::GetLog();
675  log->Error( AsyncSockMsg,
676  "[%s] Won't retry kXR_endsess request because would"
677  "reach connection timeout.",
678  pStreamName.c_str() );
679 
681  return false;
682  }
683  else
684  {
685  //--------------------------------------------------------------------
686  // We need to wait before replaying the request
687  //--------------------------------------------------------------------
688  Log *log = DefaultEnv::GetLog();
689  log->Debug( AsyncSockMsg, "[%s] Received a wait response to endsess request, "
690  "will wait for %d seconds before replaying the endsess request",
691  pStreamName.c_str(), waitSeconds );
692  pHSWaitStarted = time( 0 );
693  pHSWaitSeconds = waitSeconds;
694  }
695  return true;
696  }
697  //------------------------------------------------------------------------
698  // We are re-sending a protocol request
699  //------------------------------------------------------------------------
700  else if( pHandShakeData->out )
701  {
702  return SendHSMsg();
703  }
704  }
705 
706  //--------------------------------------------------------------------------
707  // If now is the time to enable encryption
708  //--------------------------------------------------------------------------
709  if( !pSocket->IsEncrypted() &&
711  {
713  if( !st.IsOK() )
714  return false;
715  if ( st.code == suRetry )
716  return true;
717  }
718 
719  //--------------------------------------------------------------------------
720  // Now prepare the next step of the hand-shake procedure
721  //--------------------------------------------------------------------------
722  return HandShakeNextStep( st.IsOK() && st.code == suDone );
723  }
724 
725  //------------------------------------------------------------------------
726  // Prepare the next step of the hand-shake procedure
727  //------------------------------------------------------------------------
729  {
730  //--------------------------------------------------------------------------
731  // We successfully proceeded to the next step
732  //--------------------------------------------------------------------------
733  ++pHandShakeData->step;
734 
735  //--------------------------------------------------------------------------
736  // The hand shake process is done
737  //--------------------------------------------------------------------------
738  if( done )
739  {
740  pHandShakeData.reset();
741  hswriter.reset();
742  hsreader.reset();
743  //------------------------------------------------------------------------
744  // Initialize the request writer & reader
745  //------------------------------------------------------------------------
748  XRootDStatus st;
749  if( !(st = EnableUplink()).IsOK() )
750  {
752  return false;
753  }
754  pHandShakeDone = true;
756  }
757  //--------------------------------------------------------------------------
758  // The transport handler gave us something to write
759  //--------------------------------------------------------------------------
760  else if( pHandShakeData->out )
761  {
762  return SendHSMsg();
763  }
764  return true;
765  }
766 
767  //----------------------------------------------------------------------------
768  // Handle fault
769  //----------------------------------------------------------------------------
771  {
772  Log *log = DefaultEnv::GetLog();
773  log->Error( AsyncSockMsg, "[%s] Socket error encountered: %s",
774  pStreamName.c_str(), st.ToString().c_str() );
775 
776  pStream->OnError( pSubStreamNum, st );
777  }
778 
779  //----------------------------------------------------------------------------
780  // Handle fault while handshaking
781  //----------------------------------------------------------------------------
783  {
784  Log *log = DefaultEnv::GetLog();
785  log->Error( AsyncSockMsg, "[%s] Socket error while handshaking: %s",
786  pStreamName.c_str(), st.ToString().c_str() );
787 
789  }
790 
791  //----------------------------------------------------------------------------
792  // Handle write timeout
793  //----------------------------------------------------------------------------
795  {
797  }
798 
799  //----------------------------------------------------------------------------
800  // Handler read timeout
801  //----------------------------------------------------------------------------
803  {
805  }
806 
807  //----------------------------------------------------------------------------
808  // Handle timeout while handshaking
809  //----------------------------------------------------------------------------
811  {
812  time_t now = time(0);
814  {
816  return false;
817  }
818  return true;
819  }
820 
821  //----------------------------------------------------------------------------
822  // Handle header corruption in case of kXR_status response
823  //----------------------------------------------------------------------------
825  {
826  //--------------------------------------------------------------------------
827  // We need to force a socket error so this is handled in a similar way as
828  // a stream t/o and all requests are retried
829  //--------------------------------------------------------------------------
831  }
832 
833  //----------------------------------------------------------------------------
834  // Carry out the TLS hand-shake
835  //----------------------------------------------------------------------------
837  {
838  Log *log = DefaultEnv::GetLog();
839  log->Debug( AsyncSockMsg, "[%s] TLS hand-shake exchange.", pStreamName.c_str() );
840 
841  XRootDStatus st;
842  if( !( st = pSocket->TlsHandShake( this, pUrl.GetHostName() ) ).IsOK() )
843  {
844  pTlsHandShakeOngoing = false;
846  return st;
847  }
848 
849  if( st.code == suRetry )
850  {
851  pTlsHandShakeOngoing = true;
852  return st;
853  }
854 
855  pTlsHandShakeOngoing = false;
856  log->Info( AsyncSockMsg, "[%s] TLS hand-shake done.", pStreamName.c_str() );
857 
858  return st;
859  }
860 
861  //----------------------------------------------------------------------------
862  // Handle read/write event if we are in the middle of a TLS hand-shake
863  //----------------------------------------------------------------------------
865  {
867  if( !st.IsOK() )
868  return false;
869  if ( st.code == suRetry )
870  return true;
871 
873  *pChannelData ) );
874  }
875 
876  //----------------------------------------------------------------------------
877  // Prepare a HS writer for sending and enable uplink
878  //----------------------------------------------------------------------------
880  {
881  if( !hswriter )
882  {
884  "HS writer object missing!" ) );
885  return false;
886  }
887  //--------------------------------------------------------------------------
888  // We only set a new HS message if this is not a replay due to kXR_wait
889  //--------------------------------------------------------------------------
890  if( !pHSWaitSeconds )
891  {
892  hswriter->Reset( pHandShakeData->out );
893  pHandShakeData->out = nullptr;
894  }
895  //--------------------------------------------------------------------------
896  // otherwise we replay the kXR_endsess request
897  //--------------------------------------------------------------------------
898  else
899  hswriter->Replay();
900  //--------------------------------------------------------------------------
901  // Enable writing so we can replay the HS message
902  //--------------------------------------------------------------------------
903  XRootDStatus st;
904  if( !(st = EnableUplink()).IsOK() )
905  {
907  return false;
908  }
909  return true;
910  }
911 
913  {
914  // It would be more coherent if this could be done in the
915  // transport layer, unfortunately the API does not allow it.
916  kXR_int32 waitSeconds = -1;
917  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
918  if( rsp->hdr.status == kXR_wait )
919  waitSeconds = rsp->body.wait.seconds;
920  return waitSeconds;
921  }
922 
923  //----------------------------------------------------------------------------
924  // Check if HS wait time elapsed
925  //----------------------------------------------------------------------------
927  {
928  time_t now = time( 0 );
929  if( now - pHSWaitStarted >= pHSWaitSeconds )
930  {
931  Log *log = DefaultEnv::GetLog();
932  log->Debug( AsyncSockMsg, "[%s] The hand-shake wait time elapsed, will "
933  "replay the endsess request.", pStreamName.c_str() );
934  if( !SendHSMsg() )
935  return false;
936  //------------------------------------------------------------------------
937  // Make sure the wait state is reset
938  //------------------------------------------------------------------------
939  pHSWaitSeconds = 0;
940  pHSWaitStarted = 0;
941  }
942  return true;
943  }
944 
945  //------------------------------------------------------------------------
946  // Get the IP stack
947  //------------------------------------------------------------------------
948  std::string AsyncSocketHandler::GetIpStack() const
949  {
950  std::string ipstack( ( pSockAddr.isIPType( XrdNetAddr::IPType::IPv6 ) &&
951  !pSockAddr.isMapped() ) ? "IPv6" : "IPv4" );
952  return ipstack;
953  }
954 
955  //------------------------------------------------------------------------
956  // Get IP address
957  //------------------------------------------------------------------------
959  {
960  char nameBuff[256];
961  pSockAddr.Format( nameBuff, sizeof(nameBuff), XrdNetAddrInfo::fmtAddr, XrdNetAddrInfo::noPort );
962  return nameBuff;
963  }
964 
965  //------------------------------------------------------------------------
967  //------------------------------------------------------------------------
969  {
970  const char *cstr = pSockAddr.Name();
971  if( !cstr )
972  return std::string();
973  return cstr;
974  }
975 }
union ServerResponse::@0 body
@ kXR_wait
Definition: XProtocol.hh:947
ServerResponseHeader hdr
Definition: XProtocol.hh:1330
int kXR_int32
Definition: XPtypes.hh:89
#define likely(x)
#define unlikely(x)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
Utility class encapsulating reading hand-shake response logic.
Utility class encapsulating writing hand-shake request logic.
Utility class encapsulating reading response message logic.
Utility class encapsulating writing request logic.
std::shared_ptr< Channel > pOpenChannel
static std::string ToStreamName(const URL &url, uint16_t strmnb)
Convert Stream object and sub-stream number to stream name.
bool OnReadTimeout() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSWriter > hswriter
virtual bool OnConnectionReturn() XRD_WARN_UNUSED_RESULT
bool OnWriteTimeout() XRD_WARN_UNUSED_RESULT
bool OnWrite() XRD_WARN_UNUSED_RESULT
bool OnTimeoutWhileHandshaking() XRD_WARN_UNUSED_RESULT
bool CheckHSWait() XRD_WARN_UNUSED_RESULT
bool EventRead(uint8_t type) XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSReader > hsreader
bool HandleHandShake(std::unique_ptr< Message > msg) XRD_WARN_UNUSED_RESULT
bool OnWriteWhileHandshaking() XRD_WARN_UNUSED_RESULT
void OnFaultWhileHandshaking(XRootDStatus st)
virtual void Event(uint8_t type, XrdCl::Socket *)
Handle a socket event.
kXR_int32 HandleWaitRsp(Message *rsp)
bool HandShakeNextStep(bool done) XRD_WARN_UNUSED_RESULT
bool OnReadWhileHandshaking() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncMsgWriter > reqwriter
XRootDStatus EnableUplink()
Enable uplink.
std::string GetIpStack() const
Get the IP stack.
bool EventWrite(uint8_t type) XRD_WARN_UNUSED_RESULT
std::string GetHostName()
Get hostname.
std::unique_ptr< AsyncMsgReader > rspreader
XRootDStatus DisableUplink()
Disable uplink.
std::unique_ptr< HandShakeData > pHandShakeData
bool OnRead() XRD_WARN_UNUSED_RESULT
XRootDStatus Connect(time_t timeout)
Connect to the currently set address.
AsyncSocketHandler(const URL &url, Poller *poller, TransportHandler *transport, AnyObject *channelData, uint16_t subStreamNum, Stream *strm)
Constructors.
bool OnTLSHandShake() XRD_WARN_UNUSED_RESULT
bool SendHSMsg() XRD_WARN_UNUSED_RESULT
std::string GetIpAddr()
Get IP address.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
Interface for socket pollers.
Definition: XrdClPoller.hh:88
virtual void ShutdownEvents(Socket *socket)=0
virtual bool AddSocket(Socket *socket, SocketHandler *handler)=0
virtual bool RemoveSocket(Socket *socket)=0
virtual bool EnableWriteNotification(Socket *socket, bool notify, time_t timeout=60)=0
virtual bool EnableReadNotification(Socket *socket, bool notify, time_t timeout=60)=0
@ ReadTimeOut
Read timeout.
Definition: XrdClPoller.hh:43
@ ReadyToWrite
Writing won't block.
Definition: XrdClPoller.hh:44
@ WriteTimeOut
Write timeout.
Definition: XrdClPoller.hh:45
@ ReadyToRead
New data has arrived.
Definition: XrdClPoller.hh:42
A network socket.
Definition: XrdClSocket.hh:43
std::string GetSockName() const
Get the name of the socket.
Definition: XrdClSocket.cc:632
XRootDStatus ConnectToAddress(const XrdNetAddr &addr, time_t timeout=10)
Definition: XrdClSocket.cc:212
XRootDStatus Initialize(int family=AF_INET)
Initialize the socket.
Definition: XrdClSocket.cc:63
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsEncrypted()
Definition: XrdClSocket.cc:867
void SetChannelID(AnyObject *channelID)
Definition: XrdClSocket.hh:246
void Close()
Disconnect.
Definition: XrdClSocket.cc:262
XRootDStatus SetSockOpt(int level, int optname, const void *optval, socklen_t optlen)
Set socket options.
Definition: XrdClSocket.cc:167
XRootDStatus TlsHandShake(AsyncSocketHandler *socketHandler, const std::string &thehost=std::string())
Definition: XrdClSocket.cc:844
uint8_t MapEvent(uint8_t event)
Definition: XrdClSocket.cc:835
const XrdNetAddr * GetServerAddress() const
Get the server address.
Definition: XrdClSocket.hh:237
XRootDStatus Cork()
Definition: XrdClSocket.cc:782
XRootDStatus GetSockOpt(int level, int optname, void *optval, socklen_t *optlen)
Get socket options.
Definition: XrdClSocket.cc:152
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
void SetStatus(SocketStatus status)
Set socket status - do not use unless you know what you're doing.
Definition: XrdClSocket.hh:133
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
std::shared_ptr< Channel > GetChannel()
Definition: XrdClStream.hh:414
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:834
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:947
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
Definition: XrdClStream.cc:794
void OnError(uint16_t subStream, XRootDStatus status)
On error.
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:284
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.
Perform the handshake and the authentication for each physical stream.
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)=0
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)=0
The stream has been disconnected, do the cleanups.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)=0
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)=0
HandHake.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
static const int noPort
Do not add port number.
bool isMapped() const
bool isIPType(IPType ipType) const
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
const char * Name(const char *eName=0, const char **eText=0)
int Family() const
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const int DefaultTCPKeepAliveProbes
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const int DefaultTimeoutResolution
const uint64_t AsyncSockMsg
const int DefaultTCPKeepAliveInterval
const int DefaultTCPKeepAlive
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const int DefaultTCPKeepAliveTime
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
const uint16_t suDone
Definition: XrdClStatus.hh:38
Data structure that carries the handshake information.
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148