Viewing: efalnd_connection.c

// SPDX-License-Identifier: GPL-2.0

/*
 * Copyright (c) 2024-2026, Amazon and/or its affiliates. All rights reserved.
 * Use is subject to license terms.
 */

/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * Author: Yonatan Nachum <ynachum@amazon.com>
 */

#include <linux/jiffies.h>

#include <rdma/ib_verbs.h>

#include "efalnd.h"

#define MAX_IDLE_CONN		32
#define RESP_CONN_EXTRA_TIME	300

static inline struct kefa_conn_probe_msg *
kefalnd_get_probe_from_msg(struct kefa_msg *msg)
{
	if (msg->hdr.proto_ver != EFALND_PROTO_VER_1)
		return &msg->msg_v2.u.conn_probe;

	return &msg->msg_v1.u.conn_probe;
}

static inline struct kefa_conn_probe_resp_msg *
kefalnd_get_probe_resp_from_msg(struct kefa_msg *msg)
{
	if (msg->hdr.proto_ver != EFALND_PROTO_VER_1)
		return &msg->msg_v2.u.conn_probe_resp;

	return &msg->msg_v1.u.conn_probe_resp;
}

static inline struct kefa_conn_req_msg *
kefalnd_get_conn_req_from_msg(struct kefa_msg *msg)
{
	return &msg->msg_v2.u.conn_request;
}

static inline struct kefa_conn_req_ack *
kefalnd_get_conn_req_ack_from_msg(struct kefa_msg *msg)
{
	return &msg->msg_v2.u.conn_request_ack;
}

static inline void
kefalnd_set_conn_state_locked(struct kefa_conn *conn,
			      enum kefa_conn_state new_state)
__must_hold(&conn->lock)
{
	CDEBUG(D_NET, "Connection[%s] type[%d] change state from[%d] to[%d]\n",
	       libcfs_nidstr(&conn->remote_nid), conn->type, conn->state,
	       new_state);

	conn->state = new_state;
}

static void
kefalnd_set_conn_state(struct kefa_conn *conn, enum kefa_conn_state new_state)
{
	unsigned long flags;

	spin_lock_irqsave(&conn->lock, flags);
	kefalnd_set_conn_state_locked(conn, new_state);
	spin_unlock_irqrestore(&conn->lock, flags);
}

static inline int
kefalnd_post_conn_message(struct kefa_conn *conn, struct kefa_tx *tx)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	struct kefa_peer_ni *peer_ni = conn->peer_ni;
	struct kefa_qp *cm_qp = efa_dev->cm_qp;
	struct ib_srd_wr *srd_wr;
	struct ib_send_wr *wr;
	unsigned long flags;
	int rc;

	tx->conn = conn;

	srd_wr = &tx->wrq.wr;
	wr = &srd_wr->wr;

	srd_wr->ah = conn->ah;

	atomic_set(&tx->ref_cnt, 1);
	atomic64_set(&tx->send_time, ktime_get_seconds());
	kefalnd_msg_set_epoch(tx->msg, conn->remote_epoch);

	spin_lock_irqsave(&conn->lock, flags);
	list_add_tail(&tx->list_node, &conn->active_tx);
	spin_unlock_irqrestore(&conn->lock, flags);

	if (nid_is_nid4(&conn->remote_nid)) {
		read_lock_irqsave(&peer_ni->peer_ni_lock, flags);
		srd_wr->remote_qpn = peer_ni->cm_qp.qp_num;
		srd_wr->remote_qkey = peer_ni->cm_qp.qkey;
		read_unlock_irqrestore(&peer_ni->peer_ni_lock, flags);
	} else {
		srd_wr->remote_qpn = kefalnd_large_nid_get_cm_qp_num(&conn->remote_nid);
		srd_wr->remote_qkey = kefalnd_large_nid_get_cm_qp_qkey(&conn->remote_nid);
	}

	rc = ib_post_send(cm_qp->ib_qp, wr, NULL);
	if (rc) {
		EFA_DEV_ERR(efa_dev,
			    "QP[%u] failed to post conn msg to peer NI[%s]. err[%d]\n",
			    cm_qp->ib_qp->qp_num,
			    libcfs_nidstr(&conn->remote_nid), rc);

		spin_lock_irqsave(&conn->lock, flags);
		list_del_init(&tx->list_node);
		spin_unlock_irqrestore(&conn->lock, flags);

		tx->conn = NULL;
		atomic_set(&tx->ref_cnt, 0);
		atomic64_set(&tx->send_time, 0);

		return rc;
	}

	return 0;
}

static u32
kefalnd_select_conn_qps(struct kefa_dev *efa_dev,
			struct kefa_qp_proto *data_qps)
{
	int i;

	for (i = 0; i < efa_dev->nqps; i++) {
		data_qps[i].qp_num = efa_dev->qps[i].ib_qp->qp_num;
		data_qps[i].qkey = efa_dev->qps[i].qkey;
	}

	return efa_dev->nqps;
}

static int
kefalnd_send_conn_req_ack(struct kefa_conn *conn, int status)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	struct kefa_conn_req_ack *ack_msg;
	struct kefa_tx *tx;
	int nob, rc;

	CDEBUG(D_NET, "Send connection ack from[%s] to[%s]\n",
	       libcfs_nidstr(&conn->local_nid),
	       libcfs_nidstr(&conn->remote_nid));

	tx = kefalnd_get_idle_tx(conn->efa_ni);
	if (!tx) {
		EFA_DEV_ERR(efa_dev,
			    "can't allocate conn req txd for peer NI[%s]\n",
			    libcfs_nidstr(&conn->remote_nid));

		return -ENOMEM;
	}

	nob = status ? sizeof(struct kefa_conn_req_ack) :
			offsetof(struct kefa_conn_req_ack,
				data_qps[efa_dev->nqps]);

	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_CONN_REQ_ACK, nob,
				     conn->proto_ver);
	ack_msg = kefalnd_get_conn_req_ack_from_msg(tx->msg);
	ack_msg->lnd_ver = kefalnd_get_lnd_version();
	ack_msg->src_epoch = conn->efa_ni->ni_epoch;
	ack_msg->caps = 0;
	ack_msg->reserved = 0;

	ack_msg->src_conn_id = EFALND_INV_CONN;
	ack_msg->status = kefalnd_errno_to_efa_status(status);
	ack_msg->nqps = status ? 0 :
		kefalnd_select_conn_qps(efa_dev, &ack_msg->data_qps[0]);

	rc = kefalnd_post_conn_message(conn, tx);
	if (rc)
		kefalnd_tx_done(tx);

	return rc;
}

static int
kefalnd_send_conn_req(struct kefa_conn *conn)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	struct kefa_conn_req_msg *req;
	struct kefa_tx *tx;
	int nob, rc;

	LASSERT(conn->state == KEFA_CONN_ESTABLISH);

	CDEBUG(D_NET, "Send connection request from[%s] to[%s]\n",
	       libcfs_nidstr(&conn->local_nid),
	       libcfs_nidstr(&conn->remote_nid));

	tx = kefalnd_get_idle_tx(conn->efa_ni);
	if (!tx) {
		EFA_DEV_ERR(efa_dev,
			    "can't allocate conn req txd for peer NI[%s]\n",
			    libcfs_nidstr(&conn->remote_nid));

		return -ENOMEM;
	}

	nob = offsetof(struct kefa_conn_req_msg, data_qps[efa_dev->nqps]);
	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_CONN_REQ, nob,
				     conn->proto_ver);

	req = kefalnd_get_conn_req_from_msg(tx->msg);
	req->lnd_ver = kefalnd_get_lnd_version();
	memcpy(req->src_gid, efa_dev->gid.raw, sizeof(req->src_gid));
	req->src_epoch = conn->efa_ni->ni_epoch;
	req->cm_qp.qp_num = efa_dev->cm_qp->ib_qp->qp_num;
	req->cm_qp.qkey = efa_dev->cm_qp->qkey;
	req->caps = 0;
	req->reserved = 0;

	req->requests = conn->requests;
	req->src_conn_id = EFALND_INV_CONN;
	req->nqps = kefalnd_select_conn_qps(efa_dev, &req->data_qps[0]);

	rc = kefalnd_post_conn_message(conn, tx);
	if (rc)
		kefalnd_tx_done(tx);

	return rc;
}

static int
kefalnd_send_conn_probe_resp(struct kefa_conn *conn, int status)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	struct kefa_conn_probe_resp_msg *probe_resp;
	struct kefa_tx *tx;
	int rc;

	CDEBUG(D_NET, "Send connection probe response from[%s] to[%s]\n",
	       libcfs_nidstr(&conn->local_nid),
	       libcfs_nidstr(&conn->remote_nid));

	tx = kefalnd_get_idle_tx(conn->efa_ni);
	if (!tx) {
		EFA_DEV_ERR(efa_dev,
			    "can't allocate conn req txd for peer NI[%s]\n",
			    libcfs_nidstr(&conn->remote_nid));

		return -ENOMEM;
	}

	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_CONN_PROBE_RESP,
				     sizeof(struct kefa_conn_probe_resp_msg),
				     conn->proto_ver);

	probe_resp = kefalnd_get_probe_resp_from_msg(tx->msg);
	probe_resp->lnd_ver = kefalnd_get_lnd_version();
	probe_resp->src_epoch = conn->efa_ni->ni_epoch;
	probe_resp->caps = 0;
	probe_resp->min_proto_ver = EFALND_MIN_PROTO_VER;
	probe_resp->max_proto_ver = EFALND_MAX_PROTO_VER;
	probe_resp->status = kefalnd_errno_to_efa_status(status);

	rc = kefalnd_post_conn_message(conn, tx);
	if (rc)
		kefalnd_tx_done(tx);

	return rc;
}

static int
kefalnd_send_conn_probe(struct kefa_conn *conn)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	struct kefa_conn_probe_msg *probe;
	struct kefa_tx *tx;
	int rc;

	LASSERT(conn->state == KEFA_CONN_PROBE_EFA);

	CDEBUG(D_NET, "Send connection probe from[%s] to[%s]\n",
	       libcfs_nidstr(&conn->local_nid),
	       libcfs_nidstr(&conn->remote_nid));

	tx = kefalnd_get_idle_tx(conn->efa_ni);
	if (!tx) {
		EFA_DEV_ERR(efa_dev,
			    "can't allocate conn req txd for peer NI[%s]\n",
			    libcfs_nidstr(&conn->remote_nid));

		return -ENOMEM;
	}

	kefalnd_init_tx_protocol_msg(tx, conn, EFALND_MSG_CONN_PROBE,
				     sizeof(struct kefa_conn_probe_msg),
				     conn->proto_ver);

	probe = kefalnd_get_probe_from_msg(tx->msg);
	probe->lnd_ver = kefalnd_get_lnd_version();
	probe->src_epoch = conn->efa_ni->ni_epoch;
	memcpy(probe->src_gid, efa_dev->gid.raw, sizeof(probe->src_gid));
	probe->cm_qp.qp_num = efa_dev->cm_qp->ib_qp->qp_num;
	probe->cm_qp.qkey = efa_dev->cm_qp->qkey;
	probe->caps = 0;

	rc = kefalnd_post_conn_message(conn, tx);
	if (rc)
		kefalnd_tx_done(tx);

	return rc;
}

void
kefalnd_destroy_conn(struct kefa_conn *conn, enum lnet_msg_hstatus hstatus,
		     int status)
{
	struct kefa_tx *tx, *temp_tx;
	struct list_head cancel_tx;
	unsigned long flags;

	INIT_LIST_HEAD(&cancel_tx);

	spin_lock_irqsave(&conn->lock, flags);
	if (!list_empty(&conn->active_tx))
		EFA_DEV_WARN(conn->efa_ni->efa_dev,
			     "destroying conn to peer NI[%s] with active TXs\n",
			     libcfs_nidstr(&conn->remote_nid));

	list_splice_init(&conn->pend_tx, &cancel_tx);
	spin_unlock_irqrestore(&conn->lock, flags);

	list_for_each_entry_safe(tx, temp_tx, &cancel_tx, list_node) {
		kefalnd_force_cancel_tx(tx, hstatus, status);
	}

	if (!IS_ERR_OR_NULL(conn->ah))
		rdma_destroy_ah(conn->ah, RDMA_DESTROY_AH_SLEEPABLE);

	if (conn->peer_ni)
		kefalnd_put_peer_ni(conn->peer_ni);

	LIBCFS_FREE(conn->data_qps,
		    conn->nqps * sizeof(*conn->data_qps));

	LIBCFS_FREE(conn, sizeof(*conn));
}

static u64
kefalnd_nid_to_key(struct lnet_nid *nid)
{
	u64 key;

	key = ((u64)(nid->nid_addr[0] ^ nid->nid_addr[2])) << 32;
	key |= nid->nid_addr[1] ^ nid->nid_addr[3];

	return key;
}

static struct kefa_conn *
kefalnd_create_conn(struct kefa_ni *efa_ni, struct lnet_nid *peer_nid,
		    enum kefa_conn_type conn_type)
{
	struct kefa_dev *efa_dev = efa_ni->efa_dev;
	struct kefa_conn *conn;

	LIBCFS_CPT_ALLOC(conn, lnet_cpt_table(), efa_dev->cpt, sizeof(*conn));
	if (!conn) {
		EFA_DEV_ERR(efa_dev, "failed to allocate EFA connection\n");
		return ERR_PTR(-ENOMEM);
	}

	conn->efa_ni = efa_ni;
	conn->proto_ver = EFALND_MAX_PROTO_VER;
	conn->remote_nid = *peer_nid;
	conn->hash_key = kefalnd_nid_to_key(&conn->remote_nid);
	conn->local_nid = efa_ni->lnet_ni->ni_nid;
	conn->remote_epoch = 0;
	conn->last_use_time = ktime_get_seconds();
	conn->state = KEFA_CONN_INACTIVE;

	if (nid_same(&conn->local_nid, &conn->remote_nid))
		conn_type = KEFA_CONN_TYPE_LB;

	conn->type = conn_type;
	INIT_LIST_HEAD(&conn->pend_tx);
	INIT_LIST_HEAD(&conn->active_tx);
	INIT_LIST_HEAD(&conn->abort_tx);
	INIT_HLIST_NODE(&conn->ni_node);
	spin_lock_init(&conn->lock);

	return conn;
}

static int
kefalnd_init_self_conn_data_qps(struct kefa_conn *conn)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	int i;

	conn->nqps = efa_dev->nqps;

	LIBCFS_CPT_ALLOC(conn->data_qps, lnet_cpt_table(), efa_dev->cpt,
			 efa_dev->nqps * sizeof(*conn->data_qps));
	if (!conn->data_qps)
		return -ENOMEM;

	for (i = 0; i < efa_dev->nqps; i++) {
		conn->data_qps[i].qp_num = efa_dev->qps[i].ib_qp->qp_num;
		conn->data_qps[i].qkey = efa_dev->qps[i].qkey;
	}

	atomic_set(&conn->last_qp_idx, 0);
	return 0;
}

static int
kefalnd_init_conn_data_qps(struct kefa_conn *conn,
			   struct kefa_qp_proto *data_qps, u32 nqps)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	int i;

	if (nqps > EFALND_MAX_PEER_QPS)
		return -EOPNOTSUPP;

	LIBCFS_CPT_ALLOC_GFP(conn->data_qps, lnet_cpt_table(), efa_dev->cpt,
			     nqps * sizeof(*conn->data_qps), GFP_ATOMIC);
	if (!conn->data_qps)
		return -ENOMEM;

	conn->nqps = nqps;

	for (i = 0; i < nqps; i++) {
		conn->data_qps[i].qp_num = data_qps[i].qp_num;
		conn->data_qps[i].qkey = data_qps[i].qkey;
	}

	atomic_set(&conn->last_qp_idx, 0);
	return 0;
}

static int
kefalnd_init_conn_ah(struct kefa_conn *conn, union ib_gid *gid)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	struct rdma_ah_attr ah_attr = {};

	ah_attr.type = RDMA_AH_ATTR_TYPE_UNDEFINED;
	rdma_ah_set_make_grd(&ah_attr, 0);
	rdma_ah_set_dlid(&ah_attr, 0);
	rdma_ah_set_sl(&ah_attr, 0);
	rdma_ah_set_path_bits(&ah_attr, 0);
	rdma_ah_set_static_rate(&ah_attr, 0);
	rdma_ah_set_port_num(&ah_attr, 1);
	rdma_ah_set_grh(&ah_attr, gid, 0, 0, 0, 0);

	conn->ah = rdma_create_ah(efa_dev->pd, &ah_attr,
				  RDMA_CREATE_AH_SLEEPABLE);
	if (IS_ERR(conn->ah)) {
		EFA_DEV_ERR(efa_dev,
			    "failed to create AH to peer NI[%s]. err[%ld]\n",
			    libcfs_nidstr(&conn->remote_nid),
			    PTR_ERR(conn->ah));

		return PTR_ERR(conn->ah);
	}

	return 0;
}

static int
kefalnd_establish_conn(struct kefa_conn *conn)
{
	struct kefa_dev *efa_dev = conn->efa_ni->efa_dev;
	struct kefa_peer_ni *peer_ni;
	union ib_gid gid;
	int rc = 0;

	if (nid_is_nid4(&conn->remote_nid)) {
		kefalnd_set_conn_state(conn, KEFA_CONN_PROBE_TCP);
		peer_ni = kefalnd_find_remote_peer_ni(efa_dev,
						      &conn->remote_nid);
		if (IS_ERR_OR_NULL(peer_ni)) {
			EFA_DEV_DEBUG(efa_dev,
				      "failed to locate GID for NID %s. err[%ld]\n",
				      libcfs_nidstr(&conn->remote_nid),
				      PTR_ERR(peer_ni));
			return PTR_ERR(peer_ni);
		}

		gid = peer_ni->gid;
		conn->peer_ni = peer_ni;
	} else {
		kefalnd_large_nid_get_gid(&conn->remote_nid, &gid);
	}

	kefalnd_set_conn_state(conn, KEFA_CONN_PROBE_EFA);
	rc = kefalnd_init_conn_ah(conn, &gid);
	if (rc)
		return rc;

	if (conn->type == KEFA_CONN_TYPE_LB) {
		conn->remote_epoch = conn->efa_ni->ni_epoch;
		/* TODO: initialize self connection caps and requests */
		rc = kefalnd_init_self_conn_data_qps(conn);
		if (rc)
			return rc;

		kefalnd_set_conn_state(conn, KEFA_CONN_ACTIVE);
	} else {
		rc = kefalnd_send_conn_probe(conn);
	}

	return rc;
}

static struct kefa_conn *
kefalnd_lookup_conn_locked(struct kefa_ni *efa_ni, struct lnet_nid *nid,
			   enum kefa_conn_type conn_type)
__must_hold(&efa_ni->conn_lock)
{
	struct kefa_conn *conn;
	u64 key;

	key = kefalnd_nid_to_key(nid);
	hash_for_each_possible(efa_ni->conns, conn, ni_node, key) {
		/* Match a connection if its NID and the NID of the local NI it
		 * communicates over are the same and the type of the connection
		 * is the same to the requested one.
		 * No need to take connection lock here since any change to
		 * those fields is performed while holding the NI connections
		 * write lock.
		 */
		if (!nid_same(&conn->remote_nid, nid))
			continue;

		if (conn->type != conn_type && conn->type != KEFA_CONN_TYPE_LB)
			continue;

		conn->last_use_time = ktime_get_seconds();
		return conn;
	}
	return NULL;
}

static void
kefalnd_remove_conn_locked(struct kefa_conn *conn)
__must_hold(&conn->efa_ni->conn_lock)
{
	hash_del(&conn->ni_node);
}

static void
kefalnd_remove_conn(struct kefa_ni *efa_ni, struct kefa_conn *conn)
{
	unsigned long flags;

	write_lock_irqsave(&efa_ni->conn_lock, flags);
	kefalnd_remove_conn_locked(conn);
	write_unlock_irqrestore(&efa_ni->conn_lock, flags);
}

static void
kefalnd_add_conn_locked(struct kefa_ni *efa_ni, struct kefa_conn *conn)
__must_hold(&efa_ni->conn_lock)
{
	hash_add(efa_ni->conns, &conn->ni_node, conn->hash_key);
}

struct kefa_conn *
kefalnd_lookup_or_init_conn(struct kefa_ni *efa_ni, struct lnet_nid *nid,
			    enum kefa_conn_type conn_type)
{
	struct kefa_conn *conn, *new_conn;
	unsigned long flags;
	int rc;

	/* First time, just use a read lock since I expect to find live
	 * connection
	 */
	read_lock_irqsave(&efa_ni->conn_lock, flags);
	conn = kefalnd_lookup_conn_locked(efa_ni, nid, conn_type);
	if (conn) {
		read_unlock_irqrestore(&efa_ni->conn_lock, flags);
		return conn;
	}

	read_unlock_irqrestore(&efa_ni->conn_lock, flags);

	/* Create the new conn here since we can't sleep inside the critical
	 * section
	 */
	new_conn = kefalnd_create_conn(efa_ni, nid, conn_type);
	if (IS_ERR(new_conn))
		return new_conn;

	/* Retry with write lock */
	write_lock_irqsave(&efa_ni->conn_lock, flags);
	conn = kefalnd_lookup_conn_locked(efa_ni, nid, conn_type);
	if (conn) {
		write_unlock_irqrestore(&efa_ni->conn_lock, flags);
		kefalnd_destroy_conn(new_conn, LNET_MSG_STATUS_OK, 0);
		return conn;
	}

	/* Add the new connection atomically so anyone will see it */
	kefalnd_add_conn_locked(efa_ni, new_conn);
	write_unlock_irqrestore(&efa_ni->conn_lock, flags);

	/* We establish just for an initiator connection since its the initiator
	 * of the communication with the remote side.
	 */
	if (conn_type == KEFA_CONN_TYPE_RESPONDER)
		return new_conn;

	/* Connection establishment can be lock free since other users of the
	 * connection will just place the TX on the pending list if it isn't
	 * already live.
	 */
	rc = kefalnd_establish_conn(new_conn);
	if (rc) {
		kefalnd_remove_conn(efa_ni, new_conn);
		kefalnd_destroy_conn(new_conn, LNET_MSG_STATUS_REMOTE_TIMEOUT,
				     rc);
		return ERR_PTR(rc);
	}

	return new_conn;
}

struct kefa_conn *
kefalnd_lookup_conn(struct kefa_ni *efa_ni, struct lnet_nid *nid,
		    enum kefa_conn_type conn_type)
{
	struct kefa_conn *conn;
	unsigned long flags;

	read_lock_irqsave(&efa_ni->conn_lock, flags);
	conn = kefalnd_lookup_conn_locked(efa_ni, nid, conn_type);
	read_unlock_irqrestore(&efa_ni->conn_lock, flags);
	return conn;
}

static int
kefalnd_handle_conn_req_ack(struct kefa_ni *efa_ni,
			    struct lnet_nid *srcnid,
			    struct lnet_nid *dstnid,
			    struct kefa_conn_req_ack *ack_msg)
{
	enum lnet_msg_hstatus hstatus;
	struct kefa_conn *conn;
	unsigned long flags;
	int rc;

	CDEBUG(D_NET, "Received connection ack from[%s] to[%s]\n",
	       libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

	conn = kefalnd_lookup_conn(efa_ni, srcnid, KEFA_CONN_TYPE_INITIATOR);
	if (!conn) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "Unexpected connection from[%s] to[%s]\n",
			    libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

		return -ENOTCONN;
	}

	spin_lock_irqsave(&conn->lock, flags);
	if (conn->state != KEFA_CONN_ESTABLISH) {
		spin_unlock_irqrestore(&conn->lock, flags);
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "Unexpected connection state[%u] from[%s] to[%s]\n",
			    conn->state, libcfs_nidstr(srcnid),
			    libcfs_nidstr(dstnid));

		return -EINVAL;
	}

	spin_unlock_irqrestore(&conn->lock, flags);

	if (ack_msg->status) {
		rc = kefalnd_efa_status_to_errno(ack_msg->status);
		hstatus = LNET_MSG_STATUS_REMOTE_DROPPED;
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "conn request failed on remote err[%d] from[%s] to[%s]\n",
			    rc, libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

		goto cleanup_conn;
	}

	rc = kefalnd_init_conn_data_qps(conn,
					(struct kefa_qp_proto *)ack_msg->data_qps,
					ack_msg->nqps);
	if (rc) {
		hstatus = LNET_MSG_STATUS_LOCAL_ABORTED;
		goto cleanup_conn;
	}

	spin_lock_irqsave(&conn->lock, flags);
	kefalnd_set_conn_state_locked(conn, KEFA_CONN_ACTIVE);

	/* Post all pending TXs on the connection. */
	kefalnd_conn_post_tx_locked(conn);
	spin_unlock_irqrestore(&conn->lock, flags);

	return 0;

cleanup_conn:
	kefalnd_remove_conn(efa_ni, conn);
	kefalnd_destroy_conn(conn, hstatus, rc);
	return rc;
}

static int
kefalnd_handle_conn_req(struct kefa_ni *efa_ni,
			struct lnet_nid *srcnid,
			struct lnet_nid *dstnid,
			struct kefa_conn_req_msg *request_msg,
			u8 proto_ver)
{
	struct kefa_conn *conn;
	unsigned long flags;
	int rc;

	CDEBUG(D_NET, "Received connection request from[%s] to[%s]\n",
	       libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

	conn = kefalnd_lookup_conn(efa_ni, srcnid, KEFA_CONN_TYPE_RESPONDER);
	if (!conn) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "Unexpected connection from[%s] to[%s]\n",
			    libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

		return -ENOTCONN;
	}

	/* We currently asume connection probe comes before request so the
	 * connection must exits at this point and it can be active or during
	 * establishment.
	 */
	spin_lock_irqsave(&conn->lock, flags);
	switch (conn->state) {
	case KEFA_CONN_PROBE_EFA_PASSIVE:
		/* Regular path */
		/* TODO: validate requests make sense */
		conn->requests = request_msg->requests;
		conn->proto_ver = proto_ver;

		rc = kefalnd_init_conn_data_qps(conn,
						(struct kefa_qp_proto *)request_msg->data_qps,
						request_msg->nqps);
		if (rc) {
			spin_unlock_irqrestore(&conn->lock, flags);
			kefalnd_send_conn_req_ack(conn, rc);
			return rc;
		}

		kefalnd_set_conn_state_locked(conn, KEFA_CONN_ACTIVE);
		spin_unlock_irqrestore(&conn->lock, flags);
		break;

	case KEFA_CONN_ACTIVE:
		/* If the connection was already active and valid on probe. */
		spin_unlock_irqrestore(&conn->lock, flags);
		break;

	default:
		spin_unlock_irqrestore(&conn->lock, flags);
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "Unexpected connection state[%u] from[%s] to[%s]\n",
			    conn->state, libcfs_nidstr(srcnid),
			    libcfs_nidstr(dstnid));

		return -EINVAL;
	}

	rc = kefalnd_send_conn_req_ack(conn, 0);
	return rc;
}

static int
kefalnd_handle_conn_probe_resp(struct kefa_ni *efa_ni, struct lnet_nid *srcnid,
			       struct lnet_nid *dstnid,
			       struct kefa_conn_probe_resp_msg *probe_resp_msg,
			       u16 msg_nob)
{
	enum lnet_msg_hstatus hstatus;
	u8 min_proto, max_proto;
	struct kefa_conn *conn;
	unsigned long flags;
	int rc;

	CDEBUG(D_NET, "Received connection probe response from[%s] to[%s]\n",
	       libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

	conn = kefalnd_lookup_conn(efa_ni, srcnid, KEFA_CONN_TYPE_INITIATOR);
	if (!conn) {
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "Unexpected connection from[%s] to[%s]\n",
			    libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

		return -ENOTCONN;
	}

	spin_lock_irqsave(&conn->lock, flags);
	if (conn->state != KEFA_CONN_PROBE_EFA) {
		spin_unlock_irqrestore(&conn->lock, flags);
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "Unexpected connection state[%u] from[%s] to[%s]\n",
			    conn->state, libcfs_nidstr(srcnid),
			    libcfs_nidstr(dstnid));

		return -EINVAL;
	}

	spin_unlock_irqrestore(&conn->lock, flags);

	min_proto = probe_resp_msg->min_proto_ver;
	max_proto = probe_resp_msg->max_proto_ver;

	rc = kefalnd_efa_status_to_errno(probe_resp_msg->status);
	if (rc) {
		if (rc == -EPROTONOSUPPORT) {
			/* Check for supported proto versions overlap */
			if (min_t(u8, EFALND_MAX_PROTO_VER, max_proto) <
			    max_t(u8, EFALND_MIN_PROTO_VER, min_proto)) {
				hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
				goto cleanup_conn;
			}

			/* Downgrade protocol version to remote's supported
			 * version and probe again.
			 */
			conn->proto_ver = min_t(u8, EFALND_MAX_PROTO_VER,
						max_proto);
			rc = kefalnd_send_conn_probe(conn);
			if (!rc)
				return 0;
		}

		hstatus = LNET_MSG_STATUS_REMOTE_DROPPED;
		goto cleanup_conn;
	}

	/* TODO: Based on received capabilities decide whether to communicate
	 * with the remote peer or deny the connection.
	 * Also take into account establishment version.
	 */
	conn->remote_caps = probe_resp_msg->caps;
	conn->requests = 0;
	conn->remote_epoch = probe_resp_msg->src_epoch;
	conn->proto_ver = min_t(u8, EFALND_MAX_PROTO_VER, max_proto);

	kefalnd_set_conn_state(conn, KEFA_CONN_ESTABLISH);
	rc = kefalnd_send_conn_req(conn);
	if (rc) {
		hstatus = LNET_MSG_STATUS_LOCAL_DROPPED;
		goto cleanup_conn;
	}

	return 0;

cleanup_conn:
	kefalnd_remove_conn(efa_ni, conn);
	kefalnd_destroy_conn(conn, hstatus, rc);
	return rc;
}

static void
kefalnd_deactivate_conn_locked(struct kefa_conn *conn)
__must_hold(&conn->efa_ni->conn_lock)
{
	struct kefa_ni *efa_ni = conn->efa_ni;

	kefalnd_remove_conn_locked(conn);

	/* We set the connection's hash key to 0 so it won't be found on
	 * lookup anymore and set its state to deactivating so the connection
	 * daemon will remove it.
	 */
	kefalnd_set_conn_state(conn, KEFA_CONN_DEACTIVATING);
	conn->hash_key = 0;

	kefalnd_add_conn_locked(efa_ni, conn);
}

void
kefalnd_deactivate_conn(struct kefa_conn *conn)
{
	struct kefa_ni *efa_ni = conn->efa_ni;
	unsigned long flags;

	write_lock_irqsave(&efa_ni->conn_lock, flags);
	kefalnd_deactivate_conn_locked(conn);
	write_unlock_irqrestore(&efa_ni->conn_lock, flags);
}

static struct kefa_conn *
kefalnd_refresh_connection(struct kefa_ni *efa_ni, struct kefa_conn *old_conn,
			   struct lnet_nid *nid, union ib_gid *gid, u16 cm_qpn,
			   u32 cm_qkey)
{
	struct kefa_peer_ni *peer_ni;
	struct kefa_conn *new_conn;
	unsigned long flags;

	new_conn = kefalnd_create_conn(efa_ni, nid, KEFA_CONN_TYPE_RESPONDER);
	if (IS_ERR(new_conn))
		return new_conn;

	kefalnd_set_conn_state(new_conn, KEFA_CONN_PROBE_EFA_PASSIVE);

	/* Old connection have refcount on peer NI so we expect to find it and
	 * update its fields afterwards.
	 */
	if (nid_is_nid4(nid)) {
		peer_ni = kefalnd_lookup_or_create_peer_ni(lnet_nid_to_nid4(nid),
							   gid, cm_qpn,
							   cm_qkey);
		if (IS_ERR(peer_ni)) {
			kefalnd_destroy_conn(new_conn,
					     LNET_MSG_STATUS_LOCAL_ERROR,
					     PTR_ERR(peer_ni));
			return ERR_CAST(peer_ni);
		}

		kefalnd_update_peer_ni(peer_ni, gid, cm_qpn, cm_qkey);
		new_conn->peer_ni = peer_ni;
	}

	/* We remove the old responder connection and insert a new one
	 * atomically.
	 */
	write_lock_irqsave(&efa_ni->conn_lock, flags);
	kefalnd_deactivate_conn_locked(old_conn);
	kefalnd_add_conn_locked(efa_ni, new_conn);
	write_unlock_irqrestore(&efa_ni->conn_lock, flags);

	return new_conn;
}

static int
kefalnd_handle_conn_probe(struct kefa_ni *efa_ni,
			  struct lnet_nid *srcnid,
			  struct lnet_nid *dstnid,
			  struct kefa_conn_probe_msg *probe_msg,
			  u8 proto_ver)
{
	struct kefa_conn *conn, *init_conn;
	struct kefa_peer_ni *peer_ni;
	bool init_conn_active;
	unsigned long flags;
	union ib_gid gid;
	int rc, resp_rc;

	CDEBUG(D_NET, "Received connection probe from[%s] to[%s]\n",
	       libcfs_nidstr(srcnid), libcfs_nidstr(dstnid));

	conn = kefalnd_lookup_or_init_conn(efa_ni, srcnid,
					   KEFA_CONN_TYPE_RESPONDER);
	if (IS_ERR(conn))
		return PTR_ERR(conn);

	memcpy(gid.raw, probe_msg->src_gid, sizeof(probe_msg->src_gid));

	spin_lock_irqsave(&conn->lock, flags);
	switch (conn->state) {
	case KEFA_CONN_INACTIVE:
		kefalnd_set_conn_state_locked(conn, KEFA_CONN_PROBE_EFA_PASSIVE);
		spin_unlock_irqrestore(&conn->lock, flags);
		if (nid_is_nid4(srcnid)) {
			peer_ni = kefalnd_lookup_or_create_peer_ni(lnet_nid_to_nid4(srcnid), &gid,
								   probe_msg->cm_qp.qp_num,
								   probe_msg->cm_qp.qkey);
			if (IS_ERR_OR_NULL(peer_ni)) {
				rc = PTR_ERR(peer_ni);
				goto cleanup_conn;
			}

			conn->peer_ni = peer_ni;
		}

		break;

	case KEFA_CONN_PROBE_EFA_PASSIVE:
		/* KEFA_CONN_PROBE_EFA_PASSIVE - This side received probe
		 * already and returned with an error that caused the initiator
		 * to retry.
		 */
		spin_unlock_irqrestore(&conn->lock, flags);
		goto proto_check;

	case KEFA_CONN_ACTIVE:
		/* If there is an active connection to the peer and the epochs
		 * are the same we can keep using the same connection.
		 */
		if (conn->remote_epoch == probe_msg->src_epoch) {
			spin_unlock_irqrestore(&conn->lock, flags);
			kefalnd_send_conn_probe_resp(conn, 0);
			return 0;
		}

		spin_unlock_irqrestore(&conn->lock, flags);
		conn = kefalnd_refresh_connection(efa_ni, conn, srcnid, &gid,
						  probe_msg->cm_qp.qp_num,
						  probe_msg->cm_qp.qkey);
		if (IS_ERR(conn))
			return PTR_ERR(conn);

		break;

	default:
		/* KEFA_CONN_PROBE_TCP/KEFA_CONN_PROBE_EFA - We don't expect to
		 * find an RX connection in those states since a connection can
		 * be in those states only if it is the initiator(TX connection).
		 * KEFA_CONN_DEACTIVATING - Can't happen since connection with
		 * deactivating state have 0 as NID so can't be fetched.
		 */
		spin_unlock_irqrestore(&conn->lock, flags);
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "Unexpected connection state[%u] from[%s] to[%s]\n",
			    conn->state, libcfs_nidstr(srcnid),
			    libcfs_nidstr(dstnid));

		return -EINVAL;
	}

	/* We check if TX connection is valid as well and if not remove it */
	init_conn = kefalnd_lookup_conn(efa_ni, srcnid,
					KEFA_CONN_TYPE_INITIATOR);
	if (init_conn) {
		spin_lock_irqsave(&init_conn->lock, flags);
		init_conn_active = init_conn->state == KEFA_CONN_ACTIVE;
		spin_unlock_irqrestore(&init_conn->lock, flags);

		if (init_conn_active &&
		    init_conn->remote_epoch != probe_msg->src_epoch)
			kefalnd_deactivate_conn(init_conn);
	}

	conn->remote_epoch = probe_msg->src_epoch;
	conn->remote_caps = probe_msg->caps;

	rc = kefalnd_init_conn_ah(conn, &gid);
	if (rc)
		goto cleanup_conn;

proto_check:
	resp_rc = proto_ver > EFALND_MAX_PROTO_VER ? -EPROTONOSUPPORT : 0;
	if (!resp_rc)
		conn->proto_ver = proto_ver;

	rc = kefalnd_send_conn_probe_resp(conn, resp_rc);
	if (rc)
		goto cleanup_conn;

	return 0;

cleanup_conn:
	kefalnd_remove_conn(efa_ni, conn);
	kefalnd_destroy_conn(conn, LNET_MSG_STATUS_LOCAL_ERROR, rc);
	return rc;
}

static inline u16
kefalnd_get_payload_size_from_msg(struct kefa_msg *msg)
{
	if (msg->hdr.proto_ver != EFALND_PROTO_VER_1)
		return msg->hdr.nob - offsetof(struct kefa_msg, msg_v2.u);

	return msg->hdr.nob - offsetof(struct kefa_msg, msg_v1.u);
}

void
kefalnd_handle_conn_establishment(struct kefa_ni *efa_ni, struct kefa_msg *msg)
{
	struct lnet_nid srcnid, dstnid;
	u8 proto_ver;

	kefalnd_get_srcnid_from_msg(msg, &srcnid);
	kefalnd_get_dstnid_from_msg(msg, &dstnid);
	proto_ver = msg->hdr.proto_ver;

	switch (msg->hdr.type) {
	default:
		EFA_DEV_ERR(efa_ni->efa_dev,
			    "bad EFALND establishment message type[%x]\n",
			    msg->hdr.type);

		break;

	case EFALND_MSG_CONN_PROBE:
		kefalnd_handle_conn_probe(efa_ni, &srcnid, &dstnid,
					  kefalnd_get_probe_from_msg(msg),
					  proto_ver);
		break;

	case EFALND_MSG_CONN_PROBE_RESP:
		kefalnd_handle_conn_probe_resp(efa_ni, &srcnid, &dstnid,
					       kefalnd_get_probe_resp_from_msg(msg),
					       kefalnd_get_payload_size_from_msg(msg));
		break;

	case EFALND_MSG_CONN_REQ:
		kefalnd_handle_conn_req(efa_ni, &srcnid, &dstnid,
					kefalnd_get_conn_req_from_msg(msg),
					proto_ver);
		break;

	case EFALND_MSG_CONN_REQ_ACK:
		kefalnd_handle_conn_req_ack(efa_ni, &srcnid, &dstnid,
					    kefalnd_get_conn_req_ack_from_msg(msg));
		break;
	}
}

static void
kefalnd_cleanup_conn_txs(struct kefa_conn *conn, struct list_head *cancel_tx)
{
	int timeout = lnet_get_lnd_timeout();
	time64_t now = ktime_get_seconds();
	struct kefa_tx *tx, *temp_tx;
	unsigned long flags;

	spin_lock_irqsave(&conn->lock, flags);

	/* Tail of the list holds LRU elements */
	list_for_each_entry_safe(tx, temp_tx, &conn->active_tx, list_node) {
		time64_t tx_send_time = atomic64_read(&tx->send_time);

		if (tx_send_time == 0 || now < tx_send_time + timeout)
			break;

		/* We must call abort TX after increasing the refcount to
		 * prevent calling TX done which can lead to deadlock.
		 * Also we abort the Tx only if its refcount is not already 0
		 * which means its completion is in progress.
		 */
		if (atomic_inc_not_zero(&tx->ref_cnt)) {
			kefalnd_abort_tx(tx, LNET_MSG_STATUS_NETWORK_TIMEOUT,
					 -ETIMEDOUT);
			list_move_tail(&tx->list_node, &conn->abort_tx);
		}
	}

	list_splice_init(&conn->pend_tx, cancel_tx);

	spin_unlock_irqrestore(&conn->lock, flags);
}

static void
kefalnd_cleanup_ni_conns(struct kefa_ni *efa_ni)
{
	struct kefa_conn *removed_conn[MAX_IDLE_CONN] = { 0 };
	struct kefa_conn *idle_conn[MAX_IDLE_CONN] = { 0 };
	int init_timeout, resp_timeout, timeout, bkt, i = 0;
	struct kefa_tx *tx, *temp_tx;
	struct list_head cancel_tx;
	struct kefa_conn *conn;
	unsigned long flags;
	int num_removed = 0;
	int num_idle = 0;
	time64_t now;

	now = ktime_get_seconds();
	INIT_LIST_HEAD(&cancel_tx);
	init_timeout = efa_ni->lnet_ni->ni_net->net_tunables.lct_peer_timeout;
	resp_timeout = init_timeout + RESP_CONN_EXTRA_TIME;

	/* This assumes only a single thread can validate the NI connections at
	 * a time.
	 */
	read_lock_irqsave(&efa_ni->conn_lock, flags);
	hash_for_each(efa_ni->conns, bkt, conn, ni_node) {
		timeout = conn->type == KEFA_CONN_TYPE_INITIATOR ?
						init_timeout : resp_timeout;
		if (now > conn->last_use_time + timeout ||
		    conn->state == KEFA_CONN_DEACTIVATING) {
			idle_conn[num_idle] = conn;
			if (++num_idle == MAX_IDLE_CONN)
				break;
		}
	}

	read_unlock_irqrestore(&efa_ni->conn_lock, flags);

	/* No idle connections */
	if (num_idle == 0)
		return;

	write_lock_irqsave(&efa_ni->conn_lock, flags);
	for (i = 0; i < num_idle; i++) {
		conn = idle_conn[i];
		/* Validate last used under write lock to make sure no TX is
		 * racing with the daemon.
		 */
		timeout = conn->type == KEFA_CONN_TYPE_INITIATOR ?
				init_timeout : resp_timeout;

		if (now > conn->last_use_time + timeout)
			kefalnd_deactivate_conn_locked(conn);

		if (conn->state == KEFA_CONN_DEACTIVATING)
			kefalnd_cleanup_conn_txs(conn, &cancel_tx);

		if (now > conn->last_use_time + timeout &&
		    list_empty(&conn->active_tx) &&
		    list_empty(&conn->abort_tx)) {
			kefalnd_remove_conn_locked(conn);
			removed_conn[num_removed] = conn;
			num_removed++;
		}
	}

	write_unlock_irqrestore(&efa_ni->conn_lock, flags);

	for (i = 0; i < num_idle; i++) {
		conn = idle_conn[i];
		list_for_each_entry_safe(tx, temp_tx, &conn->abort_tx, list_node) {
			/* We only complete the TX when the only refcount is by
			 * the CM.
			 */
			if (atomic_read(&tx->ref_cnt) == 1) {
				atomic_dec(&tx->ref_cnt);
				kefalnd_tx_done(tx);
			}
		}

		list_for_each_entry_safe(tx, temp_tx, &cancel_tx, list_node)
			kefalnd_force_cancel_tx(tx,
						LNET_MSG_STATUS_LOCAL_ABORTED,
						-ETIMEDOUT);
	}

	for (i = 0; i < num_removed; i++)
		kefalnd_destroy_conn(removed_conn[i],
				     LNET_MSG_STATUS_LOCAL_ABORTED,
				     -ESHUTDOWN);
}

int
kefalnd_cm_daemon(void *arg)
{
	struct kefa_cm_deamon *cm_daemon;
	wait_queue_entry_t wait;
	struct kefa_ni *efa_ni;
	long id = (long)arg;
	int rc;

	cm_daemon = kefalnd.cm_daemons[KEFA_THREAD_CPT(id)];

	rc = cfs_cpt_bind(lnet_cpt_table(), cm_daemon->cpt);
	if (rc != 0)
		CWARN("failed to bind connection daemon thread to CPU partition %d\n",
		      cm_daemon->cpt);

	init_wait(&wait);

	while (!kefalnd.shutdown) {
		mutex_lock(&cm_daemon->ni_list_lock);
		list_for_each_entry(efa_ni, &cm_daemon->efa_ni_list, cm_node) {
			if (cm_daemon->iter % 5 == 0)
				kefalnd_cleanup_ni_conns(efa_ni);
		}
		mutex_unlock(&cm_daemon->ni_list_lock);

		cm_daemon->iter++;
		set_current_state(TASK_INTERRUPTIBLE);
		add_wait_queue(&cm_daemon->waitq, &wait);

		schedule_timeout(cfs_time_seconds(1));

		remove_wait_queue(&cm_daemon->waitq, &wait);
	}

	kefalnd_thread_stop();
	return 0;
}

void
kefalnd_add_ni_to_cm_daemon(struct kefa_ni *efa_ni)
{
	struct kefa_cm_deamon *cm_daemon;

	cm_daemon = kefalnd.cm_daemons[efa_ni->efa_dev->cpt];

	mutex_lock(&cm_daemon->ni_list_lock);
	list_add_tail(&efa_ni->cm_node, &cm_daemon->efa_ni_list);
	mutex_unlock(&cm_daemon->ni_list_lock);
}

void
kefalnd_del_ni_from_cm_daemon(struct kefa_ni *efa_ni)
{
	struct kefa_cm_deamon *cm_daemon;

	cm_daemon = kefalnd.cm_daemons[efa_ni->efa_dev->cpt];

	mutex_lock(&cm_daemon->ni_list_lock);
	list_del_init(&efa_ni->cm_node);
	mutex_unlock(&cm_daemon->ni_list_lock);
}