XRootD
XrdFfsWcache.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* XrdFfsWcache.cc simple write cache that captures consecutive small writes */
3 /* */
4 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
5 /* All Rights Reserved */
6 /* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009) */
7 /* Contract DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 /*
31  When direct_io is not used, kernel will break large write to 4Kbyte
32  writes. This significantly reduces the writting performance. This
33  simple cache mechanism is to improve the performace on small writes.
34 
35  Note that fuse 2.8.0 pre2 or above and kernel 2.6.27 or above provide
36  a big_writes option to allow > 4KByte writing. It will make this
37  smiple write caching obsolete.
38 */
39 
40 #if defined(__linux__)
41 /* For pread()/pwrite() */
42 #ifndef _XOPEN_SOURCE
43 #define _XOPEN_SOURCE 500
44 #endif
45 #endif
46 
47 #include <cstring>
48 #include <cstdlib>
49 #include <sys/types.h>
50 #include <sys/resource.h>
51 #include <unistd.h>
52 #include <cerrno>
53 
54 #include <pthread.h>
55 
56 #include "XrdFfs/XrdFfsWcache.hh"
57 #ifndef NOXRD
58  #include "XrdFfs/XrdFfsPosix.hh"
59 #endif
60 
61 #ifndef O_DIRECT
62 #define O_DIRECT 0
63 #endif
64 
65 #ifdef __cplusplus
66  extern "C" {
67 #endif
68 
70 ssize_t XrdFfsWcacheBufsize = 131072;
71 
73  off_t offset;
74  size_t len;
75  char *buf;
76  size_t bufsize;
77  pthread_mutex_t *mlock;
78 };
79 
81 
82 /* #include "xrdposix.h" */
83 
85 void XrdFfsWcache_init(int basefd, int maxfd)
86 {
87  int fd;
88 /* We are now using virtual file descriptors (from Xrootd Posix interface) in XrdFfsXrootdfs.cc so we need to set
89  * base (lowest) file descriptor, and max number of file descriptors..
90  *
91  struct rlimit rlp;
92 
93  getrlimit(RLIMIT_NOFILE, &rlp);
94  XrdFfsWcacheNFILES = rlp.rlim_cur;
95  XrdFfsWcacheNFILES = (XrdFfsWcacheNFILES == (int)RLIM_INFINITY? 4096 : XrdFfsWcacheNFILES);
96  */
97 
98  XrdFfsPosix_baseFD = basefd;
99  XrdFfsWcacheNFILES = maxfd;
100 
101 /* printf("%d %d\n", XrdFfsWcacheNFILES, sizeof(struct XrdFfsWcacheFilebuf)); */
103  for (fd = 0; fd < XrdFfsWcacheNFILES; fd++)
104  {
105  XrdFfsWcacheFbufs[fd].offset = 0;
106  XrdFfsWcacheFbufs[fd].len = 0;
107  XrdFfsWcacheFbufs[fd].buf = NULL;
108  XrdFfsWcacheFbufs[fd].mlock = NULL;
109  }
110  if (!getenv("XRDCL_EC"))
111  {
112  XrdFfsRcacheBufsize = 1024 * 128;
113  }
114  else
115  {
116  char *savptr;
117  int nbdat = atoi(strtok_r(getenv("XRDCL_EC"), ",", &savptr));
118  strtok_r(NULL, ",", &savptr);
119  int chsz = atoi(strtok_r(NULL, ",", &savptr));
120  XrdFfsRcacheBufsize = nbdat * chsz;
121  }
122  if (getenv("XROOTDFS_WCACHESZ"))
123  XrdFfsRcacheBufsize = atoi(getenv("XROOTDFS_WCACHESZ"));
124 }
125 
126 int XrdFfsWcache_create(int fd, int flags)
127 /* Create a write cache buffer for a given file descriptor
128  *
129  * fd: file descriptor
130  *
131  * returns: 1 - ok
132  * 0 - error, error code in errno
133  */
134 {
136  fd -= XrdFfsPosix_baseFD;
137 
138  XrdFfsWcacheFbufs[fd].offset = 0;
139  XrdFfsWcacheFbufs[fd].len = 0;
140  if ( ((flags & O_ACCMODE) == O_RDONLY) &&
141  (flags & O_DIRECT) ) // Limit the usage scenario of the read cache
142  {
143  XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsRcacheBufsize);
145  }
146  else
147  {
148  XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
150  }
151  if (XrdFfsWcacheFbufs[fd].buf == NULL)
152  {
153  errno = ENOMEM;
154  return 0;
155  }
156  XrdFfsWcacheFbufs[fd].mlock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
157  if (XrdFfsWcacheFbufs[fd].mlock == NULL)
158  {
159  errno = ENOMEM;
160  return 0;
161  }
162  errno = pthread_mutex_init(XrdFfsWcacheFbufs[fd].mlock, NULL);
163  if (errno)
164  return 0;
165  return 1;
166 }
167 
169 {
170 /* XrdFfsWcache_flush(fd); */
171  fd -= XrdFfsPosix_baseFD;
172 
173  XrdFfsWcacheFbufs[fd].offset = 0;
174  XrdFfsWcacheFbufs[fd].len = 0;
175  if (XrdFfsWcacheFbufs[fd].buf != NULL)
176  free(XrdFfsWcacheFbufs[fd].buf);
177  XrdFfsWcacheFbufs[fd].buf = NULL;
178  if (XrdFfsWcacheFbufs[fd].mlock != NULL)
179  {
180  pthread_mutex_destroy(XrdFfsWcacheFbufs[fd].mlock);
181  free(XrdFfsWcacheFbufs[fd].mlock);
182  }
183  XrdFfsWcacheFbufs[fd].mlock = NULL;
184 }
185 
186 ssize_t XrdFfsWcache_flush(int fd)
187 {
188  ssize_t rc;
189  fd -= XrdFfsPosix_baseFD;
190 
191  if (XrdFfsWcacheFbufs[fd].len == 0 || XrdFfsWcacheFbufs[fd].buf == NULL )
192  return 0;
193 
196  if (rc > 0)
197  {
198  XrdFfsWcacheFbufs[fd].offset = 0;
199  XrdFfsWcacheFbufs[fd].len = 0;
200  }
201  return rc;
202 }
203 
204 /*
205 struct fd_n_offset {
206  int fd;
207  off_t offset;
208  fd_n_offset(int myfd, off_t myoffset) : fd(myfd), offset(myoffset) {}
209 };
210 
211 void *XrdFfsWcache_updateReadCache(void *x)
212 {
213  struct fd_n_offset *a = (struct fd_n_offset*) x;
214  size_t bufsize = XrdFfsWcacheFbufs[a->fd].bufsize;
215 
216  pthread_mutex_lock(XrdFfsWcacheFbufs[a->fd].mlock);
217  XrdFfsWcacheFbufs[a->fd].offset = (a->offset / bufsize) * bufsize;
218  XrdFfsWcacheFbufs[a->fd].len = XrdFfsPosix_pread(a->fd + XrdFfsPosix_baseFD,
219  XrdFfsWcacheFbufs[a->fd].buf,
220  bufsize,
221  XrdFfsWcacheFbufs[a->fd].offset);
222  pthread_mutex_unlock(XrdFfsWcacheFbufs[a->fd].mlock);
223  return NULL;
224 }
225 */
226 
227 // this is a read cache
228 ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
229 {
230  ssize_t rc;
231  fd -= XrdFfsPosix_baseFD;
232  if (fd < 0)
233  {
234  errno = EBADF;
235  return -1;
236  }
237 
238  char *bufptr;
239  size_t bufsize = XrdFfsWcacheFbufs[fd].bufsize;
240 
241  pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
242 
243  // identity which block to cache
244  if (XrdFfsWcacheFbufs[fd].len == 0 ||
246  {
250  bufsize,
252  } // when XrdFfsWcacheFbufs[fd].len < bufsize, the block is partially cached.
253 
254 
255  // fetch data from the cache, up to the block's upper boundary.
256  if (XrdFfsWcacheFbufs[fd].offset <= offset &&
258  { // read from cache,
259 //----------------------------------------------------------
260 // FUSE doesn't like this block of the code, unless direct_io is enabled, or
261 // O_DIRECT flags is used. Otherwise, FUSES will stop reading prematurely
262 // when two processes read the same file at the same time.
263  bufptr = &XrdFfsWcacheFbufs[fd].buf[offset - XrdFfsWcacheFbufs[fd].offset];
264  rc = (len < XrdFfsWcacheFbufs[fd].len - (offset - XrdFfsWcacheFbufs[fd].offset))?
266  memcpy(buf, bufptr, rc);
267 //----------------------------------------------------------
268  }
269  else
270  { // offset fall into the uncached part of the partically cached block
272  }
273  pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
274 /*
275  // prefetch the next block
276  if ( (offset + rc) ==
277  (XrdFfsWcacheFbufs[fd].offset + bufsize) )
278  {
279  pthread_t thread;
280  pthread_attr_t attr;
281  //size_t stacksize = 4*1024*1024;
282 
283  pthread_attr_init(&attr);
284  //pthread_attr_setstacksize(&attr, stacksize);
285  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
286 
287  struct fd_n_offset nextblock(fd, (offset + bufsize));
288  if (! pthread_create(&thread, &attr, XrdFfsWcache_updateReadCache, &nextblock))
289  pthread_detach(thread);
290  pthread_attr_destroy(&attr);
291  }
292 */
293  return rc;
294 }
295 
296 ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
297 {
298  ssize_t rc;
299  char *bufptr;
300  fd -= XrdFfsPosix_baseFD;
301  if (fd < 0)
302  {
303  errno = EBADF;
304  return -1;
305  }
306 
307 /* do not use caching under these cases */
308  if (len > (size_t)(XrdFfsWcacheBufsize/2) || fd >= XrdFfsWcacheNFILES)
309  {
311  return rc;
312  }
313 
314  pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
315  rc = XrdFfsWcacheFbufs[fd].len;
316 /*
317  in the following two cases, a XrdFfsWcache_flush is required:
318  1. current offset isnn't pointing to the tail of data in buffer
319  2. adding new data will exceed the current buffer
320 */
321  if (offset != (off_t)(XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheFbufs[fd].len) ||
324 
325  errno = 0;
326  if (rc < 0)
327  {
328  errno = ENOSPC;
329  pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
330  return -1;
331  }
332 
333  bufptr = &XrdFfsWcacheFbufs[fd].buf[XrdFfsWcacheFbufs[fd].len];
334  memcpy(bufptr, buf, len);
335  if (XrdFfsWcacheFbufs[fd].len == 0)
337  XrdFfsWcacheFbufs[fd].len += len;
338 
339  pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
340  return (ssize_t)len;
341 }
342 
343 #ifdef __cplusplus
344  }
345 #endif
ssize_t XrdFfsPosix_pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
Definition: XrdFfsPosix.cc:152
ssize_t XrdFfsPosix_pread(int fildes, void *buf, size_t nbyte, off_t offset)
Definition: XrdFfsPosix.cc:142
void XrdFfsWcache_init(int basefd, int maxfd)
Definition: XrdFfsWcache.cc:85
void XrdFfsWcache_destroy(int fd)
int XrdFfsWcacheNFILES
Definition: XrdFfsWcache.cc:84
ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsWcacheBufsize
Definition: XrdFfsWcache.cc:70
#define O_DIRECT
Definition: XrdFfsWcache.cc:62
pthread_mutex_t * mlock
Definition: XrdFfsWcache.cc:77
ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsRcacheBufsize
Definition: XrdFfsWcache.cc:69
int XrdFfsPosix_baseFD
Definition: XrdFfsWcache.cc:84
ssize_t XrdFfsWcache_flush(int fd)
struct XrdFfsWcacheFilebuf * XrdFfsWcacheFbufs
Definition: XrdFfsWcache.cc:80
int XrdFfsWcache_create(int fd, int flags)