30 #include "XrdOuc/XrdOucJson.hh"
39 #include <unordered_map>
48 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
54 const char *File::m_traceID =
"File";
58 File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
62 m_cfi(
Cache::TheOne().GetTrace(),
Cache::TheOne().is_prefetch_enabled()),
65 m_file_size(iFileSize),
66 m_current_io(m_io_set.end()),
70 m_detach_time_logged(false),
76 m_prefetch_state(kOff),
78 m_prefetch_read_cnt(0),
79 m_prefetch_hit_cnt(0),
106 m_info_file->
Close();
108 m_info_file =
nullptr;
114 m_data_file->
Close();
116 m_data_file =
nullptr;
119 if (m_resmon_token >= 0)
127 if (sr == 0 && s.st_blocks != m_st_blocks) {
130 m_st_blocks = s.st_blocks;
138 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
145 File *file =
new File(path, offset, fileSize);
146 if ( ! file->Open(inputIO))
171 m_in_shutdown =
true;
173 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
175 m_prefetch_state = kStopped;
176 cache()->DeRegisterPrefetchFile(
this);
179 report_and_merge_delta_stats();
186 void File::check_delta_stats()
191 report_and_merge_delta_stats();
194 void File::report_and_merge_delta_stats()
198 m_data_file->
Fstat(&s);
201 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
203 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
205 m_st_blocks = st_blocks_to_report;
207 m_stats.
AddUp(m_delta_stats);
208 m_delta_stats.
Reset();
215 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
223 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
227 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
239 insert_remote_location(loc);
255 IoSet_i mi = m_io_set.find(io);
257 if (mi != m_io_set.end())
262 ", active_reads " << n_active_reads <<
263 ", active_prefetches " << io->m_active_prefetches <<
264 ", allow_prefetching " << io->m_allow_prefetching <<
265 ", ios_in_detach " << m_ios_in_detach);
267 "\tio_map.size() " << m_io_set.size() <<
268 ", block_map.size() " << m_block_map.size() <<
", file");
270 insert_remote_location(loc);
272 io->m_allow_prefetching =
false;
273 io->m_in_detach =
true;
276 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
278 if ( ! select_current_io_or_disable_prefetching(
false) )
280 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
287 bool io_active_result;
289 if (n_active_reads > 0)
291 io_active_result =
true;
293 else if (m_io_set.size() - m_ios_in_detach == 1)
295 io_active_result = ! m_block_map.empty();
299 io_active_result = io->m_active_prefetches > 0;
302 if ( ! io_active_result)
307 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
309 return io_active_result;
313 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
324 m_detach_time_logged =
false;
333 if ( ! m_in_shutdown)
335 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
337 report_and_merge_delta_stats();
339 m_detach_time_logged =
true;
341 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
345 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
357 time_t now = time(0);
362 IoSet_i mi = m_io_set.find(io);
364 if (mi == m_io_set.end())
367 io->m_attach_time = now;
370 insert_remote_location(loc);
372 if (m_prefetch_state == kStopped)
374 m_prefetch_state = kOn;
375 cache()->RegisterPrefetchFile(
this);
380 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
394 time_t now = time(0);
398 IoSet_i mi = m_io_set.find(io);
400 if (mi != m_io_set.end())
402 if (mi == m_current_io)
407 m_delta_stats.
IoDetach(now - io->m_attach_time);
411 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
413 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
414 m_prefetch_state = kStopped;
415 cache()->DeRegisterPrefetchFile(
this);
420 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
432 static const char *tpfx =
"Open() ";
434 TRACEF(Dump, tpfx <<
"entered");
445 struct stat data_stat, info_stat;
449 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
450 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
453 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
454 myEnv.
Put(
"oss.asize", size_str);
466 m_data_file = myOss.
newFile(myUser);
467 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
471 delete m_data_file; m_data_file = 0;
475 myEnv.
Put(
"oss.asize",
"64k");
481 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
485 m_info_file = myOss.
newFile(myUser);
486 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
490 delete m_info_file; m_info_file = 0;
491 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
495 bool initialize_info_file =
true;
497 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
499 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
500 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
507 initialize_info_file =
false;
509 TRACEF(
Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
523 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
524 initialize_info_file =
true;
540 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
543 if (initialize_info_file)
548 m_cfi.
Write(m_info_file, ifn.c_str());
549 m_info_file->
Fsync();
550 cache()->WriteFileSizeXAttr(m_info_file->
getFD(), m_file_size);
552 if (cache()->RefConfiguration().m_httpcc)
554 std::string responseFctl;
555 int resFctl = inputIO->
Fcntl(XrdOucCacheOp::Code::QFinfo,
"head", responseFctl);
558 std::string cc_str = responseFctl;
560 if (cc_json.contains(
"max-age"))
562 time_t ma = cc_json[
"max-age"];
564 cc_json[
"expire"] = ma;
565 cc_str = cc_json.dump();
567 TRACE(
Error,
"GetFile() XrdCl::File::Fcntl value " << cc_str);
568 cache()->WriteCacheControlXAttr(m_info_file->
getFD(),
nullptr, cc_str);
572 TRACE(
Error,
"GetFile() XrdCl::File::Fcntl query XrdCl::QueryCode::FInfo failed " << inputIO->
Path());
576 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
578 " block size = " << pfc_blocksize);
582 if (futimens(m_info_file->
getFD(), NULL)) {
586 TRACEF(
Info, tpfx <<
"URL CGI pfc.blocksize ignored for an already existing file");
594 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
595 m_prefetch_max_blocks_in_flight = pfc_prefetch;
597 TRACEF(
Debug, tpfx <<
"pfc.prefetch set to " << pfc_prefetch <<
" via CGI parameter");
599 m_data_file->
Fstat(&data_stat);
600 m_st_blocks = data_stat.st_blocks;
603 constexpr
long long MB = 1024 * 1024;
604 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
613 void File::parse_pfc_url_args(
XrdOucCacheIO* inputIO,
long long &pfc_blocksize,
int &pfc_prefetch)
const
618 auto const & urlp = url.GetParams();
620 auto extract = [&](
const std::string &key, std::string &value) ->
bool {
621 auto it = urlp.find(key);
622 if (it != urlp.end()) {
634 const char *tpfx =
"File::Open::urlcgi pfc.blocksize ";
639 pfc_blocksize = bsize;
641 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
646 const char *tpfx =
"File::Open::urlcgi pfc.prefetch ";
648 if (
Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
653 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
671 if ((res = m_data_file->
Fstat(&sbuff)))
return res;
673 sbuff.st_size = m_file_size;
675 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
686 bool File::overlap(
int blk,
695 const long long beg = blk * blk_size;
696 const long long end = beg + blk_size;
697 const long long req_end = req_off + req_size;
699 if (req_off < end && req_end > beg)
701 const long long ovlp_beg = std::max(beg, req_off);
702 const long long ovlp_end = std::min(end, req_end);
704 off = ovlp_beg - req_off;
705 blk_off = ovlp_beg - beg;
706 size = (int) (ovlp_end - ovlp_beg);
708 assert(size <= blk_size);
719 Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
727 const long long off = i * m_block_size;
728 const int last_block = m_num_blocks - 1;
729 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
731 int blk_size, req_size;
732 if (i == last_block) {
733 blk_size = req_size = m_file_size - off;
734 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
736 blk_size = req_size = m_block_size;
740 char *buf = cache()->RequestRAM(req_size);
744 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
752 if (m_prefetch_state == kOn && (
int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
754 m_prefetch_state = kHold;
755 cache()->DeRegisterPrefetchFile(
this);
760 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
767 void File::ProcessBlockRequest(
Block *b)
775 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
776 b->
get_offset()/m_block_size, (
void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (
void*)b->get_buff(), (
void*)brh);
777 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
793 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
795 ProcessBlockRequest(*bi);
801 void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
803 int n_chunks = ioVec.size();
806 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
807 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
817 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
822 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
824 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
826 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
830 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
834 if (rs != expected_size)
836 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
855 if (m_in_shutdown || io->m_in_detach)
858 return m_in_shutdown ? -ENOENT : -EBADF;
866 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
875 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
877 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
884 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
888 if (m_in_shutdown || io->m_in_detach)
891 return m_in_shutdown ? -ENOENT : -EBADF;
908 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
913 int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
925 int prefetch_cnt = 0;
930 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
932 std::vector<XrdOucIOVec> iovec_disk;
933 std::vector<XrdOucIOVec> iovec_direct;
934 int iovec_disk_total = 0;
935 int iovec_direct_total = 0;
937 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
944 const int idx_first = iUserOff / m_block_size;
945 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
947 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
949 enum LastBlock_e { LB_other, LB_disk, LB_direct };
951 LastBlock_e lbe = LB_other;
953 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
956 BlockMap_i bi = m_block_map.find(block_idx);
963 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
966 if (bi != m_block_map.end())
968 inc_ref_count(bi->second);
969 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
971 if (bi->second->is_finished())
975 assert(bi->second->is_ok());
977 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
979 if (bi->second->m_prefetch)
990 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
999 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
1002 iovec_disk.back().size += size;
1004 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
1005 iovec_disk_total += size;
1019 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
1022 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
1024 blks_to_request.push_back(b);
1033 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
1035 iovec_direct_total += size;
1042 iovec_direct.back().size += size;
1044 long long in_offset = block_idx * m_block_size + blk_off;
1045 char *out_pos = iUserBuff + off;
1052 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1061 inc_prefetch_hit_cnt(prefetch_cnt);
1066 if ( ! blks_to_request.empty())
1068 ProcessBlockRequests(blks_to_request);
1069 blks_to_request.clear();
1073 if ( ! iovec_direct.empty())
1075 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1077 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
1082 long long bytes_read = 0;
1086 if ( ! blks_ready.empty())
1088 for (
auto &bvi : blks_ready)
1090 for (
auto &cr : bvi.second)
1092 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
1093 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1094 bytes_read += cr.m_size;
1100 if ( ! iovec_disk.empty())
1102 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1103 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1118 m_state_cond.
Lock();
1120 for (
auto &bvi : blks_ready)
1121 dec_ref_count(bvi.first, (
int) bvi.second.size());
1135 check_delta_stats();
1145 return -EWOULDBLOCK;
1151 check_delta_stats();
1156 return error_cond ? error_cond : bytes_read;
1168 long long offset = b->
m_offset - m_offset;
1183 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1185 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1195 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1198 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1200 bool schedule_sync =
false;
1219 m_writes_during_sync.push_back(blk_idx);
1224 ++m_non_flushed_cnt;
1228 schedule_sync =
true;
1230 m_non_flushed_cnt = 0;
1236 if (!schedule_sync) {
1243 cache()->ScheduleFileSync(
this);
1255 int ret = m_data_file->
Fsync();
1256 bool errorp =
false;
1262 report_and_merge_delta_stats();
1263 loc_stats = m_stats;
1266 m_cfi.
Write(m_info_file, m_filename.c_str());
1267 int cret = m_info_file->
Fsync();
1270 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1276 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1282 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1289 m_writes_during_sync.clear();
1295 int written_while_in_sync;
1296 bool resync =
false;
1299 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1303 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1304 m_writes_during_sync.clear();
1308 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1313 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1324 void File::free_block(
Block* b)
1327 int i = b->
m_offset / m_block_size;
1328 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1329 size_t ret = m_block_map.erase(i);
1333 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1341 if (m_prefetch_state == kHold && (
int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1343 m_prefetch_state = kOn;
1344 cache()->RegisterPrefetchFile(
this);
1350 bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1354 int io_size = (int) m_io_set.size();
1359 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1362 m_current_io = m_io_set.begin();
1365 else if (io_size > 1)
1367 IoSet_i mi = m_current_io;
1368 if (skip_current && mi != m_io_set.end()) ++mi;
1370 for (
int i = 0; i < io_size; ++i)
1372 if (mi == m_io_set.end()) mi = m_io_set.begin();
1374 if ((*mi)->m_allow_prefetching)
1386 m_current_io = m_io_set.end();
1387 m_prefetch_state = kStopped;
1388 cache()->DeRegisterPrefetchFile(
this);
1396 void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1402 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1404 m_state_cond.
Lock();
1420 FinalizeReadRequest(rreq);
1447 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1450 m_state_cond.
Lock();
1455 rreq->m_stats.m_BytesMissed += creq.
m_size;
1457 rreq->m_stats.m_BytesHit += creq.
m_size;
1459 --rreq->m_n_chunk_reqs;
1462 inc_prefetch_hit_cnt(1);
1466 bool rreq_complete = rreq->is_complete();
1471 FinalizeReadRequest(rreq);
1481 check_delta_stats();
1488 void File::ProcessBlockResponse(
Block *b,
int res)
1490 static const char* tpfx =
"ProcessBlockResponse ";
1492 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1494 if (res >= 0 && res != b->
get_size())
1498 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1502 m_state_cond.
Lock();
1508 IoSet_i mi = m_io_set.find(io);
1509 if (mi != m_io_set.end())
1511 --io->m_active_prefetches;
1514 if (res < 0 && io->m_allow_prefetching)
1516 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1517 io->m_allow_prefetching =
false;
1520 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1522 if ( ! select_current_io_or_disable_prefetching(
false) )
1524 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1530 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1547 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1548 if ( ! m_in_shutdown)
1554 cache()->AddWriteTask(b,
true);
1563 for (
auto &creq : creqs_to_notify)
1565 ProcessBlockSuccess(b, creq);
1574 <<
", io=" << b->
get_io() <<
", error=" << res);
1579 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1580 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1591 std::list<ReadRequest*> rreqs_to_complete;
1600 ProcessBlockError(b, rreq);
1603 rreqs_to_complete.push_back(rreq);
1608 creqs_to_keep.push_back(creq);
1612 bool reissue =
false;
1613 if ( ! creqs_to_keep.empty())
1615 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1617 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1618 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1627 for (
auto rreq : rreqs_to_complete)
1628 FinalizeReadRequest(rreq);
1631 ProcessBlockRequest(b);
1639 return m_filename.c_str();
1644 int File::offsetIdx(
int iIdx)
const
1646 return iIdx - m_offset/m_block_size;
1660 TRACEF(DumpXL,
"Prefetch() entering.");
1664 if (m_prefetch_state != kOn)
1669 if ( ! select_current_io_or_disable_prefetching(
true) )
1671 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1676 for (
int f = 0; f < m_num_blocks; ++f)
1680 int f_act = f + m_offset / m_block_size;
1682 BlockMap_i bi = m_block_map.find(f_act);
1683 if (bi == m_block_map.end())
1685 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1688 TRACEF(Dump,
"Prefetch take block " << f_act);
1692 inc_prefetch_read_cnt(1);
1697 TRACEF(
Warning,
"Prefetch allocation failed for block " << f_act);
1706 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1707 m_prefetch_state = kComplete;
1708 cache()->DeRegisterPrefetchFile(
this);
1712 (*m_current_io)->m_active_prefetches += (int) blks.size();
1716 if ( ! blks.empty())
1718 ProcessBlockRequests(blks);
1727 return m_prefetch_score;
1740 void File::insert_remote_location(
const std::string &loc)
1744 size_t p = loc.find_first_of(
'@');
1745 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1752 if ( ! m_remote_locations.empty())
1756 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1760 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1763 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1765 s +=
'"'; s += *i; s +=
'"';
1766 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Fstat(struct stat *buf)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int Fcntl(XrdOucCacheOp::Code opc, const std::string &args, std::string &resp)
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
void * get_req_id() const
long long get_offset() const
vChunkRequest_t m_chunk_reqs
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
bool blocksize_str2value(const char *from, const char *str, long long &val, long long min, long long max) const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
XrdSysTrace * GetTrace() const
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
static const Cache & TheOne()
XrdSysError * GetLog() const
void Done(int result) override
const char * lPath() const
Log path.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
friend class BlockResponseHandler
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
void Sync()
Sync file cache inf o and output data with disk.
float GetPrefetchScore() const
int Fstat(struct stat &sbuff)
friend class DirectResponseHandler
void WriteBlockToDisk(Block *b)
long long initiate_emergency_shutdown()
XrdSysError * GetLog() const
std::string GetRemoteLocations() const
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysTrace * GetTrace() const
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
void AddWriteStats(long long bytes_written, int n_cks_errs)
long long BytesReadAndWritten() const
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
long long m_BytesWritten
number of bytes written to disk
void IoDetach(int duration)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, time_t timeout=0)
Factory for creating ReadImpl objects.
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
cache block size, default 128 kB
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
std::string m_meta_space
oss space for metadata files (cinfo)
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
std::string m_username
username passed to oss plugin
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
void update_error_cond(int ec)