Viewing: kfilnd_peer.c

// SPDX-License-Identifier: GPL-2.0

/*
 * Copyright 2022 Hewlett Packard Enterprise Development LP
 */

/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * kfilnd peer management implementation.
 */

#include "kfilnd_peer.h"
#include "kfilnd_dev.h"

static const struct rhashtable_params peer_cache_params = {
	.head_offset = offsetof(struct kfilnd_peer, kp_node),
	.key_offset = offsetof(struct kfilnd_peer, kp_nid),
	.key_len = sizeof_field(struct kfilnd_peer, kp_nid),
	.automatic_shrinking = true,
};

/**
 * kfilnd_peer_free() - RCU safe way to free a peer.
 * @ptr: Pointer to peer.
 * @arg: Unused.
 */
static void kfilnd_peer_free(void *ptr, void *arg)
{
	struct kfilnd_peer *kp = ptr;

	CDEBUG(D_NET, "%s(%p):0x%llx peer entry freed\n",
	       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr);

	kfi_av_remove(kp->kp_dev->kfd_av, &kp->kp_addr, 1, 0);

	kfree_rcu(kp, kp_rcu_head);
}

/**
 * kfilnd_peer_del() - Delete a peer from the peer cache. kp_remove_peer is used
 * to prevent more than one thread from deleting the peer at once, and it
 * informs threads on the allocation path that this peer is being deleted. When
 * the peer is removed from the peer cache its allocation reference is returned
 * and lnet is notified that this peer is down.
 * @kp: Peer to be deleted
 */
static void kfilnd_peer_del(struct kfilnd_peer *kp)
{

	rcu_read_lock();

	if (atomic_cmpxchg(&kp->kp_remove_peer, 0, 1) == 0) {
		struct lnet_nid peer_nid;

		rhashtable_remove_fast(&kp->kp_dev->peer_cache, &kp->kp_node,
				       peer_cache_params);
		/* Return allocation reference */
		refcount_dec(&kp->kp_cnt);

		rcu_read_unlock();

		lnet_nid4_to_nid(kp->kp_nid, &peer_nid);
		CDEBUG(D_NET, "%s(%p):0x%llx removed from peer cache\n",
		       libcfs_nidstr(&peer_nid), kp, kp->kp_addr);

		lnet_notify(kp->kp_dev->kfd_ni, &peer_nid, false, false,
			    kp->kp_last_alive);
	} else {
		rcu_read_unlock();
	}
}

/**
 * kfilnd_peer_purge_old_peer() - Delete the specified peer from the cache
 * if we haven't heard from it within KP_PURGE_LIMIT seconds.
 * @kp: The peer to be checked or purged
 */
static void kfilnd_peer_purge_old_peer(struct kfilnd_peer *kp)
{
	if (ktime_after(ktime_get_seconds(),
			kp->kp_last_alive + KP_PURGE_LIMIT)) {
		CDEBUG(D_NET,
		       "Haven't heard from %s(%p):0x%llx in %lld seconds\n",
		       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr,
		       ktime_sub(ktime_get_seconds(), kp->kp_last_alive));
		kfilnd_peer_del(kp);
	}
}

/**
 * kfilnd_peer_stale() - Mark a peer as stale. If the peer is already stale then
 * check whether it should be deleted.
 * @kp: Peer to be marked stale
 * Note: only "up-to-date" peers can be marked stale.
 */
static void kfilnd_peer_stale(struct kfilnd_peer *kp)
{
	if (atomic_cmpxchg(&kp->kp_state,
			   KP_STATE_UPTODATE,
			   KP_STATE_STALE) == KP_STATE_UPTODATE) {
		CDEBUG(D_NET, "%s(%p):0x%llx uptodate -> stale\n",
		       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr);
	} else {
		kfilnd_peer_purge_old_peer(kp);
	}
}

/**
 * kfilnd_peer_down() - Mark a peer as down. If the peer is already down then
 * check whether it should be deleted.
 * @kp: Peer to be marked down
 * Note: Only peers that are "up-to-date" or "stale" can be marked down.
 */
static void kfilnd_peer_down(struct kfilnd_peer *kp)
{
	if (atomic_read(&kp->kp_state) == KP_STATE_DOWN) {
		kfilnd_peer_purge_old_peer(kp);
	} else if (atomic_cmpxchg(&kp->kp_state,
				  KP_STATE_UPTODATE,
				  KP_STATE_DOWN) == KP_STATE_UPTODATE) {
		CDEBUG(D_NET, "%s(%p):0x%llx uptodate -> down\n",
		       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr);
	} else if (atomic_cmpxchg(&kp->kp_state,
				  KP_STATE_STALE,
				  KP_STATE_DOWN) == KP_STATE_STALE) {
		CDEBUG(D_NET, "%s(%p):0x%llx stale -> down\n",
		       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr);
	}
}

/**
 * kfilnd_peer_tn_failed() - A transaction with this peer has failed. Mark the
 * peer as either stale or down depending on the provided error value. If
 * @delete is true we also delete the peer from the cache.
 * @kp: The peer to be marked down, stale, or deleted.
 * @error: An errno indicating why the transaction failed.
 * @delete: Whether to delete the peer
 * Note: We currently only consider EHOSTUNREACH which corresponds to
 * C_RC_UNDELIVERABLE, and ENOTCONN which corresponds to C_RC_VNI_NOT_FOUND.
 */
void kfilnd_peer_tn_failed(struct kfilnd_peer *kp, int error, bool delete)
{
	if (error == -EHOSTUNREACH || error == -ENOTCONN)
		kfilnd_peer_down(kp);
	else
		kfilnd_peer_stale(kp);

	if (delete)
		kfilnd_peer_del(kp);
}

/**
 * kfilnd_peer_put() - Return a reference for a peer.
 * @kp: Peer where the reference should be returned.
 */
void kfilnd_peer_put(struct kfilnd_peer *kp)
{
	if (refcount_dec_and_test(&kp->kp_cnt))
		kfilnd_peer_free(kp, NULL);
}

u16 kfilnd_peer_target_rx_base(struct kfilnd_peer *kp)
{
	int cpt = lnet_cpt_of_nid(kp->kp_nid, kp->kp_dev->kfd_ni);
	struct kfilnd_ep *ep = kp->kp_dev->cpt_to_endpoint[cpt];

	return ep->end_context_id;
}

/**
 * kfilnd_peer_get() - Get a reference for a peer.
 * @dev: Device used to lookup peer.
 * @nid: LNet NID of peer.
 *
 * Return: On success, pointer to a valid peer structed. Else, ERR_PTR.
 */
struct kfilnd_peer *kfilnd_peer_get(struct kfilnd_dev *dev, lnet_nid_t nid)
{
	char *node;
	char *service;
	int rc;
	u32 nid_addr = LNET_NIDADDR(nid);
	u32 net_num = LNET_NETNUM(LNET_NIDNET(nid));
	struct kfilnd_peer *kp;
	struct kfilnd_peer *clash_peer;

again:
	/* Check the cache for a match. */
	rcu_read_lock();
	kp = rhashtable_lookup_fast(&dev->peer_cache, &nid,
				      peer_cache_params);
	if (kp && !refcount_inc_not_zero(&kp->kp_cnt))
		kp = NULL;
	rcu_read_unlock();

	if (kp) {
		if (atomic_read(&kp->kp_remove_peer)) {
			kfilnd_peer_put(kp);
			goto again;
		}

		return kp;
	}

	/* Allocate a new peer for the cache. */
	kp = kzalloc(sizeof(*kp), GFP_NOFS);
	if (!kp) {
		rc = -ENOMEM;
		goto err;
	}

	node = kasprintf(GFP_NOFS, "%#x", nid_addr);
	if (!node) {
		rc = -ENOMEM;
		goto err_free_peer;
	}

	service = kasprintf(GFP_NOFS, "%u", net_num);
	if (!service) {
		rc = -ENOMEM;
		goto err_free_node_str;
	}

	/* Use the KFI address vector to translate node and service string into
	 * a KFI address handle.
	 */
	rc = kfi_av_insertsvc(dev->kfd_av, node, service, &kp->kp_addr, 0, dev);

	kfree(service);
	kfree(node);

	if (rc < 0) {
		goto err_free_peer;
	} else if (rc != 1) {
		rc = -ECONNABORTED;
		goto err_free_peer;
	}

	kp->kp_dev = dev;
	kp->kp_nid = nid;
	atomic_set(&kp->kp_rx_base, 0);
	atomic_set(&kp->kp_remove_peer, 0);
	atomic_set(&kp->kp_hello_state, KP_HELLO_NONE);
	atomic_set(&kp->kp_state, KP_STATE_NEW);
	kp->kp_local_session_key = kfilnd_dev_get_session_key(dev);
	kp->kp_hello_ts = ktime_get_seconds();

	/* One reference for the allocation and another for get operation
	 * performed for this peer. The allocation reference is returned when
	 * the entry is marked for removal.
	 */
	refcount_set(&kp->kp_cnt, 2);

	clash_peer = rhashtable_lookup_get_insert_fast(&dev->peer_cache,
						       &kp->kp_node,
						       peer_cache_params);

	if (clash_peer) {
		kfi_av_remove(dev->kfd_av, &kp->kp_addr, 1, 0);
		kfree(kp);

		if (IS_ERR(clash_peer)) {
			rc = PTR_ERR(clash_peer);
			goto err;
		} else {
			goto again;
		}
	}

	kfilnd_peer_alive(kp);

	CDEBUG(D_NET, "%s(%p):0x%llx peer entry allocated\n",
	       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr);

	return kp;

err_free_node_str:
	kfree(node);
err_free_peer:
	kfree(kp);
err:
	return ERR_PTR(rc);
}

/**
 * kfilnd_peer_get_kfi_addr() - Return kfi_addr_t used for eager untagged send
 * kfi operations.
 * @kp: Peer struct.
 *
 * The returned kfi_addr_t is updated to target a specific RX context. The
 * address return by this function should not be used if a specific RX context
 * needs to be targeted (i/e the response RX context for a bulk transfer
 * operation).
 *
 * Return: kfi_addr_t.
 */
kfi_addr_t kfilnd_peer_get_kfi_addr(struct kfilnd_peer *kp)
{
	/* TODO: Support RX count by round-robining the generated kfi_addr_t's
	 * across multiple RX contexts using RX base and RX count.
	 */
	return kfi_rx_addr(KFILND_BASE_ADDR(kp->kp_addr),
			   atomic_read(&kp->kp_rx_base),
				       KFILND_FAB_RX_CTX_BITS);
}

/**
 * kfilnd_peer_alive() - Update when the peer was last alive.
 * @kp: Peer to be updated.
 */
void kfilnd_peer_alive(struct kfilnd_peer *kp)
{
	kp->kp_last_alive = ktime_get_seconds();

	/* Ensure timestamp is committed to memory before used. */
	smp_mb();
}

/**
 * kfilnd_peer_destroy() - Destroy peer cache.
 * @dev: Device peer cache to be destroyed.
 */
void kfilnd_peer_destroy(struct kfilnd_dev *dev)
{
	rhashtable_free_and_destroy(&dev->peer_cache, kfilnd_peer_free, NULL);
}

/**
 * kfilnd_peer_init() - Initialize peer cache.
 * @dev: Device peer cache to be initialized.
 */
void kfilnd_peer_init(struct kfilnd_dev *dev)
{
	rhashtable_init(&dev->peer_cache, &peer_cache_params);
}

void kfilnd_peer_process_hello(struct kfilnd_peer *kp, struct kfilnd_msg *msg)
{
	/* TODO: Support RX count. */
	LASSERT(msg->proto.hello.rx_count > 0);
	atomic_set(&kp->kp_rx_base, msg->proto.hello.rx_base);

	kp->kp_remote_session_key = msg->proto.hello.session_key;

	/* If processing an incoming hello request, then negotiate kfilnd
	 * version to the minimum implemented kfilnd version.
	 */
	if (msg->type == KFILND_MSG_HELLO_REQ) {
		kp->kp_version = min_t(__u16, KFILND_MSG_VERSION,
				       msg->proto.hello.version);
		CDEBUG(D_NET,
		       "Peer %s(%p):0x%llx version: %u; local version %u; negotiated version: %u\n",
		       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr,
		       msg->proto.hello.version, KFILND_MSG_VERSION,
		       kp->kp_version);
		if (atomic_cmpxchg(&kp->kp_state, KP_STATE_NEW,
				   KP_STATE_WAIT_RSP) == KP_STATE_NEW)
			CDEBUG(D_NET, "Peer %s(%p):0x%llx new -> wait response\n",
			       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr);
	} else if (msg->type == KFILND_MSG_HELLO_RSP) {
		struct lnet_nid nid;

		kp->kp_version = msg->proto.hello.version;
		atomic_set(&kp->kp_state, KP_STATE_UPTODATE);
		CDEBUG(D_NET,
		       "Peer %s(%p):0x%llx is up-to-date negotiated version: %u\n",
		       libcfs_nid2str(kp->kp_nid), kp, kp->kp_addr,
		       msg->proto.hello.version);
		kfilnd_peer_clear_hello_state(kp);

		lnet_nid4_to_nid(kp->kp_nid, &nid);
		lnet_notify(kp->kp_dev->kfd_ni, &nid, true, false,
			    kp->kp_last_alive);
	}
}