XRootD
Loading...
Searching...
No Matches
XrdXrootdXeqChkPnt.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d X e q C h k P n t . c c */
4/* */
5/* */
6/* (c) 2020 by the Board of Trustees of the Leland Stanford, Jr., University */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
32#include "Xrd/XrdBuffer.hh"
33#include "Xrd/XrdLink.hh"
43
44/******************************************************************************/
45/* G l o b a l s */
46/******************************************************************************/
47
49
50/******************************************************************************/
51/* d o _ C h k P n t */
52/******************************************************************************/
53
54int XrdXrootdProtocol::do_ChkPnt()
55{
56 static const char *ckpName[] = {"begin","commit","query","rollback","xeq"};
57
58// Keep statistics
59//
60 SI->Bump(SI->miscCnt);
61
62// The kXR_ckpXeq is far to complicated to process here so we do it elsewhere.
63//
64 if (Request.chkpoint.opcode == kXR_ckpXeq) return do_ChkPntXeq();
65
66// Validate the filehandle
67//
69 struct iov ckpVec;
70 int rc;
71
72 if (!FTab || !(IO.File = FTab->Get(fh.handle)))
74 "chkpoint does not refer to an open file");
75
76// Handle each subcode
77//
78 switch(Request.chkpoint.opcode)
79 {case kXR_ckpBegin:
81 break;
82 case kXR_ckpCommit:
84 break;
85 case kXR_ckpQuery:
87 if (!rc)
89 ckpQResp.maxCkpSize = htonl(ckpVec.size);
90 ckpQResp.useCkpSize =
91 htonl(static_cast<uint32_t>(ckpVec.offset));
92 return Response.Send(&ckpQResp, sizeof(ckpQResp));
93 }
94 break;
95 case kXR_ckpRollback:
97 break;
98 default: return Response.Send(kXR_ArgInvalid,
99 "chkpoint subcode is invalid");
100 };
101
102// Do some tracing
103//
104 TRACEP(FS, "fh=" <<fh.handle <<" chkpnt " <<ckpName[Request.chkpoint.opcode]
105 <<" rc=" <<rc);
106
107// Check for error and invalid return codes from checkpoint note that writev's
108// aren't flushed, we simply close the connection to get rid of pending data.
109//
110 if (SFS_OK != rc)
111 {if (rc != SFS_ERROR)
112 {char eBuff[128];
113 snprintf(eBuff, sizeof(eBuff), "chkpoint %s returned invalid rc=%d!",
114 ckpName[Request.chkpoint.opcode], rc);
115 eDest.Emsg("Xeq", eBuff);
116 IO.File->XrdSfsp->error.setErrInfo(ENODEV, "logic error");
117 }
118 return fsError(SFS_ERROR, 0, IO.File->XrdSfsp->error, 0, 0);
119 }
120
121// Respond that all went well
122//
123 return Response.Send();
124}
125
126/******************************************************************************/
127/* d o _ C h k P n t X e q */
128/******************************************************************************/
129
130int XrdXrootdProtocol::do_ChkPntXeq()
131{
132 static const int sidSZ = sizeof(Request.header.streamid);
133 int rc;
134
135// If this is the first pass, check that streamid's match and setup the
136// request to be that of the chkpnt request. Note that kXR_writev requires
137// an additional fetch of data which may cause re-entry as pass2.
138//
140 {ClientRequestHdr *Subject = (ClientRequestHdr *)(argp->buff);
141 if (memcmp(Request.header.streamid, Subject->streamid, sidSZ))
142 {Response.Send(kXR_ArgInvalid, "Request streamid mismatch");
143 return -1;
144 }
145 if (Request.header.dlen != sizeof(Request.header))
146 {Response.Send(kXR_ArgInvalid, "Request length invalid");
147 return -1;
148 }
149
150 memcpy(Request.header.body, Subject->body, sizeof(Request.header.body));
151 Request.header.requestid = ntohs(Subject->requestid);
152 Request.header.dlen = ntohl(Subject->dlen);
153
156 {Response.Send(kXR_ArgInvalid,"chkpoint request is invalid");
157 return -1;
158 }
159
161 {if (!Request.header.dlen) return Response.Send();
163 {Response.Send(kXR_ArgTooLong,"chkpoint write vector is too long");
164 return -1;
165 }
166 if ( Request.header.dlen > argp->bsize)
167 {BPool->Release(argp);
168 if (!(argp = BPool->Obtain(Request.header.dlen)))
170 "Insufficient memory for chkpoint request");
171 return -1;
172 }
173 hcNow = hcPrev; halfBSize = argp->bsize >> 1;
174 }
175 if ((rc = getData("arg", argp->buff, Request.header.dlen)))
176 {Resume = &XrdXrootdProtocol::do_ChkPntXeq; return rc;}
177 }
178 }
179
180// Prepare to process the actual request
181//
182 const char *xeqOp;
183 struct iov ckpVec;
185 kXR_unt16 reqID;
186
187 reqID = Request.header.requestid;
189
190// Obtain the filehandle that we should check
191//
192 switch(reqID)
193 {case kXR_pgwrite:
194 xeqOp = "pgwrite";
195 fh.Set(Request.pgwrite.fhandle);
196 break;
197 case kXR_truncate:
198 xeqOp = "trunc";
199 fh.Set(Request.truncate.fhandle);
200 break;
201 case kXR_write:
202 xeqOp = "write";
203 fh.Set(Request.write.fhandle);
204 break;
205 case kXR_writev:
206 xeqOp = "writev";
207 if ((rc = do_WriteV())) return rc;
208 if (!wvInfo) return 0;
209 fh.handle = wvInfo->curFH;
210 for (int i = 0; i < wvInfo->vEnd; i++)
211 if (wvInfo->ioVec[i].info != fh.handle)
212 {free(wvInfo); wvInfo = 0;
214 "multi-file chkpoint writev not supported");
215 return -1;
216 }
217 break;
218 default: return Response.Send(kXR_ArgInvalid,
219 "chkpoint request is invalid");
220 }
221
222// Make sure we have the target file
223//
224 if (!FTab || !(IO.File = FTab->Get(fh.handle)))
226 "chkpoint does not refer to an open file");
227 if (reqID != kXR_truncate)
228 return Link->setEtext("chkpnt xeq write protocol violation");
229 return rc;
230 }
231
232// If this is a packaged request, create a checkpoint
233//
234
235// Now perform the action
236//
237 switch(reqID)
238 {case kXR_pgwrite:
239 ckpVec.size = Request.header.dlen;
240 n2hll(Request.pgwrite.offset, ckpVec.offset);
241 ckpVec.info = 0;
242 ckpVec.data = 0;
243 rc = IO.File->XrdSfsp->checkpoint(XrdSfsFile::cpWrite,&ckpVec,1);
244 if (!rc) return do_PgWrite();
245 break;
246 case kXR_truncate:
247 n2hll(Request.write.offset, ckpVec.offset);
248 ckpVec.info = 0;
249 ckpVec.data = 0;
250 rc = IO.File->XrdSfsp->checkpoint(XrdSfsFile::cpTrunc,&ckpVec,1);
251 if (!rc) return do_Truncate();
252 break;
253 case kXR_write:
254 ckpVec.size = Request.header.dlen;
255 n2hll(Request.write.offset, ckpVec.offset);
256 ckpVec.info = 0;
257 ckpVec.data = 0;
258 rc = IO.File->XrdSfsp->checkpoint(XrdSfsFile::cpWrite,&ckpVec,1);
259 if (!rc) return do_Write();
260 break;
261 default: // kXR_writev
263 (iov *)wvInfo->ioVec, wvInfo->vEnd);
264 if (!rc)
265 {for (int i = 0; i < wvInfo->vEnd; i++)
266 wvInfo->ioVec[i].info = fh.handle;
267 return do_WriteVec();
268 }
269 break;
270 }
271
272// Do some tracing
273//
274 TRACEP(FS, "fh=" <<fh.handle <<" chkpnt " <<xeqOp <<" rc=" <<rc);
275
276// Check for error and invalid return codes from checkpoint note that writev's
277// aren't flushed, we simply close the connection to get rid of pending data.
278//
279 if (SFS_OK != rc)
280 {if (rc != SFS_ERROR)
281 {char eBuff[128];
282 snprintf(eBuff, sizeof(eBuff),
283 "chkpoint xeq %s returned invalid rc=%d!", xeqOp, rc);
284 eDest.Emsg("Xeq", eBuff);
285 IO.File->XrdSfsp->error.setErrInfo(ENODEV, "logic error");
286 }
287 if (reqID == kXR_pgwrite)
288 {IO.EInfo[0] = SFS_ERROR; IO.EInfo[0] = 0;
289 return do_WriteNone(static_cast<int>(Request.pgwrite.pathid));
290 }
291 if (reqID == kXR_write)
292 {IO.EInfo[0] = SFS_ERROR; IO.EInfo[0] = 0;
293 return do_WriteNone(static_cast<int>(Request.write.pathid));
294 }
295 rc = fsError(SFS_ERROR, 0, IO.File->XrdSfsp->error, 0, 0);
296 return (reqID != kXR_truncate ? -1 : rc);
297 }
298
299// Respond that all went well
300//
301 return Response.Send();
302}
static const int kXR_ckpRollback
Definition XProtocol.hh:215
@ kXR_ArgInvalid
Definition XProtocol.hh:988
@ kXR_FileNotOpen
Definition XProtocol.hh:992
@ kXR_Unsupported
@ kXR_ArgTooLong
Definition XProtocol.hh:990
@ kXR_NoMemory
Definition XProtocol.hh:996
kXR_char body[16]
Definition XProtocol.hh:158
struct ClientTruncateRequest truncate
Definition XProtocol.hh:873
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_char fhandle[4]
Definition XProtocol.hh:805
kXR_char streamid[2]
Definition XProtocol.hh:156
static const int kXR_ckpXeq
Definition XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:860
struct ClientRequestHdr header
Definition XProtocol.hh:844
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_pgwrite
Definition XProtocol.hh:138
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:847
static const int kXR_ckpCommit
Definition XProtocol.hh:213
static const int kXR_ckpQuery
Definition XProtocol.hh:214
struct ClientWriteRequest write
Definition XProtocol.hh:874
static const int kXR_ckpBegin
Definition XProtocol.hh:212
unsigned short kXR_unt16
Definition XPtypes.hh:67
#define SFS_ERROR
#define SFS_OK
#define TRACEP(act, x)
XrdOucIOVec ioVec[1]
XrdSysTrace XrdXrootdTrace
void Release(XrdBuffer *bp)
Definition XrdBuffer.cc:221
XrdBuffer * Obtain(int bsz)
Definition XrdBuffer.cc:140
char * buff
Definition XrdBuffer.hh:45
int setErrInfo(int code, const char *emsg)
void Bump(int &val)
XrdOucErrInfo & error
virtual int checkpoint(cpAct act, struct iov *range=0, int n=0)
@ cpTrunc
Truncate a file within checkpoint.
@ cpDelete
Delete an existing checkpoint.
@ cpRestore
Restore an active checkpoint and delete it.
@ cpWrite
Add data to an existing checkpoint.
@ cpQuery
Return checkpoint limits.
@ cpCreate
Create a checkpoint, one must not be active.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdXrootdFile * Get(int fnum)
XrdSfsFile * XrdSfsp
static XrdXrootdStats * SI
XrdXrootd::IOParms IO
XrdXrootdWVInfo * wvInfo
XrdXrootdFileTable * FTab
static XrdSysError & eDest
int getData(gdCallBack *gdcbP, const char *dtype, char *buff, int blen)
int(XrdXrootdProtocol::* Resume)()
XrdXrootdResponse Response
static XrdBuffManager * BPool
static const int maxWvecln
Definition XProtocol.hh:835