Viewing: lib-msg.c

// SPDX-License-Identifier: GPL-2.0

/* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
 * Use is subject to license terms.
 *
 * Copyright (c) 2012, 2017, Intel Corporation.
 */

/* This file is part of Lustre, http://www.lustre.org/
 *
 * Message decoding, parsing and finalizing routines
 */

#define DEBUG_SUBSYSTEM S_LNET

#include <linux/libcfs/libcfs_fail.h>
#include <linux/lnet/lib-lnet.h>

void
lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev)
{
	ENTRY;

	memset(ev, 0, sizeof(*ev));

	ev->status   = 0;
	ev->unlinked = 1;
	ev->type     = LNET_EVENT_UNLINK;
	lnet_md_deconstruct(md, ev);
	lnet_md2handle(&ev->md_handle, md);
	EXIT;
}

/*
 * Don't need any lock, must be called after lnet_commit_md
 */
void
lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type)
{
	struct lnet_hdr	*hdr = &msg->msg_hdr;
	struct lnet_event *ev = &msg->msg_ev;

	LASSERT(!msg->msg_routing);

	ev->type = ev_type;
	ev->msg_type = msg->msg_type;

	if (ev_type == LNET_EVENT_SEND) {
		/* event for active message */
		ev->target.nid	  = hdr->dest_nid;
		ev->target.pid	  = hdr->dest_pid;
		ev->initiator.nid = LNET_ANY_NID;
		ev->initiator.pid = the_lnet.ln_pid;
		ev->source.nid	  = LNET_ANY_NID;
		ev->source.pid    = the_lnet.ln_pid;
		ev->sender	  = LNET_ANY_NID;
	} else {
		/* event for passive message */
		ev->target.pid	  = hdr->dest_pid;
		ev->target.nid	  = hdr->dest_nid;
		ev->initiator.pid = hdr->src_pid;
		/* Multi-Rail: resolve src_nid to "primary" peer NID */
		ev->initiator.nid = msg->msg_initiator;
		/* Multi-Rail: track source NID. */
		ev->source.pid	  = hdr->src_pid;
		ev->source.nid	  = hdr->src_nid;
		ev->rlength	  = hdr->payload_length;
		ev->sender	  = msg->msg_from;
		ev->mlength	  = msg->msg_wanted;
		ev->offset	  = msg->msg_offset;
	}

	switch (ev_type) {
	case LNET_EVENT_PUT: /* passive PUT */
		ev->pt_index   = hdr->msg.put.ptl_index;
		ev->match_bits = hdr->msg.put.match_bits;
		ev->hdr_data   = hdr->msg.put.hdr_data;
		return;

	case LNET_EVENT_GET: /* passive GET */
		ev->pt_index   = hdr->msg.get.ptl_index;
		ev->match_bits = hdr->msg.get.match_bits;
		ev->hdr_data   = 0;
		return;

	case LNET_EVENT_ACK: /* ACK */
		ev->match_bits = hdr->msg.ack.match_bits;
		ev->mlength    = hdr->msg.ack.mlength;
		return;

	case LNET_EVENT_REPLY: /* REPLY */
		return;

	case LNET_EVENT_SEND: /* active message */
		if (msg->msg_type == LNET_MSG_PUT) {
			ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
			ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
			ev->offset     = le32_to_cpu(hdr->msg.put.offset);
			ev->mlength    =
			ev->rlength    = le32_to_cpu(hdr->payload_length);
			ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);

		} else {
			LASSERT(msg->msg_type == LNET_MSG_GET);
			ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
			ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
			ev->mlength    =
			ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
			ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
			ev->hdr_data   = 0;
		}
		return;

	default:
		LBUG();
	}
}

/* get_msg_deadline
 *   Gets the message deadline in nanoseconds.
 *   If the LND for this message implements its own lnd_get_timeout()
 *   function via its exposed API, we use this to calculate the LNet
 *   transaction timeout (LTT) value, based on the message's NI LND timeout
 *   (LNDT) and global retry count (LRC):
 *   LTT = LNDT * (LRC + 1) + 1
 *   If the LND did not implement the lnd_get_timeout() function or the LNDT
 *   was set to zero, fall back to default global LTT implementation.
 */
static ktime_t get_msg_deadline(struct lnet_ni *msg_ni)
{
	unsigned int msg_timeout = lnet_transaction_timeout;

	if (msg_ni && msg_ni->ni_net->net_lnd->lnd_get_timeout) {
		int lnd_timeout = msg_ni->ni_net->net_lnd->lnd_get_timeout();

		if (lnd_timeout > 0)
			msg_timeout = lnd_timeout * (lnet_retry_count + 1) + 1;
	}
	return ktime_add_ns(ktime_get(), msg_timeout * NSEC_PER_SEC);
}

void
lnet_msg_commit(struct lnet_msg *msg, int cpt)
{
	struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
	struct lnet_counters_common *common;

	/* A routed message can be committed for both receiving and sending */
	LASSERT(!msg->msg_tx_committed);

	if (msg->msg_sending) {
		LASSERT(!msg->msg_receiving);

		/* Set the message deadline using msg send NI */
		msg->msg_deadline = get_msg_deadline(msg->msg_txni);
		msg->msg_tx_cpt = cpt;
		msg->msg_tx_committed = 1;
		if (msg->msg_rx_committed) { /* routed message REPLY */
			LASSERT(msg->msg_onactivelist);
			return;
		}
	} else {
		LASSERT(!msg->msg_sending);

		/* Set the message deadline using msg recv NI */
		msg->msg_deadline = get_msg_deadline(msg->msg_rxni);
		msg->msg_rx_cpt = cpt;
		msg->msg_rx_committed = 1;
	}

	LASSERT(!msg->msg_onactivelist);

	msg->msg_onactivelist = 1;
	list_add_tail(&msg->msg_activelist, &container->msc_active);

	common = &the_lnet.ln_counters[cpt]->lct_common;
	common->lcc_msgs_alloc++;
	if (common->lcc_msgs_alloc > common->lcc_msgs_max)
		common->lcc_msgs_max = common->lcc_msgs_alloc;
}

static void
lnet_msg_decommit_tx(struct lnet_msg *msg, int status)
{
	struct lnet_counters_common *common;
	struct lnet_event *ev = &msg->msg_ev;

	LASSERT(msg->msg_tx_committed);
	if (status != 0)
		goto out;

	common = &(the_lnet.ln_counters[msg->msg_tx_cpt]->lct_common);
	switch (ev->type) {
	default: /* routed message */
		LASSERT(msg->msg_routing);
		LASSERT(msg->msg_rx_committed);
		LASSERT(ev->type == 0);

		common->lcc_route_length += msg->msg_len;
		common->lcc_route_count++;
		goto incr_stats;

	case LNET_EVENT_PUT:
		/* should have been decommitted */
		LASSERT(!msg->msg_rx_committed);
		/* overwritten while sending ACK */
		LASSERT(msg->msg_type == LNET_MSG_ACK);
		msg->msg_type = LNET_MSG_PUT; /* fix type */
		break;

	case LNET_EVENT_SEND:
		LASSERT(!msg->msg_rx_committed);
		if (msg->msg_type == LNET_MSG_PUT)
			common->lcc_send_length += msg->msg_len;
		break;

	case LNET_EVENT_GET:
		LASSERT(msg->msg_rx_committed);
		/* overwritten while sending reply, we should never be
		 * here for optimized GET */
		LASSERT(msg->msg_type == LNET_MSG_REPLY);
		msg->msg_type = LNET_MSG_GET; /* fix type */
		break;
	}

	common->lcc_send_count++;

incr_stats:
	if (msg->msg_txpeer)
		lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
				msg->msg_type,
				LNET_STATS_TYPE_SEND);
	if (msg->msg_txni)
		lnet_incr_stats(&msg->msg_txni->ni_stats,
				msg->msg_type,
				LNET_STATS_TYPE_SEND);
 out:
	lnet_return_tx_credits_locked(msg);
	msg->msg_tx_committed = 0;
}

static void
lnet_msg_decommit_rx(struct lnet_msg *msg, int status)
{
	struct lnet_counters_common *common;
	struct lnet_event *ev = &msg->msg_ev;

	LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */
	LASSERT(msg->msg_rx_committed);

	if (status != 0)
		goto out;

	common = &(the_lnet.ln_counters[msg->msg_rx_cpt]->lct_common);
	switch (ev->type) {
	default:
		LASSERT(ev->type == 0);
		LASSERT(msg->msg_routing);
		goto incr_stats;

	case LNET_EVENT_ACK:
		LASSERT(msg->msg_type == LNET_MSG_ACK);
		break;

	case LNET_EVENT_GET:
		/* type is "REPLY" if it's an optimized GET on passive side,
		 * because optimized GET will never be committed for sending,
		 * so message type wouldn't be changed back to "GET" by
		 * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
		LASSERT(msg->msg_type == LNET_MSG_REPLY ||
			msg->msg_type == LNET_MSG_GET);
		common->lcc_send_length += msg->msg_wanted;
		break;

	case LNET_EVENT_PUT:
		LASSERT(msg->msg_type == LNET_MSG_PUT);
		break;

	case LNET_EVENT_REPLY:
		/* type is "GET" if it's an optimized GET on active side,
		 * see details in lnet_create_reply_msg() */
		LASSERT(msg->msg_type == LNET_MSG_GET ||
			msg->msg_type == LNET_MSG_REPLY);
		break;
	}

	common->lcc_recv_count++;

incr_stats:
	if (msg->msg_rxpeer)
		lnet_incr_stats(&msg->msg_rxpeer->lpni_stats,
				msg->msg_type,
				LNET_STATS_TYPE_RECV);
	if (msg->msg_rxni)
		lnet_incr_stats(&msg->msg_rxni->ni_stats,
				msg->msg_type,
				LNET_STATS_TYPE_RECV);
	if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
		common->lcc_recv_length += msg->msg_wanted;

 out:
	lnet_return_rx_credits_locked(msg);
	msg->msg_rx_committed = 0;
}

void
lnet_msg_decommit(struct lnet_msg *msg, int cpt, int status)
{
	int	cpt2 = cpt;

	LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
	LASSERT(msg->msg_onactivelist);

	if (msg->msg_tx_committed) { /* always decommit for sending first */
		LASSERT(cpt == msg->msg_tx_cpt);
		lnet_msg_decommit_tx(msg, status);
	}

	if (msg->msg_rx_committed) {
		/* forwarding msg committed for both receiving and sending */
		if (cpt != msg->msg_rx_cpt) {
			lnet_net_unlock(cpt);
			cpt2 = msg->msg_rx_cpt;
			lnet_net_lock(cpt2);
		}
		lnet_msg_decommit_rx(msg, status);
	}

	list_del(&msg->msg_activelist);
	msg->msg_onactivelist = 0;

	the_lnet.ln_counters[cpt2]->lct_common.lcc_msgs_alloc--;

	if (cpt2 != cpt) {
		lnet_net_unlock(cpt2);
		lnet_net_lock(cpt);
	}
}

void
lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md,
		   unsigned int offset, unsigned int mlen)
{
	/* NB: @offset and @len are only useful for receiving */
	/* Here, we attach the MD on lnet_msg and mark it busy and
	 * decrementing its threshold. Come what may, the lnet_msg "owns"
	 * the MD until a call to lnet_msg_detach_md or lnet_finalize()
	 * signals completion. */
	LASSERT(!msg->msg_routing);

	msg->msg_md = md;
	if (msg->msg_receiving) { /* committed for receiving */
		msg->msg_offset = offset;
		msg->msg_wanted = mlen;
	}

	md->md_refcount++;
	if (md->md_threshold != LNET_MD_THRESH_INF) {
		LASSERT(md->md_threshold > 0);
		md->md_threshold--;
	}

	/* build umd in event */
	lnet_md2handle(&msg->msg_ev.md_handle, md);
	lnet_md_deconstruct(md, &msg->msg_ev);
}

static int
lnet_complete_msg_locked(struct lnet_msg *msg, int cpt)
{
	struct lnet_handle_wire ack_wmd;
	int		   rc;
	int		   status = msg->msg_ev.status;

	LASSERT(msg->msg_onactivelist);

	if (status == 0 && msg->msg_ack) {
		/* Only send an ACK if the PUT completed successfully */

		lnet_msg_decommit(msg, cpt, 0);

		msg->msg_ack = 0;
		lnet_net_unlock(cpt);

		LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
		LASSERT(!msg->msg_routing);

		ack_wmd = msg->msg_hdr.msg.put.ack_wmd;

		lnet_prep_send(msg, LNET_MSG_ACK, &msg->msg_ev.source, 0, 0);

		msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
		msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
		msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);

		rc = lnet_send(&msg->msg_ev.target.nid, msg,
			       &msg->msg_from);

		lnet_net_lock(cpt);
		/*
		 * NB: message is committed for sending, we should return
		 * on success because LND will finalize this message later.
		 *
		 * Also, there is possibility that message is committed for
		 * sending and also failed before delivering to LND,
		 * i.e: ENOMEM, in that case we can't fall through either
		 * because CPT for sending can be different with CPT for
		 * receiving, so we should return back to lnet_finalize()
		 * to make sure we are locking the correct partition.
		 */
		return rc;

	} else if (status == 0 &&	/* OK so far */
		   (msg->msg_routing && !msg->msg_sending)) {
		/* not forwarded */
		LASSERT(!msg->msg_receiving);	/* called back recv already */
		lnet_net_unlock(cpt);

		rc = lnet_send(NULL, msg, NULL);

		lnet_net_lock(cpt);
		/*
		 * NB: message is committed for sending, we should return
		 * on success because LND will finalize this message later.
		 *
		 * Also, there is possibility that message is committed for
		 * sending and also failed before delivering to LND,
		 * i.e: ENOMEM, in that case we can't fall through either:
		 * - The rule is message must decommit for sending first if
		 *   the it's committed for both sending and receiving
		 * - CPT for sending can be different with CPT for receiving,
		 *   so we should return back to lnet_finalize() to make
		 *   sure we are locking the correct partition.
		 */
		return rc;
	}

	lnet_msg_decommit(msg, cpt, status);
	lnet_msg_free(msg);
	return 0;
}

/* must hold net_lock/0 */
void
lnet_ni_add_to_recoveryq_locked(struct lnet_ni *ni,
				struct list_head *recovery_queue, time64_t now)
{
	if (!list_empty(&ni->ni_recovery))
		return;

	if (atomic_read(&ni->ni_healthv) == LNET_MAX_HEALTH_VALUE)
		return;

	/* This NI is going on the recovery queue, so take a ref on it */
	lnet_ni_addref_locked(ni, 0);

	lnet_ni_set_next_ping(ni, now);

	CDEBUG(D_NET, "%s added to recovery queue. ping count: %u next ping: %lld health :%d\n",
	       libcfs_nidstr(&ni->ni_nid),
	       ni->ni_ping_count,
	       ni->ni_next_ping,
	       atomic_read(&ni->ni_healthv));

	list_add_tail(&ni->ni_recovery, recovery_queue);
}

static void
lnet_handle_local_failure(struct lnet_ni *ni)
{
	/*
	 * the lnet_net_lock(0) is used to protect the addref on the ni
	 * and the recovery queue.
	 */
	lnet_net_lock(0);
	/* the mt could've shutdown and cleaned up the queues */
	if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
		lnet_net_unlock(0);
		return;
	}

	lnet_dec_ni_healthv_locked(ni);

	lnet_ni_add_to_recoveryq_locked(ni, &the_lnet.ln_mt_localNIRecovq,
					ktime_get_seconds());
	lnet_net_unlock(0);
}

/* must hold net_lock/0 */
void
lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni)
{
	lnet_dec_lpni_healthv_locked(lpni);

	/*
	 * add the peer NI to the recovery queue if it's not already there
	 * and it's health value is actually below the maximum. It's
	 * possible that the sensitivity might be set to 0, and the health
	 * value will not be reduced. In this case, there is no reason to
	 * invoke recovery
	 */
	lnet_peer_ni_add_to_recoveryq_locked(lpni,
					     &the_lnet.ln_mt_peerNIRecovq,
					     ktime_get_seconds());
}

static void
lnet_handle_remote_failure(struct lnet_peer_ni *lpni)
{
	/* lpni could be NULL if we're in the LOLND case */
	if (!lpni)
		return;

	lnet_net_lock(0);
	/* the mt could've shutdown and cleaned up the queues */
	if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
		lnet_net_unlock(0);
		return;
	}
	lnet_handle_remote_failure_locked(lpni);
	lnet_net_unlock(0);
}

static void
lnet_incr_hstats(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
		 enum lnet_msg_hstatus hstatus, int retry_count, int cpt)
{
	struct lnet_counters_health *health;

	health = &the_lnet.ln_counters[cpt]->lct_health;

	switch (hstatus) {
	case LNET_MSG_STATUS_LOCAL_INTERRUPT:
		atomic_inc(&ni->ni_hstats.hlt_local_interrupt);
		health->lch_local_interrupt_count++;
		break;
	case LNET_MSG_STATUS_LOCAL_DROPPED:
		atomic_inc(&ni->ni_hstats.hlt_local_dropped);
		health->lch_local_dropped_count++;
		break;
	case LNET_MSG_STATUS_LOCAL_ABORTED:
		atomic_inc(&ni->ni_hstats.hlt_local_aborted);
		health->lch_local_aborted_count++;
		break;
	case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
		atomic_inc(&ni->ni_hstats.hlt_local_no_route);
		health->lch_local_no_route_count++;
		break;
	case LNET_MSG_STATUS_LOCAL_TIMEOUT:
		atomic_inc(&ni->ni_hstats.hlt_local_timeout);
		health->lch_local_timeout_count++;
		break;
	case LNET_MSG_STATUS_LOCAL_ERROR:
		atomic_inc(&ni->ni_hstats.hlt_local_error);
		health->lch_local_error_count++;
		break;
	case LNET_MSG_STATUS_REMOTE_DROPPED:
		if (lpni)
			atomic_inc(&lpni->lpni_hstats.hlt_remote_dropped);
		health->lch_remote_dropped_count++;
		break;
	case LNET_MSG_STATUS_REMOTE_ERROR:
		if (lpni)
			atomic_inc(&lpni->lpni_hstats.hlt_remote_error);
		health->lch_remote_error_count++;
		break;
	case LNET_MSG_STATUS_REMOTE_TIMEOUT:
		if (lpni)
			atomic_inc(&lpni->lpni_hstats.hlt_remote_timeout);
		health->lch_remote_timeout_count++;
		break;
	case LNET_MSG_STATUS_NETWORK_TIMEOUT:
		if (lpni)
			atomic_inc(&lpni->lpni_hstats.hlt_network_timeout);
		health->lch_network_timeout_count++;
		break;
	case LNET_MSG_STATUS_OK:
		if (retry_count)
			health->lch_successful_resends++;
		break;
	default:
		LBUG();
	}
}

static void
lnet_resend_msg_locked(struct lnet_msg *msg)
{
	msg->msg_retry_count++;

	/*
	 * remove message from the active list and reset it to prepare
	 * for a resend. Two exceptions to this
	 *
	 * 1. the router case. When a message is being routed it is
	 * committed for rx when received and committed for tx when
	 * forwarded. We don't want to remove it from the active list, since
	 * code which handles receiving expects it to remain on the active
	 * list.
	 *
	 * 2. The REPLY case. Reply messages use the same message
	 * structure for the GET that was received.
	 */
	if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) {
		list_del_init(&msg->msg_activelist);
		msg->msg_onactivelist = 0;
	}
	/*
	 * The msg_target.nid which was originally set
	 * when calling LNetGet() or LNetPut() might've
	 * been overwritten if we're routing this message.
	 * Call lnet_msg_decommit_tx() to return the credit
	 * this message consumed. The message will
	 * consume another credit when it gets resent.
	 */
	msg->msg_target.nid = msg->msg_hdr.dest_nid;
	lnet_msg_decommit_tx(msg, -EAGAIN);
	msg->msg_sending = 0;
	msg->msg_receiving = 0;
	msg->msg_target_is_router = 0;

	CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n",
	       libcfs_nidstr(&msg->msg_hdr.src_nid),
	       libcfs_nidstr(&msg->msg_hdr.dest_nid),
	       lnet_msgtyp2str(msg->msg_type),
	       lnet_health_error2str(msg->msg_health_status), msg);

	list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]);

	complete(&the_lnet.ln_mt_wait_complete);
}

static int
lnet_check_finalize_recursion_locked(struct lnet_msg *msg,
				     struct list_head *containerq,
				     int nworkers, void **workers)
{
	int my_slot = -1;
	int i;

	list_add_tail(&msg->msg_list, containerq);

	for (i = 0; i < nworkers; i++) {
		if (workers[i] == current)
			break;

		if (my_slot < 0 && workers[i] == NULL)
			my_slot = i;
	}

	if (i < nworkers || my_slot < 0)
		return -1;

	workers[my_slot] = current;

	return my_slot;
}

static int
lnet_attempt_msg_resend(struct lnet_msg *msg)
{
	struct lnet_msg_container *container;
	int my_slot;
	int cpt;

	/* we can only resend tx_committed messages */
	LASSERT(msg->msg_tx_committed);

	/* don't resend recovery messages */
	if (msg->msg_recovery) {
		CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n",
			libcfs_nidstr(&msg->msg_from),
			libcfs_nidstr(&msg->msg_target.nid),
			msg->msg_retry_count);
		return -ENOTRECOVERABLE;
	}

	/*
	 * if we explicitly indicated we don't want to resend then just
	 * return
	 */
	if (msg->msg_no_resend) {
		CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n",
			libcfs_nidstr(&msg->msg_from),
			libcfs_nidstr(&msg->msg_target.nid),
			msg->msg_retry_count);
		return -ENOTRECOVERABLE;
	}

	cpt = msg->msg_tx_cpt;
	lnet_net_lock(cpt);

	/* check if the message has exceeded the number of retries */
	if (msg->msg_retry_count >= lnet_retry_count) {
		CDEBUG(D_NET, "%s->%s exceeded retry count %d\n",
			libcfs_nidstr(&msg->msg_from),
			libcfs_nidstr(&msg->msg_target.nid),
			msg->msg_retry_count);
		if (lnet_retry_count)
			the_lnet.ln_counters[cpt]->lct_health.lch_failed_resends++;
		lnet_net_unlock(cpt);
		return -ENOTRECOVERABLE;
	}

	/* check again under lock */
	if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
		lnet_net_unlock(cpt);
		return -ESHUTDOWN;
	}

	container = the_lnet.ln_msg_containers[cpt];
	my_slot =
		lnet_check_finalize_recursion_locked(msg,
					&container->msc_resending,
					container->msc_nfinalizers,
					container->msc_resenders);

	/* enough threads are resending */
	if (my_slot == -1) {
		lnet_net_unlock(cpt);
		return 0;
	}

	while ((msg = list_first_entry_or_null(&container->msc_resending,
					       struct lnet_msg,
					       msg_list)) != NULL) {
		list_del(&msg->msg_list);

		/*
		 * resending the message will require us to call
		 * lnet_msg_decommit_tx() which will return the credit
		 * which this message holds. This could trigger another
		 * queued message to be sent. If that message fails and
		 * requires a resend we will recurse.
		 * But since at this point the slot is taken, the message
		 * will be queued in the container and dealt with
		 * later. This breaks the recursion.
		 */
		lnet_resend_msg_locked(msg);
	}

	/*
	 * msc_resenders is an array of process pointers. Each entry holds
	 * a pointer to the current process operating on the message. An
	 * array entry is created per CPT. If the array slot is already
	 * set, then it means that there is a thread on the CPT currently
	 * resending a message.
	 * Once the thread finishes clear the slot to enable the thread to
	 * take on more resend work.
	 */
	container->msc_resenders[my_slot] = NULL;
	lnet_net_unlock(cpt);

	return 0;
}

/*
 * Do a health check on the message:
 * return -1 if we're not going to handle the error or
 *   if we've reached the maximum number of retries.
 *   success case will return -1 as well
 * return 0 if it the message is requeued for send
 */
static int
lnet_health_check(struct lnet_msg *msg)
{
	enum lnet_msg_hstatus hstatus = msg->msg_health_status;
	struct lnet_peer_ni *lpni;
	struct lnet_ni *ni;
	bool lo = false;
	bool attempt_local_resend;
	bool attempt_remote_resend;
	bool handle_remote_health = true;
	ktime_t now;
	int cpt;

	LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);

	/*
	 * if we're sending to the LOLND then the msg_txpeer will not be
	 * set. So no need to sanity check it.
	 */
	if (msg->msg_tx_committed &&
	    !nid_is_lo0(&msg->msg_txni->ni_nid))
		LASSERT(msg->msg_txpeer);
	else if (msg->msg_tx_committed &&
		 nid_is_lo0(&msg->msg_txni->ni_nid))
		lo = true;

	/*
	 * always prefer txni/txpeer if they message is committed for both
	 * directions.
	 */
	if (msg->msg_tx_committed) {
		ni = msg->msg_txni;
		lpni = msg->msg_txpeer;
		attempt_local_resend = attempt_remote_resend = true;
	} else {
		ni = msg->msg_rxni;
		lpni = msg->msg_rxpeer;
		attempt_local_resend = attempt_remote_resend = false;
	}

	if (!lo)
		LASSERT(ni && lpni);
	else
		LASSERT(ni);

	now = ktime_get();
	if (ktime_after(now, msg->msg_deadline)) {
		s64 time = ktime_to_ns(ktime_sub(now, msg->msg_deadline));

		atomic64_add(time, &the_lnet.ln_late_msg_nsecs);
		atomic_inc(&the_lnet.ln_late_msg_count);

		if (hstatus != LNET_MSG_STATUS_OK)
			return -1;
	}

	CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
	       libcfs_nidstr(&ni->ni_nid),
	       (lo) ? "self" : libcfs_nidstr(&lpni->lpni_nid),
	       lnet_msgtyp2str(msg->msg_type),
	       lnet_health_error2str(hstatus));

	cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
	lnet_net_lock(cpt);

	/* if we're shutting down no point in handling health. */
	if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
		lnet_net_unlock(cpt);
		return -1;
	}

	lnet_incr_hstats(ni, lpni, hstatus, msg->msg_retry_count, cpt);

	/* Whether to update health values or perform resends is only applicable
	 * for messages with a health status != OK.
	 */
	if (hstatus != LNET_MSG_STATUS_OK) {
		struct lnet_ping_info *pi;

		/* For local failures, health/recovery/resends are not needed if
		 * I only have a single (non-lolnd) interface.
		 */
		pi = &the_lnet.ln_ping_target->pb_info;
		if (lnet_ping_at_least_two_entries(pi))
			attempt_local_resend = false;

		/* For remote failures, health/recovery/resends are not needed
		 * if the peer only has a single interface. Special case for
		 * routers where we rely on health feature to manage route and
		 * peer aliveness. NB: unlike pb_nnis above, lp_nnis does _not_
		 * include the lolnd, so a single-rail node would have
		 * lp_nnis == 1.
		 */
		if (lpni && lpni->lpni_peer_net &&
		    lpni->lpni_peer_net->lpn_peer &&
		    lpni->lpni_peer_net->lpn_peer->lp_nnis <= 1) {
			attempt_remote_resend = false;
			if (!(lnet_isrouter(lpni) || lnet_routing_enabled()))
				handle_remote_health = false;
		}
		/* Do not put my interfaces into peer NI recovery. They should
		 * be handled with local NI recovery.
		 */
		if (handle_remote_health && lpni &&
		    lnet_nid_to_ni_locked(&lpni->lpni_nid, cpt))
			handle_remote_health = false;
	}

	lnet_net_unlock(cpt);

	switch (hstatus) {
	case LNET_MSG_STATUS_OK:
		/*
		 * increment the local ni health whether we successfully
		 * received or sent a message on it.
		 *
		 * Ping counts are reset to 0 as appropriate to allow for
		 * faster recovery.
		 */
		lnet_inc_ni_healthv(ni);

		/*
		 * It's possible msg_txpeer is NULL in the LOLND
		 * case. Only increment the peer's health if we're
		 * receiving a message from it. It's the only sure way to
		 * know that a remote interface is up.
		 * If this interface is part of a router, then take that
		 * as indication that the router is fully healthy.
		 */
		if (lpni && msg->msg_rx_committed) {
			lnet_net_lock(0);
			lpni->lpni_ping_count = 0;
			ni->ni_ping_count = 0;
			/*
			 * If we're receiving a message from the router or
			 * I'm a router, then set that lpni's health to
			 * maximum so we can commence communication
			 */
			if ((lnet_isrouter(lpni) || lnet_routing_enabled()) &&
			     likely(!CFS_FAIL_CHECK(CFS_FAIL_RTR_HEALTH_INC))) {
				lnet_set_lpni_healthv_locked(lpni,
					LNET_MAX_HEALTH_VALUE);
			} else {
				lnet_inc_lpni_healthv_locked(lpni);
				/* This peer NI may have previously aged out
				 * of recovery. Now that we've received a
				 * message from it, we can continue recovery
				 * if its health value is still below the
				 * maximum.
				 */
				lnet_peer_ni_add_to_recoveryq_locked(lpni,
						&the_lnet.ln_mt_peerNIRecovq,
						ktime_get_seconds());
			}
			lnet_net_unlock(0);
		}

		/* we can finalize this message */
		return -1;
	case LNET_MSG_STATUS_LOCAL_INTERRUPT:
	case LNET_MSG_STATUS_LOCAL_DROPPED:
	case LNET_MSG_STATUS_LOCAL_ABORTED:
	case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
	case LNET_MSG_STATUS_LOCAL_TIMEOUT:
		lnet_handle_local_failure(ni);
		if (attempt_local_resend)
			return lnet_attempt_msg_resend(msg);
		break;
	case LNET_MSG_STATUS_LOCAL_ERROR:
		lnet_handle_local_failure(ni);
		return -1;
	case LNET_MSG_STATUS_REMOTE_DROPPED:
		if (handle_remote_health)
			lnet_handle_remote_failure(lpni);
		if (attempt_remote_resend)
			return lnet_attempt_msg_resend(msg);
		break;
	case LNET_MSG_STATUS_REMOTE_ERROR:
	case LNET_MSG_STATUS_REMOTE_TIMEOUT:
		if (handle_remote_health)
			lnet_handle_remote_failure(lpni);
		return -1;
	case LNET_MSG_STATUS_NETWORK_TIMEOUT:
		if (handle_remote_health)
			lnet_handle_remote_failure(lpni);
		lnet_handle_local_failure(ni);
		return -1;
	default:
		LBUG();
	}

	/* no resend is needed */
	return -1;
}

static void
lnet_msg_detach_md(struct lnet_msg *msg, int status)
{
	struct lnet_libmd *md = msg->msg_md;
	lnet_handler_t handler = NULL;
	int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
	int unlink;

	lnet_res_lock(cpt);
	while (md->md_flags & LNET_MD_FLAG_HANDLING)
		/* An event handler is running - wait for it to
		 * complete to avoid races.
		 */
		lnet_md_wait_handling(md, cpt);

	/* Now it's safe to drop my caller's ref */
	md->md_refcount--;
	LASSERT(md->md_refcount >= 0);

	unlink = lnet_md_unlinkable(md);
	if (md->md_handler) {
		if ((md->md_flags & LNET_MD_FLAG_ABORTED) && !status) {
			msg->msg_ev.status   = -ETIMEDOUT;
			CDEBUG(D_NET, "md 0x%p already unlinked\n", md);
		} else {
			msg->msg_ev.status   = status;
		}
		msg->msg_ev.unlinked = unlink;
		handler = md->md_handler;
		if (!unlink)
			md->md_flags |= LNET_MD_FLAG_HANDLING;
	}

	if (unlink || (md->md_refcount == 0 &&
		       md->md_threshold == LNET_MD_THRESH_INF))
		lnet_detach_rsp_tracker(md, cpt);

	msg->msg_md = NULL;
	if (unlink)
		lnet_md_unlink(md);

	lnet_res_unlock(cpt);

	if (handler) {
		handler(&msg->msg_ev);
		if (!unlink) {
			lnet_res_lock(cpt);
			md->md_flags &= ~LNET_MD_FLAG_HANDLING;
			wake_up_var(md);
			lnet_res_unlock(cpt);
		}
	}
}

static bool
lnet_is_health_check(struct lnet_msg *msg)
{
	bool hc = true;
	int status = msg->msg_ev.status;

	if ((!msg->msg_tx_committed && !msg->msg_rx_committed) ||
	    !msg->msg_onactivelist) {
		CDEBUG(D_NET, "msg %p not committed for send or receive\n",
		       msg);
		return false;
	}

	if ((msg->msg_tx_committed && !msg->msg_txpeer) ||
	    (msg->msg_rx_committed && !msg->msg_rxpeer)) {
		/* The optimized GET case does not set msg_rxpeer, but status
		 * could be zero. Only print the error message if we have a
		 * non-zero status.
		 */
		if (status)
			CDEBUG(D_NET, "msg %p status %d cannot retry\n", msg,
			       status);
		return false;
	}

	/* Check for status inconsistencies */
	if ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) ||
	     (status && msg->msg_health_status == LNET_MSG_STATUS_OK)) {
		CDEBUG(D_NET, "Msg %p is in inconsistent state, don't perform health "
		       "checking (%d, %d)\n", msg, status,
		       msg->msg_health_status);
		hc = false;
	}

	CDEBUG(D_NET, "health check = %d, status = %d, hstatus = %d\n",
	       hc, status, msg->msg_health_status);

	return hc;
}

char *
lnet_health_error2str(enum lnet_msg_hstatus hstatus)
{
	switch (hstatus) {
	case LNET_MSG_STATUS_LOCAL_INTERRUPT:
		return "LOCAL_INTERRUPT";
	case LNET_MSG_STATUS_LOCAL_DROPPED:
		return "LOCAL_DROPPED";
	case LNET_MSG_STATUS_LOCAL_ABORTED:
		return "LOCAL_ABORTED";
	case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
		return "LOCAL_NO_ROUTE";
	case LNET_MSG_STATUS_LOCAL_TIMEOUT:
		return "LOCAL_TIMEOUT";
	case LNET_MSG_STATUS_LOCAL_ERROR:
		return "LOCAL_ERROR";
	case LNET_MSG_STATUS_REMOTE_DROPPED:
		return "REMOTE_DROPPED";
	case LNET_MSG_STATUS_REMOTE_ERROR:
		return "REMOTE_ERROR";
	case LNET_MSG_STATUS_REMOTE_TIMEOUT:
		return "REMOTE_TIMEOUT";
	case LNET_MSG_STATUS_NETWORK_TIMEOUT:
		return "NETWORK_TIMEOUT";
	case LNET_MSG_STATUS_OK:
		return "OK";
	default:
		return "<UNKNOWN>";
	}
}

bool
lnet_send_error_simulation(struct lnet_msg *msg,
			   enum lnet_msg_hstatus *hstatus)
{
	if (!msg)
		return false;

	if (list_empty(&the_lnet.ln_drop_rules))
	    return false;

	/* match only health rules */
	if (!lnet_drop_rule_match(&msg->msg_hdr, NULL, hstatus))
		return false;

	CDEBUG(D_NET, "src %s(%s)->dst %s: %s simulate health error: %s\n",
		libcfs_nidstr(&msg->msg_hdr.src_nid),
		libcfs_nidstr(&msg->msg_txni->ni_nid),
		libcfs_nidstr(&msg->msg_hdr.dest_nid),
		lnet_msgtyp2str(msg->msg_type),
		lnet_health_error2str(*hstatus));

	return true;
}
EXPORT_SYMBOL(lnet_send_error_simulation);

void
lnet_finalize(struct lnet_msg *msg, int status)
{
	struct lnet_msg_container *container;
	int my_slot;
	int cpt;
	int rc;

	LASSERT(!in_interrupt());

	if (msg == NULL)
		return;

	msg->msg_ev.status = status;

	if (lnet_is_health_check(msg)) {
		/*
		 * Check the health status of the message. If it has one
		 * of the errors that we're supposed to handle, and it has
		 * not timed out, then
		 *	1. Decrement the appropriate health_value
		 *	2. queue the message on the resend queue

		 * if the message send is success, timed out or failed in the
		 * health check for any reason then we'll just finalize the
		 * message. Otherwise just return since the message has been
		 * put on the resend queue.
		 */
		if (!lnet_health_check(msg))
			return;
	}

	/*
	 * We're not going to resend this message so detach its MD and invoke
	 * the appropriate callbacks
	 */
	if (msg->msg_md != NULL)
		lnet_msg_detach_md(msg, status);

again:
	if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
		/* not committed to network yet */
		LASSERT(!msg->msg_onactivelist);
		lnet_msg_free(msg);
		return;
	}

	/*
	 * NB: routed message can be committed for both receiving and sending,
	 * we should finalize in LIFO order and keep counters correct.
	 * (finalize sending first then finalize receiving)
	 */
	cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
	lnet_net_lock(cpt);

	container = the_lnet.ln_msg_containers[cpt];

	/* Recursion breaker.  Don't complete the message here if I am (or
	 * enough other threads are) already completing messages */
	my_slot = lnet_check_finalize_recursion_locked(msg,
						&container->msc_finalizing,
						container->msc_nfinalizers,
						container->msc_finalizers);

	/* enough threads are resending */
	if (my_slot == -1) {
		lnet_net_unlock(cpt);
		return;
	}

	rc = 0;
	while ((msg = list_first_entry_or_null(&container->msc_finalizing,
					       struct lnet_msg,
					       msg_list)) != NULL) {
		list_del_init(&msg->msg_list);

		/* NB drops and regains the lnet lock if it actually does
		 * anything, so my finalizing friends can chomp along too */
		rc = lnet_complete_msg_locked(msg, cpt);
		if (rc != 0)
			break;
	}

	if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
		lnet_net_unlock(cpt);
		lnet_delay_rule_check();
		lnet_net_lock(cpt);
	}

	container->msc_finalizers[my_slot] = NULL;
	lnet_net_unlock(cpt);

	if (rc != 0)
		goto again;
}
EXPORT_SYMBOL(lnet_finalize);

void
lnet_msg_container_cleanup(struct lnet_msg_container *container)
{
	struct lnet_msg *msg;
	int count = 0;

	if (container->msc_init == 0)
		return;

	while ((msg = list_first_entry_or_null(&container->msc_active,
					       struct lnet_msg,
					       msg_activelist)) != NULL) {
		LASSERT(msg->msg_onactivelist);
		msg->msg_onactivelist = 0;
		list_del_init(&msg->msg_activelist);
		lnet_msg_free(msg);
		count++;
	}

	if (count > 0)
		CERROR("%d active msg on exit\n", count);

	if (container->msc_finalizers != NULL) {
		CFS_FREE_PTR_ARRAY(container->msc_finalizers,
				   container->msc_nfinalizers);
		container->msc_finalizers = NULL;
	}

	if (container->msc_resenders != NULL) {
		CFS_FREE_PTR_ARRAY(container->msc_resenders,
				   container->msc_nfinalizers);
		container->msc_resenders = NULL;
	}
	container->msc_init = 0;
}

int
lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
{
	int rc = 0;

	container->msc_init = 1;

	INIT_LIST_HEAD(&container->msc_active);
	INIT_LIST_HEAD(&container->msc_finalizing);
	INIT_LIST_HEAD(&container->msc_resending);

	/* number of CPUs */
	container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
	if (container->msc_nfinalizers == 0)
		container->msc_nfinalizers = 1;

	LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
			 container->msc_nfinalizers *
			 sizeof(*container->msc_finalizers));

	if (container->msc_finalizers == NULL) {
		CERROR("Failed to allocate message finalizers\n");
		lnet_msg_container_cleanup(container);
		return -ENOMEM;
	}

	LIBCFS_CPT_ALLOC(container->msc_resenders, lnet_cpt_table(), cpt,
			 container->msc_nfinalizers *
			 sizeof(*container->msc_resenders));

	if (container->msc_resenders == NULL) {
		CERROR("Failed to allocate message resenders\n");
		lnet_msg_container_cleanup(container);
		return -ENOMEM;
	}

	return rc;
}

void
lnet_msg_containers_destroy(void)
{
	struct lnet_msg_container *container;
	int	i;

	if (the_lnet.ln_msg_containers == NULL)
		return;

	cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers)
		lnet_msg_container_cleanup(container);

	cfs_percpt_free(the_lnet.ln_msg_containers);
	the_lnet.ln_msg_containers = NULL;
}

int
lnet_msg_containers_create(void)
{
	struct lnet_msg_container *container;
	int	rc;
	int	i;

	the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(),
						      sizeof(*container));

	if (the_lnet.ln_msg_containers == NULL) {
		CERROR("Failed to allocate cpu-partition data for network\n");
		return -ENOMEM;
	}

	cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) {
		rc = lnet_msg_container_setup(container, i);
		if (rc != 0) {
			lnet_msg_containers_destroy();
			return rc;
		}
	}

	return 0;
}