Viewing: kfilnd.c

// SPDX-License-Identifier: GPL-2.0

/*
 * Copyright 2022 Hewlett Packard Enterprise Development LP
 */

/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * kfilnd main interface.
 */

#include <linux/delay.h>
#include "kfilnd.h"
#include "kfilnd_tn.h"
#include "kfilnd_dev.h"

struct workqueue_struct *kfilnd_wq;
struct dentry *kfilnd_debug_dir;

static void kfilnd_shutdown(struct lnet_ni *ni)
{
	struct kfilnd_dev *dev = ni->ni_data;

	kfilnd_dev_free(dev);
}

static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
{
	int cpt;

	/* If the current CPT has is within the LNet NI CPTs, use that CPT. */
	cpt = lnet_cpt_current();
	if (dev->cpt_to_endpoint[cpt])
		return cpt;

	/* Hash to a LNet NI CPT based on target NID. */
	return  dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
}

int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt,
			      struct kfilnd_peer *kp)
{
	struct kfilnd_transaction *tn;
	int rc;

	/* Only one thread may progress state from NONE -> INIT */
	if (atomic_cmpxchg(&kp->kp_hello_state, KP_HELLO_NONE, KP_HELLO_INIT) !=
	    KP_HELLO_NONE) {
		CDEBUG(D_NET, "Hello already pending to peer %s(%px)\n",
		       libcfs_nid2str(kp->kp_nid), kp);
		return 0;
	}

	tn = kfilnd_tn_alloc_for_hello(dev, cpt, kp);
	if (IS_ERR(tn)) {
		rc = PTR_ERR(tn);
		CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
		atomic_set(&kp->kp_hello_state, KP_HELLO_NONE);
		return rc;
	}

	/* +1 for tn->tn_kp. This ref is dropped when this transaction is
	 * finalized
	 */
	refcount_inc(&kp->kp_cnt);

	tn->msg_type = KFILND_MSG_HELLO_REQ;

	kp->kp_hello_ts = ktime_get_seconds();

	atomic_set(&kp->kp_hello_state, KP_HELLO_SENDING);

	kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);

	return 0;
}

static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
{
	int type = msg->msg_type;
	struct lnet_processid *target = &msg->msg_target;
	struct kfilnd_transaction *tn;
	int nob;
	struct kfilnd_dev *dev = ni->ni_data;
	enum kfilnd_msg_type lnd_msg_type;
	int cpt;
	enum tn_events event = TN_EVENT_INVALID;
	int rc;
	bool tn_key = false;
	lnet_nid_t tgt_nid4;
	bool gpu = lnet_md_is_gpu(msg->msg_md);
	struct iov_iter from;

	iov_iter_bvec(&from, WRITE,
		      msg->msg_kiov, msg->msg_niov,
		      msg->msg_len + msg->msg_offset);
	iov_iter_advance(&from, msg->msg_offset);

	switch (type) {
	default:
		return -EIO;

	case LNET_MSG_ACK:
		if (msg->msg_len != 0)
			return -EINVAL;
		lnd_msg_type = KFILND_MSG_IMMEDIATE;
		break;

	case LNET_MSG_GET:
		if (msg->msg_routing || msg->msg_target_is_router) {
			lnd_msg_type = KFILND_MSG_IMMEDIATE;
			break;
		}

		nob = offsetof(struct kfilnd_msg,
			       proto.immed.payload[msg->msg_md->md_length]);
		if (nob <= KFILND_IMMEDIATE_MSG_SIZE && !gpu) {
			lnd_msg_type = KFILND_MSG_IMMEDIATE;
			break;
		}

		lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
		tn_key = true;
		break;

	case LNET_MSG_REPLY:
	case LNET_MSG_PUT:
		nob = offsetof(struct kfilnd_msg,
			       proto.immed.payload[msg->msg_len]);
		if (nob <= KFILND_IMMEDIATE_MSG_SIZE && !gpu) {
			lnd_msg_type = KFILND_MSG_IMMEDIATE;
			break;
		}

		lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
		tn_key = true;
		break;
	}

	tgt_nid4 = lnet_nid_to_nid4(&target->nid);

	cpt = kfilnd_send_cpt(dev, tgt_nid4);
	tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
	if (IS_ERR(tn)) {
		rc = PTR_ERR(tn);
		CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
		return rc;
	}

	if (kfilnd_peer_needs_hello(tn->tn_kp, true)) {
		rc = kfilnd_send_hello_request(dev, cpt, tn->tn_kp);
		if (rc && kfilnd_peer_is_new_peer(tn->tn_kp)) {
			/* Only fail the send if this is a new peer. Otherwise
			 * attempt the send using our stale peer information
			 */
			kfilnd_tn_free(tn);
			return rc;
		}
	}

	tn->tn_gpu = gpu;

	switch (lnd_msg_type) {
	case KFILND_MSG_IMMEDIATE:
		CDEBUG(D_NET,
		       "tn %p msg_kiov %p msg_niov %u msg_offset %u msg_len %u\n",
		       tn, msg->msg_kiov, msg->msg_niov,
		       msg->msg_offset, msg->msg_len);

		rc = copy_from_iter(&tn->tn_tx_msg.msg->proto.immed.payload,
				    msg->msg_len, &from);
		if (rc != msg->msg_len)
			return -EFAULT;

		tn->tn_nob = msg->msg_len;
		event = TN_EVENT_INIT_IMMEDIATE;
		break;

	case KFILND_MSG_BULK_PUT_REQ:
		tn->sink_buffer = false;
		rc = kfilnd_tn_set_buf(ni, tn, msg->msg_kiov, msg->msg_niov,
				       msg->msg_offset, msg->msg_len);
		if (rc) {
			CERROR("Failed to setup PUT source buffer rc %d\n", rc);
			kfilnd_tn_free(tn);
			return rc;
		}

		event = TN_EVENT_INIT_BULK;
		break;

	case KFILND_MSG_BULK_GET_REQ:
		/* We need to create a reply message to inform LNet our
		 * optimized GET is done.
		 */
		tn->tn_getreply = lnet_create_reply_msg(ni, msg);
		if (!tn->tn_getreply) {
			CERROR("Can't create reply for GET -> %s\n",
			       libcfs_nidstr(&target->nid));
			kfilnd_tn_free(tn);
			return -EIO;
		}

		tn->sink_buffer = true;
		rc = kfilnd_tn_set_buf(ni, tn, msg->msg_md->md_kiov,
				       msg->msg_md->md_niov,
				       msg->msg_md->md_offset,
				       msg->msg_md->md_length);
		if (rc) {
			CERROR("Failed to setup GET sink buffer rc %d\n", rc);
			kfilnd_tn_free(tn);
			return rc;
		}
		event = TN_EVENT_INIT_BULK;
		break;

	default:
		kfilnd_tn_free(tn);
		return -EIO;
	}

	tn->msg_type = lnd_msg_type;
	tn->tn_lntmsg = msg;	/* finalise msg on completion */
	tn->lnet_msg_len = tn->tn_nob;

	KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
			msg_type_to_str(lnd_msg_type), tn->tn_nob,
#ifdef HAVE_KFI_SGL
			tn->tn_sgt.nents
#else
			tn->tn_num_iovec
#endif
			);

	/* Start the state machine processing this transaction */
	kfilnd_tn_event_handler(tn, event, 0);

	return 0;
}

static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
		       int delayed, struct iov_iter *to, unsigned int rlen)
{
	struct kfilnd_transaction *tn = private;
	struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
	int wanted = iov_iter_count(to);
	int nob;
	int rc = 0;
	int status = 0;
	enum tn_events event;

	if (wanted > rlen)
		return -EINVAL;

	/* Transaction must be in receive state */
	if (tn->tn_state != TN_STATE_IMM_RECV)
		return -EINVAL;

	tn->tn_lntmsg = msg;
	tn->lnet_msg_len = rlen;

	switch (rxmsg->type) {
	case KFILND_MSG_IMMEDIATE:
		nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
		if (nob > tn->tn_rx_msg.length) {
			char *nid = "unknown";

			if (msg)
				nid = libcfs_nidstr(&msg->msg_hdr.src_nid);
			CERROR("Immediate message from %s too big: %d(%lu)\n",
				nid, nob, tn->tn_rx_msg.length);
			return -EPROTO;
		}
		tn->tn_nob = nob;

		rc = copy_to_iter(&rxmsg->proto.immed.payload, wanted, to);
		if (rc != wanted) {
			rc = -EFAULT;
			break;
		}

		rc = 0;
		kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
		return 0;

	case KFILND_MSG_BULK_PUT_REQ:
		if (wanted == 0) {
			event = TN_EVENT_SKIP_TAG_RMA;
		} else {
			struct lnet_libmd *msg_md = NULL;

			if (msg)
				msg_md = msg->msg_md;

			tn->tn_gpu = lnet_md_is_gpu(msg_md);

			/* Post the buffer given us as a sink  */
			tn->sink_buffer = true;
			rc = kfilnd_tn_set_buf(ni, tn,
					       (struct bio_vec *)to->bvec,
					       to->nr_segs, to->iov_offset,
					       wanted);
			if (rc) {
				CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
				kfilnd_tn_free(tn);
				return rc;
			}
			event = TN_EVENT_INIT_TAG_RMA;
		}
		break;

	case KFILND_MSG_BULK_GET_REQ:
		if (!msg) {
			event = TN_EVENT_SKIP_TAG_RMA;
			status = -ENODATA;
		} else {
			struct lnet_libmd *msg_md = NULL;

			if (msg)
				msg_md = msg->msg_md;

			tn->tn_gpu = lnet_md_is_gpu(msg_md);

			/* Post the buffer given to us as a source  */
			tn->sink_buffer = false;
			rc = kfilnd_tn_set_buf(ni, tn, msg->msg_kiov,
					       msg->msg_niov, msg->msg_offset,
					       msg->msg_len);
			if (rc) {
				CERROR("Failed to setup GET source buffer rc %d\n", rc);
				kfilnd_tn_free(tn);
				return rc;
			}
			event = TN_EVENT_INIT_TAG_RMA;
		}
		break;

	default:
		/* TODO: TN leaks here. */
		CERROR("Invalid message type = %d\n", rxmsg->type);
		return -EINVAL;
	}

	/* Store relevant fields to generate a bulk response. */
	if (rxmsg->version == KFILND_MSG_VERSION_1) {
		tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
		tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
		tn->tn_response_session_key = tn->tn_kp->kp_remote_session_key;
	} else {
		tn->tn_response_mr_key = rxmsg->proto.bulk_req_v2.kbrm2_key;
		tn->tn_response_rx = rxmsg->proto.bulk_req_v2.kbrm2_response_rx;
		tn->tn_response_session_key =
				rxmsg->proto.bulk_req_v2.kbrm2_session_key;
	}

#if 0
	tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
						 KFILND_MSG_BULK_RSP,
						 sizeof(struct kfilnd_bulk_rsp),
						 ni);
#endif

	KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
			msg_type_to_str(rxmsg->type), tn->tn_nob,
#ifdef HAVE_KFI_SGL
			tn->tn_sgt.nents
#else
			tn->tn_num_iovec
#endif
			);

	kfilnd_tn_event_handler(tn, event, status);

	return rc;
}

static int
kfilnd_tun_defaults(struct lnet_lnd_tunables *tunables,
		    struct lnet_ioctl_config_lnd_cmn_tunables *cmn)
{
	int rc;

	/* sync to latest module settings */
	rc = kfilnd_tunables_setup(tunables, true, cmn);
	if (rc < 0)
		return rc;

	memcpy(&tunables->lnd_tun_u.lnd_kfi, &kfi_default_tunables,
	       sizeof(kfi_default_tunables));

	return rc;
}

static const struct ln_key_list kfilnd_tunables_keys = {
	.lkl_maxattr                    = LNET_NET_KFILND_TUNABLES_ATTR_MAX,
	.lkl_list                       = {
		[LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR]	= {
			.lkp_value	= "prov_major_version",
			.lkp_data_type	= NLA_S32
		},
		[LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR]  = {
			.lkp_value	= "prov_minor_version",
			.lkp_data_type	= NLA_S32
		},
		[LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY]  = {
			.lkp_value	= "auth_key",
			.lkp_data_type	= NLA_S32
		},
		[LNET_NET_KFILND_TUNABLES_ATTR_TRAFFIC_CLASS]  = {
			.lkp_value      = "traffic_class",
			.lkp_data_type  = NLA_STRING,
		},
		[LNET_NET_KFILND_TUNABLES_ATTR_TRAFFIC_CLASS_NUM]  = {
			.lkp_value      = "traffic_class_num",
			.lkp_data_type  = NLA_S32,
		},
		[LNET_NET_KFILND_TUNABLES_ATTR_TIMEOUT]  = {
			.lkp_value      = "timeout",
			.lkp_data_type  = NLA_S32,
		},
	},
};

static int
kfilnd_nl_get(int cmd, struct sk_buff *msg, int type, void *data,
	      bool export_backup)
{
	struct lnet_lnd_tunables *tunables;
	struct lnet_ni *ni = data;

	if (!ni || !msg)
		return -EINVAL;

	if (cmd != LNET_CMD_NETS || type != LNET_NET_LOCAL_NI_ATTR_LND_TUNABLES)
		return -EOPNOTSUPP;

	tunables = &ni->ni_lnd_tunables;
	nla_put_s32(msg, LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR,
		    tunables->lnd_tun_u.lnd_kfi.lnd_prov_major_version);
	nla_put_s32(msg, LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR,
		    tunables->lnd_tun_u.lnd_kfi.lnd_prov_minor_version);
	nla_put_s32(msg, LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY,
		    tunables->lnd_tun_u.lnd_kfi.lnd_auth_key);
	nla_put_string(msg, LNET_NET_KFILND_TUNABLES_ATTR_TRAFFIC_CLASS,
		       tunables->lnd_tun_u.lnd_kfi.lnd_traffic_class_str);
	if (!export_backup) {
		nla_put_s32(msg,
			    LNET_NET_KFILND_TUNABLES_ATTR_TRAFFIC_CLASS_NUM,
			    tunables->lnd_tun_u.lnd_kfi.lnd_traffic_class);
		nla_put_s32(msg, LNET_NET_KFILND_TUNABLES_ATTR_TIMEOUT,
			    kfilnd_timeout());
	}

	return 0;
}

static int
kfilnd_nl_set(int cmd, struct nlattr *attr, int type, void *data)
{
	struct lnet_lnd_tunables *tunables = data;
	struct lnet_ioctl_config_kfilnd_tunables *lnd_kfi;
	int rc = 0;

	if (cmd != LNET_CMD_NETS)
		return -EOPNOTSUPP;

	if (!attr)
		return 0;

	lnd_kfi = &tunables->lnd_tun_u.lnd_kfi;

	switch (type) {
	case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR:
		lnd_kfi->lnd_prov_major_version = nla_get_s64(attr);
		break;
	case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR:
		lnd_kfi->lnd_prov_minor_version = nla_get_s64(attr);
		break;
	case LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY:
		lnd_kfi->lnd_auth_key = nla_get_s64(attr);
		break;
	case LNET_NET_KFILND_TUNABLES_ATTR_TRAFFIC_CLASS:
		rc = nla_strscpy(lnd_kfi->lnd_traffic_class_str, attr,
				 sizeof(lnd_kfi->lnd_traffic_class_str));
		break;
	default:
		rc = -EINVAL;
		break;
	}

	return rc;
}

static unsigned int
kfilnd_get_dev_prio(struct lnet_ni *ni, unsigned int dev_idx)
{
	struct kfilnd_dev *dev = ni->ni_data;
	struct device *device = NULL;

	if (dev)
		device = dev->device;

	return lnet_get_dev_prio(device, dev_idx);
}

static int kfilnd_startup(struct lnet_ni *ni);

static const struct lnet_lnd the_kfilnd = {
	.lnd_type		= KFILND,
	.lnd_startup		= kfilnd_startup,
	.lnd_shutdown		= kfilnd_shutdown,
	.lnd_send		= kfilnd_send,
	.lnd_recv		= kfilnd_recv,
	.lnd_tun_defaults	= kfilnd_tun_defaults,
	.lnd_nl_get		= kfilnd_nl_get,
	.lnd_nl_set		= kfilnd_nl_set,
	.lnd_get_timeout	= kfilnd_timeout,
	.lnd_keys		= &kfilnd_tunables_keys,
	.lnd_get_dev_prio	= kfilnd_get_dev_prio,
};

static int kfilnd_startup(struct lnet_ni *ni)
{
	const char *node;
	int rc;
	struct kfilnd_dev *kfdev;
	int node_id;
	int cpt = CFS_CPT_ANY;

	if (!ni)
		return -EINVAL;

	if (ni->ni_net->net_lnd != &the_kfilnd) {
		CERROR("Wrong lnd type\n");
		return -EINVAL;
	}

	rc = kfilnd_tunables_setup(&ni->ni_lnd_tunables,
				   ni->ni_lnd_tunables_set,
				   &ni->ni_net->net_tunables);
	if (rc) {
		CERROR("Can't configure tunable values, rc = %d\n", rc);
		goto err;
	}

	/* Only a single interface is supported. */
	if (!ni->ni_interface) {
		rc = -ENODEV;
		CERROR("No LNet network interface address defined\n");
		goto err;
	}

	node = ni->ni_interface;

	kfdev = kfilnd_dev_alloc(ni, node);
	if (IS_ERR(kfdev)) {
		rc = PTR_ERR(kfdev);
		CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
		       rc);
		goto err;
	}

	if (kfdev->device) {
		node_id = dev_to_node(kfdev->device);
		cpt = cfs_cpt_of_node(lnet_cpt_table(), node_id);
	}

	ni->ni_dev_cpt = cpt;

	/* Post a series of immediate receive buffers */
	rc = kfilnd_dev_post_imm_buffers(kfdev);
	if (rc) {
		CERROR("Can't post buffers, rc = %d\n", rc);
		goto err_free_dev;
	}

	return 0;

err_free_dev:
	kfilnd_dev_free(kfdev);
err:
	return rc;
}

static void __exit kfilnd_exit(void)
{
	destroy_workqueue(kfilnd_wq);

	kfilnd_tn_cleanup();

	lnet_unregister_lnd(&the_kfilnd);

	debugfs_remove_recursive(kfilnd_debug_dir);
}

static int __init kfilnd_init(void)
{
	int rc;
	unsigned int flags;

	kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);

	rc = kfilnd_tunables_init();
	if (rc)
		goto err;

	rc = libcfs_setup();
	if (rc)
		return rc;

	/* Do any initialization of the transaction system */
	rc = kfilnd_tn_init();
	if (rc) {
		CERROR("Cannot initialize transaction system\n");
		goto err;
	}

	flags = WQ_MEM_RECLAIM | WQ_SYSFS;
	if (wq_cpu_intensive)
		flags = flags | WQ_CPU_INTENSIVE;
	if (wq_high_priority)
		flags = flags | WQ_HIGHPRI;

	kfilnd_wq = alloc_workqueue("kfilnd_wq", flags, wq_max_active);
	if (!kfilnd_wq) {
		rc = -ENOMEM;
		CERROR("Failed to allocated kfilnd work queue\n");
		goto err_tn_cleanup;
	}

	lnet_register_lnd(&the_kfilnd);

	return 0;

err_tn_cleanup:
	kfilnd_tn_cleanup();
err:
	return rc;
}

MODULE_AUTHOR("Cray Inc.");
MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
MODULE_VERSION(KFILND_VERSION);
MODULE_LICENSE("GPL");

late_initcall_sync(kfilnd_init);
module_exit(kfilnd_exit);