From 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5 Mon Sep 17 00:00:00 2001 From: hc <hc@nodka.com> Date: Tue, 22 Oct 2024 10:36:11 +0000 Subject: [PATCH] 修改4g拨号为QMI,需要在系统里后台执行quectel-CM --- kernel/net/sunrpc/xprtrdma/frwr_ops.c | 823 +++++++++++++++++++++++++++++++--------------------------- 1 files changed, 435 insertions(+), 388 deletions(-) diff --git a/kernel/net/sunrpc/xprtrdma/frwr_ops.c b/kernel/net/sunrpc/xprtrdma/frwr_ops.c index 1bb00dd..bf3627d 100644 --- a/kernel/net/sunrpc/xprtrdma/frwr_ops.c +++ b/kernel/net/sunrpc/xprtrdma/frwr_ops.c @@ -7,70 +7,39 @@ /* Lightweight memory registration using Fast Registration Work * Requests (FRWR). * - * FRWR features ordered asynchronous registration and deregistration - * of arbitrarily sized memory regions. This is the fastest and safest + * FRWR features ordered asynchronous registration and invalidation + * of arbitrarily-sized memory regions. This is the fastest and safest * but most complex memory registration mode. */ /* Normal operation * - * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG - * Work Request (frwr_op_map). When the RDMA operation is finished, this + * A Memory Region is prepared for RDMA Read or Write using a FAST_REG + * Work Request (frwr_map). When the RDMA operation is finished, this * Memory Region is invalidated using a LOCAL_INV Work Request - * (frwr_op_unmap_sync). + * (frwr_unmap_async and frwr_unmap_sync). * - * Typically these Work Requests are not signaled, and neither are RDMA - * SEND Work Requests (with the exception of signaling occasionally to - * prevent provider work queue overflows). This greatly reduces HCA + * Typically FAST_REG Work Requests are not signaled, and neither are + * RDMA Send Work Requests (with the exception of signaling occasionally + * to prevent provider work queue overflows). This greatly reduces HCA * interrupt workload. - * - * As an optimization, frwr_op_unmap marks MRs INVALID before the - * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on - * rb_mrs immediately so that no work (like managing a linked list - * under a spinlock) is needed in the completion upcall. - * - * But this means that frwr_op_map() can occasionally encounter an MR - * that is INVALID but the LOCAL_INV WR has not completed. Work Queue - * ordering prevents a subsequent FAST_REG WR from executing against - * that MR while it is still being invalidated. */ /* Transport recovery * - * ->op_map and the transport connect worker cannot run at the same - * time, but ->op_unmap can fire while the transport connect worker - * is running. Thus MR recovery is handled in ->op_map, to guarantee - * that recovered MRs are owned by a sending RPC, and not one where - * ->op_unmap could fire at the same time transport reconnect is - * being done. + * frwr_map and frwr_unmap_* cannot run at the same time the transport + * connect worker is running. The connect worker holds the transport + * send lock, just as ->send_request does. This prevents frwr_map and + * the connect worker from running concurrently. When a connection is + * closed, the Receive completion queue is drained before the allowing + * the connect worker to get control. This prevents frwr_unmap and the + * connect worker from running concurrently. * - * When the underlying transport disconnects, MRs are left in one of - * four states: - * - * INVALID: The MR was not in use before the QP entered ERROR state. - * - * VALID: The MR was registered before the QP entered ERROR state. - * - * FLUSHED_FR: The MR was being registered when the QP entered ERROR - * state, and the pending WR was flushed. - * - * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR - * state, and the pending WR was flushed. - * - * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered - * with ib_dereg_mr and then are re-initialized. Because MR recovery - * allocates fresh resources, it is deferred to a workqueue, and the - * recovered MRs are placed back on the rb_mrs list when recovery is - * complete. frwr_op_map allocates another MR for the current RPC while - * the broken MR is reset. - * - * To ensure that frwr_op_map doesn't encounter an MR that is marked - * INVALID but that is about to be flushed due to a previous transport - * disconnect, the transport connect worker attempts to drain all - * pending send queue WRs before the transport is reconnected. + * When the underlying transport disconnects, MRs that are in flight + * are flushed and are likely unusable. Thus all MRs are destroyed. + * New MRs are created on demand. */ -#include <linux/sunrpc/rpc_rdma.h> #include <linux/sunrpc/svc_rdma.h> #include "xprt_rdma.h" @@ -80,156 +49,158 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -bool -frwr_is_supported(struct rpcrdma_ia *ia) -{ - struct ib_device_attr *attrs = &ia->ri_device->attrs; - - if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) - goto out_not_supported; - if (attrs->max_fast_reg_page_list_len == 0) - goto out_not_supported; - return true; - -out_not_supported: - pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n", - ia->ri_device->name); - return false; -} - -static int -frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) -{ - unsigned int depth = ia->ri_max_frwr_depth; - struct rpcrdma_frwr *frwr = &mr->frwr; - int rc; - - frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth); - if (IS_ERR(frwr->fr_mr)) - goto out_mr_err; - - mr->mr_sg = kcalloc(depth, sizeof(*mr->mr_sg), GFP_KERNEL); - if (!mr->mr_sg) - goto out_list_err; - - INIT_LIST_HEAD(&mr->mr_list); - sg_init_table(mr->mr_sg, depth); - init_completion(&frwr->fr_linv_done); - return 0; - -out_mr_err: - rc = PTR_ERR(frwr->fr_mr); - dprintk("RPC: %s: ib_alloc_mr status %i\n", - __func__, rc); - return rc; - -out_list_err: - rc = -ENOMEM; - dprintk("RPC: %s: sg allocation failure\n", - __func__); - ib_dereg_mr(frwr->fr_mr); - return rc; -} - -static void -frwr_op_release_mr(struct rpcrdma_mr *mr) +/** + * frwr_release_mr - Destroy one MR + * @mr: MR allocated by frwr_mr_init + * + */ +void frwr_release_mr(struct rpcrdma_mr *mr) { int rc; rc = ib_dereg_mr(mr->frwr.fr_mr); if (rc) - pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n", - mr, rc); + trace_xprtrdma_frwr_dereg(mr, rc); kfree(mr->mr_sg); kfree(mr); } -static int -__frwr_mr_reset(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) +static void frwr_mr_recycle(struct rpcrdma_mr *mr) { - struct rpcrdma_frwr *frwr = &mr->frwr; - int rc; - - rc = ib_dereg_mr(frwr->fr_mr); - if (rc) { - pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n", - rc, mr); - return rc; - } - - frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, - ia->ri_max_frwr_depth); - if (IS_ERR(frwr->fr_mr)) { - pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n", - PTR_ERR(frwr->fr_mr), mr); - return PTR_ERR(frwr->fr_mr); - } - - dprintk("RPC: %s: recovered FRWR %p\n", __func__, frwr); - frwr->fr_state = FRWR_IS_INVALID; - return 0; -} - -/* Reset of a single FRWR. Generate a fresh rkey by replacing the MR. - */ -static void -frwr_op_recover_mr(struct rpcrdma_mr *mr) -{ - enum rpcrdma_frwr_state state = mr->frwr.fr_state; struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - int rc; - rc = __frwr_mr_reset(ia, mr); - if (state != FRWR_FLUSHED_LI) { - trace_xprtrdma_dma_unmap(mr); - ib_dma_unmap_sg(ia->ri_device, + trace_xprtrdma_mr_recycle(mr); + + if (mr->mr_dir != DMA_NONE) { + trace_xprtrdma_mr_unmap(mr); + ib_dma_unmap_sg(r_xprt->rx_ep->re_id->device, mr->mr_sg, mr->mr_nents, mr->mr_dir); + mr->mr_dir = DMA_NONE; } - if (rc) - goto out_release; - rpcrdma_mr_put(mr); - r_xprt->rx_stats.mrs_recovered++; - return; - -out_release: - pr_err("rpcrdma: FRWR reset failed %d, %p released\n", rc, mr); - r_xprt->rx_stats.mrs_orphaned++; - - spin_lock(&r_xprt->rx_buf.rb_mrlock); + spin_lock(&r_xprt->rx_buf.rb_lock); list_del(&mr->mr_all); - spin_unlock(&r_xprt->rx_buf.rb_mrlock); + r_xprt->rx_stats.mrs_recycled++; + spin_unlock(&r_xprt->rx_buf.rb_lock); - frwr_op_release_mr(mr); + frwr_release_mr(mr); } -/* On success, sets: - * ep->rep_attr.cap.max_send_wr - * ep->rep_attr.cap.max_recv_wr - * cdata->max_requests - * ia->ri_max_segs +/* frwr_reset - Place MRs back on the free list + * @req: request to reset * - * And these FRWR-related fields: - * ia->ri_max_frwr_depth - * ia->ri_mrtype + * Used after a failed marshal. For FRWR, this means the MRs + * don't have to be fully released and recreated. + * + * NB: This is safe only as long as none of @req's MRs are + * involved with an ongoing asynchronous FAST_REG or LOCAL_INV + * Work Request. */ -static int -frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, - struct rpcrdma_create_data_internal *cdata) +void frwr_reset(struct rpcrdma_req *req) { - struct ib_device_attr *attrs = &ia->ri_device->attrs; + struct rpcrdma_mr *mr; + + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) + rpcrdma_mr_put(mr); +} + +/** + * frwr_mr_init - Initialize one MR + * @r_xprt: controlling transport instance + * @mr: generic MR to prepare for FRWR + * + * Returns zero if successful. Otherwise a negative errno + * is returned. + */ +int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) +{ + struct rpcrdma_ep *ep = r_xprt->rx_ep; + unsigned int depth = ep->re_max_fr_depth; + struct scatterlist *sg; + struct ib_mr *frmr; + int rc; + + frmr = ib_alloc_mr(ep->re_pd, ep->re_mrtype, depth); + if (IS_ERR(frmr)) + goto out_mr_err; + + sg = kmalloc_array(depth, sizeof(*sg), GFP_NOFS); + if (!sg) + goto out_list_err; + + mr->mr_xprt = r_xprt; + mr->frwr.fr_mr = frmr; + mr->mr_dir = DMA_NONE; + INIT_LIST_HEAD(&mr->mr_list); + init_completion(&mr->frwr.fr_linv_done); + + sg_init_table(sg, depth); + mr->mr_sg = sg; + return 0; + +out_mr_err: + rc = PTR_ERR(frmr); + trace_xprtrdma_frwr_alloc(mr, rc); + return rc; + +out_list_err: + ib_dereg_mr(frmr); + return -ENOMEM; +} + +/** + * frwr_query_device - Prepare a transport for use with FRWR + * @ep: endpoint to fill in + * @device: RDMA device to query + * + * On success, sets: + * ep->re_attr + * ep->re_max_requests + * ep->re_max_rdma_segs + * ep->re_max_fr_depth + * ep->re_mrtype + * + * Return values: + * On success, returns zero. + * %-EINVAL - the device does not support FRWR memory registration + * %-ENOMEM - the device is not sufficiently capable for NFS/RDMA + */ +int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device) +{ + const struct ib_device_attr *attrs = &device->attrs; int max_qp_wr, depth, delta; + unsigned int max_sge; - ia->ri_mrtype = IB_MR_TYPE_MEM_REG; + if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) || + attrs->max_fast_reg_page_list_len == 0) { + pr_err("rpcrdma: 'frwr' mode is not supported by device %s\n", + device->name); + return -EINVAL; + } + + max_sge = min_t(unsigned int, attrs->max_send_sge, + RPCRDMA_MAX_SEND_SGES); + if (max_sge < RPCRDMA_MIN_SEND_SGES) { + pr_err("rpcrdma: HCA provides only %u send SGEs\n", max_sge); + return -ENOMEM; + } + ep->re_attr.cap.max_send_sge = max_sge; + ep->re_attr.cap.max_recv_sge = 1; + + ep->re_mrtype = IB_MR_TYPE_MEM_REG; if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG) - ia->ri_mrtype = IB_MR_TYPE_SG_GAPS; + ep->re_mrtype = IB_MR_TYPE_SG_GAPS; - ia->ri_max_frwr_depth = - min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - attrs->max_fast_reg_page_list_len); - dprintk("RPC: %s: device's max FR page list len = %u\n", - __func__, ia->ri_max_frwr_depth); + /* Quirk: Some devices advertise a large max_fast_reg_page_list_len + * capability, but perform optimally when the MRs are not larger + * than a page. + */ + if (attrs->max_sge_rd > RPCRDMA_MAX_HDR_SEGS) + ep->re_max_fr_depth = attrs->max_sge_rd; + else + ep->re_max_fr_depth = attrs->max_fast_reg_page_list_len; + if (ep->re_max_fr_depth > RPCRDMA_MAX_DATA_SEGS) + ep->re_max_fr_depth = RPCRDMA_MAX_DATA_SEGS; /* Add room for frwr register and invalidate WRs. * 1. FRWR reg WR for head @@ -245,155 +216,81 @@ /* Calculate N if the device max FRWR depth is smaller than * RPCRDMA_MAX_DATA_SEGS. */ - if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) { - delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth; + if (ep->re_max_fr_depth < RPCRDMA_MAX_DATA_SEGS) { + delta = RPCRDMA_MAX_DATA_SEGS - ep->re_max_fr_depth; do { depth += 2; /* FRWR reg + invalidate */ - delta -= ia->ri_max_frwr_depth; + delta -= ep->re_max_fr_depth; } while (delta > 0); } - max_qp_wr = ia->ri_device->attrs.max_qp_wr; + max_qp_wr = attrs->max_qp_wr; max_qp_wr -= RPCRDMA_BACKWARD_WRS; max_qp_wr -= 1; if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE) return -ENOMEM; - if (cdata->max_requests > max_qp_wr) - cdata->max_requests = max_qp_wr; - ep->rep_attr.cap.max_send_wr = cdata->max_requests * depth; - if (ep->rep_attr.cap.max_send_wr > max_qp_wr) { - cdata->max_requests = max_qp_wr / depth; - if (!cdata->max_requests) - return -EINVAL; - ep->rep_attr.cap.max_send_wr = cdata->max_requests * - depth; + if (ep->re_max_requests > max_qp_wr) + ep->re_max_requests = max_qp_wr; + ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth; + if (ep->re_attr.cap.max_send_wr > max_qp_wr) { + ep->re_max_requests = max_qp_wr / depth; + if (!ep->re_max_requests) + return -ENOMEM; + ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth; } - ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; - ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */ - ep->rep_attr.cap.max_recv_wr = cdata->max_requests; - ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; - ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ + ep->re_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; + ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */ + ep->re_attr.cap.max_recv_wr = ep->re_max_requests; + ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; + ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH; + ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ - ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS / - ia->ri_max_frwr_depth); + ep->re_max_rdma_segs = + DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ep->re_max_fr_depth); + /* Reply chunks require segments for head and tail buffers */ + ep->re_max_rdma_segs += 2; + if (ep->re_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS) + ep->re_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS; + + /* Ensure the underlying device is capable of conveying the + * largest r/wsize NFS will ask for. This guarantees that + * failing over from one RDMA device to another will not + * break NFS I/O. + */ + if ((ep->re_max_rdma_segs * ep->re_max_fr_depth) < RPCRDMA_MAX_SEGS) + return -ENOMEM; + return 0; } -/* FRWR mode conveys a list of pages per chunk segment. The - * maximum length of that list is the FRWR page list depth. - */ -static size_t -frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - - return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frwr_depth); -} - -static void -__frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr) -{ - if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("rpcrdma: %s: %s (%u/0x%x)\n", - wr, ib_wc_status_msg(wc->status), - wc->status, wc->vendor_err); -} - /** - * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC - * @cq: completion queue (ignored) - * @wc: completed WR + * frwr_map - Register a memory region + * @r_xprt: controlling transport + * @seg: memory region co-ordinates + * @nsegs: number of segments remaining + * @writing: true when RDMA Write will be used + * @xid: XID of RPC using the registered memory + * @mr: MR to fill in * - */ -static void -frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) { - frwr->fr_state = FRWR_FLUSHED_FR; - __frwr_sendcompletion_flush(wc, "fastreg"); - } - trace_xprtrdma_wc_fastreg(wc, frwr); -} - -/** - * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - */ -static void -frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, - fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) { - frwr->fr_state = FRWR_FLUSHED_LI; - __frwr_sendcompletion_flush(wc, "localinv"); - } - trace_xprtrdma_wc_li(wc, frwr); -} - -/** - * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - * Awaken anyone waiting for an MR to finish being fenced. - */ -static void -frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, - fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) { - frwr->fr_state = FRWR_FLUSHED_LI; - __frwr_sendcompletion_flush(wc, "localinv"); - } - complete(&frwr->fr_linv_done); - trace_xprtrdma_wc_li_wake(wc, frwr); -} - -/* Post a REG_MR Work Request to register a memory region + * Prepare a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. + * + * Returns the next segment or a negative errno pointer. + * On success, @mr is filled in. */ -static struct rpcrdma_mr_seg * -frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing, struct rpcrdma_mr **out) +struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_mr_seg *seg, + int nsegs, bool writing, __be32 xid, + struct rpcrdma_mr *mr) { - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; - struct rpcrdma_frwr *frwr; - struct rpcrdma_mr *mr; - struct ib_mr *ibmr; + struct rpcrdma_ep *ep = r_xprt->rx_ep; struct ib_reg_wr *reg_wr; - int i, n; + int i, n, dma_nents; + struct ib_mr *ibmr; u8 key; - mr = NULL; - do { - if (mr) - rpcrdma_mr_defer_recovery(mr); - mr = rpcrdma_mr_get(r_xprt); - if (!mr) - return ERR_PTR(-EAGAIN); - } while (mr->frwr.fr_state != FRWR_IS_INVALID); - frwr = &mr->frwr; - frwr->fr_state = FRWR_IS_VALID; - - if (nsegs > ia->ri_max_frwr_depth) - nsegs = ia->ri_max_frwr_depth; + if (nsegs > ep->re_max_fr_depth) + nsegs = ep->re_max_fr_depth; for (i = 0; i < nsegs;) { if (seg->mr_page) sg_set_page(&mr->mr_sg[i], @@ -406,28 +303,31 @@ ++seg; ++i; - if (holes_ok) + if (ep->re_mrtype == IB_MR_TYPE_SG_GAPS) continue; if ((i < nsegs && offset_in_page(seg->mr_offset)) || offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } mr->mr_dir = rpcrdma_data_dir(writing); + mr->mr_nents = i; - mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir); - if (!mr->mr_nents) + dma_nents = ib_dma_map_sg(ep->re_id->device, mr->mr_sg, mr->mr_nents, + mr->mr_dir); + if (!dma_nents) goto out_dmamap_err; - trace_xprtrdma_dma_map(mr); - ibmr = frwr->fr_mr; - n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE); - if (unlikely(n != mr->mr_nents)) + ibmr = mr->frwr.fr_mr; + n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE); + if (n != dma_nents) goto out_mapmr_err; + ibmr->iova &= 0x00000000ffffffff; + ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32; key = (u8)(ibmr->rkey & 0x000000FF); ib_update_fast_reg_key(ibmr, ++key); - reg_wr = &frwr->fr_regwr; + reg_wr = &mr->frwr.fr_regwr; reg_wr->mr = ibmr; reg_wr->key = ibmr->rkey; reg_wr->access = writing ? @@ -437,37 +337,59 @@ mr->mr_handle = ibmr->rkey; mr->mr_length = ibmr->length; mr->mr_offset = ibmr->iova; + trace_xprtrdma_mr_map(mr); - *out = mr; return seg; out_dmamap_err: - pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n", - mr->mr_sg, i); - frwr->fr_state = FRWR_IS_INVALID; - rpcrdma_mr_put(mr); + mr->mr_dir = DMA_NONE; + trace_xprtrdma_frwr_sgerr(mr, i); return ERR_PTR(-EIO); out_mapmr_err: - pr_err("rpcrdma: failed to map mr %p (%d/%d)\n", - frwr->fr_mr, n, mr->mr_nents); - rpcrdma_mr_defer_recovery(mr); + trace_xprtrdma_frwr_maperr(mr, n); return ERR_PTR(-EIO); } -/* Post Send WR containing the RPC Call message. +/** + * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC + * @cq: completion queue + * @wc: WCE for a completed FastReg WR * - * For FRMR, chain any FastReg WRs to the Send WR. Only a + */ +static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_fastreg(wc, frwr); + /* The MR will get recycled when the associated req is retransmitted */ + + rpcrdma_flush_disconnect(cq->cq_context, wc); +} + +/** + * frwr_send - post Send WRs containing the RPC Call message + * @r_xprt: controlling transport instance + * @req: prepared RPC Call + * + * For FRWR, chain any FastReg WRs to the Send WR. Only a * single ib_post_send call is needed to register memory * and then post the Send WR. + * + * Returns the return code from ib_post_send. + * + * Caller must hold the transport send lock to ensure that the + * pointers to the transport's rdma_cm_id and QP are stable. */ -static int -frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req) +int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *post_wr; struct rpcrdma_mr *mr; - post_wr = &req->rl_sendctx->sc_wr; + post_wr = &req->rl_wr; list_for_each_entry(mr, &req->rl_registered, mr_list) { struct rpcrdma_frwr *frwr; @@ -483,46 +405,96 @@ post_wr = &frwr->fr_regwr.wr; } - /* If ib_post_send fails, the next ->send_request for - * @req will queue these MWs for recovery. - */ - return ib_post_send(ia->ri_id->qp, post_wr, NULL); + return ib_post_send(r_xprt->rx_ep->re_id->qp, post_wr, NULL); } -/* Handle a remotely invalidated mr on the @mrs list +/** + * frwr_reminv - handle a remotely invalidated mr on the @mrs list + * @rep: Received reply + * @mrs: list of MRs to check + * */ -static void -frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) +void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) { struct rpcrdma_mr *mr; list_for_each_entry(mr, mrs, mr_list) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); - trace_xprtrdma_remoteinv(mr); - mr->frwr.fr_state = FRWR_IS_INVALID; - rpcrdma_mr_unmap_and_put(mr); + trace_xprtrdma_mr_reminv(mr); + rpcrdma_mr_put(mr); break; /* only one invalidated MR per RPC */ } } -/* Invalidate all memory regions that were registered for "req". +static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) +{ + if (wc->status != IB_WC_SUCCESS) + frwr_mr_recycle(mr); + else + rpcrdma_mr_put(mr); +} + +/** + * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC + * @cq: completion queue + * @wc: WCE for a completed LocalInv WR * - * Sleeps until it is safe for the host CPU to access the - * previously mapped memory regions. - * - * Caller ensures that @mrs is not empty before the call. This - * function empties the list. */ -static void -frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) +static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li(wc, frwr); + __frwr_release_mr(wc, mr); + + rpcrdma_flush_disconnect(cq->cq_context, wc); +} + +/** + * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC + * @cq: completion queue + * @wc: WCE for a completed LocalInv WR + * + * Awaken anyone waiting for an MR to finish being fenced. + */ +static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li_wake(wc, frwr); + __frwr_release_mr(wc, mr); + complete(&frwr->fr_linv_done); + + rpcrdma_flush_disconnect(cq->cq_context, wc); +} + +/** + * frwr_unmap_sync - invalidate memory regions that were registered for @req + * @r_xprt: controlling transport instance + * @req: rpcrdma_req with a non-empty list of MRs to process + * + * Sleeps until it is safe for the host CPU to access the previously mapped + * memory regions. This guarantees that registered MRs are properly fenced + * from the server before the RPC consumer accesses the data in them. It + * also ensures proper Send flow control: waking the next RPC waits until + * this RPC has relinquished all its Send Queue entries. + */ +void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *first, **prev, *last; const struct ib_send_wr *bad_wr; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; - int count, rc; + int rc; /* ORDER: Invalidate all of the MRs first * @@ -530,86 +502,161 @@ * a single ib_post_send() call. */ frwr = NULL; - count = 0; prev = &first; - list_for_each_entry(mr, mrs, mr_list) { - mr->frwr.fr_state = FRWR_IS_INVALID; + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { + + trace_xprtrdma_mr_localinv(mr); + r_xprt->rx_stats.local_inv_needed++; frwr = &mr->frwr; - trace_xprtrdma_localinv(mr); - frwr->fr_cqe.done = frwr_wc_localinv; last = &frwr->fr_invwr; - memset(last, 0, sizeof(*last)); + last->next = NULL; last->wr_cqe = &frwr->fr_cqe; + last->sg_list = NULL; + last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; + last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; - count++; *prev = last; prev = &last->next; } - if (!frwr) - goto unmap; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ - last->send_flags = IB_SEND_SIGNALED; frwr->fr_cqe.done = frwr_wc_localinv_wake; reinit_completion(&frwr->fr_linv_done); /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us - * unless ri_id->qp is a valid pointer. + * unless re_id->qp is a valid pointer. */ - r_xprt->rx_stats.local_inv_needed++; bad_wr = NULL; - rc = ib_post_send(ia->ri_id->qp, first, &bad_wr); + rc = ib_post_send(r_xprt->rx_ep->re_id->qp, first, &bad_wr); + + /* The final LOCAL_INV WR in the chain is supposed to + * do the wake. If it was never posted, the wake will + * not happen, so don't wait in that case. + */ if (bad_wr != first) wait_for_completion(&frwr->fr_linv_done); - if (rc) - goto reset_mrs; + if (!rc) + return; - /* ORDER: Now DMA unmap all of the MRs, and return - * them to the free MR list. + /* Recycle MRs in the LOCAL_INV chain that did not get posted. */ -unmap: - while (!list_empty(mrs)) { - mr = rpcrdma_mr_pop(mrs); - rpcrdma_mr_unmap_and_put(mr); - } - return; - -reset_mrs: - pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc); - - /* Find and reset the MRs in the LOCAL_INV WRs that did not - * get posted. - */ + trace_xprtrdma_post_linv(req, rc); while (bad_wr) { frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); mr = container_of(frwr, struct rpcrdma_mr, frwr); - - __frwr_mr_reset(ia, mr); - bad_wr = bad_wr->next; + + frwr_mr_recycle(mr); } - goto unmap; } -const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { - .ro_map = frwr_op_map, - .ro_send = frwr_op_send, - .ro_reminv = frwr_op_reminv, - .ro_unmap_sync = frwr_op_unmap_sync, - .ro_recover_mr = frwr_op_recover_mr, - .ro_open = frwr_op_open, - .ro_maxpages = frwr_op_maxpages, - .ro_init_mr = frwr_op_init_mr, - .ro_release_mr = frwr_op_release_mr, - .ro_displayname = "frwr", - .ro_send_w_inv_ok = RPCRDMA_CMP_F_SND_W_INV_OK, -}; +/** + * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC + * @cq: completion queue + * @wc: WCE for a completed LocalInv WR + * + */ +static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_rep *rep = mr->mr_req->rl_reply; + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li_done(wc, frwr); + __frwr_release_mr(wc, mr); + + /* Ensure @rep is generated before __frwr_release_mr */ + smp_rmb(); + rpcrdma_complete_rqst(rep); + + rpcrdma_flush_disconnect(cq->cq_context, wc); +} + +/** + * frwr_unmap_async - invalidate memory regions that were registered for @req + * @r_xprt: controlling transport instance + * @req: rpcrdma_req with a non-empty list of MRs to process + * + * This guarantees that registered MRs are properly fenced from the + * server before the RPC consumer accesses the data in them. It also + * ensures proper Send flow control: waking the next RPC waits until + * this RPC has relinquished all its Send Queue entries. + */ +void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct ib_send_wr *first, *last, **prev; + const struct ib_send_wr *bad_wr; + struct rpcrdma_frwr *frwr; + struct rpcrdma_mr *mr; + int rc; + + /* Chain the LOCAL_INV Work Requests and post them with + * a single ib_post_send() call. + */ + frwr = NULL; + prev = &first; + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { + + trace_xprtrdma_mr_localinv(mr); + r_xprt->rx_stats.local_inv_needed++; + + frwr = &mr->frwr; + frwr->fr_cqe.done = frwr_wc_localinv; + last = &frwr->fr_invwr; + last->next = NULL; + last->wr_cqe = &frwr->fr_cqe; + last->sg_list = NULL; + last->num_sge = 0; + last->opcode = IB_WR_LOCAL_INV; + last->send_flags = IB_SEND_SIGNALED; + last->ex.invalidate_rkey = mr->mr_handle; + + *prev = last; + prev = &last->next; + } + + /* Strong send queue ordering guarantees that when the + * last WR in the chain completes, all WRs in the chain + * are complete. The last completion will wake up the + * RPC waiter. + */ + frwr->fr_cqe.done = frwr_wc_localinv_done; + + /* Transport disconnect drains the receive CQ before it + * replaces the QP. The RPC reply handler won't call us + * unless re_id->qp is a valid pointer. + */ + bad_wr = NULL; + rc = ib_post_send(r_xprt->rx_ep->re_id->qp, first, &bad_wr); + if (!rc) + return; + + /* Recycle MRs in the LOCAL_INV chain that did not get posted. + */ + trace_xprtrdma_post_linv(req, rc); + while (bad_wr) { + frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); + mr = container_of(frwr, struct rpcrdma_mr, frwr); + bad_wr = bad_wr->next; + + frwr_mr_recycle(mr); + } + + /* The final LOCAL_INV WR in the chain is supposed to + * do the wake. If it was never posted, the wake will + * not happen, so wake here in that case. + */ + rpcrdma_complete_rqst(req->rl_reply); +} -- Gitblit v1.6.2