+ }
+
+ payload_advance64(&payload, &offset);
+ payload_advance32(&payload, &hole_size);
+
+ if (*offset + *hole_size > qiov->size) {
+ return -EINVAL;
+ }
Whereas here, the server sent us the right number of bytes, but with
invalid semantics (a weaker class of error than above). Still, once we
know the server is violating protocol, we're probably wiser to hang up
than persisting on keeping the connection with the server.
We aren't checking for overlap with any previously-received chunk, or
with any previously-received error-with-offset. I'm okay with that, as
it really is easier to implement on the client side (just because the
spec says the server is buggy for doing that does not mean we have to
spend resources in the client validating if the server is buggy).
+
+ qemu_iovec_memset(qiov, *offset, 0, *hole_size);
+
+ return 0;
+}
+
+static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
+ uint8_t *payload, int *request_ret)
+{
+ uint32_t *error;
+ uint16_t *message_size;
+
+ assert(chunk->type & (1 << 15));
+
+ if (chunk->length < sizeof(error) + sizeof(message_size)) {
+ return -EINVAL;
+ }
Again, should we plumb in the use of errp, rather than just
disconnecting from the server?
+
+ payload_advance32(&payload, &error);
+ payload_advance16(&payload, &message_size);
+
+ error_report("%.*s", *message_size, payload);
I think this one is wrong - we definitely want to log the error message
that we got (or at least trace it), but since the chunk is in reply to a
pending request, we should be able to feed the error message to errp of
the request, rather than reporting it here and losing it.
Also, if *message_size is 0, error_report("") isn't very useful (and
right now, the simple server implementation doesn't send a message).
+
+ /* TODO add special case for ERROR_OFFSET */
+
+ *request_ret = nbd_errno_to_system_errno(*error);
We should validate that the server didn't send us 0 for success.
+
+ return 0;
So if we get here, we know we have an error, but we did the minimal
handling of it (regardless of what chunk type it has), and can resume
communication with the server. This matches the NBD spec.
+}
+
+static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
+ QEMUIOVector *qiov)
+{
+ QEMUIOVector sub_qiov;
+ uint64_t offset;
+ size_t data_size;
+ int ret;
+ NBDStructuredReplyChunk *chunk = &s->reply.structured;
+
+ assert(nbd_reply_is_structured(&s->reply));
+
+ if (chunk->length < sizeof(offset)) {
+ return -EINVAL;
+ }
+
+ if (nbd_read(s->ioc, &offset, sizeof(offset), NULL) < 0) {
+ return -EIO;
errp plumbing is missing (instead of ignoring the nbd_read() error and
relying just on our own -EIO, we should also try to preserve the
original error message).
+ }
+ be64_to_cpus(&offset);
+
+ data_size = chunk->length - sizeof(offset);
+ if (offset + data_size > qiov->size) {
+ return -EINVAL;
+ }
+
+ qemu_iovec_init(&sub_qiov, qiov->niov);
+ qemu_iovec_concat(&sub_qiov, qiov, offset, data_size);
+ ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, NULL);
errp plumbing again.
+ qemu_iovec_destroy(&sub_qiov);
+
+ return ret < 0 ? -EIO : 0;
+}
+
+#define NBD_MAX_MALLOC_PAYLOAD 1000
Feels rather arbitrary, and potentially somewhat small. What is the
justification for this number, as opposed to reusing NBD_MAX_BUFFER_SIZE
(32M) or some other value?
+static int nbd_co_receive_structured_payload(NBDClientSession *s,
+ void **payload)
Documentation should mention that the result requires qemu_vfree()
instead of the more usual g_free()
+{
+ int ret;
+ uint32_t len;
+
+ assert(nbd_reply_is_structured(&s->reply));
+
+ len = s->reply.structured.length;
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (payload == NULL) {
+ return -EINVAL;
+ }
Again, missing a useful error message (the server sent us payload that
we aren't expecting) for reporting back through errp.
+
+ if (len > NBD_MAX_MALLOC_PAYLOAD) {
+ return -EINVAL;
+ }
+
+ *payload = qemu_memalign(8, len);
+ ret = nbd_read(s->ioc, *payload, len, NULL);
errp plumbing
+ if (ret < 0) {
+ qemu_vfree(*payload);
+ *payload = NULL;
+ return ret;
+ }
+
+ return 0;
+}
+
+/* nbd_co_do_receive_one_chunk
+ * for simple reply:
+ * set request_ret to received reply error
+ * if qiov is not NULL: read payload to @qiov
+ * for structured reply chunk:
+ * if error chunk: read payload, set @request_ret, do not set @payload
+ * else if offset_data chunk: read payload data to @qiov, do not set @payload
+ * else: read payload to @payload
Mention that payload must be freed with qemu_vfree()
+ */
+static int nbd_co_do_receive_one_chunk(NBDClientSession *s, uint64_t handle,
+ bool only_structured, int *request_ret,
+ QEMUIOVector *qiov, void **payload)
{
int ret;
int i = HANDLE_TO_INDEX(s, handle);
+ void *local_payload = NULL;
+
+ if (payload) {
+ *payload = NULL;
+ }
+ *request_ret = 0;
/* Wait until we're woken up by nbd_read_reply_entry. */
s->requests[i].receiving = true;
qemu_coroutine_yield();
s->requests[i].receiving = false;
if (!s->ioc || s->quit) {
- ret = -EIO;
- } else {
- assert(s->reply.handle == handle);
- ret = -nbd_errno_to_system_errno(s->reply.simple.error);
- if (qiov && ret == 0) {
- if (qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
- NULL) < 0) {
Okay, I'll admit that we are already lacking errp plumbing, so adding it
in this patch is not fair (if we add it, it can be a separate patch).
- ret = -EIO;
- s->quit = true;
- }
+ return -EIO;
+ }
+
+ assert(s->reply.handle == handle);
+
+ if (nbd_reply_is_simple(&s->reply)) {
+ if (only_structured) {
+ return -EINVAL;
}
[1] Earlier, you checked that the server isn't sending structured
replies where we expect simple; and here you're checking that the server
isn't sending simple replies where we expect structured. Good, we've
covered both mismatches, and I can see why you have it in separate
locations.
- /* Tell the read handler to read another header. */
- s->reply.handle = 0;
+ *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
+ if (*request_ret < 0 || !qiov) {
+ return 0;
+ }
+
+ return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
+ NULL) < 0 ? -EIO : 0;
+ }
+
+ /* handle structured reply chunk */
+ assert(s->info.structured_reply);
+
+ if (s->reply.structured.type == NBD_SREP_TYPE_NONE) {
+ return 0;
We should also check that the server properly set NBD_REPLY_FLAG_DONE
(if we got this type and the flag wasn't set, the server is sending
garbage, and we should request disconnect soon). [2]
+ }
+
+ if (s->reply.structured.type == NBD_SREP_TYPE_OFFSET_DATA) {
+ if (!qiov) {
+ return -EINVAL;
+ }
+
+ return nbd_co_receive_offset_data_payload(s, qiov);
+ }
+
+ if (nbd_srep_type_is_error(s->reply.structured.type)) {
+ payload = &local_payload;
+ }
+
+ ret = nbd_co_receive_structured_payload(s, payload);
+ if (ret < 0) {
+ return ret;
}
- s->requests[i].coroutine = NULL;
+ if (nbd_srep_type_is_error(s->reply.structured.type)) {
+ ret = nbd_parse_error_payload(&s->reply.structured, local_payload,
+ request_ret);
+ qemu_vfree(local_payload);
+ return ret;
+ }
+
+ return 0;
+}
+
+/* nbd_co_receive_one_chunk
+ * Read reply, wake up read_reply_co and set s->quit if needed.
+ * Return value is a fatal error code or normal nbd reply error code
+ */
+static int nbd_co_receive_one_chunk(NBDClientSession *s, uint64_t handle,
Is this function called in coroutine context? Presumably yes, because
of the _co_ infix; but then it should also have the coroutine_fn marker
in the declaration.
+ bool only_structured,
+ QEMUIOVector *qiov, NBDReply *reply,
+ void **payload)
+{
+ int request_ret;
+ int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
+ &request_ret, qiov, payload);
+
+ if (ret < 0) {
+ s->quit = true;
+ } else {
+ /* For assert at loop start in nbd_read_reply_entry */
+ if (reply) {
+ *reply = s->reply;
+ }
+ s->reply.handle = 0;
+ ret = request_ret;
+ }
- /* Kick the read_reply_co to get the next reply. */
if (s->read_reply_co) {
aio_co_wake(s->read_reply_co);
}
+ return ret;
+}
+
+typedef struct NBDReplyChunkIter {
+ int ret;
+ bool done, only_structured;
+} NBDReplyChunkIter;
+
+#define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
+ qiov, reply, payload) \
+ for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
+ nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
+
+static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
+ NBDReplyChunkIter *iter,
+ uint64_t handle,
+ QEMUIOVector *qiov, NBDReply *reply,
+ void **payload)
+{
+ int ret;
+ NBDReply local_reply;
+ NBDStructuredReplyChunk *chunk;
+ if (s->quit) {
+ if (iter->ret == 0) {
+ iter->ret = -EIO;
+ }
+ goto break_loop;
+ }
+
+ if (iter->done) {
+ /* Previous iteration was last. */
+ goto break_loop;
+ }
+
+ if (reply == NULL) {
+ reply = &local_reply;
+ }
+
+ ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
+ qiov, reply, payload);
+ if (ret < 0 && iter->ret == 0) {
+ /* If it is a fatal error s->qiov is set by nbd_co_receive_one_chunk */
did you mean s->quit here?
+ iter->ret = ret;
+ }
+
+ /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
+ if (nbd_reply_is_simple(&s->reply) || s->quit) {
+ goto break_loop;
+ }
+
+ chunk = &reply->structured;
+ iter->only_structured = true;
Interesting observation - the NBD spec lets us return a structured error
even to a non-read command. Only NBD_CMD_READ requires a structured
reply when we haven't yet received any reply; but you are correct that
once a given handle sees one structured chunk without a done bit, then
all future replies for that handle must also be structured.
+
+ if (chunk->type == NBD_SREP_TYPE_NONE) {
+ if (!(chunk->flags & NBD_SREP_FLAG_DONE)) {
+ /* protocol error */
+ s->quit = true;
+ if (iter->ret == 0) {
+ iter->ret = -EIO;
+ }
[2] Ah, I see you did it here instead!
+ s->quit = true;
+ }
+
+ qemu_vfree(payload);
+ payload = NULL;
+ }
+
+ return iter.ret;
}
static int nbd_co_request(BlockDriverState *bs,
NBDRequest *request,
- QEMUIOVector *qiov)
+ QEMUIOVector *write_qiov)
The rename is somewhat cosmetic, but I think it reads well.
{
NBDClientSession *client = nbd_get_client_session(bs);
int ret;
- if (qiov) {
- assert(request->type == NBD_CMD_WRITE || request->type ==
NBD_CMD_READ);
- assert(request->len == iov_size(qiov->iov, qiov->niov));
+ assert(request->type != NBD_CMD_READ);
+ if (write_qiov) {
+ assert(request->type == NBD_CMD_WRITE);
+ assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
} else {
- assert(request->type != NBD_CMD_WRITE && request->type !=
NBD_CMD_READ);
+ assert(request->type != NBD_CMD_WRITE);
}
- ret = nbd_co_send_request(bs, request,
- request->type == NBD_CMD_WRITE ? qiov : NULL);
+ ret = nbd_co_send_request(bs, request, write_qiov);
if (ret < 0) {
return ret;
}
- return nbd_co_receive_reply(client, request->handle,
- request->type == NBD_CMD_READ ? qiov : NULL);
+ return nbd_co_receive_return_code(client, request->handle);
}
So basically read was special enough that it no longer shares the common
code at this level. Still, I like how you've divided things among the
various functions.
int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, QEMUIOVector *qiov, int flags)
{
+ int ret;
+ NBDClientSession *client = nbd_get_client_session(bs);
NBDRequest request = {
.type = NBD_CMD_READ,
.from = offset,
@@ -259,7 +591,12 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t
offset,
assert(bytes <= NBD_MAX_BUFFER_SIZE);
assert(!flags);
- return nbd_co_request(bs, &request, qiov);
+ ret = nbd_co_send_request(bs, &request, NULL);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return nbd_co_receive_cmdread_reply(client, request.handle, qiov);
}
And of course, your next goal is adding a block_status function that
will also special-case chunked replies - but definitely a later patch ;)
int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
diff --git a/nbd/client.c b/nbd/client.c
index a38e1a7d8e..2f256ee771 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -687,6 +687,13 @@ int nbd_receive_negotiate(QIOChannel *ioc, const char
*name,
if (fixedNewStyle) {
int result;
+ result = nbd_request_simple_option(ioc, NBD_OPT_STRUCTURED_REPLY,
+ errp);
Bug - we must NOT request this option unless we know we can handle the
server saying yes. When nbd-client is handling the connection, we're
fine; but when qemu-nbd is handling the connection by using ioctls to
hand off to the kernel, we MUST query the kernel to see if it supports
structured replies (well, for now, we can get by with saying that we
KNOW the kernel code has not been written yet, and therefore our query
is a constant false, but someday we'll have a future patch in that area).