XRootD
Loading...
Searching...
No Matches
XrdBuffer.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d B u f f e r . c c */
4/* */
5/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <ctime>
31#include <unistd.h>
32#include <cstdio>
33#include <cstdlib>
34#include <sys/types.h>
35
36#include "XrdOuc/XrdOucUtils.hh"
37#include "XrdSys/XrdSysError.hh"
39#include "XrdSys/XrdSysTimer.hh"
40#include "Xrd/XrdBuffer.hh"
41#include "Xrd/XrdBuffXL.hh"
42#include "Xrd/XrdTrace.hh"
43
44/******************************************************************************/
45/* E x t e r n a l L i n k a g e s */
46/******************************************************************************/
47
48void *XrdReshaper(void *pp)
49{
50 XrdBuffManager *bmp = (XrdBuffManager *)pp;
51 bmp->Reshape();
52 return (void *)0;
53}
54
55/******************************************************************************/
56/* G l o b a l s */
57/******************************************************************************/
58
59const char *XrdBuffManager::TraceID = "BuffManager";
60
61namespace
62{
63static const int minBuffSz = 1 << XRD_BUSHIFT;
64}
65
66namespace XrdGlobal
67{
69extern XrdSysError Log;
70}
71
72using namespace XrdGlobal;
73
74/******************************************************************************/
75/* C o n s t r u c t o r */
76/******************************************************************************/
77
79 slots(XRD_BUCKETS),
80 shift(XRD_BUSHIFT),
81 pagsz(getpagesize()),
82 maxsz(1<<(XRD_BUSHIFT+XRD_BUCKETS-1)),
83 Reshaper(0, "buff reshaper")
84{
85
86// Clear everything to zero
87//
88 totbuf = 0;
89 totreq = 0;
90 totalo = 0;
91 totadj = 0;
92#ifdef _SC_PHYS_PAGES
93 maxalo = static_cast<long long>(pagsz)/8
94 * static_cast<long long>(sysconf(_SC_PHYS_PAGES));
95#else
96 maxalo = 0x7ffffff;
97#endif
98 rsinprog = 0;
99 minrsw = minrst;
100 memset(static_cast<void *>(bucket), 0, sizeof(bucket));
101}
102
103/******************************************************************************/
104/* D e s t r u c t o r */
105/******************************************************************************/
106
108{
109 XrdBuffer *bP;
110
111 for (int i = 0; i < XRD_BUCKETS; i++)
112 {while((bP = bucket[i].bnext))
113 {bucket[i].bnext = bP->next;
114 delete bP;
115 }
116 bucket[i].numbuf = 0;
117 }
118}
119
120/******************************************************************************/
121/* I n i t */
122/******************************************************************************/
123
125{
126 pthread_t tid;
127 int rc;
128
129// Start the reshaper thread
130//
131 if ((rc = XrdSysThread::Run(&tid, XrdReshaper, static_cast<void *>(this), 0,
132 "Buffer Manager reshaper")))
133 Log.Emsg("BuffManager", rc, "create reshaper thread");
134}
135
136/******************************************************************************/
137/* O b t a i n */
138/******************************************************************************/
139
141{
142 XrdBuffer *bp;
143 char *memp;
144 int mk, pk, bindex;
145
146// Make sure the request is within our limits
147//
148 if (sz <= 0) return 0;
149 if (sz > maxsz) return xlBuff.Obtain(sz);
150
151// Calculate bucket index
152//
153 mk = sz >> shift;
154 bindex = XrdOucUtils::Log2(mk);
155 mk = minBuffSz << bindex;
156 if (mk < sz) {bindex++; mk = mk << 1;}
157 if (bindex >= slots) return 0; // Should never happen!
158
159// Obtain a lock on the bucket array and try to give away an existing buffer
160//
161 Reshaper.Lock();
162 totreq++;
163 bucket[bindex].numreq++;
164 if ((bp = bucket[bindex].bnext))
165 {bucket[bindex].bnext = bp->next; bucket[bindex].numbuf--;}
166 Reshaper.UnLock();
167
168// Check if we really allocated a buffer
169//
170 if (bp) return bp;
171
172// Allocate a chunk of aligned memory
173//
174 pk = (mk < pagsz ? mk : pagsz);
175 if (posix_memalign((void **)&memp, pk, mk)) return 0;
176
177// Wrap the memory with a buffer object
178//
179 if (!(bp = new XrdBuffer(memp, mk, bindex))) {free(memp); return 0;}
180
181// Update statistics
182//
183 Reshaper.Lock();
184 totbuf++;
185 if ((totalo += mk) > maxalo && !rsinprog)
186 {rsinprog = 1; Reshaper.Signal();}
187 Reshaper.UnLock();
188 return bp;
189}
190
191/******************************************************************************/
192/* R e c a l c */
193/******************************************************************************/
194
196{
197 int mk, bindex;
198
199// Make sure the request is within our limits
200//
201 if (sz <= 0) return 0;
202 if (sz > maxsz) return xlBuff.Recalc(sz);
203
204// Calculate bucket index
205//
206 mk = sz >> shift;
207 bindex = XrdOucUtils::Log2(mk);
208 mk = minBuffSz << bindex;
209 if (mk < sz) {bindex++; mk = mk << 1;}
210 if (bindex >= slots) return 0; // Should never happen!
211
212// All done, return the actual size we would have allocated
213//
214 return mk;
215}
216
217/******************************************************************************/
218/* R e l e a s e */
219/******************************************************************************/
220
222{
223 int bindex = bp->bindex;
224
225// Check if we should release this via the big buffer object
226//
227 if (bindex >= slots) {xlBuff.Release(bp); return;}
228
229// Obtain a lock on the bucket array and reclaim the buffer
230//
231 Reshaper.Lock();
232 bp->next = bucket[bp->bindex].bnext;
233 bucket[bp->bindex].bnext = bp;
234 bucket[bindex].numbuf++;
235 Reshaper.UnLock();
236}
237
238/******************************************************************************/
239/* R e s h a p e */
240/******************************************************************************/
241
243{
244int i, bufprof[XRD_BUCKETS], numfreed;
245time_t delta, lastshape = time(0);
246long long memslot, memhave, memtarget = (long long)(.80*(float)maxalo);
247XrdSysTimer Timer;
248float requests, buffers;
249XrdBuffer *bp;
250
251// This is an endless loop to periodically reshape the buffer pool
252//
253while(1)
254 {Reshaper.Lock();
255 while(Reshaper.Wait(minrsw) && totalo <= maxalo)
256 {TRACE(MEM, "Reshaper has " <<(totalo>>10) <<"K; target " <<(memtarget>>10) <<"K");}
257 if ((delta = (time(0) - lastshape)) < minrsw)
258 {Reshaper.UnLock();
259 Timer.Wait((minrsw-delta)*1000);
260 Reshaper.Lock();
261 }
262
263 // We have the lock so compute the request profile
264 //
265 if (totreq > slots)
266 {requests = (float)totreq;
267 buffers = (float)totbuf;
268 for (i = 0; i < slots; i++)
269 {bufprof[i] = (int)(buffers*(((float)bucket[i].numreq)/requests));
270 bucket[i].numreq = 0;
271 }
272 totreq = 0; memhave = totalo;
273 } else memhave = 0;
274 Reshaper.UnLock();
275
276 // Reshape the buffer pool to agree with the request profile
277 //
278 memslot = maxsz; numfreed = 0;
279 for (i = slots-1; i >= 0 && memhave > memtarget; i--)
280 {Reshaper.Lock();
281 while(bucket[i].numbuf > bufprof[i])
282 if ((bp = bucket[i].bnext))
283 {bucket[i].bnext = bp->next;
284 delete bp;
285 bucket[i].numbuf--; numfreed++;
286 memhave -= memslot; totalo -= memslot;
287 totbuf--;
288 } else {bucket[i].numbuf = 0; break;}
289 Reshaper.UnLock();
290 memslot = memslot>>1;
291 }
292
293 // All done
294 //
295 totadj += numfreed;
296 TRACE(MEM, "Pool reshaped; " <<numfreed <<" freed; have " <<(memhave>>10) <<"K; target " <<(memtarget>>10) <<"K");
297 lastshape = time(0);
298 rsinprog = 0; // No need to lock, we're the only ones now setting it
299
300 xlBuff.Trim(); // Trim big buffers
301 }
302}
303
304/******************************************************************************/
305/* S e t */
306/******************************************************************************/
307
308void XrdBuffManager::Set(int maxmem, int minw)
309{
310
311// Obtain a lock and set the values
312//
313 Reshaper.Lock();
314 if (maxmem > 0) maxalo = (long long)maxmem;
315 if (minw > 0) minrsw = minw;
316 Reshaper.UnLock();
317}
318
319/******************************************************************************/
320/* S t a t s */
321/******************************************************************************/
322
323int XrdBuffManager::Stats(char *buff, int blen, int do_sync)
324{
325 static const char statfmt[] = "<stats id=\"buff\"><reqs>%d</reqs>"
326 "<mem>%lld</mem><buffs>%d</buffs><adj>%d</adj>%s</stats>";
327 char xlStats[1024];
328 int nlen;
329
330// If only size wanted, return it
331//
332 if (!buff) return sizeof(statfmt) + 16*4 + xlBuff.Stats(0,0);
333
334// Return formatted stats
335//
336 if (do_sync) Reshaper.Lock();
337 xlBuff.Stats(xlStats, sizeof(xlStats), do_sync);
338 nlen = snprintf(buff,blen,statfmt,totreq,totalo,totbuf,totadj,xlStats);
339 if (do_sync) Reshaper.UnLock();
340 return nlen;
341}
void * XrdReshaper(void *pp)
Definition XrdBuffer.cc:48
#define XRD_BUSHIFT
Definition XrdBuffer.hh:67
#define XRD_BUCKETS
Definition XrdBuffer.hh:66
#define TRACE(act, x)
Definition XrdTrace.hh:63
int Stats(char *buff, int blen, int do_sync=0)
Definition XrdBuffer.cc:323
void Release(XrdBuffer *bp)
Definition XrdBuffer.cc:221
int Recalc(int bsz)
Definition XrdBuffer.cc:195
XrdBuffer * Obtain(int bsz)
Definition XrdBuffer.cc:140
XrdBuffManager(int minrst=20 *60)
Definition XrdBuffer.cc:78
void Set(int maxmem=-1, int minw=-1)
Definition XrdBuffer.cc:308
static int Log2(unsigned long long n)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
XrdSysError Log
Definition XrdConfig.cc:113
XrdBuffXL xlBuff
Definition XrdBuffer.cc:68