Viewing: efalnd.c

// SPDX-License-Identifier: GPL-2.0

/*
 * Copyright (c) 2023-2026, Amazon and/or its affiliates. All rights reserved.
 * Use is subject to license terms.
 */

/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * Author: Yehuda Yitschak <yehuday@amazon.com>
 * Author: Yonatan Nachum <ynachum@amazon.com>
 */

#include <linux/delay.h>
#include <linux/device.h>
#include <linux/dmapool.h>
#include <linux/ethtool.h>
#include <linux/inet.h>
#include <linux/inetdevice.h>
#include <linux/pci.h>
#include <linux/random.h>
#include <linux/smp.h>

#include <rdma/ib_verbs.h>

#include "kcompat.h"
#include "efalnd.h"

static const struct lnet_lnd the_efalnd;
struct kefa_data kefalnd;

#ifndef DRV_MODULE_VERSION
#define DRV_MODULE_VERSION \
	__stringify(EFALND_MAJOR_VER) "."   \
	__stringify(EFALND_MINOR_VER) "."   \
	__stringify(EFALND_SUBMINOR_VER)
#endif

#define MAX_CQE_BATCH	16
#define SQ_DEPTH	4096
#define RQ_DEPTH	4096
#define CQ_DEPTH	(SQ_DEPTH + RQ_DEPTH)

#define kefalnd_thread_start(fn, data, namefmt, arg...)			\
	({								\
		struct task_struct *__task = kthread_run(fn, data, namefmt, ##arg); \
		if (!IS_ERR(__task))					\
			atomic_inc(&kefalnd.nthreads);			\
		PTR_ERR_OR_ZERO(__task);				\
	})

static char *
kefalnd_msgtype2str(int type)
{
	static char *msg_name[EFALND_MSG_MAX] = {
		[EFALND_MSG_RESERVED] = "RESERVED",
		[EFALND_MSG_CONN_PROBE] = "CONN_PROBE",
		[EFALND_MSG_CONN_PROBE_RESP] = "CONN_PROBE_RESP",
		[EFALND_MSG_CONN_REQ] = "CONN_REQ",
		[EFALND_MSG_CONN_REQ_ACK] = "CONN_REQ_ACK",
		[EFALND_MSG_IMMEDIATE] = "IMMEDIATE",
		[EFALND_MSG_NACK] = "NACK",
		[EFALND_MSG_PUTR_REQ] = "PUTR_REQ",
		[EFALND_MSG_PUTR_DONE] = "PUTR_DONE",
		[EFALND_MSG_GETR_REQ] = "GETR_REQ",
		[EFALND_MSG_GETR_ACK] = "GETR_ACK",
		[EFALND_MSG_GETR_DONE] = "GETR_DONE",
	};

	if (type >= EFALND_MSG_MAX)
		return "UKNOWN";

	return msg_name[type];
}

int
kefalnd_msgtype2size(int type, u8 proto_ver)
{
	int hdr_size_v2 = offsetof(struct kefa_msg, msg_v2.u);
	int hdr_size_v1 = offsetof(struct kefa_msg, msg_v1.u);

	switch (type) {
	case EFALND_MSG_IMMEDIATE:
		return offsetof(struct kefa_msg, msg_v2.u.immediate.payload[0]);

	case EFALND_MSG_PUTR_REQ:
		return hdr_size_v2 + sizeof(struct kefa_putr_req_msg_v2);

	case EFALND_MSG_GETR_REQ:
		return hdr_size_v2 + sizeof(struct kefa_getr_req_msg_v2);

	case EFALND_MSG_NACK:
	case EFALND_MSG_PUTR_DONE:
	case EFALND_MSG_GETR_DONE:
		return hdr_size_v2 + sizeof(struct kefa_completion_msg);

	case EFALND_MSG_GETR_ACK:
		return hdr_size_v2 + sizeof(struct kefa_getr_ack_msg);

	case EFALND_MSG_CONN_PROBE:
		if (proto_ver == EFALND_PROTO_VER_1)
			return hdr_size_v1 + sizeof(struct kefa_conn_probe_msg);

		return hdr_size_v2 + sizeof(struct kefa_conn_probe_msg);

	case EFALND_MSG_CONN_PROBE_RESP:
		if (proto_ver == EFALND_PROTO_VER_1)
			return hdr_size_v1 + sizeof(struct kefa_conn_probe_resp_msg);

		return hdr_size_v2 + sizeof(struct kefa_conn_probe_resp_msg);

	case EFALND_MSG_CONN_REQ:
		return offsetof(struct kefa_msg,
				msg_v2.u.conn_request.data_qps[0]);

	case EFALND_MSG_CONN_REQ_ACK:
		return offsetof(struct kefa_msg,
				msg_v2.u.conn_request_ack.data_qps[0]);

	default:
		return -1;
	}
}

int
kefalnd_efa_status_to_errno(s16 efa_status)
{
	switch (efa_status) {
	case KEFA_COMP_STATUS_OK:
		return 0;

	case KEFA_COMP_STATUS_UNSUPPORTED_OP:
		return -EOPNOTSUPP;

	case KEFA_COMP_STATUS_NO_MEMORY:
		return -ENOMEM;

	case KEFA_COMP_STATUS_COMM_FAILURE:
		return -ECOMM;

	case KEFA_COMP_STATUS_NO_LNET_MSG:
		return -ENODATA;

	case KEFA_COMP_STATUS_BAD_ADDRESS:
		return -EFAULT;

	case KEFA_COMP_STATUS_UNSUPPORTED_PROTO:
		return -EPROTONOSUPPORT;

	case KEFA_COMP_STATUS_DMA_FAILURE:
	case KEFA_COMP_STATUS_GENERAL_ERROR:
	default:
		return -EREMOTEIO;
	}
}

s16
kefalnd_errno_to_efa_status(int status)
{
	switch (status) {
	case 0:
		return KEFA_COMP_STATUS_OK;

	case -EOPNOTSUPP:
		return KEFA_COMP_STATUS_UNSUPPORTED_OP;

	case -ENOMEM:
		return KEFA_COMP_STATUS_NO_MEMORY;

	case -ECOMM:
		return KEFA_COMP_STATUS_COMM_FAILURE;

	case -ENODATA:
		return KEFA_COMP_STATUS_NO_LNET_MSG;

	case -EFAULT:
		return KEFA_COMP_STATUS_BAD_ADDRESS;

	case -EPROTONOSUPPORT:
		return KEFA_COMP_STATUS_UNSUPPORTED_PROTO;

	case -EIO:
		return KEFA_COMP_STATUS_DMA_FAILURE;

	case -EINVAL:
	default:
		return KEFA_COMP_STATUS_GENERAL_ERROR;
	}
}

static unsigned int
kefalnd_get_dev_prio(struct lnet_ni *ni, unsigned int dev_idx)
{
	struct kefa_ni *efa_ni = ni->ni_data;
	struct device *dev = NULL;

	if (efa_ni)
		dev = efa_ni->efa_dev->ib_dev->dma_device;

	return lnet_get_dev_prio(dev, dev_idx);
}

static inline int kefalnd_dma_map_sg(struct kefa_dev *efa_dev,
				     struct scatterlist *sg, int nents,
				     enum dma_data_direction direction)
{
	int count;

	count = lnet_rdma_map_sg_attrs(efa_dev->ib_dev->dma_device,
				       sg, nents, direction);

	if (count != 0)
		return count;

	count = ib_dma_map_sg(efa_dev->ib_dev, sg, nents, direction);
	return count ?: -EIO;
}

static inline void kefalnd_dma_unmap_sg(struct kefa_dev *efa_dev,
					struct scatterlist *sg, int nents,
					enum dma_data_direction direction)
{
	int count;

	count = lnet_rdma_unmap_sg(efa_dev->ib_dev->dma_device,
				   sg, nents, direction);
	if (count != 0)
		return;

	ib_dma_unmap_sg(efa_dev->ib_dev, sg, nents, direction);
}

void
kefalnd_get_srcnid_from_msg(struct kefa_msg *msg, struct lnet_nid *srcnid)
{
	if (msg->hdr.proto_ver != EFALND_PROTO_VER_1)
		*srcnid = msg->msg_v2.srcnid;
	else
		lnet_nid4_to_nid(msg->msg_v1.srcnid, srcnid);
}

void
kefalnd_get_dstnid_from_msg(struct kefa_msg *msg, struct lnet_nid *dstnid)
{
	if (msg->hdr.proto_ver != EFALND_PROTO_VER_1)
		*dstnid = msg->msg_v2.dstnid;
	else
		lnet_nid4_to_nid(msg->msg_v1.dstnid, dstnid);
}

static inline struct kefa_getr_ack_msg *
kefalnd_get_getr_ack_from_msg(struct kefa_msg *msg)
{
	return &msg->msg_v2.u.getr_ack;
}

static inline struct kefa_completion_msg *
kefalnd_get_completion_from_msg(struct kefa_msg *msg)
{
	return &msg->msg_v2.u.completion;
}

static int
kefalnd_obj_pool_init(struct kefa_ni *efa_ni, struct kefa_obj_pool *pool,
		      u32 pool_size, int cpt, size_t obj_size)
{
	pool->efa_ni = efa_ni;
	pool->pool_size = pool_size;
	pool->cpt = cpt;
	atomic_set(&pool->pending_work, false);
	spin_lock_init(&pool->lock);
	INIT_LIST_HEAD(&pool->free_obj);
	INIT_LIST_HEAD(&pool->free_pend_obj);

	LIBCFS_CPT_ALLOC(pool->obj_arr, lnet_cpt_table(),
			 cpt, pool_size * obj_size);
	if (!pool->obj_arr)
		return -ENOMEM;

	return 0;
}

static void
kefalnd_obj_pool_free(struct kefa_obj_pool *pool, struct list_head *node)
{
	unsigned long flags;

	spin_lock_irqsave(&pool->lock, flags);
	list_add_tail(node, &pool->free_obj);
	spin_unlock_irqrestore(&pool->lock, flags);
}

static void
kefalnd_obj_pool_put_on_pend_list(struct kefa_obj_pool *pool,
				  struct list_head *node)
{
	unsigned long flags;

	spin_lock_irqsave(&pool->lock, flags);
	list_add_tail(node, &pool->free_pend_obj);
	atomic_set(&pool->pending_work, true);
	spin_unlock_irqrestore(&pool->lock, flags);
}

static struct list_head *
kefalnd_obj_pool_alloc(struct kefa_obj_pool *pool)
{
	unsigned long flags;
	struct list_head *node;

	spin_lock_irqsave(&pool->lock, flags);

	if (list_empty(&pool->free_obj)) {
		spin_unlock_irqrestore(&pool->lock, flags);
		return NULL;
	}

	/* Get first object from the list. */
	node = pool->free_obj.next;
	list_del_init(node);

	spin_unlock_irqrestore(&pool->lock, flags);
	return node;
}

static inline struct kefa_fmr *
kefalnd_fmr_pool_alloc(struct kefa_obj_pool *fmr_pool)
{
	struct list_head *node;

	node = kefalnd_obj_pool_alloc(fmr_pool);
	return node ? list_entry(node, struct kefa_fmr, list_node) : NULL;
}

static struct kefa_fmr *
kefalnd_get_idle_fmr(struct kefa_ni *efa_ni)
{
	struct kefa_obj_pool *fmr_pool = &efa_ni->efa_dev->fmr_pool;
	struct kefa_fmr *fmr;

	fmr = kefalnd_fmr_pool_alloc(fmr_pool);
	if (!fmr)
		return NULL;

	LASSERT(fmr->state == KEFA_FMR_INACTIVE);

	return fmr;
}

static inline struct kefa_tx *
kefalnd_tx_pool_alloc(struct kefa_obj_pool *tx_pool)
{
	struct list_head *node;

	node = kefalnd_obj_pool_alloc(tx_pool);
	return node ? list_entry(node, struct kefa_tx, list_node) : NULL;
}

struct kefa_tx *
kefalnd_get_idle_tx(struct kefa_ni *efa_ni)
{
	struct kefa_obj_pool *tx_pool = &efa_ni->tx_pool;
	struct kefa_tx *tx;

	tx = kefalnd_tx_pool_alloc(tx_pool);
	if (!tx)
		return NULL;

	LASSERT(!tx->conn);
	LASSERT(!tx->lntmsg[0]);
	LASSERT(!tx->lntmsg[1]);

	tx->hstatus = LNET_MSG_STATUS_OK;
	tx->status = 0;

	return tx;
}

static inline u64
kefalnd_tx_to_idx(struct kefa_tx *tx)
{
	return tx - (struct kefa_tx *)tx->tx_pool->obj_arr;
}

static inline struct kefa_tx *
kefalnd_get_tx_by_idx(struct kefa_ni *efa_ni, u64 tx_idx)
{
	struct kefa_obj_pool *tx_pool = &efa_ni->tx_pool;

	if (tx_idx >= tx_pool->pool_size) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "received out of range TX[%llu] max[%u]\n",
			    tx_idx, tx_pool->pool_size);
		return NULL;
	}

	return (struct kefa_tx *)tx_pool->obj_arr + tx_idx;
}

static inline void
kefalnd_init_tx_protocol_sge(struct kefa_tx *tx, u32 lkey, u64 addr,
			     unsigned int len)
{
	struct ib_sge *sge = &tx->sge;

	*sge = (struct ib_sge) {
		.lkey	= lkey,
		.addr	= addr,
		.length = len,
	};
}

static int
kefalnd_init_msg(struct kefa_msg *msg, struct kefa_conn *conn, u8 proto_ver,
		 int type, int body_nob)
{
	struct kefa_hdr *hdr = &msg->hdr;
	struct kefa_msg_v1 *msg_v1;
	struct kefa_msg_v2 *msg_v2;
	int nob;

	hdr->magic = EFALND_MSG_MAGIC;
	hdr->proto_ver = proto_ver;
	hdr->type = type;

	if (proto_ver == EFALND_PROTO_VER_1) {
		msg_v1 = &msg->msg_v1;
		nob = offsetof(struct kefa_msg, msg_v1.u) + body_nob;
		LASSERT(nob <= EFALND_MSG_SIZE);
		hdr->nob = nob;

		msg_v1->srcnid = lnet_nid_to_nid4(&conn->local_nid);
		msg_v1->dstnid = lnet_nid_to_nid4(&conn->remote_nid);
		msg_v1->credits = 0;
		msg_v1->dst_conn_id = EFALND_INV_CONN;
	} else {
		msg_v2 = &msg->msg_v2;
		nob = offsetof(struct kefa_msg, msg_v2.u) + body_nob;
		LASSERT(nob <= EFALND_MSG_SIZE);
		hdr->nob = nob;

		msg_v2->srcnid = conn->local_nid;
		msg_v2->dstnid = conn->remote_nid;
		msg_v2->credits = 0;
		msg_v2->dst_conn_id = EFALND_INV_CONN;
	}

	return nob;
}

void
kefalnd_init_tx_protocol_msg(struct kefa_tx *tx, struct kefa_conn *conn,
			     int type, int body_nob, u8 proto_ver)
{
	struct ib_srd_rdma_wr *wrq;
	int total_nob;

	total_nob = kefalnd_init_msg(tx->msg, conn, proto_ver, type, body_nob);
	tx->type = type;

	wrq = &tx->wrq;
	*wrq = (struct ib_srd_rdma_wr) {
		.wr.wr = {
			.wr_id		= (u64)tx,
			.num_sge	= 1,
			.sg_list	= &tx->sge,
			.opcode		= IB_WR_SEND,
			.send_flags	= IB_SEND_SIGNALED,
			.next		= NULL,
		},
	};

	kefalnd_init_tx_protocol_sge(tx, tx->lkey, tx->msgaddr, total_nob);
}

static struct kefa_remote_qp *
kefalnd_conn_get_remote_qp(struct kefa_conn *conn)
{
	int remote_qpn = atomic_inc_return_relaxed(&conn->last_qp_idx);

	return conn->data_qps + ((u32)remote_qpn % conn->nqps);
}

static void
kefalnd_set_tx_remote_data(struct kefa_conn *conn, struct kefa_tx *tx)
{
	struct kefa_remote_qp *qp = kefalnd_conn_get_remote_qp(conn);
	struct ib_srd_wr *srd_wr;

	srd_wr = &tx->wrq.wr;
	srd_wr->ah = conn->ah;
	srd_wr->remote_qpn = qp->qp_num;
	srd_wr->remote_qkey = qp->qkey;
}

static inline struct kefa_qp *
kefalnd_device_get_qp(struct kefa_dev *efa_dev)
{
	int local_qpn = atomic_inc_return_relaxed(&efa_dev->local_qpn);

	return efa_dev->qps + (local_qpn % efa_dev->nqps);
}

void
kefalnd_conn_post_tx_locked(struct kefa_conn *conn)
__must_hold(&conn->lock)
{
	struct ib_send_wr *bad = NULL;
	struct kefa_tx *tx, *temp_tx;
	struct ib_send_wr *wr;
	struct kefa_qp *qp;
	time64_t now;
	int rc;

	qp = kefalnd_device_get_qp(conn->efa_ni->efa_dev);
	now = ktime_get_seconds();

	list_for_each_entry_safe(tx, temp_tx, &conn->pend_tx, list_node) {
		if (tx->fmr && tx->fmr->state == KEFA_FMR_ACTIVATING) {
			wr = &tx->fmr->reg_wr.wr;
		} else {
			wr = &tx->wrq.wr.wr;
			kefalnd_set_tx_remote_data(conn, tx);
			kefalnd_msg_set_epoch(tx->msg, conn->remote_epoch);
		}

		atomic64_set(&tx->send_time, now);
		atomic_inc(&tx->ref_cnt);
		list_move_tail(&tx->list_node, &conn->active_tx);
		rc = ib_post_send(qp->ib_qp, wr,
				  (const struct ib_send_wr **)&bad);
		if (rc) {
			if (rc != -ENOMEM) {
				/* We don't expect anything other than -ENOMEM here. */
				EFA_DEV_WARN(qp->efa_dev,
					     "QP[%u] failed to post send. err[%d]\n",
					     qp->ib_qp->qp_num, rc);
			}

			atomic64_set(&tx->send_time, 0);
			atomic_dec(&tx->ref_cnt);
			list_move(&tx->list_node, &conn->pend_tx);

			/* TODO - TX might stay stuck on connection.
			 * Need to trigger kefalnd_conn_post_tx_locked() from
			 * other context.
			 */
			break;
		}
	}
}

static void
kefalnd_launch_tx(struct kefa_conn *conn, struct kefa_tx *tx)
{
	unsigned long flags;

	spin_lock_irqsave(&conn->lock, flags);
	if (!list_empty(&tx->list_node))
		list_del_init(&tx->list_node);

	list_add_tail(&tx->list_node, &conn->pend_tx);

	tx->conn = conn;

	/* TODO - consider all pending connections. i.e. connection arbitration */
	if (conn->state == KEFA_CONN_ACTIVE)
		kefalnd_conn_post_tx_locked(conn);
	spin_unlock_irqrestore(&conn->lock, flags);
}

static inline void
kefalnd_post_finv_failure(struct kefa_dev *efa_dev, struct kefa_qp *qp,
			  struct kefa_fmr *fmr, int rc)
{
	if (rc != -ENOMEM) {
		/* We don't expect anything other than -ENOMEM here. */
		EFA_DEV_WARN(efa_dev,
			     "QP[%u] failed to post FINV[0x%x]. err[%d]\n",
			     qp->ib_qp->qp_num, fmr->mr->lkey, rc);
	}
}

static void
kefalnd_launch_pending_finvs(struct kefa_dev *efa_dev)
{
	struct kefa_obj_pool *fmr_pool = &efa_dev->fmr_pool;
	struct kefa_fmr *fmr, *temp_fmr;
	struct ib_send_wr *wr;
	unsigned long flags;
	struct kefa_qp *qp;
	int rc;

	spin_lock_irqsave(&fmr_pool->lock, flags);
	list_for_each_entry_safe(fmr, temp_fmr, &fmr_pool->free_pend_obj, list_node) {
		wr = &fmr->inv_wr;
		qp = kefalnd_device_get_qp(efa_dev);
		list_del_init(&fmr->list_node);
		rc = ib_post_send(qp->ib_qp, wr, NULL);
		if (rc) {
			kefalnd_post_finv_failure(efa_dev, qp, fmr, rc);
			list_add(&fmr->list_node, &fmr_pool->free_pend_obj);
			break;
		}
	}

	if (list_empty(&fmr_pool->free_pend_obj))
		atomic_set(&fmr_pool->pending_work, false);

	spin_unlock_irqrestore(&fmr_pool->lock, flags);
}

static void
kefalnd_launch_finv(struct kefa_dev *efa_dev, struct kefa_fmr *fmr)
{
	struct ib_send_wr *wr;
	struct kefa_qp *qp;
	int rc;

	wr = &fmr->inv_wr;
	qp = kefalnd_device_get_qp(efa_dev);
	rc = ib_post_send(qp->ib_qp, wr, NULL);
	if (rc) {
		kefalnd_post_finv_failure(efa_dev, qp, fmr, rc);
		kefalnd_obj_pool_put_on_pend_list(&efa_dev->fmr_pool,
						  &fmr->list_node);
	}

	if (atomic_read(&efa_dev->fmr_pool.pending_work))
		kefalnd_launch_pending_finvs(efa_dev);
}

static void
kefalnd_unmap_tx(struct kefa_tx *tx)
{
	struct kefa_dev *efa_dev = tx->tx_pool->efa_ni->efa_dev;
	struct kefa_fmr *fmr;

	if (tx->fmr) {
		fmr = tx->fmr;
		if (fmr->state == KEFA_FMR_ACTIVE) {
			fmr->state = KEFA_FMR_DEACTIVATING;
			kefalnd_launch_finv(efa_dev, fmr);
		} else {
			fmr->state = KEFA_FMR_INACTIVE;
			kefalnd_obj_pool_free(&efa_dev->fmr_pool,
					      &fmr->list_node);
		}

		tx->fmr = NULL;
	}

	if (tx->nfrags) {
		kefalnd_dma_unmap_sg(efa_dev, tx->frags, tx->nfrags,
				     tx->dmadir);
		tx->nfrags = 0;
	}
}

static int
kefalnd_map_tx(struct kefa_ni *efa_ni, struct kefa_tx *tx,
	       bool remote_access_fmr)
{
	struct kefa_dev *efa_dev = efa_ni->efa_dev;
	struct kefa_rdma_desc *rd = &tx->rdma_desc;
	int i, sg_nsegs, fmr_nsegs;
	struct ib_send_wr *inv_wr;
	struct ib_reg_wr *reg_wr;
	u32 nob;

	tx->dmadir = remote_access_fmr ? DMA_TO_DEVICE : DMA_FROM_DEVICE;
	sg_nsegs = kefalnd_dma_map_sg(efa_dev, tx->frags, tx->nfrags, tx->dmadir);
	if (unlikely(sg_nsegs < 0)) {
		EFA_DEV_ERR(efa_dev, "Failed to DMA map TX, err %d\n", sg_nsegs);
		return sg_nsegs;
	}

	for (i = 0, nob = 0; i < sg_nsegs; i++)
		nob += sg_dma_len(&tx->frags[i]);

	tx->fmr = kefalnd_get_idle_fmr(efa_ni);
	if (!tx->fmr)
		return -ENOMEM;

	fmr_nsegs = ib_map_mr_sg(tx->fmr->mr, tx->frags, tx->nfrags, 0,
				 PAGE_SIZE);
	if (unlikely(fmr_nsegs != sg_nsegs)) {
		EFA_DEV_ERR(efa_dev, "Failed to map MR, %d/%d elements\n",
			    fmr_nsegs, sg_nsegs);
		return fmr_nsegs < 0 ? fmr_nsegs : -EIO;
	}

	efa_inc_fast_reg_key_gen(tx->fmr->mr);
	tx->fmr->state = KEFA_FMR_ACTIVATING;

	reg_wr = &tx->fmr->reg_wr;
	memset(reg_wr, 0, sizeof(*reg_wr));
	reg_wr->wr.opcode = IB_WR_REG_MR;
	reg_wr->wr.wr_id  = (u64)tx;
	reg_wr->wr.num_sge = 0;
	reg_wr->wr.send_flags = 0;
	reg_wr->mr = tx->fmr->mr;
	reg_wr->key = tx->fmr->mr->lkey;
	reg_wr->access = IB_ACCESS_LOCAL_WRITE;
	if (remote_access_fmr)
		reg_wr->access |= IB_ACCESS_REMOTE_READ;

	inv_wr = &tx->fmr->inv_wr;
	memset(inv_wr, 0, sizeof(*inv_wr));
	inv_wr->opcode = IB_WR_LOCAL_INV;
	inv_wr->wr_id  = (u64)tx->fmr;
	inv_wr->ex.invalidate_rkey = tx->fmr->mr->lkey;

	rd->nob = nob;
	rd->addr = sg_dma_address(&tx->frags[0]);
	rd->key = remote_access_fmr ? tx->fmr->mr->rkey : tx->fmr->mr->lkey;

	return 0;
}

static int
kefalnd_bio_vec_to_sgl(struct kefa_ni *efa_ni, struct scatterlist *sg,
		       struct bio_vec *kiov, int nkiov, int offset, int nob)
{
	int fragnob, max_nkiov, sg_count = 0;

	LASSERT(nob > 0);
	LASSERT(nkiov > 0);

	/* Trasnalate from bio_vec to sg to use for mapping */
	while (offset >= kiov->bv_len) {
		offset -= kiov->bv_len;
		nkiov--;
		kiov++;
		LASSERT(nkiov > 0);
	}

	max_nkiov = nkiov;
	do {
		LASSERT(nkiov > 0);

		if (!sg) {
			EFA_DEV_ERR(efa_ni->efa_dev,
				    "lacking enough sg entries to map TX\n");
			return -EFAULT;
		}
		sg_count++;

		fragnob = min((int)(kiov->bv_len - offset), nob);

		/*
		 * We're allowed to start at a non-aligned page offset in
		 * the first fragment and end at a non-aligned page offset
		 * in the last fragment.
		 */
		if ((fragnob < (int)(kiov->bv_len - offset)) &&
		    nkiov < max_nkiov && nob > fragnob) {
			CDEBUG(D_NET,
			       "fragnob %d < available page %d: with remaining %d kiovs with %d nob left\n",
			       fragnob, (int)(kiov->bv_len - offset), nkiov, nob);

			EFA_DEV_ERR(efa_ni->efa_dev, "no gaps support\n");
			return -EINVAL;
		}

		sg_set_page(sg, kiov->bv_page, fragnob,
			    kiov->bv_offset + offset);
		sg = sg_next(sg);

		offset = 0;
		kiov++;
		nkiov--;
		nob -= fragnob;
	} while (nob > 0);

	return sg_count;
}

static int
kefalnd_map_msg_iov(struct kefa_ni *efa_ni, struct kefa_tx *tx, int nkiov,
		    struct bio_vec *kiov, int offset, int nob,
		    bool remote_access_fmr)
{
	int rc;

	rc = kefalnd_bio_vec_to_sgl(efa_ni, tx->frags, kiov, nkiov,
				    offset, nob);
	if (rc < 0)
		goto out;

	tx->nfrags = rc;

	/* Map the SGs to our device */
	rc = kefalnd_map_tx(efa_ni, tx, remote_access_fmr);

out:
	if (rc != 0)
		tx->hstatus = LNET_MSG_STATUS_LOCAL_ERROR;

	return rc;
}

static void
kefalnd_init_comp_message(struct kefa_conn *conn, struct kefa_tx *tx, int type,
			  int status, u64 cookie)
{
	struct kefa_completion_msg *completion;

	kefalnd_init_tx_protocol_msg(tx, conn, type,
				     sizeof(struct kefa_completion_msg),
				     conn->proto_ver);

	completion = kefalnd_get_completion_from_msg(tx->msg);
	completion->cookie = cookie;
	completion->status = kefalnd_errno_to_efa_status(status);
}

static inline void
kefalnd_set_sync_data(struct kefa_tx *tx, u64 cookie)
{
	tx->send_sync = true;
	tx->cookie = cookie;
}

static void
kefalnd_send_sync_msg(struct kefa_tx *tx)
{
	LASSERT(tx->type == EFALND_MSG_PUTR_DONE ||
		tx->type == EFALND_MSG_GETR_DONE);

	CDEBUG(D_NET, "Sending last ctrl for TX type[%s] from[%s] to[%s]\n",
	       kefalnd_msgtype2str(tx->type),
	       libcfs_nidstr(&tx->conn->local_nid),
	       libcfs_nidstr(&tx->conn->remote_nid));

	tx->send_sync = false;
	kefalnd_init_comp_message(tx->conn, tx, tx->type, tx->status,
				  tx->cookie);
	kefalnd_launch_tx(tx->conn, tx);
}

void
kefalnd_tx_done(struct kefa_tx *tx)
{
	unsigned long flags;
	int i;

	/* Send last control message once all RDMAs completed.
	 * re-use the connection and message types set during RDMA submit
	 */
	if (tx->send_sync) {
		kefalnd_send_sync_msg(tx);
		return;
	}

	if (tx->conn) {
		spin_lock_irqsave(&tx->conn->lock, flags);
		if (!list_empty(&tx->list_node))
			list_del_init(&tx->list_node);

		spin_unlock_irqrestore(&tx->conn->lock, flags);
	}

	CDEBUG(D_NET, "Completed TX type[%s] from[%s] to[%s]\n",
	       kefalnd_msgtype2str(tx->type),
	       tx->conn ? libcfs_nidstr(&tx->conn->local_nid) : "NA",
	       tx->conn ? libcfs_nidstr(&tx->conn->remote_nid) : "NA");

	LASSERT(atomic_read(&tx->ref_cnt) == 0);
	LASSERT(list_empty(&tx->list_node));
	atomic64_set(&tx->send_time, 0);
	kefalnd_unmap_tx(tx);

	for (i = 0; i < 2; i++) {
		if (tx->lntmsg[i] == NULL)
			continue;

		/* propagate health status to LNet for requests */
		if (i == 0 && tx->lntmsg[i])
			tx->lntmsg[i]->msg_health_status = tx->hstatus;

		CDEBUG(D_NET, "Finalizing TX type[%s] from[%s] to[%s]\n",
		       kefalnd_msgtype2str(tx->type),
		       tx->conn ? libcfs_nidstr(&tx->conn->local_nid) : "NA",
		       tx->conn ? libcfs_nidstr(&tx->conn->remote_nid) : "NA");

		lnet_finalize(tx->lntmsg[i], tx->status);
		tx->lntmsg[i] = NULL;
	}

	tx->hstatus = LNET_MSG_STATUS_OK;
	tx->conn = NULL;
	tx->dmadir = DMA_BIDIRECTIONAL;
	tx->type = 0;

	kefalnd_obj_pool_free(tx->tx_pool, &tx->list_node);
}

void
kefalnd_abort_tx(struct kefa_tx *tx, enum lnet_msg_hstatus hstatus, int status)
{
	EFA_DEV_WARN(tx->conn->efa_ni->efa_dev,
		     "aborting TX type[%s] to peer NI[%s]\n",
		     kefalnd_msgtype2str(tx->type),
		     libcfs_nidstr(&tx->conn->remote_nid));

	tx->send_sync = false;
	tx->hstatus = hstatus;
	tx->status = status;

	/* Make sure response message refcount decreased only once */
	if (!atomic_xchg_relaxed(&tx->waiting_resp, false))
		return;

	if (atomic_dec_and_test(&tx->ref_cnt))
		kefalnd_tx_done(tx);
}

void
kefalnd_force_cancel_tx(struct kefa_tx *tx, enum lnet_msg_hstatus hstatus,
			int status)
{
	EFA_DEV_WARN(tx->conn->efa_ni->efa_dev,
		     "canceling TX type[%s] to peer NI[%s]\n",
		     kefalnd_msgtype2str(tx->type),
		     libcfs_nidstr(&tx->conn->remote_nid));

	tx->send_sync = false;
	tx->hstatus = hstatus;
	tx->status = status;

	atomic_set(&tx->waiting_resp, false);
	atomic_set(&tx->ref_cnt, 0);

	kefalnd_tx_done(tx);
}

static void
kefalnd_send_completion(struct kefa_ni *efa_ni, struct kefa_conn *conn,
			int type, int status, u64 cookie)
{
	struct kefa_tx *tx;

	tx = kefalnd_get_idle_tx(efa_ni);
	if (tx == NULL) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "can't allocate %s completion TX to peer NI[%s]\n",
			    kefalnd_msgtype2str(type),
			    libcfs_nidstr(&conn->remote_nid));

		return;
	}

	kefalnd_init_comp_message(conn, tx, type, status, cookie);

	kefalnd_launch_tx(conn, tx);
}

static inline void
kefalnd_fill_getr_msg(struct kefa_conn *conn, struct kefa_tx *tx,
		      struct lnet_hdr *hdr)
{
	struct kefa_msg *msg = tx->msg;

	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_GETR_REQ,
				     sizeof(struct kefa_getr_req_msg_v2),
				     conn->proto_ver);

	lnet_hdr_to_nid16(hdr, &msg->msg_v2.u.getr_req.hdr);
	msg->msg_v2.u.getr_req.sink_cookie = kefalnd_tx_to_idx(tx);
}

static inline void
kefalnd_fill_putr_msg(struct kefa_conn *conn, struct kefa_tx *tx,
		      struct lnet_hdr *hdr)
{
	struct kefa_msg *msg = tx->msg;

	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_PUTR_REQ,
				     sizeof(struct kefa_putr_req_msg_v2),
				     conn->proto_ver);

	lnet_hdr_to_nid16(hdr, &msg->msg_v2.u.putr_req.hdr);
	msg->msg_v2.u.putr_req.cookie = kefalnd_tx_to_idx(tx);
	msg->msg_v2.u.putr_req.rdma_desc = tx->rdma_desc;
}

static inline int
kefalnd_fill_imm_msg(struct kefa_conn *conn, struct lnet_msg *lntmsg,
		     struct kefa_tx *tx, struct lnet_hdr *hdr)
{
	struct kefa_msg *msg = tx->msg;
	struct iov_iter from;
	int body_nob, rc;

	body_nob = offsetof(struct kefa_immediate_msg_v2,
			    payload[lntmsg->msg_len]);
	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_IMMEDIATE, body_nob,
				     conn->proto_ver);
	lnet_hdr_to_nid16(hdr, &msg->msg_v2.u.immediate.hdr);

	iov_iter_bvec(&from, WRITE,
		      lntmsg->msg_kiov, lntmsg->msg_niov,
		      lntmsg->msg_len + lntmsg->msg_offset);
	iov_iter_advance(&from, lntmsg->msg_offset);

	rc = copy_from_iter(&msg->msg_v2.u.immediate.payload,
			    lntmsg->msg_len, &from);

	return rc != lntmsg->msg_len ? -EFAULT : 0;
}

static int
kefalnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg)
{
	struct lnet_processid *target = &lntmsg->msg_target;
	struct lnet_libmd *msg_md = lntmsg->msg_md;
	struct lnet_hdr *hdr = &lntmsg->msg_hdr;
	struct kefa_ni *efa_ni = ni->ni_data;
	int type = lntmsg->msg_type;
	struct kefa_conn *conn;
	struct kefa_tx *tx;
	int nob, rc;
	bool gpu;

	tx = kefalnd_get_idle_tx(efa_ni);
	if (tx == NULL) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "can't allocate %s TX to peer NI[%s]\n",
			    lnet_msgtyp2str(type), libcfs_nidstr(&target->nid));

		return -ENOMEM;
	}

	conn = kefalnd_lookup_or_init_conn(efa_ni, &target->nid,
					   KEFA_CONN_TYPE_INITIATOR);
	if (IS_ERR(conn)) {
		EFA_DEV_DEBUG(efa_ni->efa_dev,
			      "can't establish connection to peer NI[%s]\n",
			      libcfs_nidstr(&target->nid));
		tx->hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
		kefalnd_tx_done(tx);
		return -ENOTCONN;
	}

	CDEBUG(D_NET, "Request to send LNet %s from %s to %s size[%u]\n",
	       lnet_msgtyp2str(type),
	       libcfs_nidstr(&ni->ni_nid),
	       libcfs_nidstr(&target->nid),
	       type == LNET_MSG_GET ? msg_md->md_length : lntmsg->msg_len);

	gpu = lnet_md_is_gpu(msg_md);

	switch (type) {
	default:
		LBUG();
		return (-EIO);

	case LNET_MSG_ACK:
		LASSERT(lntmsg->msg_len == 0);
		break;

	case LNET_MSG_GET:
		/* use RDMA or SEND based on size */
		nob = offsetof(struct kefa_msg, msg_v2.u.immediate.payload[msg_md->md_length]);
		if (nob <= EFALND_NO_RDMA_THRESH && !gpu)
			break;

		/* RDMA based flow */
		rc = kefalnd_map_msg_iov(efa_ni, tx, msg_md->md_niov,
					 msg_md->md_kiov, 0, msg_md->md_length,
					 false);
		if (rc != 0) {
			EFA_DEV_ERR(efa_ni->efa_dev,
				    "can't setup GET destination for peer NI[%s]. err[%d]\n",
				    libcfs_nidstr(&target->nid), rc);

			kefalnd_tx_done(tx);
			return -EIO;
		}

		/* setup the message */
		kefalnd_fill_getr_msg(conn, tx, hdr);
		tx->lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
		if (tx->lntmsg[1] == NULL) {
			EFA_DEV_ERR(efa_ni->efa_dev,
				    "can't create reply for GET for peer NI[%s]\n",
				    libcfs_nidstr(&target->nid));

			tx->hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
			kefalnd_tx_done(tx);
			return -EIO;
		}

		/* finalise lntmsg[0,1] on completion */
		tx->lntmsg[0] = lntmsg;
		atomic_inc(&tx->ref_cnt); /* wait for GETR_{ACK,NACK} */
		atomic_set(&tx->waiting_resp, true);
		kefalnd_launch_tx(conn, tx);
		return 0;

	case LNET_MSG_REPLY:
	case LNET_MSG_PUT:
		/* use RDMA or SEND based on size */
		nob = offsetof(struct kefa_msg, msg_v2.u.immediate.payload[lntmsg->msg_len]);
		if (nob <= EFALND_NO_RDMA_THRESH && !gpu)
			break;

		/* RDMA based flow */
		rc = kefalnd_map_msg_iov(efa_ni, tx, lntmsg->msg_niov,
					 lntmsg->msg_kiov, lntmsg->msg_offset,
					 lntmsg->msg_len, true);
		if (rc != 0) {
			EFA_DEV_ERR(efa_ni->efa_dev,
				    "can't setup PUT src for peer NI[%s]. err[%d]\n",
				    libcfs_nidstr(&target->nid), rc);

			kefalnd_tx_done(tx);
			return -EIO;
		}

		/* setup the message */
		kefalnd_fill_putr_msg(conn, tx, hdr);
		/* finalise lntmsg[0,1] on completion */
		tx->lntmsg[0] = lntmsg;
		atomic_inc(&tx->ref_cnt); /* wait for PUT_DONE */
		atomic_set(&tx->waiting_resp, true);
		kefalnd_launch_tx(conn, tx);
		return 0;
	}

	/* SEND based (non-RDMA flow) */
	rc = kefalnd_fill_imm_msg(conn, lntmsg, tx, hdr);
	if (rc < 0) {
		kefalnd_tx_done(tx);
		return rc;
	}

	/* finalise lntmsg on completion */
	tx->lntmsg[0] = lntmsg;

	kefalnd_launch_tx(conn, tx);

	return 0;
}

static void
kefalnd_init_tx_rdma_read(struct kefa_conn *conn, struct kefa_tx *tx, int type,
			  struct kefa_rdma_desc *src_rdma, u64 dstcookie)
{
	struct kefa_rdma_desc *sink_rdma = &tx->rdma_desc;
	struct ib_srd_rdma_wr *wrq;
	struct ib_send_wr *ib_wr;
	struct ib_sge *sge;
	int sge_nob;

	LASSERT(!in_interrupt());
	LASSERT(type == EFALND_MSG_PUTR_DONE || type == EFALND_MSG_GETR_DONE);

	tx->type = type;

	sge_nob = min(src_rdma->nob, sink_rdma->nob);

	sge = &tx->sge;
	wrq = &tx->wrq;

	sge->addr = sink_rdma->addr;
	sge->lkey = sink_rdma->key;
	sge->length = sge_nob;

	ib_wr = &wrq->wr.wr;

	ib_wr->next = NULL;
	ib_wr->wr_id = (u64)tx;
	ib_wr->sg_list = sge;
	ib_wr->num_sge = 1; /* EFA supports a single SGE for RDMA */
	ib_wr->opcode = IB_WR_RDMA_READ;
	ib_wr->send_flags = 0;

	/* RDMA specific */
	wrq->remote_addr = src_rdma->addr;
	wrq->rkey = src_rdma->key;

	kefalnd_set_sync_data(tx, dstcookie);
}

static int
kefalnd_handle_putr_req(struct kefa_ni *efa_ni, struct kefa_conn *conn,
			struct lnet_msg *lntmsg, struct kefa_rdma_desc *src_rd,
			u64 src_cookie, int nob)
{
	struct kefa_tx *tx;
	int rc = 0;

	if (lntmsg == NULL) {
		kefalnd_send_completion(efa_ni, conn, EFALND_MSG_PUTR_DONE,
					-ENODATA, src_cookie);
		return 0;
	}

	tx = kefalnd_get_idle_tx(efa_ni);
	if (tx == NULL) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "can't allocate %s TX to peer NI[%s]\n",
			    kefalnd_msgtype2str(EFALND_MSG_PUTR_DONE),
			    libcfs_nidstr(&conn->remote_nid));

		return -ENOMEM;
	}

	if (likely(nob != 0))
		rc = kefalnd_map_msg_iov(efa_ni, tx, lntmsg->msg_niov,
					 lntmsg->msg_kiov, lntmsg->msg_offset,
					 nob, false);
	if (unlikely(rc != 0)) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "can't setup GET src for peer NI[%s]. err[%d]\n",
			    libcfs_nidstr(&conn->remote_nid), rc);

		kefalnd_tx_done(tx);
		kefalnd_send_completion(efa_ni, conn, EFALND_MSG_PUTR_DONE, rc,
					src_cookie);
		return rc;
	}

	kefalnd_init_tx_rdma_read(conn, tx, EFALND_MSG_PUTR_DONE, src_rd,
				  src_cookie);

	if (nob == 0) {
		/* No RDMA: local completion may happen now! */
		lnet_finalize(lntmsg, 0);
	} else {
		/* RDMA: lnet_finalize(lntmsg) when it completes */
		tx->lntmsg[0] = lntmsg;
	}

	kefalnd_launch_tx(conn, tx);
	return 0;
}

static inline int
kefalnd_handle_putr_req_v2(struct kefa_ni *efa_ni,
			   struct kefa_conn *conn,
			   struct lnet_msg *lntmsg,
			   struct kefa_putr_req_msg_v2 *putr_req,
			   int nob)
{
	return kefalnd_handle_putr_req(efa_ni, conn, lntmsg,
				       &putr_req->rdma_desc,
				       putr_req->cookie, nob);
}

static int
kefalnd_handle_getr_req(struct kefa_ni *efa_ni, struct kefa_conn *conn,
			struct lnet_msg *lntmsg, u64 sink_cookie)
{
	struct kefa_getr_ack_msg *getr_ack;
	struct kefa_tx *tx;
	unsigned int nob;
	int rc = 0;

	if (lntmsg == NULL) {
		kefalnd_send_completion(efa_ni, conn, EFALND_MSG_NACK, -ENODATA,
					sink_cookie);
		return 0;
	}

	nob = lntmsg->msg_len;

	tx = kefalnd_get_idle_tx(efa_ni);
	if (tx == NULL) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "can't allocate %s TX to peer NI[%s]\n",
			    kefalnd_msgtype2str(EFALND_MSG_GETR_ACK),
			    libcfs_nidstr(&conn->remote_nid));

		return -ENOMEM;
	}

	if (nob != 0)
		rc = kefalnd_map_msg_iov(efa_ni, tx, lntmsg->msg_niov,
					 lntmsg->msg_kiov, lntmsg->msg_offset,
					 nob, true);
	if (rc != 0) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "can't setup GET src for peer NI[%s]. err[%d]\n",
			    libcfs_nidstr(&conn->remote_nid), rc);

		kefalnd_tx_done(tx);
		kefalnd_send_completion(efa_ni, conn, EFALND_MSG_NACK, rc,
					sink_cookie);
		return rc;
	}

	if (nob == 0) {
		/* No RDMA: local completion may happen now! */
		lnet_finalize(lntmsg, 0);
	} else {
		/* RDMA: lnet_finalize(lntmsg) when it completes */
		tx->lntmsg[0] = lntmsg;
	}

	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_GETR_ACK,
				     sizeof(struct kefa_getr_ack_msg),
				     conn->proto_ver);

	getr_ack = kefalnd_get_getr_ack_from_msg(tx->msg);
	getr_ack->sink_cookie = sink_cookie;
	getr_ack->src_cookie = kefalnd_tx_to_idx(tx);
	getr_ack->rdma_desc = tx->rdma_desc;

	atomic_inc(&tx->ref_cnt); /* Wait for GETR_DONE */
	atomic_set(&tx->waiting_resp, true);

	kefalnd_launch_tx(conn, tx);
	return 0;
}

static inline int
kefalnd_handle_getr_req_v2(struct kefa_ni *efa_ni,
			   struct kefa_conn *conn,
			   struct lnet_msg *lntmsg,
			   struct kefa_getr_req_msg_v2 *getr_req)
{
	return kefalnd_handle_getr_req(efa_ni, conn, lntmsg,
				       getr_req->sink_cookie);
}

static int
kefalnd_refill_rx(struct kefa_qp *qp, u32 budget)
{
	struct kefa_rx *rx, *tmp;
	struct ib_recv_wr *prev_wrq = NULL;
	struct ib_recv_wr *first_wrq = NULL;
	struct ib_recv_wr *bad_wrq = NULL;
	unsigned long flags;
	int rc = 0, wr_cnt = 0;

	spin_lock_irqsave(&qp->rq_lock, flags);

	budget = min(budget, qp->rq_space);

	/* prepare a list of recv WRs to submit */
	list_for_each_entry_safe(rx, tmp, &qp->free_rx, list_node) {
		if (budget == 0)
			break;

		LASSERT(rx->rx_nob >= 0);
		rx->rx_nob = -1; /* mark posted */

		list_move_tail(&rx->list_node, &qp->posted_rx);

		if (prev_wrq)
			prev_wrq->next = &rx->wrq;

		if (first_wrq == NULL)
			first_wrq = &rx->wrq;

		rx->wrq.next = NULL;
		prev_wrq = &rx->wrq;
		wr_cnt++;
		budget--;
	}

	if (unlikely(wr_cnt == 0)) {
		spin_unlock_irqrestore(&qp->rq_lock, flags);
		return 0;
	}

	rc = ib_post_recv(qp->ib_qp, first_wrq,
			  (const struct ib_recv_wr **)&bad_wrq);
	if (unlikely(rc != 0)) {
		spin_unlock_irqrestore(&qp->rq_lock, flags);
		EFA_DEV_ERR(qp->efa_dev,
			    "QP[%u] failed to post RX. err[%d], bad_wrq[%p]\n",
			    qp->ib_qp->qp_num, rc, bad_wrq);

		return -EIO;
	}

	qp->rq_space -= wr_cnt;

	spin_unlock_irqrestore(&qp->rq_lock, flags);
	return rc;
}

static int
kefalnd_free_rx(struct kefa_rx *rx)
{
	struct kefa_qp *qp = rx->qp;
	unsigned long flags;

	spin_lock_irqsave(&qp->rq_lock, flags);
	qp->rq_space++;
	list_move_tail(&rx->list_node, &qp->free_rx);
	spin_unlock_irqrestore(&qp->rq_lock, flags);

	return 0;
}

static int
kefalnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg,
	     int delayed, struct iov_iter *to, unsigned int rlen)
{
	struct kefa_ni *efa_ni = ni->ni_data;
	int wanted = iov_iter_count(to);
	struct kefa_rx *rx = private;
	struct kefa_conn *conn;
	struct kefa_msg *msg;
	struct lnet_nid *nid;
	int imm_nob, rc = 0;

	msg = rx->msg;
	nid = &msg->msg_v2.srcnid;
	conn = kefalnd_lookup_conn(efa_ni, nid, KEFA_CONN_TYPE_RESPONDER);
	if (IS_ERR_OR_NULL(conn)) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "failed to get connection for RX from peer NI[%s]. err[%ld]\n",
			    libcfs_nidstr(nid), PTR_ERR(conn));

		return -ENOTCONN;
	}

	switch (msg->hdr.type) {
	default:
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "received unexpected efa msg[%s] from peer NI[%s]\n",
			    kefalnd_msgtype2str(msg->hdr.type),
			    libcfs_nidstr(nid));

		break;

	case EFALND_MSG_IMMEDIATE:
		imm_nob = offsetof(struct kefa_msg, msg_v2.u.immediate.payload[rlen]);
		if (imm_nob > rx->rx_nob) {
			EFA_DEV_ERR(efa_ni->efa_dev,
				    "immediate message from peer NI[%s] too big: %d(%d)\n",
				    libcfs_nidstr(nid), imm_nob, rx->rx_nob);

			rc = -EPROTO;
			break;
		}

		rc = copy_to_iter(&msg->msg_v2.u.immediate.payload, wanted,
				  to);
		if (rc != wanted) {
			rc = -EFAULT;
			break;
		}

		lnet_finalize(lntmsg, 0);
		break;

	case EFALND_MSG_PUTR_REQ:
		rc = kefalnd_handle_putr_req_v2(efa_ni, conn, lntmsg,
						&msg->msg_v2.u.putr_req,
						wanted);
		break;

	case EFALND_MSG_GETR_REQ:
		rc = kefalnd_handle_getr_req_v2(efa_ni, conn, lntmsg,
						&msg->msg_v2.u.getr_req);
		break;
	}

	kefalnd_free_rx(rx);
	return rc;
}

static void
kefalnd_finv_complete(struct kefa_ni *efa_ni, struct ib_wc *wc)
{
	struct kefa_dev *efa_dev = efa_ni->efa_dev;
	struct kefa_fmr *fmr = (void *)wc->wr_id;

	if (!fmr) {
		/* Reaching here means FW or LND did something bad */
		CERROR("cpu[%u] received bad FINV completion with status[%u]\n",
		       smp_processor_id(), wc->status);
		return;
	}

	if (wc->status != IB_WC_SUCCESS) {
		EFA_DEV_WARN(efa_dev,
			     "QP[%u] received FINV[0x%x] completion with err[%u] vendor[%u]\n",
			     wc->qp->qp_num, fmr->mr->lkey, wc->status,
			     wc->vendor_err);

		kefalnd_obj_pool_put_on_pend_list(&efa_dev->fmr_pool,
						  &fmr->list_node);
		return;
	}

	fmr->state = KEFA_FMR_INACTIVE;
	kefalnd_obj_pool_free(&efa_dev->fmr_pool, &fmr->list_node);
}

static void
kefalnd_tx_complete(struct kefa_ni *efa_ni, struct ib_wc *wc)
{
	struct kefa_conn *conn;
	struct kefa_tx *tx;

	tx = (void *)wc->wr_id;
	if (!tx) {
		/* Reaching here means FW or LND did something bad */
		CERROR("cpu[%u] received bad TX completion with status[%u]",
		       smp_processor_id(), wc->status);
		return;
	}

	conn = tx->conn;
	if (!conn) {
		CERROR("TX[%p] received bad conn[%p], type[%s]", tx, conn,
		       kefalnd_msgtype2str(tx->type));

		return;
	}

	if (atomic_read(&tx->ref_cnt) <= 0) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "received completion on free TX\n");
		return;
	}

	if (wc->opcode == IB_WC_REG_MR) {
		if (wc->status == IB_WC_SUCCESS) {
			tx->fmr->state = KEFA_FMR_ACTIVE;
			kefalnd_launch_tx(tx->conn, tx);
		} else {
			EFA_DEV_ERR(efa_ni->efa_dev,
				    "QP[%u] received FRWR[0x%x] completion with err[%u] vendor[%u]\n",
				     wc->qp->qp_num, tx->fmr->mr->lkey,
				     wc->status, wc->vendor_err);

			kefalnd_abort_tx(tx, LNET_MSG_STATUS_LOCAL_DROPPED, -ECOMM);
		}
	} else if (wc->status != IB_WC_SUCCESS) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "QP[%u] received TX[%s] completion with err. opcode[%u] status[%u] vendor[%u] peer_ni[%s]\n",
			    wc->qp->qp_num, kefalnd_msgtype2str(tx->type),
			    wc->opcode, wc->status, wc->vendor_err,
			    libcfs_nidstr(&conn->remote_nid));

		kefalnd_abort_tx(tx, LNET_MSG_STATUS_REMOTE_DROPPED, -ECOMM);
		if (conn->type == KEFA_CONN_TYPE_INITIATOR)
			kefalnd_deactivate_conn(conn);
	}

	if (atomic_dec_and_test(&tx->ref_cnt))
		kefalnd_tx_done(tx);
}

static void
kefalnd_handle_completion(struct kefa_ni *efa_ni,
			  struct kefa_completion_msg *completion)
{
	int status = kefalnd_efa_status_to_errno(completion->status);
	u64 tx_idx = completion->cookie;
	struct kefa_tx *tx;

	tx = kefalnd_get_tx_by_idx(efa_ni, tx_idx);
	if (!tx)
		return;

	/* Response handling might race with TX abort, first 'wins' */
	if (!atomic_xchg_relaxed(&tx->waiting_resp, false))
		return;

	if (tx->status == 0) { /* success so far */
		if (status < 0) { /* failed? */
			tx->status = status;
			tx->hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
		}
	}

	if (atomic_dec_and_test(&tx->ref_cnt))
		kefalnd_tx_done(tx);
}

static int
kefalnd_handle_getr_ack(struct kefa_ni *efa_ni,
			struct kefa_getr_ack_msg *getr_ack)
{
	struct kefa_conn *conn;
	struct kefa_tx *tx;

	tx = kefalnd_get_tx_by_idx(efa_ni, getr_ack->sink_cookie);
	if (!tx)
		return -EINVAL;

	/* Response handling might race with TX abort, first 'wins' */
	if (!atomic_xchg_relaxed(&tx->waiting_resp, false))
		return -EINVAL;

	conn = tx->conn;

	lnet_set_reply_msg_len(efa_ni->lnet_ni, tx->lntmsg[1],
			       getr_ack->rdma_desc.nob);

	/* source has mapped his buffers - let's read */
	kefalnd_init_tx_rdma_read(conn, tx, EFALND_MSG_GETR_DONE,
				  &getr_ack->rdma_desc, getr_ack->src_cookie);

	kefalnd_launch_tx(conn, tx);

	/* remove GETR_{ACK,NACK} reference only after we added RDMA ref_cnt.
	 * We check if TX is done here and not only decreasing the refcnt in
	 * case RDMA is finished before reaching this point.
	 */
	if (atomic_dec_and_test(&tx->ref_cnt))
		kefalnd_tx_done(tx);

	return 0;
}

static void
kefalnd_handle_lnet_request_msg(struct kefa_ni *efa_ni, struct kefa_rx *rx)
{
	struct kefa_msg *msg = rx->msg;
	int rc, rdma_req = 0;
	struct lnet_hdr hdr;

	switch (msg->hdr.type) {
	case EFALND_MSG_IMMEDIATE:
		lnet_hdr_from_nid16(&hdr, &msg->msg_v2.u.immediate.hdr);
		break;

	case EFALND_MSG_PUTR_REQ:
		rdma_req = 1;
		lnet_hdr_from_nid16(&hdr, &msg->msg_v2.u.putr_req.hdr);
		break;

	case EFALND_MSG_GETR_REQ:
		rdma_req = 1;
		lnet_hdr_from_nid16(&hdr, &msg->msg_v2.u.getr_req.hdr);
		break;

	default:
		LASSERTF(0, "message type[%u] doesn't have lnet header\n",
			 msg->hdr.type);
		break;
	}

	rc = lnet_parse(efa_ni->lnet_ni, &hdr, &hdr.src_nid, rx, rdma_req);
	if (rc < 0)
		EFA_DEV_ERR(efa_ni->efa_dev, "error parsing lnet msg\n");
}

static void
kefalnd_handle_rx(struct kefa_ni *efa_ni, struct kefa_rx *rx)
{
	struct kefa_msg *msg = rx->msg;
	bool free_rx = false;
	int rc = 0;

	switch (msg->hdr.type) {
	default:
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "bad EFALND message type %x from loopback\n",
			    msg->hdr.type);

		rc = -EPROTO;
		break;

	case EFALND_MSG_GETR_REQ:
	case EFALND_MSG_PUTR_REQ:
	case EFALND_MSG_IMMEDIATE:
		kefalnd_handle_lnet_request_msg(efa_ni, rx);
		break;

	case EFALND_MSG_GETR_ACK:
		kefalnd_handle_getr_ack(efa_ni, kefalnd_get_getr_ack_from_msg(msg));
		free_rx = true;
		break;

	case EFALND_MSG_NACK:
	case EFALND_MSG_PUTR_DONE:
	case EFALND_MSG_GETR_DONE:
		kefalnd_handle_completion(efa_ni, kefalnd_get_completion_from_msg(msg));
		free_rx = true;
		break;

	case EFALND_MSG_CONN_PROBE:
	case EFALND_MSG_CONN_PROBE_RESP:
	case EFALND_MSG_CONN_REQ:
	case EFALND_MSG_CONN_REQ_ACK:
		kefalnd_handle_conn_establishment(efa_ni, msg);
		free_rx = true;
		break;
	}

	if (free_rx || rc < 0)
		kefalnd_free_rx(rx);

	kefalnd_refill_rx(rx->qp, 1);
}

static int
kefalnd_unpack_header_v2(struct kefa_ni *efa_ni, struct kefa_msg_v2 *msg_v2,
			 u8 type, int rx_nob)
{
	const int base_hdr_size = offsetof(struct kefa_msg, msg_v2.u);
	struct kefa_dev *efa_dev = efa_ni->efa_dev;

	if (rx_nob < base_hdr_size) {
		EFA_DEV_ERR(efa_dev, "short message: %d\n", rx_nob);
		return -EPROTO;
	}

	if (type != EFALND_MSG_CONN_PROBE &&
	    msg_v2->dst_epoch != efa_ni->ni_epoch) {
		EFA_DEV_ERR(efa_dev, "RX[%u] epoch mismatch: recv[%llu], expected[%llu]\n",
			    type, msg_v2->dst_epoch, efa_ni->ni_epoch);
		return -EPROTO;
	}

	if (LNET_NID_IS_ANY(&msg_v2->srcnid)) {
		EFA_DEV_ERR(efa_dev, "bad src nid: %s\n",
			    libcfs_nidstr(&msg_v2->srcnid));
		return -EPROTO;
	}

	return 0;
}

static int
kefalnd_unpack_header_v1(struct kefa_ni *efa_ni, struct kefa_msg_v1 *msg_v1,
			 u8 type, int rx_nob)
{
	const int base_hdr_size = offsetof(struct kefa_msg, msg_v1.u);
	struct kefa_dev *efa_dev = efa_ni->efa_dev;

	if (type != EFALND_MSG_CONN_PROBE &&
	    type != EFALND_MSG_CONN_PROBE_RESP) {
		EFA_DEV_ERR(efa_dev, "unsupported V1 protocol[%s]\n",
			    kefalnd_msgtype2str(type));
		return -EPROTO;
	}

	if (rx_nob < base_hdr_size) {
		EFA_DEV_ERR(efa_dev, "short message: %d\n", rx_nob);
		return -EPROTO;
	}

	if (type != EFALND_MSG_CONN_PROBE &&
	    msg_v1->dst_epoch != efa_ni->ni_epoch) {
		EFA_DEV_ERR(efa_dev, "RX[%u] epoch mismatch: recv[%llu], expected[%llu]\n",
			    type, msg_v1->dst_epoch, efa_ni->ni_epoch);
		return -EPROTO;
	}

	if (msg_v1->srcnid == LNET_NID_ANY) {
		EFA_DEV_ERR(efa_dev, "bad src nid: %s\n",
			    libcfs_nid2str(msg_v1->srcnid));
		return -EPROTO;
	}

	return 0;
}

static int
kefalnd_unpack_msg(struct kefa_ni *efa_ni, struct kefa_rx *rx, struct ib_wc *wc)
{
	const int min_hdr_size = sizeof(struct kefa_hdr);
	struct kefa_msg *msg = rx->msg;
	struct kefa_hdr *hdr;
	struct lnet_nid nid;
	int rc = 0;

	hdr = &msg->hdr;
	if (rx->rx_nob < min_hdr_size) {
		EFA_DEV_ERR(efa_ni->efa_dev, "short message: %d\n", rx->rx_nob);
		return -EPROTO;
	}

	if (hdr->magic != EFALND_MSG_MAGIC) {
		EFA_DEV_ERR(efa_ni->efa_dev, "bad magic: %08x\n", hdr->magic);
		return -EPROTO;
	}

	if (hdr->proto_ver > EFALND_MAX_PROTO_VER ||
	    (hdr->type != EFALND_MSG_CONN_PROBE && hdr->proto_ver < EFALND_MIN_PROTO_VER)) {
		EFA_DEV_ERR(efa_ni->efa_dev, "bad protocol version: %x\n",
			    hdr->proto_ver);
		return -EPROTO;
	}

	if (hdr->nob > rx->rx_nob) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "short message: got %d, wanted %d\n",
			    rx->rx_nob, hdr->nob);

		return -EPROTO;
	}

	if (hdr->nob < kefalnd_msgtype2size(hdr->type, hdr->proto_ver)) {
		EFA_DEV_ERR(efa_ni->efa_dev, "short %s: %d(%d)\n",
			    kefalnd_msgtype2str(hdr->type), hdr->nob,
			    kefalnd_msgtype2size(hdr->type, hdr->proto_ver));

		return -EPROTO;
	}

	if (hdr->proto_ver != EFALND_PROTO_VER_1)
		rc = kefalnd_unpack_header_v2(efa_ni, &msg->msg_v2, hdr->type,
					      rx->rx_nob);
	else
		rc = kefalnd_unpack_header_v1(efa_ni, &msg->msg_v1, hdr->type,
					      rx->rx_nob);

	if (rc)
		goto bad_pkt;

	return 0;

bad_pkt:
	kefalnd_get_srcnid_from_msg(msg, &nid);
	EFA_DEV_ERR(efa_ni->efa_dev,
		    "QP[%u] failed RX unpacking from peer NI[%s].\n",
		    wc->qp->qp_num, libcfs_nidstr(&nid));

	return -EPROTO;
}

static void
kefalnd_rx_complete(struct kefa_ni *efa_ni, struct ib_wc *wc)
{
	struct kefa_rx *rx;
	int nob, rc;

	rx = (void *)wc->wr_id;
	nob = wc->byte_len;
	if (!rx || nob == 0) {
		/* Reaching here means FW or LND did something bad */
		CERROR("cpu[%u] received bad RX handle with status[%u] nob[%u]",
		       smp_processor_id(), wc->status, nob);
		return;
	}

	if (wc->status != IB_WC_SUCCESS) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "QP[%u] received RX completion with err[%u]\n",
			    wc->qp->qp_num, wc->status);

		goto failed;
	}

	LASSERT(rx->rx_nob < 0);	/* was posted */
	rx->rx_nob = nob;

	rc = kefalnd_unpack_msg(efa_ni, rx, wc);
	if (rc)
		goto failed;

	kefalnd_handle_rx(efa_ni, rx);
	return;

failed:
	kefalnd_free_rx(rx);
}

static void
kefalnd_complete(struct kefa_ni *efa_ni, struct ib_wc *wc)
{
	switch (wc->opcode) {
	default:
		LBUG();

	case IB_WC_SEND:
	case IB_WC_REG_MR:
	case IB_WC_RDMA_READ:
		kefalnd_tx_complete(efa_ni, wc);
		return;

	case IB_WC_LOCAL_INV:
		kefalnd_finv_complete(efa_ni, wc);
		return;

	case IB_WC_RECV:
		kefalnd_rx_complete(efa_ni, wc);
		return;
	}
}

static int
kefalnd_scheduler(void *arg)
{
	struct kefa_cq *cq = NULL;
	struct kefa_sched *sched;
	wait_queue_entry_t wait;
	long id = (long)arg;
	unsigned long flags;
	int rc, cqe_cnt, i;
	struct ib_wc *wc;

	sched = kefalnd.scheds[KEFA_THREAD_CPT(id)];

	LIBCFS_CPT_ALLOC(wc, lnet_cpt_table(), sched->cpt,
			 MAX_CQE_BATCH * sizeof(*wc));
	if (!wc) {
		CERROR("Failed to allocate memory for scheduler WCs pool\n");
		return -ENOMEM;
	}

	rc = cfs_cpt_bind(lnet_cpt_table(), sched->cpt);
	if (rc != 0)
		CWARN("Failed to bind shceduler thread to CPU partition %d\n",
		      sched->cpt);

	init_wait(&wait);

	while (!kefalnd.shutdown) {
		spin_lock_irqsave(&sched->lock, flags);
		cq = list_first_entry_or_null(&sched->pend_cqs, struct kefa_cq, sched_node);
		if (!cq) {
			spin_unlock_irqrestore(&sched->lock, flags);
			set_current_state(TASK_INTERRUPTIBLE);
			add_wait_queue_exclusive(&sched->waitq, &wait);
			schedule();
			remove_wait_queue(&sched->waitq, &wait);
			set_current_state(TASK_RUNNING);
			continue;
		}
		list_del_init(&cq->sched_node);
		spin_unlock_irqrestore(&sched->lock, flags);

again:
		cqe_cnt = ib_poll_cq(cq->ib_cq, MAX_CQE_BATCH, wc);
		if (cqe_cnt < 0) {
			/* TODO - handle error is fatal */
			EFA_DEV_ERR(cq->efa_dev, "poll CQ failed. err[%d]\n",
				    cqe_cnt);
			continue;
		}

		if (cqe_cnt == 0) {
			/* TODO - consider releasing CQ on every CQ poll */
			rc = ib_req_notify_cq(cq->ib_cq,
					      IB_CQ_NEXT_COMP |
					      IB_CQ_REPORT_MISSED_EVENTS);
			if (rc < 0) {
				/* TODO - This is fatal, handle error flow */
				EFA_DEV_ERR(cq->efa_dev,
					    "request notify CQ failed. err[%d]\n",
					    rc);
			}

			/* We missed some CQEs. try again */
			if (rc > 0)
				goto again;

			/* Try acquire a new CQ */
			continue;
		}

		/* return the CQ so other threads can take the next batch */
		spin_lock_irqsave(&sched->lock, flags);
		if (list_empty(&cq->sched_node))
			list_add_tail(&cq->sched_node, &sched->pend_cqs);
		spin_unlock_irqrestore(&sched->lock, flags);

		for (i = 0; i < cqe_cnt; i++)
			kefalnd_complete(cq->efa_dev->efa_ni, wc + i);

		/* respect periodic scheduling  */
		if (need_resched())
			cond_resched();
	}

	LIBCFS_FREE(wc, MAX_CQE_BATCH * sizeof(*wc));

	kefalnd_thread_stop();
	return 0;
}

static void
kefalnd_destroy_all_conns(struct kefa_ni *efa_ni)
{
	struct kefa_conn *conn;
	struct hlist_node *tmp;
	int bkt;

	hash_for_each_safe(efa_ni->conns, bkt, tmp, conn, ni_node) {
		hlist_del_init(&conn->ni_node);
		kefalnd_destroy_conn(conn, LNET_MSG_STATUS_LOCAL_ABORTED,
				     -ENODEV);
	}
}

static int
kefalnd_start_scheduler(struct kefa_sched *sched)
{
	int rc = 0;
	int nthrs;
	int i;

	if (sched->nthreads == 0) {
		/* decide thread count for new interface */
		if (*kefalnd_tunables.kefa_nscheds > 0) {
			nthrs = sched->nthreads_max;
		} else {
			/* re-calculate thread count in case cpt changed */
			nthrs = cfs_cpt_weight(lnet_cpt_table(), sched->cpt);
			nthrs = min(max(EFALND_MIN_SCHED_THRS, nthrs >> 1), nthrs);
			nthrs = min(EFALND_MAX_SCHED_THRS, nthrs);
		}
	} else {
		LASSERT(sched->nthreads <= sched->nthreads_max);
		/* increase one thread if there is new interface */
		nthrs = (sched->nthreads < sched->nthreads_max);
	}


	for (i = 0; i < nthrs; i++) {
		long id = KEFA_THREAD_ID(sched->cpt, sched->nthreads + i);

		rc = kefalnd_thread_start(kefalnd_scheduler, (void *)id,
					  "kefalnd_s_%02ld_%02ld",
					  KEFA_THREAD_CPT(id),
					  KEFA_THREAD_TID(id));
		if (rc) {
			CWARN("Can't spawn thread %d for scheduler[%d]: rc[%d]\n",
			      sched->nthreads + i, sched->cpt, rc);
			break;
		}
	}

	sched->nthreads += i;
	return rc;
}

static int
kefalnd_start_cm_daemon(struct kefa_dev *efa_dev,
			struct kefa_cm_deamon *cm_daemon)
{
	int rc = 0;
	long id;

	id = KEFA_THREAD_ID(cm_daemon->cpt, 0);
	rc = kefalnd_thread_start(kefalnd_cm_daemon, (void *)id,
				  "kefalnd_cd_%02ld_%02ld",
				  KEFA_THREAD_CPT(id), KEFA_THREAD_TID(id));
	if (rc) {
		EFA_DEV_ERR(efa_dev, "can't spawn thread for connection daemon[%d]. err[%d]\n",
			    cm_daemon->cpt, rc);
	} else {
		cm_daemon->active = true;
	}

	return rc;
}

static int
kefalnd_dev_start_threads(struct kefa_dev *efa_dev)
{
	struct kefa_cm_deamon *cm_daemon;
	struct kefa_sched *sched;
	int rc;

	sched = kefalnd.scheds[efa_dev->cpt];
	rc = kefalnd_start_scheduler(sched);
	if (rc) {
		EFA_DEV_ERR(efa_dev, "failed to start scheduler threads\n");
		return rc;
	}

	cm_daemon = kefalnd.cm_daemons[efa_dev->cpt];
	if (cm_daemon->active)
		return 0;

	rc = kefalnd_start_cm_daemon(efa_dev, cm_daemon);
	if (rc) {
		EFA_DEV_ERR(efa_dev,
			    "failed to start connection daemon thread\n");
		return rc;
	}

	return 0;
}

static u32
kefalnd_get_tx_pool_size(struct kefa_ni *efa_ni)
{
	struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
	struct lnet_ioctl_config_efalnd_tunables *efa_tunables;

	u32 tx_pool_size, lnd_nqps;

	efa_tunables = &efa_ni->lnet_ni->ni_lnd_tunables.lnd_tun_u.lnd_efa;
	net_tunables = &efa_ni->lnet_ni->ni_net->net_tunables;

	tx_pool_size = net_tunables->lct_max_tx_credits * 2;
	tx_pool_size = min_t(u32, tx_pool_size, EFALND_MAX_NI_TX_POOL);

	if (tx_pool_size < net_tunables->lct_max_tx_credits) {
		EFA_DEV_WARN(efa_ni->efa_dev,
			     "LNET NI credits[%d] exceeds EFA LND TX pool size[%d]\n",
			     net_tunables->lct_max_tx_credits, tx_pool_size);
	}

	lnd_nqps = efa_tunables->lnd_nqps;

	return tx_pool_size + lnd_nqps * RQ_DEPTH;
}

static void
kefalnd_destroy_tx_pool(struct kefa_ni *efa_ni)
{
	struct kefa_obj_pool *tx_pool = &efa_ni->tx_pool;
	u32 pool_size = tx_pool->pool_size;
	int i;

	if (!tx_pool->obj_arr)
		return;

	for (i = 0; i < pool_size; i++) {
		struct kefa_tx *tx = &((struct kefa_tx *)tx_pool->obj_arr)[i];

		LIBCFS_FREE(tx->frags, (EFALND_MAX_TX_FRAGS) * sizeof(*tx->frags));
		if (tx->msg) {
			ib_dma_unmap_single(efa_ni->efa_dev->ib_dev,
					    tx->msgaddr,
					    EFALND_MSG_SIZE_ALIGNED,
					    DMA_TO_DEVICE);
			kfree(tx->msg);
		}
	}
	LIBCFS_FREE(tx_pool->obj_arr, pool_size * sizeof(struct kefa_tx));
	tx_pool->obj_arr = NULL;
}

static int
kefalnd_create_tx_pool(struct kefa_ni *efa_ni, int cpt)
{
	struct kefa_obj_pool *tx_pool = &efa_ni->tx_pool;
	struct kefa_dev *efa_dev = efa_ni->efa_dev;
	u32 pool_size;
	int i, rc;

	memset(tx_pool, 0, sizeof(*tx_pool));

	pool_size = kefalnd_get_tx_pool_size(efa_ni);
	rc = kefalnd_obj_pool_init(efa_ni, tx_pool, pool_size, cpt,
				   sizeof(struct kefa_tx));
	if (rc != 0) {
		EFA_DEV_ERR(efa_dev, "cannot allocate TX pool\n");
		goto failed;
	}

	for (i = 0; i < pool_size; i++) {
		struct kefa_tx *tx = &((struct kefa_tx *)tx_pool->obj_arr)[i];

		tx->tx_pool = tx_pool;

		LIBCFS_CPT_ALLOC(tx->frags, lnet_cpt_table(), tx_pool->cpt,
				 EFALND_MAX_TX_FRAGS * sizeof(*tx->frags));
		if (!tx->frags) {
			EFA_DEV_ERR(efa_dev,
				    "can't allocate TX SG fragments\n");
			goto failed;
		}

		sg_init_table(tx->frags, EFALND_MAX_TX_FRAGS);

		tx->msg = cfs_cpt_malloc(lnet_cpt_table(), tx_pool->cpt,
					 EFALND_MSG_SIZE_ALIGNED, GFP_KERNEL);
		if (!tx->msg) {
			EFA_DEV_ERR(efa_dev,
				    "failed to allocate TX SGL buffer\n");
			goto failed;
		}

		tx->msgaddr = ib_dma_map_single(efa_dev->ib_dev, tx->msg,
						EFALND_MSG_SIZE_ALIGNED,
						DMA_TO_DEVICE);
		if (ib_dma_mapping_error(efa_dev->ib_dev, tx->msgaddr)) {
			EFA_DEV_ERR(efa_dev, "failed to map TX SGL buffer\n");
			kfree(tx->msg);
			tx->msg = NULL;
			goto failed;
		}

		tx->lkey = efa_dev->pd->local_dma_lkey;
		list_add_tail(&tx->list_node, &tx_pool->free_obj);
	}

	return 0;

failed:
	kefalnd_destroy_tx_pool(efa_ni);
	return -ENOMEM;
}

static int
kefalnd_init_rx_msgs(struct kefa_qp *qp)
{
	struct kefa_dev *efa_dev = qp->efa_dev;
	struct kefa_rx *rx;
	int i;

	for (i = 0; i < EFALND_RX_MSGS(qp); i++) {
		rx = qp->rx_msgs + i;

		rx->qp = qp;
		rx->rx_nob = 0;

		rx->msg = cfs_cpt_malloc(lnet_cpt_table(), efa_dev->cpt,
					 EFALND_MSG_SIZE_ALIGNED, GFP_KERNEL);
		if (!rx->msg) {
			EFA_DEV_ERR(qp->efa_dev, "failed to allocate RX SGE\n");
			return -ENOMEM;
		}

		rx->sge.addr = ib_dma_map_single(efa_dev->ib_dev, rx->msg,
						 EFALND_MSG_SIZE_ALIGNED,
						 DMA_FROM_DEVICE);
		if (ib_dma_mapping_error(efa_dev->ib_dev, rx->sge.addr)) {
			EFA_DEV_ERR(qp->efa_dev, "failed to map RX SGE\n");
			kfree(rx->msg);
			rx->msg = NULL;
			return -ENOMEM;
		}

		rx->sge.lkey = efa_dev->pd->local_dma_lkey;
		rx->sge.length = EFALND_MSG_SIZE;

		rx->wrq.sg_list = &rx->sge;
		rx->wrq.num_sge = 1;
		rx->wrq.wr_id = (u64)rx;
		INIT_LIST_HEAD(&rx->list_node);
		list_add_tail(&rx->list_node, &qp->free_rx);
	}
	return 0;
}

static void
kefalnd_destroy_qp(struct kefa_qp *qp)
{
	struct kefa_rx *rx;
	int rc, i;

	if (!IS_ERR_OR_NULL(qp->ib_qp)) {
		rc = ib_destroy_qp(qp->ib_qp);
		if (rc) {
			EFA_DEV_ERR(qp->efa_dev,
				    "failed to destroy QP[%u]. rc[%d]\n",
				    qp->ib_qp->qp_num, rc);
		}
	}

	if (qp->rx_msgs) {
		for (i = 0; i < EFALND_RX_MSGS(qp); i++) {
			rx = qp->rx_msgs + i;
			if (rx->msg) {
				ib_dma_unmap_single(qp->efa_dev->ib_dev,
						    rx->sge.addr,
						    EFALND_MSG_SIZE_ALIGNED,
						    DMA_FROM_DEVICE);
				kfree(rx->msg);
			}
		}

		LIBCFS_FREE(qp->rx_msgs, EFALND_RX_MSGS(qp) * sizeof(struct kefa_rx));
	}
}

static int
kefalnd_create_qp(struct kefa_dev *efa_dev, struct kefa_qp *qp,
		  struct kefa_cq *cq, u16 sq_depth, u16 rq_depth, u32 qkey)
{
	struct ib_qp_init_attr init_attr;
	struct ib_qp_attr qp_attr = {};
	struct ib_qp *ib_qp;
	int rc = 0;

	memset(&init_attr, 0, sizeof(init_attr));
	init_attr.cap.max_send_wr = sq_depth;
	init_attr.cap.max_recv_wr = rq_depth;

	init_attr.cap.max_recv_sge = 1;
	init_attr.cap.max_send_sge = 1;
	init_attr.qp_type = EFA_QPT_SRD;
	init_attr.send_cq = cq->ib_cq;
	init_attr.recv_cq = cq->ib_cq;
	init_attr.sq_sig_type = IB_SIGNAL_ALL_WR;

	ib_qp = ib_create_qp(efa_dev->pd, &init_attr);
	if (IS_ERR(ib_qp)) {
		EFA_DEV_ERR(efa_dev, "failed to create QP. err[%ld]\n",
			    PTR_ERR(ib_qp));
		return PTR_ERR(ib_qp);
	}
	qp->ib_qp = ib_qp;

	/* EFA doesn't support CM so change QP to ready immediately */
	qp->qkey = qkey;
	qp_attr.qp_state = IB_QPS_INIT;
	qp_attr.cur_qp_state = IB_QPS_RESET;
	qp_attr.port_num = 1;
	qp_attr.qkey = qp->qkey;
	rc = ib_modify_qp(ib_qp, &qp_attr,
			  IB_QP_PKEY_INDEX | IB_QP_PORT | IB_QP_QKEY | IB_QP_STATE);
	if (rc) {
		EFA_DEV_ERR(efa_dev, "failed to set QP[%u] qkey\n",
			    ib_qp->qp_num);
		goto failed;
	}

	memset(&qp_attr, 0, sizeof(qp_attr));
	qp_attr.cur_qp_state = qp_attr.qp_state;
	qp_attr.qp_state = IB_QPS_RTR;
	rc = ib_modify_qp(ib_qp, &qp_attr, IB_QP_STATE);
	if (rc) {
		EFA_DEV_ERR(efa_dev, "failed to set QP[%u] state to RTR\n",
			    ib_qp->qp_num);
		goto failed;
	}

	memset(&qp_attr, 0, sizeof(qp_attr));
	qp_attr.rnr_retry = *kefalnd_tunables.kefa_rnr_retry_count;
	qp_attr.cur_qp_state = qp_attr.qp_state;
	qp_attr.qp_state = IB_QPS_RTS;
	qp_attr.sq_psn = 1;
	rc = ib_modify_qp(ib_qp, &qp_attr,
			  IB_QP_STATE | IB_QP_SQ_PSN | IB_QP_RNR_RETRY);
	if (rc) {
		EFA_DEV_ERR(efa_dev, "failed to set QP[%u] state to RTS\n",
			    ib_qp->qp_num);
		goto failed;
	}

	qp->efa_dev = efa_dev;
	qp->rq_depth = rq_depth;
	qp->rq_space = rq_depth;
	qp->cq = cq;
	spin_lock_init(&qp->rq_lock);
	INIT_LIST_HEAD(&qp->free_rx);
	INIT_LIST_HEAD(&qp->posted_rx);

	/* allocate RX buffers */
	LIBCFS_CPT_ALLOC(qp->rx_msgs, lnet_cpt_table(), efa_dev->cpt,
			 EFALND_RX_MSGS(qp) * sizeof(*qp->rx_msgs));
	if (!qp->rx_msgs) {
		EFA_DEV_ERR(efa_dev, "cannot allocate RX buffers\n");
		rc = -ENOMEM;
		goto failed;
	}

	rc = kefalnd_init_rx_msgs(qp);
	if (rc) {
		EFA_DEV_ERR(efa_dev, "failed to init RX messages for QP[%u]\n",
			    ib_qp->qp_num);
		goto failed;
	}

	/* initial post receives */
	rc = kefalnd_refill_rx(qp, rq_depth);
	if (rc) {
		EFA_DEV_ERR(efa_dev, "can't post rx msg: %d\n", rc);
		goto failed;
	}

	return 0;
failed:
	kefalnd_destroy_qp(qp);
	return rc;
}

static void
kefalnd_destroy_qps(struct kefa_dev *efa_dev)
{
	int i;

	if (efa_dev->qps) {
		for (i = 0; i < efa_dev->nqps; i++)
			kefalnd_destroy_qp(&efa_dev->qps[i]);

		LIBCFS_FREE(efa_dev->qps, efa_dev->nqps * sizeof(*efa_dev->qps));
	}

	if (efa_dev->cm_qp) {
		kefalnd_destroy_qp(efa_dev->cm_qp);
		LIBCFS_FREE(efa_dev->cm_qp, sizeof(*efa_dev->cm_qp));
	}
}

/*  Create Data QPs to distribute traffic per device and manager QP for
 * connection establishment
 */
static int
kefalnd_create_qps(struct kefa_dev *efa_dev, int num_qps, int sq_depth,
		   int rq_depth)
{
	int i, rc = 0;
	u32 qkey;

	efa_dev->nqps = num_qps;

	LIBCFS_CPT_ALLOC(efa_dev->cm_qp, lnet_cpt_table(), efa_dev->cpt,
			 sizeof(*efa_dev->cm_qp));
	if (!efa_dev->cm_qp) {
		EFA_DEV_ERR(efa_dev,
			    "failed to allocate memory for manager QP\n");
		return -ENOMEM;
	}

	if (the_lnet.ln_nis_use_large_nids)
		qkey = (u16)get_random_u32();
	else
		qkey = EFALND_CM_STATIC_QKEY;

	rc = kefalnd_create_qp(efa_dev, efa_dev->cm_qp, efa_dev->cm_cq,
			       sq_depth, rq_depth, qkey);
	if (rc) {
		kefalnd_destroy_qps(efa_dev);
		return rc;
	}

	LIBCFS_CPT_ALLOC(efa_dev->qps, lnet_cpt_table(), efa_dev->cpt,
			 num_qps * sizeof(*efa_dev->qps));
	if (!efa_dev->qps) {
		EFA_DEV_ERR(efa_dev, "failed to allocate memory for data QP\n");
		kefalnd_destroy_qps(efa_dev);
		return -ENOMEM;
	}
	memset(efa_dev->qps, 0, num_qps * sizeof(*efa_dev->qps));

	for (i = 0; i < num_qps; i++) {
		/* bit 31 is reserved for privileged qkeys */
		qkey = get_random_u32() & ~BIT(31);
		rc = kefalnd_create_qp(efa_dev, &efa_dev->qps[i],
				       &efa_dev->cqs[i % efa_dev->ncqs],
				       sq_depth, rq_depth, qkey);
		if (rc)
			break;
	}

	if (rc)
		kefalnd_destroy_qps(efa_dev);

	return rc;
}

static void
kefalnd_deschedule_cq(struct kefa_cq *cq)
{
	unsigned long flags;
	struct kefa_sched *sched;

	sched = kefalnd.scheds[cq->cpt];

	/* TODO - handle cq processing be scheduler */
	spin_lock_irqsave(&sched->lock, flags);
	if (!list_empty(&cq->sched_node))
		list_del_init(&cq->sched_node);
	spin_unlock_irqrestore(&sched->lock, flags);
}

static struct kefa_sched *
kefalnd_schedule_cq(struct kefa_cq *cq)
{
	unsigned long flags;
	struct kefa_sched *sched;

	sched = kefalnd.scheds[cq->cpt];

	spin_lock_irqsave(&sched->lock, flags);
	if (list_empty(&cq->sched_node))
		list_add_tail(&cq->sched_node, &sched->pend_cqs);
	spin_unlock_irqrestore(&sched->lock, flags);

	return sched;
}

static void
kefalnd_cq_comp_handler(struct ib_cq *ib_cq, void *cq_ctxt)
{
	struct kefa_cq *cq = cq_ctxt;
	struct kefa_sched *sched;

	sched = kefalnd_schedule_cq(cq);
	wake_up(&sched->waitq);
}

static int
kefalnd_get_completion_vector(struct kefa_dev *efa_dev)
{
	int vectors;

	vectors = efa_dev->ib_dev->num_comp_vectors;
	if (vectors <= 1)
		return 0;

	return efa_dev->cpt % vectors;
}

static int
kefalnd_create_cq(struct kefa_dev *efa_dev, struct kefa_cq *cq, int cq_depth)
{
	struct ib_cq_init_attr cq_attr = {0};
	struct ib_cq *ib_cq;

	cq_attr.cqe = cq_depth;
	cq_attr.comp_vector = kefalnd_get_completion_vector(efa_dev);
	ib_cq = ib_create_cq(efa_dev->ib_dev, kefalnd_cq_comp_handler, NULL,
			     (void *)cq, &cq_attr);
	if (IS_ERR(ib_cq)) {
		EFA_DEV_ERR(efa_dev, "can't create CQ. err[%ld]\n",
			    PTR_ERR(ib_cq));
		return IS_ERR(ib_cq);
	}

	cq->cpt = efa_dev->cpt;
	cq->efa_dev = efa_dev;
	cq->ib_cq = ib_cq;
	INIT_LIST_HEAD(&cq->sched_node);

	return 0;
}

static void
kefalnd_destroy_cqs(struct kefa_dev *efa_dev)
{
	int i;

	if (efa_dev->cqs) {
		for (i = 0; i < efa_dev->ncqs; i++) {
			struct kefa_cq *cq = &efa_dev->cqs[i];

			if (!IS_ERR_OR_NULL(cq->ib_cq)) {
				kefalnd_deschedule_cq(cq);
				ib_destroy_cq(cq->ib_cq);
			}
		}

		LIBCFS_FREE(efa_dev->cqs, efa_dev->ncqs * sizeof(*efa_dev->cqs));
	}

	if (efa_dev->cm_cq) {
		if (!IS_ERR_OR_NULL(efa_dev->cm_cq->ib_cq)) {
			kefalnd_deschedule_cq(efa_dev->cm_cq);
			ib_destroy_cq(efa_dev->cm_cq->ib_cq);
		}

		LIBCFS_FREE(efa_dev->cm_cq, sizeof(*efa_dev->cm_cq));
	}
}

static int
kefalnd_create_cqs(struct kefa_dev *efa_dev, int num_cqs, int cq_depth)
{
	int i, rc = 0;

	efa_dev->ncqs = num_cqs;

	LIBCFS_CPT_ALLOC(efa_dev->cm_cq, lnet_cpt_table(), efa_dev->cpt,
			 sizeof(*efa_dev->cm_cq));
	if (!efa_dev->cm_cq) {
		EFA_DEV_ERR(efa_dev,
			    "failed to allocate memory for manager CQ\n");
		return -ENOMEM;
	}

	rc = kefalnd_create_cq(efa_dev, efa_dev->cm_cq, cq_depth);
	if (rc)
		return rc;

	kefalnd_schedule_cq(efa_dev->cm_cq);

	LIBCFS_CPT_ALLOC(efa_dev->cqs, lnet_cpt_table(), efa_dev->cpt,
			 num_cqs * sizeof(*efa_dev->cqs));
	if (!efa_dev->cqs) {
		EFA_DEV_ERR(efa_dev,
			    "failed to allocate memory for data CQs\n");
		kefalnd_destroy_cqs(efa_dev);
		return -ENOMEM;
	}

	for (i = 0; i < num_cqs; i++) {
		struct kefa_cq *cq = &efa_dev->cqs[i];

		rc = kefalnd_create_cq(efa_dev, cq, cq_depth);
		if (rc)
			break;

		kefalnd_schedule_cq(cq);
	}

	if (rc)
		kefalnd_destroy_cqs(efa_dev);

	return rc;
}

static int
kefalnd_dev_query(struct kefa_dev *efa_dev)
{
	int rc;

	rc = efa_dev->ib_dev->ops.query_gid(efa_dev->ib_dev, 0, 0,
					    &efa_dev->gid);
	if (rc) {
		EFA_DEV_ERR(efa_dev,
			    "failed to query EFA device GID. err[%u]\n", rc);
		return rc;
	}

	return 0;
}

static void
kefalnd_destroy_fmr_pool(struct kefa_obj_pool *fmr_pool)
{
	int i;

	if (!fmr_pool || !fmr_pool->obj_arr)
		return;

	for (i = 0; i < fmr_pool->pool_size; i++) {
		struct kefa_fmr *fmr = &((struct kefa_fmr *)fmr_pool->obj_arr)[i];

		if (!IS_ERR_OR_NULL(fmr->mr))
			ib_dereg_mr(fmr->mr);
	}

	LIBCFS_FREE(fmr_pool->obj_arr, fmr_pool->pool_size * sizeof(struct kefa_fmr));
	fmr_pool->obj_arr = NULL;
}

static int
kefalnd_create_fmr_pool(struct kefa_ni *efa_ni, struct kefa_dev *efa_dev,
			u32 pool_size)
{
	struct kefa_obj_pool *fmr_pool = &efa_dev->fmr_pool;
	int i, rc = 0;

	memset(fmr_pool, 0, sizeof(*fmr_pool));

	rc = kefalnd_obj_pool_init(efa_ni, fmr_pool, pool_size, efa_dev->cpt,
				   sizeof(struct kefa_fmr));
	if (rc != 0) {
		EFA_DEV_ERR(efa_dev, "can't allocate FMR array\n");
		goto failed;
	}

	for (i = 0; i < pool_size; i++) {
		struct kefa_fmr *fmr = &((struct kefa_fmr *)fmr_pool->obj_arr)[i];

		fmr->mr = ib_alloc_mr(efa_dev->pd, IB_MR_TYPE_MEM_REG,
				      LNET_MAX_IOV);
		if (IS_ERR(fmr->mr)) {
			EFA_DEV_ERR(efa_dev, "failed to alloc mr. err[%ld]\n",
				    PTR_ERR(fmr->mr));
			rc = -ENOSPC;
			goto failed;
		}

		list_add_tail(&fmr->list_node, &fmr_pool->free_obj);
	}

	return 0;

failed:
	kefalnd_destroy_fmr_pool(fmr_pool);
	return rc;
}

static void
kefalnd_dev_destroy(struct kefa_dev *efa_dev, bool put_module)
{
	kefalnd_destroy_fmr_pool(&efa_dev->fmr_pool);

	if (efa_dev->qps)
		kefalnd_destroy_qps(efa_dev);

	if (efa_dev->cqs)
		kefalnd_destroy_cqs(efa_dev);

	if (!IS_ERR_OR_NULL(efa_dev->pd))
		ib_dealloc_pd(efa_dev->pd);

	if (put_module && efa_dev->ib_dev)
		module_put(efa_dev->ib_dev->ops.owner);

	if (efa_dev->ib_dev)
		ib_device_put(efa_dev->ib_dev);

	LIBCFS_FREE(efa_dev, sizeof(*efa_dev));
}

static struct kefa_dev *
kefalnd_dev_init(struct kefa_ni *efa_ni, char *ifname, __be32 ip_addr)
{
	struct lnet_ioctl_config_efalnd_tunables *tunables;
	struct lnet_ni *lnet_ni = efa_ni->lnet_ni;
	struct kefa_dev *efa_dev = NULL;
	bool took_module_ref = false;
	int cq_depth, qp_depth;
	int dev_numa_node, rc;

	CDEBUG(D_NET, "Initializing EFA device\n");

	LIBCFS_ALLOC(efa_dev, sizeof(*efa_dev));
	if (!efa_dev)
		return ERR_PTR(-ENOMEM);

	memset(efa_dev, 0, sizeof(*efa_dev));
	strscpy(efa_dev->ifname, ifname, sizeof(efa_dev->ifname));
	efa_dev->efa_ni = efa_ni;
	efa_dev->ifip = ip_addr;
	efa_dev->ib_dev = ib_device_get_by_name(efa_dev->ifname, RDMA_DRIVER_EFA);
	if (!efa_dev->ib_dev) {
		CERROR("Failed to find EFA IB device %s\n", efa_dev->ifname);
		rc = -ENODEV;
		goto failed;
	}

	if (!efa_dev->ib_dev->kverbs_provider) {
		EFA_DEV_ERR(efa_dev, "EFA driver does not support Kverbs\n");
		rc = -EINVAL;
		goto failed;
	}

	if (!try_module_get(efa_dev->ib_dev->ops.owner)) {
		EFA_DEV_ERR(efa_dev, "Failed to take reference on EFA driver\n");
		rc = -ENODEV;
		goto failed;
	}
	took_module_ref = true;

	rc = kefalnd_dev_query(efa_dev);
	if (rc)
		goto failed;

	/* If a user does not specify any cpt, or specifies to use all cpts,
	 * lnet_ni->ni_cpts will be NULL. See lnet_ni_alloc() and
	 * lnet_ni_alloc_w_cpt_array() in lnet/lnet/config.c
	 */
	if (lnet_ni->ni_cpts)
		efa_dev->cpt = lnet_ni->ni_cpts[0];
	else {
		dev_numa_node = ibdev_to_node(efa_dev->ib_dev);
		efa_dev->cpt = cfs_cpt_of_node(lnet_cpt_table(), dev_numa_node);
	}

	if (efa_dev->cpt == CFS_CPT_ANY)
		efa_dev->cpt = 0;

	efa_dev->pd = ib_alloc_pd(efa_dev->ib_dev, 0);
	if (IS_ERR(efa_dev->pd)) {
		rc = PTR_ERR(efa_dev->pd);
		EFA_DEV_ERR(efa_dev, "can't allocate PD. err[%d]\n", rc);
		goto failed;
	}

	cq_depth = min(efa_dev->ib_dev->attrs.max_cqe, CQ_DEPTH);
	qp_depth = min(efa_dev->ib_dev->attrs.max_qp_wr, (cq_depth / 2));
	tunables = &lnet_ni->ni_lnd_tunables.lnd_tun_u.lnd_efa;
	rc = kefalnd_create_cqs(efa_dev, tunables->lnd_nqps, cq_depth);
	if (rc)
		goto failed;

	rc = kefalnd_create_qps(efa_dev, tunables->lnd_nqps, qp_depth, qp_depth);
	if (rc)
		goto failed;

	rc = kefalnd_create_fmr_pool(efa_ni, efa_dev,
				     kefalnd_get_tx_pool_size(efa_ni));
	if (rc)
		goto failed;

	return efa_dev;

failed:
	kefalnd_dev_destroy(efa_dev, took_module_ref);
	return ERR_PTR(rc);
}

static struct kefa_dev *
kefalnd_dev_search(char *ifname)
{
	struct kefa_ni *efa_ni;

	list_for_each_entry(efa_ni, &kefalnd.efa_ni_list, lnd_node) {
		if (strncmp(efa_ni->efa_dev->ifname, ifname,
			    sizeof(efa_ni->efa_dev->ifname)) == 0)
			return efa_ni->efa_dev;
	}

	return NULL;
}

static int
kefalnd_select_ipif(struct lnet_ni *ni, __be32 *ip_addr)
{
	char *ipif_name = *kefalnd_tunables.kefa_ipif_name;
	struct lnet_inetdev *ifaces = NULL;
	int i, nip, rc;

	rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns, false);
	if (rc < 0) {
		rc = -ENODEV;
		goto complete;
	}
	nip = rc;

	if (!ipif_name || strlen(ipif_name) == 0) {
		CDEBUG(D_NET, "Using first interface %s because ipif_name was not provided\n",
		       ifaces[0].li_name);

		if (ifaces[0].li_size != sizeof(*ip_addr)) {
			CERROR("Interface %s is IPv6 interface\n",
			       ifaces[0].li_name);
			rc = -ENODEV;
			goto complete;
		}

		*ip_addr = ifaces[0].li_ipaddr;
		rc = 0;
		goto complete;
	}

	for (i = 0; i < nip; i++) {
		if (strncmp(ipif_name, ifaces[i].li_name,
			    sizeof(ifaces[i].li_name)) == 0) {
			CDEBUG(D_NET, "Found matching interface %s\n",
			       ifaces[i].li_name);

			if (ifaces[i].li_size != sizeof(*ip_addr)) {
				CERROR("Interface %s is IPv6 interface\n",
				       ifaces[i].li_name);

				rc = -ENODEV;
				goto complete;
			}

			*ip_addr = ifaces[i].li_ipaddr;
			rc = 0;
			goto complete;
		}
	}
	CERROR("No interface %s found\n", ipif_name);
	rc = -ENODEV;

complete:
	kfree(ifaces);
	return rc;
}

/**
 * kefalnd_create_efa_nid() - Create EFA NID.
 * @efa_dev: The EFA device.
 *
 * Small NID:
 * The EFA 4 byte NID address is made of three parts: the host
 * identifier, PCI bus number, and PCI devfn number.
 * For example, if a node uses 172.43.23.2@tcp for TCP pings, the
 * EFA interface would be assigned 23.2.0.2@efa for EFA device with
 * bus number 0 and devfn number 2.
 *
 * Large NID:
 * The EFA 16 byte NID address is made of three parts: the device GID,
 * CM QP number and CM QP QKEY.
 *
 */
static void kefalnd_create_efa_nid(struct kefa_ni *efa_ni)
{
	struct lnet_nid *ni_nid = &efa_ni->lnet_ni->ni_nid;
	struct kefa_dev *efa_dev = efa_ni->efa_dev;
	struct kefa_qp *cm_qp = efa_dev->cm_qp;
	struct pci_dev *pci_dev;
	u32 addr;

	if (the_lnet.ln_nis_use_large_nids) {
		kefalnd_large_nid_create(ni_nid, &efa_dev->gid,
					 cm_qp->ib_qp->qp_num,
					 (u16)cm_qp->qkey);
	} else {
		pci_dev = to_pci_dev(efa_dev->ib_dev->dev.parent);
		addr = (__swab32(efa_dev->ifip) & 0xffff) << 16;
		addr = addr | ((pci_dev->bus->number & 0xff) << 8);
		addr = addr | (pci_dev->devfn & 0xff);

		ni_nid->nid_addr[0] = __swab32(addr);
		ni_nid->nid_size = 0;
	}
}

static void
kefalnd_base_shutdown(void)
{
	struct kefa_cm_deamon *cm_daemon;
	struct kefa_sched *sched;
	int i;

	CDEBUG(D_MALLOC, "Before LND cleanup: kmem[%lld]\n",
	       libcfs_kmem_read());

	kefalnd.shutdown = true;

	rcu_barrier();
	rhashtable_destroy(&kefalnd.peer_ni);

	cfs_percpt_for_each(sched, i, kefalnd.scheds)
		wake_up_all(&sched->waitq);

	cfs_percpt_for_each(cm_daemon, i, kefalnd.cm_daemons)
		wake_up_all(&cm_daemon->waitq);

	wait_var_event_warning(&kefalnd.nthreads,
			       !atomic_read(&kefalnd.nthreads),
			       "Waiting for %d threads to terminate\n",
			       atomic_read(&kefalnd.nthreads));

	cfs_percpt_free(kefalnd.scheds);
	cfs_percpt_free(kefalnd.cm_daemons);

	CDEBUG(D_MALLOC, "After LND cleanup: kmem[%lld]\n",
	       libcfs_kmem_read());

	kefalnd.init_state = EFALND_INIT_NONE;
	module_put(THIS_MODULE);
}

/* global initialization of EFA LND data */
static int
kefalnd_base_startup(void)
{
	struct kefa_cm_deamon *cm_daemon;
	struct kefa_sched *sched;
	int nthrs;
	int rc = 0, i;

	LASSERT(kefalnd.init_state == EFALND_INIT_NONE);

	CDEBUG(D_MALLOC, "Before LND startup: kmem[%lld]\n",
	       libcfs_kmem_read());

	/* take a reference count until we clear all module resources */
	if (!try_module_get(THIS_MODULE)) {
		rc = -ENETDOWN;
		goto err;
	}

	memset(&kefalnd, 0, sizeof(kefalnd));
	INIT_LIST_HEAD(&kefalnd.efa_ni_list);
	rc = rhashtable_init(&kefalnd.peer_ni, &peer_ni_params);
	if (rc)
		goto err_module;

	/* allocate a shceduler per NUMA node (cpt) */
	kefalnd.scheds = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*sched));
	if (!kefalnd.scheds) {
		rc = -ENOMEM;
		goto err_hash;
	}

	cfs_percpt_for_each(sched, i, kefalnd.scheds) {
		spin_lock_init(&sched->lock);

		nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
		if (*kefalnd_tunables.kefa_nscheds > 0) {
			nthrs = min(nthrs, *kefalnd_tunables.kefa_nscheds);
		} else {
			/*
			 * Max to half of CPUs, another half is reserved for
			 * upper layer modules.
			 */
			nthrs = min(max(EFALND_MIN_SCHED_THRS, nthrs >> 1), nthrs);
		}

		sched->nthreads_max = nthrs;
		sched->cpt = i;
		INIT_LIST_HEAD(&sched->pend_cqs);
		init_waitqueue_head(&sched->waitq);
	}

	/* allocate a connection manager daemon per NUMA node (cpt) */
	kefalnd.cm_daemons = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*cm_daemon));
	if (!kefalnd.cm_daemons) {
		rc = -ENOMEM;
		goto err_sched;
	}

	cfs_percpt_for_each(cm_daemon, i, kefalnd.cm_daemons) {
		cm_daemon->cpt = i;
		cm_daemon->iter = 0;
		mutex_init(&cm_daemon->ni_list_lock);
		INIT_LIST_HEAD(&cm_daemon->efa_ni_list);
		init_waitqueue_head(&cm_daemon->waitq);
	}

	CDEBUG(D_MALLOC, "After LND startup: kmem[%lld]\n",
	       libcfs_kmem_read());

	kefalnd.init_state = EFALND_INIT_ALL;
	kefalnd.shutdown = false;
	return 0;

err_sched:
	cfs_percpt_free(kefalnd.scheds);
err_hash:
	rhashtable_destroy(&kefalnd.peer_ni);
err_module:
	module_put(THIS_MODULE);
err:
	return rc;
}

static void
kefalnd_shutdown(struct lnet_ni *ni)
{
	struct kefa_ni *efa_ni = ni->ni_data;
	struct kefa_dev *efa_dev = efa_ni->efa_dev;

	LASSERT(kefalnd.init_state == EFALND_INIT_ALL);

	CDEBUG(D_MALLOC, "Before NI[%s] cleanup: kmem[%lld]\n",
	       libcfs_nidstr(&ni->ni_nid), libcfs_kmem_read());

	if (!list_empty(&efa_ni->cm_node))
		kefalnd_del_ni_from_cm_daemon(efa_ni);

	if (efa_ni->self_peer_ni)
		kefalnd_put_peer_ni(efa_ni->self_peer_ni);

	/* remove network resources - connections pools, etc */
	kefalnd_destroy_tx_pool(efa_ni);
	kefalnd_destroy_all_conns(efa_ni);

	/* Remove the underlaying device if exists */
	if (efa_dev)
		kefalnd_dev_destroy(efa_dev, true);

	if (!list_empty(&efa_ni->lnd_node))
		list_del_init(&efa_ni->lnd_node);

	/* remove the NI itself */
	ni->ni_data = NULL;
	LIBCFS_FREE(efa_ni, sizeof(*efa_ni));

	CDEBUG(D_MALLOC, "After NI[%s] cleanup: kmem[%lld]\n",
	       libcfs_nidstr(&ni->ni_nid), libcfs_kmem_read());

	/* if there are no more NIs - destroy the global efalnd */
	if (list_empty(&kefalnd.efa_ni_list))
		kefalnd_base_shutdown();
}

static int
kefalnd_startup(struct lnet_ni *ni)
{
	struct kefa_dev *efa_dev;
	struct kefa_ni *efa_ni;
	__be32 ip_addr;
	char *ifname;
	int rc;

	if (kefalnd.init_state == EFALND_INIT_NONE) {
		rc = kefalnd_base_startup();
		if (rc != 0)
			return rc;
	}

	CDEBUG(D_MALLOC, "Before NI startup: kmem[%lld]\n", libcfs_kmem_read());

	LIBCFS_ALLOC(efa_ni, sizeof(*efa_ni));
	if (!efa_ni) {
		CERROR("Failed to allocate memory for EFA network\n");
		return -ENOMEM;
	}

	memset(efa_ni, 0, sizeof(*efa_ni));
	ni->ni_data = efa_ni;
	efa_ni->lnet_ni = ni;
	efa_ni->ni_epoch = ktime_get_real_ns();
	hash_init(efa_ni->conns);
	rwlock_init(&efa_ni->conn_lock);
	INIT_LIST_HEAD(&efa_ni->lnd_node);
	INIT_LIST_HEAD(&efa_ni->cm_node);

	if (ni->ni_interface) {
		ifname = ni->ni_interface;
	} else {
		CERROR("Missing interface name\n");
		rc = -EINVAL;
		goto failed;
	}

	kefalnd_tunables_setup(ni);

	rc = kefalnd_select_ipif(ni, &ip_addr);
	if (rc < 0) {
		goto failed;
	}

	efa_dev = kefalnd_dev_search(ifname);
	if (efa_dev) {
		CERROR("Device[%s] already exists\n", ifname);
		rc = -EINVAL;
		goto failed;
	}

	/* initialize the device */
	efa_dev = kefalnd_dev_init(efa_ni, ifname, ip_addr);
	if (IS_ERR(efa_dev)) {
		rc = PTR_ERR(efa_dev);
		CERROR("Failed to initialize device[%s]. err[%d]\n", ifname, rc);
		goto failed;
	}

	efa_ni->efa_dev = efa_dev;
	ni->ni_dev_cpt = efa_dev->cpt;

	kefalnd_create_efa_nid(efa_ni);
	if (nid_is_nid4(&ni->ni_nid)) {
		efa_ni->self_peer_ni =
			kefalnd_lookup_or_create_peer_ni(lnet_nid_to_nid4(&ni->ni_nid),
							 &efa_dev->gid,
							 efa_dev->cm_qp->ib_qp->qp_num,
							 efa_dev->cm_qp->qkey);
		if (!efa_ni->self_peer_ni) {
			rc = -ENODEV;
			goto failed;
		}
	}

	rc = kefalnd_create_tx_pool(efa_ni, efa_dev->cpt);
	if (rc != 0)
		goto failed;

	rc = kefalnd_dev_start_threads(efa_dev);
	if (rc != 0)
		goto failed;

	kefalnd_add_ni_to_cm_daemon(efa_ni);
	list_add_tail(&efa_ni->lnd_node, &kefalnd.efa_ni_list);

	CDEBUG(D_MALLOC, "After NI[%s] startup: kmem[%lld]\n",
	       ni->ni_interface, libcfs_kmem_read());

	LCONSOLE_INFO("Started NI[%s] EFA device[%s] FW[0x%llx] Lustre[%s] LND[%s] CPT[%d]\n",
		      libcfs_nidstr(&ni->ni_nid), efa_dev->ifname,
		      efa_dev->ib_dev->attrs.fw_ver,
		      LUSTRE_VERSION_STRING,
		      DRV_MODULE_VERSION, efa_dev->cpt);

	return 0;

failed:
	kefalnd_shutdown(ni);
	return rc;
}

static const struct lnet_lnd the_efalnd = {
	.lnd_type		= EFALND,
	.lnd_startup		= kefalnd_startup,
	.lnd_shutdown		= kefalnd_shutdown,
	.lnd_send		= kefalnd_send,
	.lnd_recv		= kefalnd_recv,
	.lnd_get_dev_prio	= kefalnd_get_dev_prio,
	.lnd_get_nid_metadata	= kefalnd_get_nid_metadata,

};

static void __exit kefalnd_exit(void)
{
	CDEBUG(D_NET, "Exiting EFA LND\n");

	kefalnd_debugfs_exit();

	lnet_unregister_lnd(&the_efalnd);
}

static int __init kefalnd_init(void)
{
	int rc;

	memset(&kefalnd, 0, sizeof(kefalnd));

	CDEBUG(D_NET, "Entering EFA LND\n");
	rc = kefalnd_tunables_init();
	if (rc) {
		CERROR("failed to init tunables\n");
		return rc;
	}

	kefalnd_debugfs_init();

	lnet_register_lnd(&the_efalnd);
	return 0;
}

MODULE_SOFTDEP("pre: efa");

MODULE_AUTHOR("Amazon.com, Inc. or its affiliates");
MODULE_DESCRIPTION("EFA LNet Network Driver");
MODULE_VERSION(DRV_MODULE_VERSION);
MODULE_LICENSE("GPL");

late_initcall_sync(kefalnd_init);
module_exit(kefalnd_exit);