XRootD
Loading...
Searching...
No Matches
XrdPfc::ResourceMonitor Class Reference

#include <XrdPfcResourceMonitor.hh>

+ Collaboration diagram for XrdPfc::ResourceMonitor:

Public Member Functions

 ResourceMonitor (XrdOss &oss)
 
 ~ResourceMonitor ()
 
void CrossCheckIfScanIsInProgress (const std::string &lfn, XrdSysCondVar &cond)
 
void fill_pshot_vec_children (const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
 
void fill_sshot_vec_children (const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
 
void heart_beat ()
 
void init_before_main ()
 
void main_thread_function ()
 
bool perform_initial_scan ()
 
void perform_purge_check (bool purge_cold_files, int tl)
 
void perform_purge_task (DataFsPurgeshot &ps)
 
void perform_purge_task_cleanup ()
 
int process_queues ()
 
void register_file_close (int token_id, time_t close_timestamp, const Stats &full_stats)
 
int register_file_open (const std::string &filename, time_t open_timestamp, bool existing_file)
 
void register_file_purge (const std::string &filename, long long size_in_st_blocks)
 
void register_file_purge (DirState *target, long long size_in_st_blocks)
 
void register_file_update_stats (int token_id, const Stats &stats)
 
void register_multi_file_purge (const std::string &target, long long size_in_st_blocks, int n_files)
 
void register_multi_file_purge (DirState *target, long long size_in_st_blocks, int n_files)
 
void scan_dir_and_recurse (FsTraversal &fst)
 
AccessToken & token (int i)
 
void update_vs_and_file_usage_info ()
 

Public Attributes

bool m_purge_task_active {false}
 
bool m_purge_task_complete {false}
 
XrdSysCondVar m_purge_task_cond {0}
 
time_t m_purge_task_end {0}
 
time_t m_purge_task_start {0}
 

Detailed Description

Definition at line 37 of file XrdPfcResourceMonitor.hh.

Constructor & Destructor Documentation

◆ ResourceMonitor()

ResourceMonitor::ResourceMonitor ( XrdOss & oss)

Definition at line 32 of file XrdPfcResourceMonitor.cc.

32 :
33 m_fs_state(* new DataFsState),
34 m_oss(oss)
35{}

◆ ~ResourceMonitor()

ResourceMonitor::~ResourceMonitor ( )

Definition at line 37 of file XrdPfcResourceMonitor.cc.

38{
39 delete &m_fs_state;
40}

Member Function Documentation

◆ CrossCheckIfScanIsInProgress()

void ResourceMonitor::CrossCheckIfScanIsInProgress ( const std::string & lfn,
XrdSysCondVar & cond )

Definition at line 46 of file XrdPfcResourceMonitor.cc.

47{
48 m_dir_scan_mutex.Lock();
49 if (m_dir_scan_in_progress) {
50 bool dir_checked = false;
51 m_dir_scan_open_requests.push_back({lfn, cond, dir_checked});
52 cond.Lock();
53 m_dir_scan_mutex.UnLock();
54 while ( ! dir_checked)
55 cond.Wait();
56 cond.UnLock();
57 } else {
58 m_dir_scan_mutex.UnLock();
59 }
60}

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ fill_pshot_vec_children()

void ResourceMonitor::fill_pshot_vec_children ( const DirState & parent_ds,
int parent_idx,
std::vector< DirPurgeElement > & vec,
int max_depth )

Definition at line 517 of file XrdPfcResourceMonitor.cc.

521{
522 int pos = vec.size();
523 int n_children = parent_ds.m_subdirs.size();
524
525 DirPurgeElement &parent_dpe = vec[parent_idx];
526 parent_dpe.m_daughters_begin = pos;
527 parent_dpe.m_daughters_end = pos + n_children;
528
529 if (n_children == 0) return;
530
531 for (auto const & [name, child] : parent_ds.m_subdirs)
532 {
533 vec.emplace_back( DirPurgeElement(child, child.m_here_usage, child.m_recursive_subdir_usage, parent_idx) );
534 }
535
536 if (parent_ds.m_depth < max_depth)
537 {
538 for (auto const & [name, child] : parent_ds.m_subdirs)
539 {
540 fill_pshot_vec_children(child, pos, vec, max_depth);
541 ++pos;
542 }
543 }
544}
static void child()
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)

References child(), fill_pshot_vec_children(), XrdPfc::DirPurgeElement::m_daughters_begin, XrdPfc::DirPurgeElement::m_daughters_end, XrdPfc::DirState::m_depth, and XrdPfc::DirState::m_subdirs.

Referenced by fill_pshot_vec_children(), and perform_purge_check().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fill_sshot_vec_children()

void ResourceMonitor::fill_sshot_vec_children ( const DirState & parent_ds,
int parent_idx,
std::vector< DirStateElement > & vec,
int max_depth )

Definition at line 488 of file XrdPfcResourceMonitor.cc.

492{
493 int pos = vec.size();
494 int n_children = parent_ds.m_subdirs.size();
495
496 DirStateElement &parent_dse = vec[parent_idx];
497 parent_dse.m_daughters_begin = pos;
498 parent_dse.m_daughters_end = pos + n_children;
499
500 if (n_children == 0) return;
501
502 for (auto const & [name, child] : parent_ds.m_subdirs)
503 {
504 vec.emplace_back( DirStateElement(child, parent_idx) );
505 }
506
507 if (parent_ds.m_depth < max_depth)
508 {
509 for (auto const & [name, child] : parent_ds.m_subdirs)
510 {
511 fill_sshot_vec_children(child, pos, vec, max_depth);
512 ++pos;
513 }
514 }
515}
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)

References child(), fill_sshot_vec_children(), XrdPfc::DirStateElement::m_daughters_begin, XrdPfc::DirStateElement::m_daughters_end, XrdPfc::DirState::m_depth, and XrdPfc::DirState::m_subdirs.

Referenced by fill_sshot_vec_children(), and heart_beat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ heart_beat()

void ResourceMonitor::heart_beat ( )

Definition at line 351 of file XrdPfcResourceMonitor.cc.

352{
353 static const char *tpfx = "heart_beat() ";
354
355 const Configuration &conf = Cache::Conf();
356
357 const int s_queue_proc_interval = 10;
358 const int s_sshot_report_interval = conf.m_dirStatsInterval; // 1, 5, 10, 15, 30 or 60 minutes
359 const int s_purge_check_interval = 60;
360 const int s_purge_report_interval = conf.m_purgeInterval;
361 const int s_purge_cold_files_interval = conf.m_purgeInterval * conf.m_purgeAgeBasedPeriod;
362
363 // initial scan performed as part of config
364
365 time_t now = time(0);
366 time_t next_queue_proc_time = now + s_queue_proc_interval;
367 time_t next_sshot_report_time = (now / 60) * 60 + 60; // at next full minute
368 time_t next_purge_check_time = now + s_purge_check_interval;
369 time_t next_purge_report_time = now + s_purge_report_interval;
370 time_t next_purge_cold_files_time = now + s_purge_cold_files_interval;
371
372 while (true)
373 {
374 time_t start = time(0);
375 time_t next_event = std::min({ next_queue_proc_time, next_sshot_report_time,
376 next_purge_check_time, next_purge_report_time, next_purge_cold_files_time });
377
378 if (next_event > start)
379 {
380 unsigned int t_sleep = next_event - start;
381 TRACE(Debug, tpfx << "sleeping for " << t_sleep << " seconds until the next beat.");
382 sleep(t_sleep);
383 }
384
385 // Check if purge has been running and has completed yet.
386 // For now this is only used to prevent removal of empty leaf directories
387 // during stat propagation so we do not need to wait for the condition in
388 // the above sleep.
390 MutexHolder _lck(m_purge_task_cond);
393 }
394 }
395
396 time_t queue_swap_time = time(0);
397
398 // Always process the queues.
399 int n_processed = process_queues();
400 next_queue_proc_time = queue_swap_time + s_queue_proc_interval;
401 TRACE(Debug, tpfx << "process_queues -- n_records=" << n_processed);
402
403 // Always update basic info on m_fs_state (space, usage, file_usage).
405
406 now = time(0);
407
408 // Make planning for fs_state_update, sshot dump and purge task.
409 // Second two require the first, so figure out what is going to happen.
410 bool do_sshot_report = next_sshot_report_time <= now;
411 bool do_purge_check = next_purge_check_time <= now;
412 bool do_purge_report = next_purge_report_time <= now;
413 bool do_purge_cold_files = next_purge_cold_files_time <= now;
414
415 // Update stats in usages if any secondary activity will happen.
416 if (do_sshot_report || do_purge_check || do_purge_report || do_purge_cold_files)
417 {
418 unlink_func unlink_foo = [&](const std::string &dp)->int {
419 int ret = m_oss.Unlink(dp.c_str());
420 if (ret != 0) {
421 TRACE(Info, tpfx << "Empty dir unlink error: " << ret << " at " << dp);
422 } else {
423 TRACE(Debug, tpfx << "Empty dir unlink success: " << dp);
424 }
425 return ret;
426 };
427
428 // Potentially prune the empty leaf dirs even less frequently, once per hour, maybe?
429 bool purge_leaf_dirs = do_sshot_report && ! m_purge_task_active;
430 m_fs_state.update_stats_and_usages(queue_swap_time, purge_leaf_dirs, unlink_foo);
431
432 // This reporting into log/stdout is to be removed.
433 // Meaning of conf.is_dir_stat_reporting_on() etc is to be clarified / improved.
434 if (do_sshot_report && conf.is_dir_stat_reporting_on())
435 {
436 const int store_depth = conf.m_dirStatsStoreDepth;
437 #ifdef RM_DEBUG
438 const DirState &root_ds = *m_fs_state.get_root();
439 dprintf("Snapshot n_dirs=%d, total n_dirs=%d\n", root_ds.count_dirs_to_level(store_depth),
441 #endif
442 m_fs_state.dump_recursively(store_depth);
443 }
444
445 m_fs_state.reset_stats(queue_swap_time);
446 }
447
448 if (do_sshot_report)
449 {
450 // Sshot reports are equidistant, at "full" reporting interval.
451 next_sshot_report_time = ((now + 1) / s_sshot_report_interval) * s_sshot_report_interval + s_sshot_report_interval;
452
453 // This should dump out binary snapshot into /pfc-stats/, if so configured.
454
455 // json dump to std::out for debug purpose
456 DataFsSnapshot ss(m_fs_state, m_fs_state.m_sshot_stats_reset_time);
457 const DirState &root_ds = *m_fs_state.get_root();
458 const int store_depth = conf.m_dirStatsStoreDepth;
459 const int n_sshot_dirs = root_ds.count_dirs_to_level(store_depth);
460 ss.m_dir_states.reserve(n_sshot_dirs);
461 ss.m_dir_states.emplace_back( DirStateElement(root_ds, -1) );
462 fill_sshot_vec_children(root_ds, 0, ss.m_dir_states, store_depth);
463
464 // This should really be export to a file (preferably binary, but then bin->json command is needed, too).
465 // ss.dump();
466
467 const char* dumpfile = "/pfc-stats/DirStat.json";
468 ss.write_json_file(dumpfile, m_oss, false);
469 m_fs_state.reset_sshot_stats(queue_swap_time);
470 }
471
472 if (do_purge_check || do_purge_report || do_purge_cold_files)
473 {
474 perform_purge_check(do_purge_cold_files, do_purge_report ? TRACE_Info : TRACE_Debug);
475
476 next_purge_check_time = now + s_purge_check_interval;
477 if (do_purge_report) next_purge_report_time = now + s_purge_report_interval;
478 if (do_purge_cold_files) next_purge_cold_files_time = now + s_purge_cold_files_interval;
479 }
480
481 } // end while forever
482}
#define TRACE_Debug
#define TRACE_Info
#define dprintf(...)
bool Debug
#define TRACE(act, x)
Definition XrdTrace.hh:63
static const Configuration & Conf()
Definition XrdPfc.cc:134
void perform_purge_check(bool purge_cold_files, int tl)
std::function< int(const std::string &)> unlink_func
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:106
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition XrdPfc.hh:99
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition XrdPfc.hh:104
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:97
bool is_dir_stat_reporting_on() const
Definition XrdPfc.hh:70
DirUsage m_recursive_subdir_usage
int count_dirs_to_level(int max_depth) const

References XrdPfc::Cache::Conf(), XrdPfc::DirState::count_dirs_to_level(), Debug, dprintf, fill_sshot_vec_children(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdPfc::DataFsSnapshot::m_dir_states, XrdPfc::Configuration::m_dirStatsInterval, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::DirState::m_here_usage, XrdPfc::DirUsage::m_NDirectories, m_purge_task_active, m_purge_task_complete, m_purge_task_cond, XrdPfc::Configuration::m_purgeAgeBasedPeriod, XrdPfc::Configuration::m_purgeInterval, XrdPfc::DirState::m_recursive_subdir_usage, perform_purge_check(), process_queues(), TRACE, TRACE_Debug, TRACE_Info, update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

Referenced by main_thread_function().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ init_before_main()

void ResourceMonitor::init_before_main ( )

Definition at line 826 of file XrdPfcResourceMonitor.cc.

827{
828 // setup for in-scan -- this is called from initial setup.
829 MutexHolder _lck(m_dir_scan_mutex);
830 m_dir_scan_in_progress = true;
831}

◆ main_thread_function()

void ResourceMonitor::main_thread_function ( )

Definition at line 833 of file XrdPfcResourceMonitor.cc.

834{
835 const char *tpfx = "main_thread_function ";
836 {
837 time_t is_start = time(0);
838 m_fs_state.init_stat_reset_times(is_start);
839 TRACE(Info, tpfx << "Stating initial directory scan.");
840
841 if ( ! perform_initial_scan()) {
842 TRACE(Error, tpfx << "Initial directory scan has failed. This is a terminal error, aborting.")
843 _exit(1);
844 }
845 // Reset of m_dir_scan_in_progress is done in perform_initial_scan()
846
847 time_t is_duration = time(0) - is_start;
848 TRACE(Info, tpfx << "Initial directory scan complete, duration=" << is_duration <<"s");
849
850 // run first process queues
851 int n_proc_is = process_queues();
852 TRACE(Info, tpfx << "First process_queues finished, n_records=" << n_proc_is);
853
854 // shrink queues if scan time was longer than 30s.
855 if (is_duration > 30 || n_proc_is > 3000)
856 {
857 m_file_open_q.shrink_read_queue();
858 m_file_update_stats_q.shrink_read_queue();
859 m_file_close_q.shrink_read_queue();
860 m_file_purge_q1.shrink_read_queue();
861 m_file_purge_q2.shrink_read_queue();
862 m_file_purge_q3.shrink_read_queue();
863 }
864 }
865 heart_beat();
866}
if(ec< 0) ec

References Error, heart_beat(), perform_initial_scan(), process_queues(), and TRACE.

Referenced by ResourceMonitorThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ perform_initial_scan()

bool ResourceMonitor::perform_initial_scan ( )

Definition at line 171 of file XrdPfcResourceMonitor.cc.

172{
173 // Called after PFC configuration is complete, but before full startup of the daemon.
174 // Base line usages are accumulated as part of the file-system, traversal.
175
177
178 DirState *root_ds = m_fs_state.get_root();
179 FsTraversal fst(m_oss);
180 fst.m_protected_top_dirs.insert("pfc-stats"); // XXXX This should come from config. Also: N2N?
181
182 if ( ! fst.begin_traversal(root_ds, "/"))
183 return false;
184
185 // The following are initialized in ResourceMonitor.hh to avoid a race at startup:
186 // m_dir_scan_in_progress = true;
187 // m_dir_scan_check_counter = 0;
188
190
191 fst.end_traversal();
192
193 // We have all directories scanned, available in DirState tree, let all remaining files go
194 // and then we shall do the upward propagation of usages.
195 {
196 XrdSysMutexHelper _lock(m_dir_scan_mutex);
197 m_dir_scan_in_progress = false;
198 m_dir_scan_check_counter = 0;
199 }
200 // m_dir_scan_open_requests should now be final, ie, no new entries will be added.
201 while ( ! m_dir_scan_open_requests.empty())
202 {
203 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
204 lcr.f_cond.Lock();
205 lcr.f_checked = true;
206 lcr.f_cond.Signal();
207 lcr.f_cond.UnLock();
208
209 m_dir_scan_open_requests.pop_front();
210 }
211
212 // Do upward propagation of usages.
214 m_current_usage_in_st_blocks = root_ds->m_here_usage.m_StBlocks +
217
218 return true;
219}
void scan_dir_and_recurse(FsTraversal &fst)
void upward_propagate_initial_scan_usages()

References XrdPfc::FsTraversal::begin_traversal(), XrdPfc::FsTraversal::end_traversal(), XrdSysCondVar::Lock(), XrdPfc::DirState::m_here_usage, XrdPfc::FsTraversal::m_protected_top_dirs, XrdPfc::DirState::m_recursive_subdir_usage, XrdPfc::DirUsage::m_StBlocks, scan_dir_and_recurse(), XrdSysCondVar::Signal(), XrdSysCondVar::UnLock(), update_vs_and_file_usage_info(), and XrdPfc::DirState::upward_propagate_initial_scan_usages().

Referenced by main_thread_function().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ perform_purge_check()

void ResourceMonitor::perform_purge_check ( bool purge_cold_files,
int tl )

Definition at line 682 of file XrdPfcResourceMonitor.cc.

683{
684 static const char *trc_pfx = "perform_purge_check() ";
685 const Configuration &conf = Cache::Conf();
686
687 std::unique_ptr<DataFsPurgeshot> psp( new DataFsPurgeshot(m_fs_state) );
688 DataFsPurgeshot &ps = *psp;
689
690 ps.m_file_usage = 512ll * m_current_usage_in_st_blocks;
691 // These are potentially wrong as cache might be writing over preallocated byte ranges.
693 // Can have another estimate based on eiter writes or st-blocks from purge-stats, once we have them.
694
695 TRACE_INT(tl, trc_pfx << "Purge check:");
696
697 ps.m_bytes_to_remove = 0;
698 if (conf.are_file_usage_limits_set())
699 {
700 ps.m_bytes_to_remove = get_file_usage_bytes_to_remove(ps, ps.m_estimated_writes_from_writeq, tl);
701 }
702 else
703 {
704 if (ps.m_disk_used > conf.m_diskUsageHWM)
705 {
706 TRACE_INT(tl, "Disk usage: " << ps.m_disk_used << " exceed highWatermark.");
708 }
709 }
710
712
713 // Purge precheck -- check if age-based purge is required
714 // We ignore uvkeep time, it requires reading of cinfo files and it is enforced in File::Open() anyway.
715
716 if (purge_cold_files && conf.is_age_based_purge_in_effect()) // || conf.is_uvkeep_purge_in_effect())
717 {
718 ps.m_age_based_purge = true;
719 }
720
721 TRACE_INT(tl, "\tbytes_to_remove = " << ps.m_bytes_to_remove << " B");
722 TRACE_INT(tl, "\tspace_based_purge = " << ps.m_space_based_purge);
723 TRACE_INT(tl, "\tage_based_purge = " << ps.m_age_based_purge);
724
725 bool periodic = Cache::GetInstance().GetPurgePin() ?
727
728 if ( ! ps.m_space_based_purge && ! ps.m_age_based_purge && !periodic ) {
729 TRACE(Info, trc_pfx << "purge not required.");
731 return;
732 }
734 TRACE(Warning, trc_pfx << "purge required but previous purge task is still active!");
735 return;
736 }
737
738 TRACE(Info, trc_pfx << "scheduling purge task.");
739
740 // At this point we have all the information: report, decide on action.
741 // There is still some missing infrastructure, especially as regards to purge-plugin:
742 // - at what point do we start bugging the pu-pin to start coughing up purge lists?
743 // - have a new parameter or just do it "one cycle before full"?
744 // - what if it doesn't -- when do we do the old-stlye scan & purge?
745 // - how do we do age-based purge and uvkeep purge?
746 // - they are really quite different -- and could run separately, registering
747 // files into a purge-candidate list. This has to be rechecked before the actual
748 // deletion -- eg, by comparing stat time of cinfo + doing the is-active / is-purge-protected.
749
750 const DirState &root_ds = *m_fs_state.get_root();
751 const int n_calc_dirs = 1 + root_ds.m_here_usage.m_NDirectories + root_ds.m_recursive_subdir_usage.m_NDirectories;
752#ifdef RM_DEBUG
753 const int n_pshot_dirs = root_ds.count_dirs_to_level(9999);
754 dprintf("purge dir count recursive=%d vs from_usage=%d\n", n_pshot_dirs, n_calc_dirs);
755#endif
756 ps.m_dir_vec.reserve(n_calc_dirs);
757 ps.m_dir_vec.emplace_back( DirPurgeElement(root_ds, root_ds.m_here_usage, root_ds.m_recursive_subdir_usage, -1) );
758 fill_pshot_vec_children(root_ds, 0, ps.m_dir_vec, 9999);
759
760 m_purge_task_active = true;
761
762 struct PurgeDriverJob : public XrdJob
763 {
764 DataFsPurgeshot *m_purge_shot_ptr;
765
766 PurgeDriverJob(DataFsPurgeshot *psp) :
767 XrdJob("XrdPfc::ResourceMonitor::PurgeDriver"),
768 m_purge_shot_ptr(psp)
769 {}
770
771 void DoIt() override
772 {
773 Cache::ResMon().perform_purge_task(*m_purge_shot_ptr);
775
776 delete m_purge_shot_ptr;
777 delete this;
778 }
779 };
780
781 Cache::schedP->Schedule( new PurgeDriverJob(psp.release()) );
782}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define TRACE_INT(act, x)
XrdJob(const char *desc="")
Definition XrdJob.hh:51
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
void ClearPurgeProtectedSet()
Definition XrdPfc.cc:684
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
static XrdScheduler * schedP
Definition XrdPfc.hh:290
long long WritesSinceLastCall()
Definition XrdPfc.cc:320
PurgePin * GetPurgePin() const
Definition XrdPfc.hh:272
virtual bool CallPeriodically()
void perform_purge_task(DataFsPurgeshot &ps)
void Schedule(XrdJob *jp)
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:93
bool are_file_usage_limits_set() const
Definition XrdPfc.hh:67
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:92
bool is_age_based_purge_in_effect() const
Definition XrdPfc.hh:68
std::vector< DirPurgeElement > m_dir_vec

References XrdPfc::Configuration::are_file_usage_limits_set(), XrdPfc::PurgePin::CallPeriodically(), XrdPfc::Cache::ClearPurgeProtectedSet(), XrdPfc::Cache::Conf(), XrdPfc::DirState::count_dirs_to_level(), DoIt(), dprintf, fill_pshot_vec_children(), XrdPfc::Cache::GetInstance(), XrdPfc::Cache::GetPurgePin(), XrdPfc::Configuration::is_age_based_purge_in_effect(), XrdPfc::DataFsPurgeshot::m_age_based_purge, XrdPfc::DataFsPurgeshot::m_bytes_to_remove, XrdPfc::DataFsPurgeshot::m_dir_vec, XrdPfc::DataFsStateBase::m_disk_used, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::DataFsPurgeshot::m_estimated_writes_from_writeq, XrdPfc::DataFsStateBase::m_file_usage, XrdPfc::DirState::m_here_usage, XrdPfc::DirUsage::m_NDirectories, m_purge_task_active, XrdPfc::DirState::m_recursive_subdir_usage, XrdPfc::DataFsPurgeshot::m_space_based_purge, perform_purge_task(), perform_purge_task_cleanup(), XrdPfc::Cache::ResMon(), XrdPfc::Cache::schedP, TRACE, TRACE_INT, and XrdPfc::Cache::WritesSinceLastCall().

Referenced by heart_beat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ perform_purge_task()

void ResourceMonitor::perform_purge_task ( DataFsPurgeshot & ps)

Definition at line 789 of file XrdPfcResourceMonitor.cc.

790{
791 // BEWARE: Runs in a dedicated thread - is only to communicate back to the
792 // hear_beat() / data structs via the purge queues and condition variable.
793
794 // const char *tpfx = "perform_purge_task ";
795
796 {
797 MutexHolder _lck(m_purge_task_cond);
798 m_purge_task_start = time(0);
799 }
800
801 // For now, fall back to the old purge ... to be improved with:
802 // - new scan, following the DataFsPurgeshot;
803 // - usage of cinfo stat mtime for time of last access (touch already done at output);
804 // - use DirState* to report back purged files.
805 // Already changed to report back purged files --- but using the string / path variant.
806 OldStylePurgeDriver(ps); // In XrdPfcPurge.cc
807}
void OldStylePurgeDriver(DataFsPurgeshot &ps)

References m_purge_task_cond, m_purge_task_start, and XrdPfc::OldStylePurgeDriver().

Referenced by perform_purge_check().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ perform_purge_task_cleanup()

void ResourceMonitor::perform_purge_task_cleanup ( )

Definition at line 809 of file XrdPfcResourceMonitor.cc.

810{
811 // Separated out so the purge_task can exit without post-checks.
812
813 {
814 MutexHolder _lck(m_purge_task_cond);
815 m_purge_task_end = time(0);
817 m_purge_task_cond.Signal();
818 }
820}

References XrdPfc::Cache::ClearPurgeProtectedSet(), XrdPfc::Cache::GetInstance(), m_purge_task_complete, m_purge_task_cond, and m_purge_task_end.

Referenced by perform_purge_check().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ process_queues()

int ResourceMonitor::process_queues ( )

Definition at line 225 of file XrdPfcResourceMonitor.cc.

226{
227 static const char *trc_pfx = "process_queues() ";
228
229 // Assure that we pick up only entries that are present now.
230 // We really want all open records to be processed before file-stats updates
231 // and all those before the close records.
232 // Purges are sort of tangential as they really just modify bytes / number
233 // of files in a directory and do not deal with any persistent file id tokens.
234
235 int n_records = 0;
236 {
237 XrdSysMutexHelper _lock(&m_queue_mutex);
238 n_records += m_file_open_q.swap_queues();
239 n_records += m_file_update_stats_q.swap_queues();
240 n_records += m_file_close_q.swap_queues();
241 n_records += m_file_purge_q1.swap_queues();
242 n_records += m_file_purge_q2.swap_queues();
243 n_records += m_file_purge_q3.swap_queues();
244 ++m_queue_swap_u1;
245 }
246
247 for (auto &i : m_file_open_q.read_queue())
248 {
249 // i.id: LFN, i.record: OpenRecord
250 AccessToken &at = token(i.id);
251 dprintf("process file open for token %d, time %ld -- %s\n",
252 i.id, i.record.m_open_time, at.m_filename.c_str());
253
254 // Resolve fname into DirState.
255 // We could clear the filename after this ... or keep it, should we need it later on.
256 // For now it is just used for debug printouts.
257 DirState *last_existing_ds = nullptr;
258 DirState *ds = m_fs_state.find_dirstate_for_lfn(at.m_filename, &last_existing_ds);
259 at.m_dir_state = ds;
261
262 // If this is a new file figure out how many new parent dirs got created along the way.
263 if ( ! i.record.m_existing_file) {
265 DirState *pp = ds;
266 while (pp != last_existing_ds) {
267 pp = pp->get_parent();
269 }
270 }
271
272 ds->m_here_usage.m_LastOpenTime = i.record.m_open_time;
273 }
274
275 for (auto &i : m_file_update_stats_q.read_queue())
276 {
277 // i.id: token, i.record: Stats
278 AccessToken &at = token(i.id);
279 // Stats
280 DirState *ds = at.m_dir_state;
281 dprintf("process file update for token %d, %p -- %s\n",
282 i.id, ds, at.m_filename.c_str());
283
284 ds->m_here_stats.AddUp(i.record);
285 m_current_usage_in_st_blocks += i.record.m_StBlocksAdded;
286 }
287
288 for (auto &i : m_file_close_q.read_queue())
289 {
290 // i.id: token, i.record: CloseRecord
291 AccessToken &at = token(i.id);
292 dprintf("process file close for token %d, time %ld -- %s\n",
293 i.id, i.record.m_close_time, at.m_filename.c_str());
294
295 DirState *ds = at.m_dir_state;
297 ds->m_here_usage.m_LastCloseTime = i.record.m_close_time;
298
299 at.clear();
300 }
301 { // Release the AccessToken slots under lock.
302 XrdSysMutexHelper _lock(&m_queue_mutex);
303 for (auto &i : m_file_close_q.read_queue())
304 m_access_tokens_free_slots.push_back(i.id);
305 }
306
307 for (auto &i : m_file_purge_q1.read_queue())
308 {
309 // i.id: DirState*, i.record: PurgeRecord
310 DirState *ds = i.id;
311 ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
312 ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
313 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
314 }
315 for (auto &i : m_file_purge_q2.read_queue())
316 {
317 // i.id: directory-path, i.record: PurgeRecord
318 DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, false, false);
319 if ( ! ds) {
320 TRACE(Error, trc_pfx << "DirState not found for directory path '" << i.id << "'.");
321 // find_path can return the last dir found ... but this clearly isn't a valid purge record.
322 continue;
323 }
324 ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
325 ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
326 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
327 }
328 for (auto &i : m_file_purge_q3.read_queue())
329 {
330 // i.id: LFN, i.record: size of file in st_blocks
331 DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, true, false);
332 if ( ! ds) {
333 TRACE(Error, trc_pfx << "DirState not found for LFN path '" << i.id << "'.");
334 continue;
335 }
336 ds->m_here_stats.m_StBlocksRemoved += i.record;
338 m_current_usage_in_st_blocks -= i.record;
339 }
340
341 // Read queues / vectors are cleared at swap time.
342 // We might consider reducing their capacity by half if, say, their usage is below 25%.
343
344 return n_records;
345}
void AddUp(const DirStats &s)
long long m_StBlocksRemoved
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs, DirState **last_existing_dir=nullptr)
DirState * get_parent()

References XrdPfc::DirStats::AddUp(), dprintf, Error, XrdPfc::DirState::find_path(), XrdPfc::DirState::get_parent(), XrdPfc::DirState::m_here_stats, XrdPfc::DirState::m_here_usage, XrdPfc::DirUsage::m_LastCloseTime, XrdPfc::DirUsage::m_LastOpenTime, XrdPfc::DirStats::m_NDirectoriesCreated, XrdPfc::DirStats::m_NFilesClosed, XrdPfc::DirStats::m_NFilesCreated, XrdPfc::DirStats::m_NFilesOpened, XrdPfc::DirStats::m_NFilesRemoved, XrdPfc::DirStats::m_StBlocksRemoved, token(), and TRACE.

Referenced by heart_beat(), and main_thread_function().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ register_file_close()

void XrdPfc::ResourceMonitor::register_file_close ( int token_id,
time_t close_timestamp,
const Stats & full_stats )
inline

Definition at line 185 of file XrdPfcResourceMonitor.hh.

185 {
186 XrdSysMutexHelper _lock(&m_queue_mutex);
187 m_file_close_q.push(token_id, {close_timestamp, full_stats});
188 }

◆ register_file_open()

int XrdPfc::ResourceMonitor::register_file_open ( const std::string & filename,
time_t open_timestamp,
bool existing_file )
inline

Definition at line 149 of file XrdPfcResourceMonitor.hh.

149 {
150 // Simply return a token, we will resolve it in the actual processing of the queue.
151 XrdSysMutexHelper _lock(&m_queue_mutex);
152 int token_id;
153 if ( ! m_access_tokens_free_slots.empty()) {
154 token_id = m_access_tokens_free_slots.back();
155 m_access_tokens_free_slots.pop_back();
156 m_access_tokens[token_id].m_filename = filename;
157 m_access_tokens[token_id].m_last_queue_swap_u1 = m_queue_swap_u1 - 1;
158 } else {
159 token_id = (int) m_access_tokens.size();
160 m_access_tokens.push_back({filename, m_queue_swap_u1 - 1});
161 }
162
163 m_file_open_q.push(token_id, {open_timestamp, existing_file});
164 return token_id;
165 }

◆ register_file_purge() [1/2]

void XrdPfc::ResourceMonitor::register_file_purge ( const std::string & filename,
long long size_in_st_blocks )
inline

Definition at line 204 of file XrdPfcResourceMonitor.hh.

204 {
205 XrdSysMutexHelper _lock(&m_queue_mutex);
206 m_file_purge_q3.push(filename, size_in_st_blocks);
207 }

◆ register_file_purge() [2/2]

void XrdPfc::ResourceMonitor::register_file_purge ( DirState * target,
long long size_in_st_blocks )
inline

Definition at line 192 of file XrdPfcResourceMonitor.hh.

192 {
193 XrdSysMutexHelper _lock(&m_queue_mutex);
194 m_file_purge_q1.push(target, {size_in_st_blocks, 1});
195 }

◆ register_file_update_stats()

void XrdPfc::ResourceMonitor::register_file_update_stats ( int token_id,
const Stats & stats )
inline

Definition at line 167 of file XrdPfcResourceMonitor.hh.

167 {
168 XrdSysMutexHelper _lock(&m_queue_mutex);
169 AccessToken &at = token(token_id);
170 // Check if this is the first update within this queue swap cycle.
171 if (at.m_last_queue_swap_u1 != m_queue_swap_u1) {
172 m_file_update_stats_q.push(token_id, stats);
173 at.m_last_queue_swap_u1 = m_queue_swap_u1;
174 at.m_last_write_queue_pos = m_file_update_stats_q.write_queue_size() - 1;
175 } else {
176 Stats &existing_stats = m_file_update_stats_q.write_record(at.m_last_write_queue_pos);
177 existing_stats.AddUp(stats);
178 }
179 // Optionally, one could return "scaler" to moodify stat-reporting
180 // frequency in the file ... if it comes too often or too rarely.
181 // See also the logic for determining reporting interval (in N_bytes_read)
182 // in File::Open().
183 }
XrdPosixStats Stats

References XrdPfc::Stats::AddUp(), and token().

+ Here is the call graph for this function:

◆ register_multi_file_purge() [1/2]

void XrdPfc::ResourceMonitor::register_multi_file_purge ( const std::string & target,
long long size_in_st_blocks,
int n_files )
inline

Definition at line 200 of file XrdPfcResourceMonitor.hh.

200 {
201 XrdSysMutexHelper _lock(&m_queue_mutex);
202 m_file_purge_q2.push(target, {size_in_st_blocks, n_files});
203 }

◆ register_multi_file_purge() [2/2]

void XrdPfc::ResourceMonitor::register_multi_file_purge ( DirState * target,
long long size_in_st_blocks,
int n_files )
inline

Definition at line 196 of file XrdPfcResourceMonitor.hh.

196 {
197 XrdSysMutexHelper _lock(&m_queue_mutex);
198 m_file_purge_q1.push(target, {size_in_st_blocks, n_files});
199 }

◆ scan_dir_and_recurse()

void ResourceMonitor::scan_dir_and_recurse ( FsTraversal & fst)

Definition at line 118 of file XrdPfcResourceMonitor.cc.

119{
120 dprintf("In scan_dir_and_recurse for '%s', size of dir_vec = %d, file_stat_map = %d\n",
121 fst.m_current_path.c_str(),
122 (int)fst.m_current_dirs.size(), (int)fst.m_current_files.size());
123
124 // Breadth first, accumulate into "here", unless it was already scanned via an
125 // OOB open file request.
126 if ( ! fst.m_dir_state->m_scanned)
127 {
128 DirUsage &here = fst.m_dir_state->m_here_usage;
129 for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
130 {
131 dprintf("would be doing something with %s ... has_data=%d, has_cinfo=%d\n",
132 it->first.c_str(), it->second.has_data, it->second.has_cinfo);
133
134 // XXX Make some of these optional?
135 // Remove files that do not have both cinfo and data?
136 // Remove empty directories before even descending?
137 // Leave this for some consistency pass?
138 // Note that FsTraversal supports ignored paths ... some details (config, N2N) to be clarified.
139
140 if (it->second.has_data && it->second.has_cinfo) {
141 here.m_StBlocks += it->second.stat_data.st_blocks;
142 here.m_NFiles += 1;
143 }
144 }
145 fst.m_dir_state->m_scanned = true;
146 }
147
148 // Swap-out directories as inter_dir_scan can use the FsTraversal.
149 std::vector<std::string> dirs;
150 dirs.swap(fst.m_current_dirs);
151
152 if (++m_dir_scan_check_counter >= 100)
153 {
154 process_inter_dir_scan_open_requests(fst);
155 m_dir_scan_check_counter = 0;
156 }
157
158 // Descend into sub-dirs, do not accumulate into recursive_subdir_usage yet. This is done
159 // in a separate pass to allow for proper accounting of files being opened during the initial scan.
160 for (auto &dname : dirs)
161 {
162 if (fst.cd_down(dname))
163 {
165 fst.cd_up();
166 }
167 // XXX else try to remove it?
168 }
169}
std::vector< std::string > m_current_dirs
bool cd_down(const std::string &dir_name)
std::map< std::string, FilePairStat > m_current_files

References XrdPfc::FsTraversal::cd_down(), XrdPfc::FsTraversal::cd_up(), dprintf, XrdPfc::FsTraversal::m_current_dirs, XrdPfc::FsTraversal::m_current_files, XrdPfc::FsTraversal::m_current_path, XrdPfc::FsTraversal::m_dir_state, XrdPfc::DirState::m_here_usage, XrdPfc::DirUsage::m_NFiles, XrdPfc::DirState::m_scanned, XrdPfc::DirUsage::m_StBlocks, and scan_dir_and_recurse().

Referenced by perform_initial_scan(), and scan_dir_and_recurse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ token()

AccessToken & XrdPfc::ResourceMonitor::token ( int i)
inline

Definition at line 217 of file XrdPfcResourceMonitor.hh.

217{ return m_access_tokens[i]; }

Referenced by process_queues(), and register_file_update_stats().

+ Here is the caller graph for this function:

◆ update_vs_and_file_usage_info()

void ResourceMonitor::update_vs_and_file_usage_info ( )

Definition at line 550 of file XrdPfcResourceMonitor.cc.

551{
552 static const char *trc_pfx = "update_vs_and_file_usage_info() ";
553
554 const auto &conf = Cache::Conf();
555 XrdOssVSInfo vsi;
556
557 // StatVS error (after it succeeded in config) implies a memory corruption (according to Mr. H).
558 if (m_oss.StatVS(&vsi, conf.m_data_space.c_str(), 1) < 0) {
559 TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_data_space << "'. This is a fatal error.");
560 _exit(1);
561 }
562 m_fs_state.m_disk_total = vsi.Total;
563 m_fs_state.m_disk_used = vsi.Total - vsi.Free;
564 m_fs_state.m_file_usage = 512ll * m_current_usage_in_st_blocks;
565 if (m_oss.StatVS(&vsi, conf.m_meta_space.c_str(), 1) < 0) {
566 TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_meta_space << "'. This is a fatal error.");
567 _exit(1);
568 }
569 m_fs_state.m_meta_total = vsi.Total;
570 m_fs_state.m_meta_used = vsi.Total - vsi.Free;
571}
long long Total
Definition XrdOssVS.hh:90
long long Free
Definition XrdOssVS.hh:91

References XrdPfc::Cache::Conf(), Error, XrdOssVSInfo::Free, XrdOssVSInfo::Total, and TRACE.

Referenced by heart_beat(), and perform_initial_scan().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ m_purge_task_active

bool XrdPfc::ResourceMonitor::m_purge_task_active {false}

Definition at line 248 of file XrdPfcResourceMonitor.hh.

248{false}; // from the perspective of heart-beat, set only in heartbeat

Referenced by heart_beat(), and perform_purge_check().

◆ m_purge_task_complete

bool XrdPfc::ResourceMonitor::m_purge_task_complete {false}

Definition at line 249 of file XrdPfcResourceMonitor.hh.

249{false}; // from the perspective of the task, reset in heartbeat, set in task

Referenced by heart_beat(), and perform_purge_task_cleanup().

◆ m_purge_task_cond

XrdSysCondVar XrdPfc::ResourceMonitor::m_purge_task_cond {0}

Definition at line 244 of file XrdPfcResourceMonitor.hh.

244{0};

Referenced by heart_beat(), perform_purge_task(), and perform_purge_task_cleanup().

◆ m_purge_task_end

time_t XrdPfc::ResourceMonitor::m_purge_task_end {0}

Definition at line 247 of file XrdPfcResourceMonitor.hh.

247{0};

Referenced by perform_purge_task_cleanup().

◆ m_purge_task_start

time_t XrdPfc::ResourceMonitor::m_purge_task_start {0}

Definition at line 246 of file XrdPfcResourceMonitor.hh.

246{0};

Referenced by perform_purge_task().


The documentation for this class was generated from the following files: