45 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
51 const char *File::m_traceID =
"File";
55 File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
59 m_cfi(
Cache::GetInstance().GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
62 m_file_size(iFileSize),
63 m_current_io(m_io_set.end()),
67 m_detach_time_logged(false),
73 m_prefetch_state(kOff),
74 m_prefetch_read_cnt(0),
75 m_prefetch_hit_cnt(0),
86 m_info_file =
nullptr;
94 m_data_file =
nullptr;
97 if (m_resmon_token >= 0)
103 TRACEF(
Debug,
"~File() ended, prefetch score = " << m_prefetch_score);
110 File *file =
new File(path, offset, fileSize);
138 m_in_shutdown =
true;
140 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
142 m_prefetch_state = kStopped;
143 cache()->DeRegisterPrefetchFile(
this);
150 void File::check_delta_stats()
154 if (m_delta_stats.
BytesRead() >= m_resmon_report_threshold)
155 report_and_merge_delta_stats();
158 void File::report_and_merge_delta_stats()
162 m_data_file->
Fstat(&s);
164 m_st_blocks = s.st_blocks;
166 m_stats.
AddUp(m_delta_stats);
167 m_delta_stats.
Reset();
174 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
182 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
186 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
198 insert_remote_location(loc);
214 IoSet_i mi = m_io_set.find(io);
216 if (mi != m_io_set.end())
221 ", active_reads " << n_active_reads <<
222 ", active_prefetches " << io->m_active_prefetches <<
223 ", allow_prefetching " << io->m_allow_prefetching <<
224 ", ios_in_detach " << m_ios_in_detach);
226 "\tio_map.size() " << m_io_set.size() <<
227 ", block_map.size() " << m_block_map.size() <<
", file");
229 insert_remote_location(loc);
231 io->m_allow_prefetching =
false;
232 io->m_in_detach =
true;
235 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
237 if ( ! select_current_io_or_disable_prefetching(
false) )
239 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
246 bool io_active_result;
248 if (n_active_reads > 0)
250 io_active_result =
true;
252 else if (m_io_set.size() - m_ios_in_detach == 1)
254 io_active_result = ! m_block_map.empty();
258 io_active_result = io->m_active_prefetches > 0;
261 if ( ! io_active_result)
266 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
268 return io_active_result;
272 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
283 m_detach_time_logged =
false;
292 if ( ! m_in_shutdown)
294 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
296 report_and_merge_delta_stats();
298 m_detach_time_logged =
true;
300 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
304 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
316 time_t now = time(0);
321 IoSet_i mi = m_io_set.find(io);
323 if (mi == m_io_set.end())
326 io->m_attach_time = now;
329 insert_remote_location(loc);
331 if (m_prefetch_state == kStopped)
333 m_prefetch_state = kOn;
334 cache()->RegisterPrefetchFile(
this);
339 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
353 time_t now = time(0);
357 IoSet_i mi = m_io_set.find(io);
359 if (mi != m_io_set.end())
361 if (mi == m_current_io)
366 m_delta_stats.
IoDetach(now - io->m_attach_time);
370 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
372 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
373 m_prefetch_state = kStopped;
374 cache()->DeRegisterPrefetchFile(
this);
379 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
391 static const char *tpfx =
"Open() ";
393 TRACEF(Dump, tpfx <<
"entered");
404 struct stat data_stat, info_stat;
408 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
409 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
412 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
413 myEnv.
Put(
"oss.asize", size_str);
425 m_data_file = myOss.
newFile(myUser);
426 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
430 delete m_data_file; m_data_file = 0;
434 myEnv.
Put(
"oss.asize",
"64k");
440 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
444 m_info_file = myOss.
newFile(myUser);
445 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
449 delete m_info_file; m_info_file = 0;
450 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
454 bool initialize_info_file =
true;
456 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
458 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
459 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
465 initialize_info_file =
false;
467 TRACEF(
Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
479 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
480 initialize_info_file =
true;
490 if (initialize_info_file)
495 m_cfi.
Write(m_info_file, ifn.c_str());
496 m_info_file->
Fsync();
497 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.
GetNBlocks());
501 if (futimens(m_info_file->
getFD(), NULL)) {
510 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
512 m_data_file->
Fstat(&data_stat);
513 m_st_blocks = data_stat.st_blocks;
516 m_resmon_report_threshold = std::min(std::max(200ll * 1024, m_file_size / 50), 500ll * 1024 * 1024);
530 bool File::overlap(
int blk,
539 const long long beg = blk * blk_size;
540 const long long end = beg + blk_size;
541 const long long req_end = req_off + req_size;
543 if (req_off < end && req_end > beg)
545 const long long ovlp_beg = std::max(beg, req_off);
546 const long long ovlp_end = std::min(end, req_end);
548 off = ovlp_beg - req_off;
549 blk_off = ovlp_beg - beg;
550 size = (int) (ovlp_end - ovlp_beg);
552 assert(size <= blk_size);
563 Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
571 const long long off = i * m_block_size;
572 const int last_block = m_num_blocks - 1;
573 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
575 int blk_size, req_size;
576 if (i == last_block) {
577 blk_size = req_size = m_file_size - off;
578 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
580 blk_size = req_size = m_block_size;
584 char *buf = cache()->RequestRAM(req_size);
588 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
598 m_prefetch_state = kHold;
599 cache()->DeRegisterPrefetchFile(
this);
604 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
611 void File::ProcessBlockRequest(
Block *b)
619 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
621 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
637 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
639 ProcessBlockRequest(*bi);
645 void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
647 int n_chunks = ioVec.size();
650 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
651 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
661 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
666 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
668 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
670 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
674 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
678 if (rs != expected_size)
680 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
699 if (m_in_shutdown || io->m_in_detach)
702 return m_in_shutdown ? -ENOENT : -EBADF;
710 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
719 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
721 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
728 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
732 if (m_in_shutdown || io->m_in_detach)
735 return m_in_shutdown ? -ENOENT : -EBADF;
752 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
757 int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
769 int prefetch_cnt = 0;
774 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
776 std::vector<XrdOucIOVec> iovec_disk;
777 std::vector<XrdOucIOVec> iovec_direct;
778 int iovec_disk_total = 0;
779 int iovec_direct_total = 0;
781 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
788 const int idx_first = iUserOff / m_block_size;
789 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
791 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
793 enum LastBlock_e { LB_other, LB_disk, LB_direct };
795 LastBlock_e lbe = LB_other;
797 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
800 BlockMap_i bi = m_block_map.find(block_idx);
807 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
810 if (bi != m_block_map.end())
812 inc_ref_count(bi->second);
813 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
815 if (bi->second->is_finished())
819 assert(bi->second->is_ok());
821 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
823 if (bi->second->m_prefetch)
834 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
843 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
846 iovec_disk.back().size += size;
848 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
849 iovec_disk_total += size;
863 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
866 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
868 blks_to_request.push_back(b);
877 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
879 iovec_direct_total += size;
886 iovec_direct.back().size += size;
888 long long in_offset = block_idx * m_block_size + blk_off;
889 char *out_pos = iUserBuff + off;
896 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
905 inc_prefetch_hit_cnt(prefetch_cnt);
910 if ( ! blks_to_request.empty())
912 ProcessBlockRequests(blks_to_request);
913 blks_to_request.clear();
917 if ( ! iovec_direct.empty())
919 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
921 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
926 long long bytes_read = 0;
930 if ( ! blks_ready.empty())
932 for (
auto &bvi : blks_ready)
934 for (
auto &cr : bvi.second)
936 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
937 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
938 bytes_read += cr.m_size;
944 if ( ! iovec_disk.empty())
946 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
947 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
964 for (
auto &bvi : blks_ready)
965 dec_ref_count(bvi.first, (
int) bvi.second.size());
999 return error_cond ? error_cond : bytes_read;
1011 long long offset = b->
m_offset - m_offset;
1031 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1041 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1044 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1046 bool schedule_sync =
false;
1067 m_writes_during_sync.push_back(blk_idx);
1072 ++m_non_flushed_cnt;
1076 schedule_sync =
true;
1078 m_non_flushed_cnt = 0;
1085 cache()->ScheduleFileSync(
this);
1095 int ret = m_data_file->
Fsync();
1096 bool errorp =
false;
1102 report_and_merge_delta_stats();
1103 loc_stats = m_stats;
1106 m_cfi.
Write(m_info_file, m_filename.c_str());
1107 int cret = m_info_file->
Fsync();
1110 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1116 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1122 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1129 m_writes_during_sync.clear();
1135 int written_while_in_sync;
1136 bool resync =
false;
1139 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1143 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1144 m_writes_during_sync.clear();
1148 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1153 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1164 void File::free_block(
Block* b)
1167 int i = b->
m_offset / m_block_size;
1168 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1169 size_t ret = m_block_map.erase(i);
1173 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1183 m_prefetch_state = kOn;
1184 cache()->RegisterPrefetchFile(
this);
1190 bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1194 int io_size = (int) m_io_set.size();
1199 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1202 m_current_io = m_io_set.begin();
1205 else if (io_size > 1)
1207 IoSet_i mi = m_current_io;
1208 if (skip_current && mi != m_io_set.end()) ++mi;
1210 for (
int i = 0; i < io_size; ++i)
1212 if (mi == m_io_set.end()) mi = m_io_set.begin();
1214 if ((*mi)->m_allow_prefetching)
1226 m_current_io = m_io_set.end();
1227 m_prefetch_state = kStopped;
1228 cache()->DeRegisterPrefetchFile(
this);
1236 void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1242 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1244 m_state_cond.
Lock();
1260 FinalizeReadRequest(rreq);
1287 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1290 m_state_cond.
Lock();
1295 rreq->m_stats.m_BytesMissed += creq.
m_size;
1297 rreq->m_stats.m_BytesHit += creq.
m_size;
1299 --rreq->m_n_chunk_reqs;
1302 inc_prefetch_hit_cnt(1);
1306 bool rreq_complete = rreq->is_complete();
1311 FinalizeReadRequest(rreq);
1321 check_delta_stats();
1328 void File::ProcessBlockResponse(
Block *b,
int res)
1330 static const char* tpfx =
"ProcessBlockResponse ";
1332 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1334 if (res >= 0 && res != b->
get_size())
1338 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1342 m_state_cond.
Lock();
1348 IoSet_i mi = m_io_set.find(io);
1349 if (mi != m_io_set.end())
1351 --io->m_active_prefetches;
1354 if (res < 0 && io->m_allow_prefetching)
1356 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1357 io->m_allow_prefetching =
false;
1360 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1362 if ( ! select_current_io_or_disable_prefetching(
false) )
1364 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1370 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1386 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1387 if ( ! m_in_shutdown)
1393 cache()->AddWriteTask(b,
true);
1402 for (
auto &creq : creqs_to_notify)
1404 ProcessBlockSuccess(b, creq);
1413 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1424 std::list<ReadRequest*> rreqs_to_complete;
1433 ProcessBlockError(b, rreq);
1436 rreqs_to_complete.push_back(rreq);
1441 creqs_to_keep.push_back(creq);
1445 bool reissue =
false;
1446 if ( ! creqs_to_keep.empty())
1448 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1450 TRACEF(
Info,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1451 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1460 for (
auto rreq : rreqs_to_complete)
1461 FinalizeReadRequest(rreq);
1464 ProcessBlockRequest(b);
1472 return m_filename.c_str();
1477 int File::offsetIdx(
int iIdx)
const
1479 return iIdx - m_offset/m_block_size;
1493 TRACEF(DumpXL,
"Prefetch() entering.");
1497 if (m_prefetch_state != kOn)
1502 if ( ! select_current_io_or_disable_prefetching(
true) )
1504 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1509 for (
int f = 0; f < m_num_blocks; ++f)
1513 int f_act = f + m_offset / m_block_size;
1515 BlockMap_i bi = m_block_map.find(f_act);
1516 if (bi == m_block_map.end())
1518 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1521 TRACEF(Dump,
"Prefetch take block " << f_act);
1525 inc_prefetch_read_cnt(1);
1530 TRACEF(
Warning,
"Prefetch allocation failed for block " << f_act);
1539 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1540 m_prefetch_state = kComplete;
1541 cache()->DeRegisterPrefetchFile(
this);
1545 (*m_current_io)->m_active_prefetches += (int) blks.size();
1549 if ( ! blks.empty())
1551 ProcessBlockRequests(blks);
1560 return m_prefetch_score;
1573 void File::insert_remote_location(
const std::string &loc)
1577 size_t p = loc.find_first_of(
'@');
1578 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1585 if ( ! m_remote_locations.empty())
1589 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1593 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1596 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1598 s +=
'"'; s += *i; s +=
'"';
1599 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
int stat(const char *path, struct stat *buf)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Fstat(struct stat *buf)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
void * get_req_id() const
long long get_offset() const
vChunkRequest_t m_chunk_reqs
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
const std::string & GetLocalPath() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
XrdOucCacheIO * GetInput()
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long BytesRead() const
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)