Viewing: kfilnd_tn.c

// SPDX-License-Identifier: GPL-2.0

/*
 * Copyright 2022 Hewlett Packard Enterprise Development LP
 */

/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * kfilnd transaction and state machine processing.
 */

#include "kfilnd_tn.h"
#include "kfilnd_ep.h"
#include "kfilnd_dev.h"
#include "kfilnd_dom.h"
#include "kfilnd_peer.h"
#include <asm/checksum.h>
#include <linux/mempool.h>

static struct kmem_cache *tn_cache;
static struct kmem_cache *imm_buf_cache;

/* Mempool for guaranteed allocation under memory pressure */
static mempool_t *tn_cache_mp;
static mempool_t *imm_buf_cache_mp;

static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
{
	if (cksum)
		return csum_fold(csum_partial(ptr, nob, 0));
	return NO_CHECKSUM;
}

static int kfilnd_tn_msgtype2size(struct kfilnd_msg *msg)
{
	const int hdr_size = offsetof(struct kfilnd_msg, proto);

	switch (msg->type) {
	case KFILND_MSG_IMMEDIATE:
		return offsetof(struct kfilnd_msg, proto.immed.payload[0]);

	case KFILND_MSG_BULK_PUT_REQ:
	case KFILND_MSG_BULK_GET_REQ:
		if (msg->version == KFILND_MSG_VERSION_1)
			return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
		else if (msg->version == KFILND_MSG_VERSION_2)
			return hdr_size + sizeof(struct kfilnd_bulk_req_msg_v2);
		fallthrough;
	default:
		return -1;
	}
}

static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
{
	struct kfilnd_msg *msg = tn->tn_tx_msg.msg;

	/* Pack the protocol header and payload. */
	msg->proto.hello.version = KFILND_MSG_VERSION;
	msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
	msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;

	/* TODO: Support multiple RX contexts per peer. */
	msg->proto.hello.rx_count = 1;

	/* Pack the transport header. */
	msg->magic = KFILND_MSG_MAGIC;

	/* Mesage version zero is only valid for hello requests. */
	msg->version = 0;
	msg->type = KFILND_MSG_HELLO_REQ;
	msg->nob = sizeof(struct kfilnd_hello_msg) +
		offsetof(struct kfilnd_msg, proto);
	msg->cksum = NO_CHECKSUM;
	msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
	msg->dstnid = tn->tn_kp->kp_nid;

	/* Checksum entire message. */
	msg->cksum = kfilnd_tn_cksum(msg, msg->nob);

	tn->tn_tx_msg.length = msg->nob;
}

static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
{
	struct kfilnd_msg *msg = tn->tn_tx_msg.msg;

	/* Pack the protocol header and payload. */
	msg->proto.hello.version = tn->tn_kp->kp_version;
	msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
	msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;

	/* TODO: Support multiple RX contexts per peer. */
	msg->proto.hello.rx_count = 1;

	/* Pack the transport header. */
	msg->magic = KFILND_MSG_MAGIC;

	/* Mesage version zero is only valid for hello requests. */
	msg->version = 0;
	msg->type = KFILND_MSG_HELLO_RSP;
	msg->nob = sizeof(struct kfilnd_hello_msg) +
		offsetof(struct kfilnd_msg, proto);
	msg->cksum = NO_CHECKSUM;
	msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
	msg->dstnid = tn->tn_kp->kp_nid;

	/* Checksum entire message. */
	msg->cksum = kfilnd_tn_cksum(msg, msg->nob);

	tn->tn_tx_msg.length = msg->nob;
}

static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
{
	struct kfilnd_msg *msg = tn->tn_tx_msg.msg;

	/* Pack the transport header. */
	msg->magic = KFILND_MSG_MAGIC;
	msg->version = tn->tn_kp->kp_version;
	msg->type = tn->msg_type;
	msg->cksum = NO_CHECKSUM;
	msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
	msg->dstnid = tn->tn_kp->kp_nid;

	if (msg->version == KFILND_MSG_VERSION_1) {
		msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
			offsetof(struct kfilnd_msg, proto);

		/* Pack the protocol header and payload. */
		lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr,
				 &msg->proto.bulk_req.hdr);
		msg->proto.bulk_req.key = tn->tn_mr_key;
		msg->proto.bulk_req.response_rx = tn->tn_response_rx;
	} else {
		msg->nob = sizeof(struct kfilnd_bulk_req_msg_v2) +
			offsetof(struct kfilnd_msg, proto);

		/* Pack the protocol header and payload. */
		lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr,
				 &msg->proto.bulk_req_v2.kbrm2_hdr);
		msg->proto.bulk_req_v2.kbrm2_key = tn->tn_mr_key;
		msg->proto.bulk_req_v2.kbrm2_response_rx = tn->tn_response_rx;
		msg->proto.bulk_req_v2.kbrm2_session_key =
						tn->tn_kp->kp_local_session_key;
	}

	/* Checksum entire message. */
	msg->cksum = kfilnd_tn_cksum(msg, msg->nob);

	tn->tn_tx_msg.length = msg->nob;
}

static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
{
	struct kfilnd_msg *msg = tn->tn_tx_msg.msg;

	/* Pack the protocol header and payload. */
	lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);

	/* Pack the transport header. */
	msg->magic = KFILND_MSG_MAGIC;
	msg->version = tn->tn_kp->kp_version;
	msg->type = tn->msg_type;
	msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
	msg->cksum = NO_CHECKSUM;
	msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
	msg->dstnid = tn->tn_kp->kp_nid;

	/* Checksum entire message. */
	msg->cksum = kfilnd_tn_cksum(msg, msg->nob);

	tn->tn_tx_msg.length = msg->nob;
}

static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
				unsigned int nob)
{
	const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);

	if (nob < hdr_size) {
		KFILND_EP_ERROR(ep, "Short message: %u", nob);
		return -EPROTO;
	}

	/* TODO: Support byte swapping on mixed endian systems. */
	if (msg->magic != KFILND_MSG_MAGIC) {
		KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
		return -EPROTO;
	}

	/* TODO: Allow for older versions. */
	if (msg->version > KFILND_MSG_VERSION) {
		KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
		return -EPROTO;
	}

	if (msg->nob > nob) {
		KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
				msg->nob);
		return -EPROTO;
	}

	/* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
	if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
		KFILND_EP_ERROR(ep, "Bad checksum");
		return -EPROTO;
	}

	if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
		KFILND_EP_ERROR(ep, "Bad destination nid: %s",
				libcfs_nid2str(msg->dstnid));
		return -EPROTO;
	}

	if (msg->srcnid == LNET_NID_ANY) {
		KFILND_EP_ERROR(ep, "Bad source nid: %s",
				libcfs_nid2str(msg->srcnid));
		return -EPROTO;
	}

	if (msg->nob < kfilnd_tn_msgtype2size(msg)) {
		KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
				msg_type_to_str(msg->type),
				msg->nob, kfilnd_tn_msgtype2size(msg));
		return -EPROTO;
	}

	switch ((enum kfilnd_msg_type)msg->type) {
	case KFILND_MSG_IMMEDIATE:
	case KFILND_MSG_BULK_PUT_REQ:
	case KFILND_MSG_BULK_GET_REQ:
		if (msg->version == 0) {
			KFILND_EP_ERROR(ep,
					"Bad message type and version: type=%s version=%u",
					msg_type_to_str(msg->type),
					msg->version);
			return -EPROTO;
		}
		break;

	case KFILND_MSG_HELLO_REQ:
	case KFILND_MSG_HELLO_RSP:
		if (msg->version != 0) {
			KFILND_EP_ERROR(ep,
					"Bad message type and version: type=%s version=%u",
					msg_type_to_str(msg->type),
					msg->version);
			return -EPROTO;
		}
		break;

	default:
		CERROR("Unknown message type %x\n", msg->type);
		return -EPROTO;
	}
	return 0;
}

static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
{
	unsigned int data_size_bucket =
		kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
	struct kfilnd_tn_duration_stat *stat;
	s64 time;
	s64 cur;

	if (tn->is_initiator)
		stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
	else
		stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];

	time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts));
	atomic64_add(time, &stat->accumulated_duration);
	atomic_inc(&stat->accumulated_count);

	do {
		cur = atomic64_read(&stat->max_duration);
		if (time <= cur)
			break;
	} while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);

	do {
		cur = atomic64_read(&stat->min_duration);
		if (time >= cur)
			break;
	} while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
}

static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
				   enum tn_states new_state)
{
	KFILND_TN_DEBUG(tn, "%s -> %s state change",
			tn_state_to_str(tn->tn_state),
			tn_state_to_str(new_state));

	kfilnd_tn_record_state_change(tn);

	tn->tn_state = new_state;
	tn->tn_state_ts = ktime_get();
}

static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
				    enum lnet_msg_hstatus hstatus)
{
	/* Only the first non-ok status will take. */
	if (tn->tn_status == 0) {
		KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
				status);
		tn->tn_status = status;
	}

	if (tn->hstatus == LNET_MSG_STATUS_OK) {
		KFILND_TN_DEBUG(tn, "%d -> %d health status change",
				tn->hstatus, hstatus);
		tn->hstatus = hstatus;
	}
}

static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
{
	return tn->tn_status != 0;
}

/**
 * kfilnd_tn_process_rx_event() - Process an immediate receive event.
 *
 * For each immediate receive, a transaction structure needs to be allocated to
 * process the receive.
 */
void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
				struct kfilnd_msg *rx_msg, int msg_size)
{
	struct kfilnd_transaction *tn;
	struct kfilnd_ep *ep = bufdesc->immed_end;
	bool alloc_msg = true;
	int rc;
	enum tn_events event = TN_EVENT_RX_HELLO;
	enum kfilnd_msg_type msg_type;

	/* Increment buf ref count for this work */
	atomic_inc(&bufdesc->immed_ref);

	/* Unpack the message */
	rc = kfilnd_tn_unpack_msg(ep, rx_msg, msg_size);
	if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
		kfilnd_ep_imm_buffer_put(bufdesc);
		KFILND_EP_ERROR(ep, "Failed to unpack message %d", rc);
		return;
	}

	msg_type = (enum kfilnd_msg_type)rx_msg->type;

	switch (msg_type) {
	case KFILND_MSG_IMMEDIATE:
	case KFILND_MSG_BULK_PUT_REQ:
	case KFILND_MSG_BULK_GET_REQ:
		event = TN_EVENT_RX_OK;
		fallthrough;
	case KFILND_MSG_HELLO_RSP:
		alloc_msg = false;
		fallthrough;
	case KFILND_MSG_HELLO_REQ:
		/* Context points to a received buffer and status is the length.
		 * Allocate a Tn structure, set its values, then launch the
		 * receive.
		 */
		tn = kfilnd_tn_alloc(ep->end_dev, ep->end_cpt,
				     rx_msg->srcnid, alloc_msg, false,
				     false);
		if (IS_ERR(tn)) {
			kfilnd_ep_imm_buffer_put(bufdesc);
			KFILND_EP_ERROR(ep,
				"Failed to allocate transaction struct: rc=%ld",
					PTR_ERR(tn));
			return;
		}

		tn->tn_rx_msg.msg = rx_msg;
		tn->tn_rx_msg.length = msg_size;
		tn->tn_posted_buf = bufdesc;
		tn->msg_type = msg_type;

		KFILND_EP_DEBUG(ep, "%s TN %p tmk %u",
				msg_type_to_str(tn->msg_type), tn,
				tn->tn_mr_key);
		break;

	default:
		KFILND_EP_ERROR(ep, "Unhandled kfilnd message type: %d",
				msg_type);
		LBUG();
	};

	kfilnd_tn_event_handler(tn, event, 0);
}

static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
{
	unsigned int data_size_bucket =
		kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
	struct kfilnd_tn_duration_stat *stat;
	s64 time;
	s64 cur;

	if (tn->is_initiator)
		stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
	else
		stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];

	time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts));
	atomic64_add(time, &stat->accumulated_duration);
	atomic_inc(&stat->accumulated_count);

	do {
		cur = atomic64_read(&stat->max_duration);
		if (time <= cur)
			break;
	} while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);

	do {
		cur = atomic64_read(&stat->min_duration);
		if (time >= cur)
			break;
	} while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
}

/**
 * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
 *
 * All state machine functions should call kfilnd_tn_finalize() instead of
 * kfilnd_tn_free(). Once all expected asynchronous events have been received,
 * if the transaction lock has not been released, it will now be released,
 * transaction resources cleaned up, and LNet finalized will be called.
 */
static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
{
	if (!*tn_released) {
		mutex_unlock(&tn->tn_lock);
		*tn_released = true;
	}

	/* Release the reference on the multi-receive buffer. */
	if (tn->tn_posted_buf)
		kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);

#ifdef HAVE_KFI_SGL
	if (tn->tn_sgt_mapped) {
		struct device *device = tn->tn_ep->end_dev->device;
		enum dma_data_direction dmadir = tn->tn_dmadir;
		int rc = 0;

		if (tn->tn_gpu)
			rc = lnet_rdma_unmap_sg(device, tn->tn_sgt.sgl,
						tn->tn_sgt.nents, dmadir);
		else
			dma_unmap_sgtable(device, &tn->tn_sgt, dmadir, 0);

		CDEBUG(D_NET,
		       "tn %p tn_sgt %p sgl %p dir %u orig_nents %u nents %u gpu %s rc %d\n",
		       tn, &tn->tn_sgt, tn->tn_sgt.sgl, tn->tn_dmadir,
		       tn->tn_sgt.orig_nents, tn->tn_sgt.nents,
		       tn->tn_gpu ? "y" : "n", rc);
	}
#endif

	/* Finalize LNet operation. */
	if (tn->tn_lntmsg) {
		tn->tn_lntmsg->msg_health_status = tn->hstatus;
		lnet_finalize(tn->tn_lntmsg, tn->tn_status);
	}

	if (tn->tn_getreply) {
		tn->tn_getreply->msg_health_status = tn->hstatus;
		lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
				       tn->tn_getreply,
				       tn->tn_status ? 0 : tn->tn_nob);
		lnet_finalize(tn->tn_getreply, tn->tn_status);
	}

	if (KFILND_TN_PEER_VALID(tn))
		kfilnd_peer_put(tn->tn_kp);

	kfilnd_tn_record_state_change(tn);
	kfilnd_tn_record_duration(tn);

	kfilnd_tn_free(tn);
}

/**
 * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
 * @tn: Transaction to have tagged received cancelled.
 *
 * Return: 0 on success. Else, negative errno. If an error occurs, resources may
 * be leaked.
 */
static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
{
	int rc;

	/* Issue a cancel. A return code of zero means the operation issued an
	 * async cancel. A return code of -ENOENT means the tagged receive was
	 * not found. The assumption here is that a tagged send landed thus
	 * removing the tagged receive buffer from hardware. For both cases,
	 * async events should occur.
	 */
	rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
	if (rc != 0 && rc != -ENOENT) {
		KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
		return rc;
	}

	return 0;
}

static void kfilnd_tn_timeout_work(struct work_struct *work)
{
	struct kfilnd_transaction *tn =
		container_of(work, struct kfilnd_transaction, timeout_work);

	KFILND_TN_ERROR(tn, "Bulk operation timeout");
	kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
}

static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
{
	struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);

	queue_work(kfilnd_wq, &tn->timeout_work);
}

static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
{
	return timer_delete(&tn->timeout_timer);
}

static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
{
	ktime_t remaining_time = max_t(ktime_t, 0,
				       tn->deadline - ktime_get_seconds());
	unsigned long expires = remaining_time * HZ + jiffies;

	if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
		expires = jiffies;

	cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
			(unsigned long)tn, 0);
	mod_timer(&tn->timeout_timer, expires);
}

/*  The following are the state machine routines for the transactions. */
static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
				       enum tn_events event, int status,
				       bool *tn_released)
{
	int rc;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_INIT_BULK:
		/* Need to cancel the tagged receive to prevent resources from
		 * being leaked.
		 */
		rc = kfilnd_tn_cancel_tag_recv(tn);

		switch (rc) {
		/* Async event will progress transaction. */
		case 0:
			kfilnd_tn_state_change(tn, TN_STATE_FAIL);
			return 0;

		/* Need to replay TN_EVENT_INIT_BULK event while in the
		 * TN_STATE_SEND_FAILED state.
		 */
		case -EAGAIN:
			KFILND_TN_DEBUG(tn,
					"Need to replay cancel tagged recv");
			return -EAGAIN;

		default:
			KFILND_TN_ERROR(tn,
					"Unexpected error during cancel tagged receive: rc=%d",
					rc);
			LBUG();
			return -EINVAL;
		}
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
		return -EINVAL;
	}
}

static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
					      enum tn_events event, int status,
					      bool *tn_released)
{
	int rc;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_INIT_BULK:
		tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
		KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
				libcfs_nid2str(tn->tn_kp->kp_nid),
				tn->tn_target_addr);

		kfilnd_tn_pack_bulk_req(tn);

		rc = kfilnd_ep_post_send(tn->tn_ep, tn);
		switch (rc) {
		/* Async event will progress immediate send. */
		case 0:
			kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
			return 0;

		/* Need to replay TN_EVENT_INIT_BULK event while in the
		 * TN_STATE_TAGGED_RECV_POSTED state.
		 */
		case -EAGAIN:
			KFILND_TN_DEBUG(tn,
					"Need to replay post send to %s(%#llx)",
					libcfs_nid2str(tn->tn_kp->kp_nid),
					tn->tn_target_addr);
			return -EAGAIN;

		/* Need to transition to the TN_STATE_SEND_FAILED to cleanup
		 * posted tagged receive buffer.
		 */
		default:
			KFILND_TN_ERROR(tn,
					"Failed to post send to %s(%#llx): rc=%d",
					libcfs_nid2str(tn->tn_kp->kp_nid),
					tn->tn_target_addr, rc);
			kfilnd_tn_status_update(tn, rc,
						LNET_MSG_STATUS_LOCAL_ERROR);
			kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);

			/* Propogate TN_EVENT_INIT_BULK event to
			 * TN_STATE_SEND_FAILED handler.
			 */
			return kfilnd_tn_state_send_failed(tn, event, rc,
							   tn_released);
		}

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
		return -EINVAL;
	}
}

static bool kfilnd_tn_can_replay(struct kfilnd_transaction *tn,
				 enum tn_events event)
{
	if (event == TN_EVENT_INIT_IMMEDIATE)
		return true;

	if (event == TN_EVENT_INIT_BULK)
		return true;

	if (event == TN_EVENT_RX_OK && tn->tn_early_rx)
		return true;

	return false;
}

static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
				enum tn_events event, int status,
				bool *tn_released)
{
	struct kfilnd_msg *msg;
	int rc = 0;
	bool finalize = false;
	struct lnet_hdr hdr;
	struct lnet_nid srcnid;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	/* For new peers, send a hello request message and queue the true LNet
	 * message for replay.
	 */
	if (kfilnd_peer_needs_throttle(tn->tn_kp) &&
	    kfilnd_tn_can_replay(tn, event)) {
		if (kfilnd_peer_deleted(tn->tn_kp)) {
			/* We'll assign a NETWORK_TIMEOUT message health status
			 * below because we don't know why this peer was marked
			 * for removal
			 */
			rc = -ESTALE;
			KFILND_TN_DEBUG(tn, "Drop message to deleted peer");
		} else if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
			/* We're throttling transactions to this peer until
			 * a handshake can be completed, but there is no HELLO
			 * currently in flight. This implies the HELLO has
			 * failed, and we should cancel this TN. Otherwise we
			 * are stuck waiting for the TN deadline.
			 *
			 * We assign NETWORK_TIMEOUT health status below because
			 * we do not know why the HELLO failed.
			 */
			rc = -ECANCELED;
			KFILND_TN_DEBUG(tn, "Cancel throttled TN");
		} else if (ktime_before(ktime_get_seconds(),
					tn->tn_replay_deadline)) {
			/* If the transaction replay deadline has not been met,
			 * then return -EAGAIN. This will cause this transaction
			 * event to be replayed. During this time, an async
			 * hello message from the peer should occur at which
			 * point we can resume sending new messages to this peer
			 */
			KFILND_TN_DEBUG(tn, "hello response pending");
			return -EAGAIN;
		} else {
			rc = -ETIMEDOUT;
		}

		kfilnd_tn_status_update(tn, rc,
					LNET_MSG_STATUS_NETWORK_TIMEOUT);
		rc = 0;
		goto out;
	}

	if ((event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK) &&
	    ktime_after(ktime_get_seconds(), tn->tn_replay_deadline)) {
		kfilnd_tn_status_update(tn, -ETIMEDOUT,
					LNET_MSG_STATUS_NETWORK_TIMEOUT);
		rc = 0;
		goto out;
	}

	if (CFS_FAIL_CHECK_VALUE(CFS_KFI_REPLAY_IDLE_EVENT, event))
		return -EAGAIN;

	switch (event) {
	case TN_EVENT_INIT_IMMEDIATE:
	case TN_EVENT_TX_HELLO:
		tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
		KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
				libcfs_nid2str(tn->tn_kp->kp_nid),
				tn->tn_target_addr);

		if (event == TN_EVENT_INIT_IMMEDIATE)
			kfilnd_tn_pack_immed_msg(tn);
		else
			kfilnd_tn_pack_hello_req(tn);

		/* Send immediate message. */
		rc = kfilnd_ep_post_send(tn->tn_ep, tn);
		switch (rc) {
		/* Async event will progress immediate send. */
		case 0:
			kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
			return 0;

		/* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
		 * state.
		 */
		case -EAGAIN:
			KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
					libcfs_nid2str(tn->tn_kp->kp_nid),
					tn->tn_target_addr);
			return -EAGAIN;

		default:
			KFILND_TN_ERROR(tn,
					"Failed to post send to %s(%#llx): rc=%d",
					libcfs_nid2str(tn->tn_kp->kp_nid),
					tn->tn_target_addr, rc);
			if (event == TN_EVENT_TX_HELLO)
				kfilnd_peer_clear_hello_state(tn->tn_kp);
			kfilnd_tn_status_update(tn, rc,
						LNET_MSG_STATUS_LOCAL_ERROR);
		}
		break;

	case TN_EVENT_INIT_BULK:
		/* Post tagged receive buffer used to land bulk response. */
		rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);

		switch (rc) {
		/* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
		case 0:
			kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);

			/* Propogate TN_EVENT_INIT_BULK event to
			 * TN_STATE_TAGGED_RECV_POSTED handler.
			 */
			return kfilnd_tn_state_tagged_recv_posted(tn, event,
								  rc,
								  tn_released);

		/* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
		 * state.
		 */
		case -EAGAIN:
			KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
			return -EAGAIN;

		default:
			KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
					rc);
			kfilnd_tn_status_update(tn, rc,
						LNET_MSG_STATUS_LOCAL_ERROR);
		}
		break;

	case TN_EVENT_RX_OK:
		if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
			rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
						       tn->tn_ep->end_cpt,
						       tn->tn_kp);
			if (rc)
				KFILND_TN_ERROR(tn,
						"Failed to send hello request: rc=%d",
						rc);
			rc = 0;
		}

		msg = tn->tn_rx_msg.msg;

		if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
			if (msg->type == KFILND_MSG_IMMEDIATE ||
			    msg->version == KFILND_MSG_VERSION_2) {
				tn->tn_early_rx = true;
				KFILND_TN_DEBUG(tn, "Replay early rx\n");
				return -EAGAIN;
			}
			KFILND_TN_ERROR(tn,
					"Dropping message from %s due to stale peer",
					libcfs_nid2str(tn->tn_kp->kp_nid));
			kfilnd_tn_status_update(tn, -EPROTO,
						LNET_MSG_STATUS_LOCAL_DROPPED);
			rc = 0;
			goto out;
		}

		LASSERT(kfilnd_peer_is_new_peer(tn->tn_kp) == false);

		kfilnd_peer_alive(tn->tn_kp);

		/* Pass message up to LNet
		 * The TN will be reused in this call chain so we need to
		 * release the lock on the TN before proceeding.
		 */
		KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
				tn_state_to_str(tn->tn_state));

		/* TODO: Do not manually update this state change. */
		tn->tn_state = TN_STATE_IMM_RECV;
		mutex_unlock(&tn->tn_lock);
		*tn_released = true;
		lnet_nid4_to_nid(msg->srcnid, &srcnid);
		if (msg->type == KFILND_MSG_IMMEDIATE) {
			lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
			rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
					&hdr, &srcnid, tn, 0);
		} else {
			if (msg->version == KFILND_MSG_VERSION_1)
				lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
			else
				lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req_v2.kbrm2_hdr);
			rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
					&hdr, &srcnid, tn, 1);
		}

		/* If successful, transaction has been accepted by LNet and we
		 * cannot process the transaction anymore within this context.
		 */
		if (!rc)
			return 0;

		KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
		kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
		break;

	case TN_EVENT_RX_HELLO:
		msg = tn->tn_rx_msg.msg;

		kfilnd_peer_alive(tn->tn_kp);

		switch (msg->type) {
		case KFILND_MSG_HELLO_REQ:
			if (CFS_FAIL_CHECK(CFS_KFI_REPLAY_RX_HELLO_REQ)) {
				CDEBUG(D_NET, "Replay RX HELLO_REQ\n");
				return -EAGAIN;
			}

			if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
				rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
							       tn->tn_ep->end_cpt,
							       tn->tn_kp);
				if (rc)
					KFILND_TN_ERROR(tn,
							"Failed to send hello request: rc=%d",
							rc);
				rc = 0;
			}

			kfilnd_peer_process_hello(tn->tn_kp, msg);
			tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
			KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
					libcfs_nid2str(tn->tn_kp->kp_nid),
					tn->tn_target_addr);

			kfilnd_tn_pack_hello_rsp(tn);

			/* Send immediate message. */
			rc = kfilnd_ep_post_send(tn->tn_ep, tn);
			switch (rc) {
			case 0:
				kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
				return 0;

			case -EAGAIN:
				KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
						libcfs_nid2str(tn->tn_kp->kp_nid),
						tn->tn_target_addr);
				return -EAGAIN;

			default:
				KFILND_TN_ERROR(tn,
						"Failed to post send to %s(%#llx): rc=%d",
						libcfs_nid2str(tn->tn_kp->kp_nid),
						tn->tn_target_addr, rc);
				kfilnd_tn_status_update(tn, rc,
							LNET_MSG_STATUS_LOCAL_ERROR);
			}
			break;

		case KFILND_MSG_HELLO_RSP:
			rc = 0;
			kfilnd_peer_process_hello(tn->tn_kp, msg);
			finalize = true;
			break;

		default:
			KFILND_TN_ERROR(tn, "Invalid message type: %s",
					msg_type_to_str(msg->type));
			LBUG();
		}
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

out:
	if (kfilnd_tn_has_failed(tn))
		finalize = true;

	if (finalize)
		kfilnd_tn_finalize(tn, tn_released);

	return rc;
}

static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
				    enum tn_events event, int status,
				    bool *tn_released)
{
	enum lnet_msg_hstatus hstatus;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_TX_FAIL:
		if (status == -ETIMEDOUT || status == -EIO)
			hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
		else
			hstatus = LNET_MSG_STATUS_REMOTE_ERROR;

		kfilnd_tn_status_update(tn, status, hstatus);
		/* RKEY is not involved in immediate sends, so no need to
		 * delete peer
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, status, false);
		if (tn->msg_type == KFILND_MSG_HELLO_REQ)
			kfilnd_peer_clear_hello_state(tn->tn_kp);
		break;

	case TN_EVENT_TX_OK:
		kfilnd_peer_alive(tn->tn_kp);
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	kfilnd_tn_finalize(tn, tn_released);

	return 0;
}

static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
				    enum tn_events event, int status,
				    bool *tn_released)
{
	int rc = 0;
	bool finalize = false;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_INIT_TAG_RMA:
	case TN_EVENT_SKIP_TAG_RMA:
		/* Release the buffer we received the request on. All relevant
		 * information to perform the RMA operation is stored in the
		 * transaction structure. This should be done before the RMA
		 * operation to prevent two contexts from potentially processing
		 * the same transaction.
		 *
		 * TODO: Prevent this from returning -EAGAIN.
		 */
		if (tn->tn_posted_buf) {
			kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
			tn->tn_posted_buf = NULL;
		}

		/* Update the KFI address to use the response RX context. */
		tn->tn_target_addr =
			kfi_rx_addr(KFILND_BASE_ADDR(tn->tn_kp->kp_addr),
				    tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
		KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
				libcfs_nid2str(tn->tn_kp->kp_nid),
				tn->tn_target_addr);

		/* Initiate the RMA operation to push/pull the LNet payload or
		 * send a tagged message to finalize the bulk operation if the
		 * RMA operation should be skipped.
		 */
		if (event == TN_EVENT_INIT_TAG_RMA) {
			if (tn->sink_buffer)
				rc = kfilnd_ep_post_read(tn->tn_ep, tn);
			else
				rc = kfilnd_ep_post_write(tn->tn_ep, tn);

			switch (rc) {
			/* Async tagged RMA event will progress transaction. */
			case 0:
				kfilnd_tn_state_change(tn,
						       TN_STATE_WAIT_TAG_RMA_COMP);
				return 0;

			/* Need to replay TN_EVENT_INIT_TAG_RMA event while in
			 * the TN_STATE_IMM_RECV state.
			 */
			case -EAGAIN:
				KFILND_TN_DEBUG(tn,
						"Need to replay tagged %s to %s(%#llx)",
						tn->sink_buffer ? "read" : "write",
						libcfs_nid2str(tn->tn_kp->kp_nid),
						tn->tn_target_addr);
				return -EAGAIN;

			default:
				KFILND_TN_ERROR(tn,
						"Failed to post tagged %s to %s(%#llx): rc=%d",
						tn->sink_buffer ? "read" : "write",
						libcfs_nid2str(tn->tn_kp->kp_nid),
						tn->tn_target_addr, rc);
				kfilnd_tn_status_update(tn, rc,
							LNET_MSG_STATUS_LOCAL_ERROR);
			}
		} else {
			kfilnd_tn_status_update(tn, status,
						LNET_MSG_STATUS_OK);

			/* Since the LNet initiator has posted a unique tagged
			 * buffer specific for this LNet transaction and the
			 * LNet target has decide not to push/pull to/for the
			 * LNet initiator tagged buffer, a noop operation is
			 * done to this tagged buffer (i/e payload transfer size
			 * is zero). But, immediate data, which contains the
			 * LNet target status for the transaction, is sent to
			 * the LNet initiator. Immediate data only appears in
			 * the completion event at the LNet initiator and not in
			 * the tagged buffer.
			 */
			tn->tagged_data = cpu_to_be64(abs(tn->tn_status));

			rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
			switch (rc) {
			/* Async tagged RMA event will progress transaction. */
			case 0:
				kfilnd_tn_state_change(tn,
						       TN_STATE_WAIT_TAG_COMP);
				return 0;

			/* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
			 * the TN_STATE_IMM_RECV state.
			 */
			case -EAGAIN:
				KFILND_TN_DEBUG(tn,
						"Need to replay tagged send to %s(%#llx)",
						libcfs_nid2str(tn->tn_kp->kp_nid),
						tn->tn_target_addr);
				return -EAGAIN;

			default:
				KFILND_TN_ERROR(tn,
						"Failed to post tagged send to %s(%#llx): rc=%d",
						libcfs_nid2str(tn->tn_kp->kp_nid),
						tn->tn_target_addr, rc);
				kfilnd_tn_status_update(tn, rc,
							LNET_MSG_STATUS_LOCAL_ERROR);
			}
		}
		break;

	case TN_EVENT_RX_OK:
		finalize = true;
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	if (kfilnd_tn_has_failed(tn))
		finalize = true;

	if (finalize)
		kfilnd_tn_finalize(tn, tn_released);

	return rc;
}

static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
				     enum tn_events event, int status,
				     bool *tn_released)
{
	int rc;
	enum lnet_msg_hstatus hstatus;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_TX_OK:
		if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
		    CFS_FAIL_CHECK_RESET(CFS_KFI_FAIL_WAIT_SEND_COMP1,
					 CFS_KFI_FAIL_WAIT_SEND_COMP2 |
					 CFS_FAIL_ONCE))
			break;
		if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ ||
			     tn->msg_type == KFILND_MSG_BULK_GET_REQ) &&
		    CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP3)) {
			hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
			kfilnd_tn_status_update(tn, -EIO, hstatus);
			/* Don't delete peer on debug/test path */
			kfilnd_peer_tn_failed(tn->tn_kp, -EIO, false);
			kfilnd_tn_state_change(tn, TN_STATE_FAIL);
			break;
		}
		kfilnd_peer_alive(tn->tn_kp);
		kfilnd_tn_timeout_enable(tn);
		kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
		break;

	case TN_EVENT_TAG_RX_OK:
		if (status)
			kfilnd_tn_status_update(tn, status, LNET_MSG_STATUS_OK);

		kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
		if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
		    CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP2)) {
			struct kfi_cq_err_entry fake_error = {
				.op_context = tn,
				.flags = KFI_MSG | KFI_SEND,
				.err = EIO,
			};

			kfilnd_ep_gen_fake_err(tn->tn_ep, &fake_error);
		}
		break;

	case TN_EVENT_TX_FAIL:
		if (status == -ETIMEDOUT)
			hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
		else
			hstatus = LNET_MSG_STATUS_REMOTE_ERROR;

		kfilnd_tn_status_update(tn, status, hstatus);
		/* The bulk request message failed, however, there is an edge
		 * case where the last request packet of a message is received
		 * at the target successfully, but the corresponding response
		 * packet is repeatedly dropped. This results in the target
		 * generating a success completion event but the initiator
		 * generating an error completion event. Due to this, we have to
		 * delete the peer here to protect the RKEY.
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, status, true);

		/* Need to cancel the tagged receive to prevent resources from
		 * being leaked.
		 */
		rc = kfilnd_tn_cancel_tag_recv(tn);

		switch (rc) {
		/* Async cancel event will progress transaction. */
		case 0:
			kfilnd_tn_status_update(tn, status,
						LNET_MSG_STATUS_LOCAL_ERROR);
			kfilnd_tn_state_change(tn, TN_STATE_FAIL);
			return 0;

		/* Need to replay TN_EVENT_INIT_BULK event while in the
		 * TN_STATE_SEND_FAILED state.
		 */
		case -EAGAIN:
			KFILND_TN_DEBUG(tn,
					"Need to replay cancel tagged recv");
			return -EAGAIN;

		default:
			KFILND_TN_ERROR(tn,
					"Unexpected error during cancel tagged receive: rc=%d",
					rc);
			LBUG();
		}
		break;

	case TN_EVENT_TAG_RX_FAIL:
		kfilnd_tn_status_update(tn, status,
					LNET_MSG_STATUS_LOCAL_ERROR);
		/* The target may hold a reference to the RKEY, so we need to
		 * delete the peer to protect it
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, status, true);
		kfilnd_tn_state_change(tn, TN_STATE_FAIL);
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	return 0;
}

static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
					  enum tn_events event, int status,
					  bool *tn_released)
{
	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_TX_OK:
		kfilnd_peer_alive(tn->tn_kp);
		break;
	case TN_EVENT_TX_FAIL:
		kfilnd_tn_status_update(tn, status,
					LNET_MSG_STATUS_NETWORK_TIMEOUT);
		/* The bulk request message was never queued so we do not need
		 * to delete the peer
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, status, false);
		break;
	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	kfilnd_tn_finalize(tn, tn_released);

	return 0;
}

static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
					     enum tn_events event, int status,
					     bool *tn_released)
{
	enum lnet_msg_hstatus hstatus;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_TAG_TX_OK:
		kfilnd_peer_alive(tn->tn_kp);
		break;

	case TN_EVENT_TAG_TX_FAIL:
		if (status == -ETIMEDOUT)
			hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
		else
			hstatus = LNET_MSG_STATUS_REMOTE_ERROR;

		kfilnd_tn_status_update(tn, status, hstatus);
		/* This event occurrs at the target of a bulk LNetPut/Get.
		 * Since the target did not generate the RKEY, we needn't
		 * delete the peer.
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, status, false);
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	kfilnd_tn_finalize(tn, tn_released);

	return 0;
}

static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
					 enum tn_events event, int status,
					 bool *tn_released)
{
	int rc;
	enum lnet_msg_hstatus hstatus;

	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_TAG_RX_FAIL:
	case TN_EVENT_TAG_RX_OK:
		/* Status can be set for both TN_EVENT_TAG_RX_FAIL and
		 * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
		 * LNet target returned -ENODATA.
		 */
		if (status) {
			if (event == TN_EVENT_TAG_RX_FAIL)
				kfilnd_tn_status_update(tn, status,
							LNET_MSG_STATUS_LOCAL_ERROR);
			else
				kfilnd_tn_status_update(tn, status,
							LNET_MSG_STATUS_OK);
		}

		if (!kfilnd_tn_timeout_cancel(tn)) {
			kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
			return 0;
		}
		break;

	case TN_EVENT_TIMEOUT:
		/* Need to cancel the tagged receive to prevent resources from
		 * being leaked.
		 */
		rc = kfilnd_tn_cancel_tag_recv(tn);

		switch (rc) {
		/* Async cancel event will progress transaction. */
		case 0:
			kfilnd_tn_state_change(tn,
					       TN_STATE_WAIT_TIMEOUT_TAG_COMP);
			return 0;

		/* Need to replay TN_EVENT_INIT_BULK event while in the
		 * TN_STATE_WAIT_TAG_COMP state.
		 */
		case -EAGAIN:
			KFILND_TN_DEBUG(tn,
					"Need to replay cancel tagged recv");
			return -EAGAIN;

		default:
			KFILND_TN_ERROR(tn,
					"Unexpected error during cancel tagged receive: rc=%d",
					rc);
			LBUG();
		}
		break;

	case TN_EVENT_TAG_TX_FAIL:
		if (status == -ETIMEDOUT)
			hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
		else
			hstatus = LNET_MSG_STATUS_REMOTE_ERROR;

		kfilnd_tn_status_update(tn, status, hstatus);
		/* This event occurrs at the target of a bulk LNetPut/Get.
		 * Since the target did not generate the RKEY, we needn't
		 * delete the peer.
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, status, false);
		break;

	case TN_EVENT_TAG_TX_OK:
		kfilnd_peer_alive(tn->tn_kp);
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	kfilnd_tn_finalize(tn, tn_released);

	return 0;
}

static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
				enum tn_events event, int status,
				bool *tn_released)
{
	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_TX_FAIL:
		/* Prior TN states will have deleted the peer if necessary */
		kfilnd_peer_tn_failed(tn->tn_kp, status, false);
		break;

	case TN_EVENT_TX_OK:
		kfilnd_peer_alive(tn->tn_kp);
		break;

	case TN_EVENT_TAG_RX_OK:
		kfilnd_peer_alive(tn->tn_kp);
		if (tn->tn_status != status) {
			KFILND_TN_DEBUG(tn, "%d -> %d status change",
					tn->tn_status, status);
			tn->tn_status = status;
		}
		if (tn->hstatus != LNET_MSG_STATUS_OK) {
			KFILND_TN_DEBUG(tn, "%d -> %d health status change",
					tn->hstatus, LNET_MSG_STATUS_OK);
			tn->hstatus = LNET_MSG_STATUS_OK;
		}
		break;

	case TN_EVENT_TAG_RX_FAIL:
	case TN_EVENT_TAG_RX_CANCEL:
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	kfilnd_tn_finalize(tn, tn_released);

	return 0;
}

static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
						 enum tn_events event,
						 int status, bool *tn_released)
{
	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	switch (event) {
	case TN_EVENT_TAG_RX_CANCEL:
		kfilnd_tn_status_update(tn, -ETIMEDOUT,
					LNET_MSG_STATUS_NETWORK_TIMEOUT);
		/* We've cancelled locally, but the target may still have a ref
		 * on the RKEY. Delete the peer to protect it.
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, -ETIMEDOUT, true);
		break;

	case TN_EVENT_TAG_RX_FAIL:
		kfilnd_tn_status_update(tn, status,
					LNET_MSG_STATUS_LOCAL_ERROR);
		/* The initiator of a bulk LNetPut/Get eagerly sends the bulk
		 * request message to the target without ensuring the tagged
		 * receive buffer is posted. Thus, the target could be issuing
		 * kfi_write/read operations using the tagged receive buffer
		 * RKEY, and we need to delete this peer to protect the it.
		 */
		kfilnd_peer_tn_failed(tn->tn_kp, status, true);
		break;

	case TN_EVENT_TAG_RX_OK:
		break;

	default:
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	kfilnd_tn_finalize(tn, tn_released);

	return 0;
}

static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
					     enum tn_events event, int status,
					     bool *tn_released)
{
	KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
			status);

	if (event == TN_EVENT_TIMEOUT) {
		kfilnd_tn_finalize(tn, tn_released);
	} else {
		KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
		LBUG();
	}

	return 0;
}

static int
(* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
						       enum tn_events event,
						       int status,
						       bool *tn_released) = {
	[TN_STATE_IDLE] = kfilnd_tn_state_idle,
	[TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
	[TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
	[TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
	[TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
	[TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
	[TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
	[TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
	[TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
		kfilnd_tn_state_wait_timeout_tag_comp,
	[TN_STATE_FAIL] = kfilnd_tn_state_fail,
	[TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
	[TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
};

/**
 * kfilnd_tn_event_handler() - Update transaction state machine with an event.
 * @tn: Transaction to be updated.
 * @event: Transaction event.
 * @status: Errno status associated with the event.
 *
 * When the transaction event handler is first called on a new transaction, the
 * transaction is now own by the transaction system. This means that will be
 * freed by the system as the transaction is progressed through the state
 * machine.
 */
void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
			     enum tn_events event, int status)
{
	bool tn_released = false;
	int rc;

	if (!tn)
		return;

	mutex_lock(&tn->tn_lock);
	rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
							  &tn_released);
	if (rc == -EAGAIN) {
		tn->replay_event = event;
		tn->replay_status = status;
		kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
	}

	if (!tn_released)
		mutex_unlock(&tn->tn_lock);
}

#ifdef HAVE_KFI_SGL
static void kfilnd_tn_sgt_free(struct kfilnd_transaction *tn)
{
	/* Restore orig_nents before free */
	tn->tn_sgt.orig_nents = tn->tn_sgt_alloc_nents;
	sg_free_table(&tn->tn_sgt);
}
#endif

/**
 * kfilnd_tn_free() - Free a transaction.
 */
void kfilnd_tn_free(struct kfilnd_transaction *tn)
{
	spin_lock(&tn->tn_ep->tn_list_lock);
	list_del(&tn->tn_entry);
	spin_unlock(&tn->tn_ep->tn_list_lock);

	KFILND_TN_DEBUG(tn, "Transaction freed");

	if (tn->tn_mr_key)
		kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);

#ifdef HAVE_KFI_SGL
	if (tn->tn_sgt_mapped)
		kfilnd_tn_sgt_free(tn);
#endif

	/* Free send message buffer if needed. */
	if (tn->tn_tx_msg.msg)
		mempool_free(tn->tn_tx_msg.msg, imm_buf_cache_mp);

	mempool_free(tn, tn_cache_mp);
}

/*
 * Allocation logic common to kfilnd_tn_alloc() and kfilnd_tn_alloc_for_hello().
 * @ep: The KFI LND endpoint to associate with the transaction.
 * @kp: The kfilnd peer to associate with the transaction.
 * See kfilnd_tn_alloc() for a description of the other fields
 * Note: Caller must have a reference on @kp
 */
static struct kfilnd_transaction *kfilnd_tn_alloc_common(struct kfilnd_ep *ep,
							 struct kfilnd_peer *kp,
							 bool alloc_msg,
							 bool is_initiator,
							 u16 key)
{
	struct kfilnd_transaction *tn;
	int rc;
	ktime_t tn_alloc_ts;

	tn_alloc_ts = ktime_get();

	tn = mempool_alloc(tn_cache_mp, GFP_NOFS);
	if (!tn) {
		rc = -ENOMEM;
		goto err;
	}
	/* mempool_alloc doesn't zero, so manually zero the structure */
	memset(tn, 0, sizeof(*tn));

	if (alloc_msg) {
		tn->tn_tx_msg.msg = mempool_alloc(imm_buf_cache_mp, GFP_NOFS);
		if (!tn->tn_tx_msg.msg) {
			rc = -ENOMEM;
			goto err_free_tn;
		}
	}

	tn->tn_mr_key = key;

	tn->tn_kp = kp;

	mutex_init(&tn->tn_lock);
	tn->tn_ep = ep;
	tn->tn_response_rx = ep->end_context_id;
	tn->tn_state = TN_STATE_IDLE;
	tn->hstatus = LNET_MSG_STATUS_OK;
	tn->deadline = ktime_get_seconds() + kfilnd_timeout();
	tn->tn_replay_deadline = ktime_sub(tn->deadline,
					   (kfilnd_timeout() / 2));
	tn->is_initiator = is_initiator;
	INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);

	/* Add the transaction to an endpoint.  This is like
	 * incrementing a ref counter.
	 */
	spin_lock(&ep->tn_list_lock);
	list_add_tail(&tn->tn_entry, &ep->tn_list);
	spin_unlock(&ep->tn_list_lock);

	tn->tn_alloc_ts = tn_alloc_ts;
	tn->tn_state_ts = ktime_get();

	KFILND_EP_DEBUG(ep, "TN %p allocated with tmk %u", tn, tn->tn_mr_key);

	return tn;

err_free_tn:
	mempool_free(tn, tn_cache_mp);
err:
	return ERR_PTR(rc);
}

static struct kfilnd_ep *kfilnd_dev_to_ep(struct kfilnd_dev *dev, int cpt)
{
	struct kfilnd_ep *ep;

	if (!dev)
		return ERR_PTR(-EINVAL);

	ep = dev->cpt_to_endpoint[cpt];
	if (!ep) {
		CWARN("%s used invalid cpt=%d\n",
		      libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
		ep = dev->kfd_endpoints[0];
	}

	return ep;
}

/**
 * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
 * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
 * transaction.
 * @cpt: CPT of the transaction.
 * @target_nid: Target NID of the transaction.
 * @alloc_msg: Allocate an immediate message for the transaction.
 * @is_initiator: Is initiator of LNet transaction.
 * @need_key: Is transaction memory region key needed.
 *
 * During transaction allocation, each transaction is associated with a KFI LND
 * endpoint use to post data transfer operations. The CPT argument is used to
 * lookup the KFI LND endpoint within the KFI LND device.
 *
 * Return: On success, valid pointer. Else, negative errno pointer.
 */
struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
					   lnet_nid_t target_nid,
					   bool alloc_msg, bool is_initiator,
					   bool need_key)
{
	struct kfilnd_transaction *tn;
	struct kfilnd_ep *ep;
	struct kfilnd_peer *kp;
	int rc;
	u16 key = 0;

	ep = kfilnd_dev_to_ep(dev, cpt);
	if (IS_ERR(ep)) {
		rc = PTR_ERR(ep);
		goto err;
	}

	/* Consider the following:
	 * Thread 1: Posts tagged receive with RKEY based on
	 *           peerA::kp_local_session_key X and tn_mr_key Y
	 * Thread 2: Fetches peerA with kp_local_session_key X
	 * Thread 1: Cancels tagged receive, marks peerA for removal, and
	 *	     releases tn_mr_key Y
	 * Thread 2: allocates tn_mr_key Y
	 * At this point, thread 2 has the same RKEY used by thread 1.
	 * Thus, we always allocate the tn_mr_key before looking up the peer,
	 * and we always mark peers for removal before releasing tn_mr_key.
	 */
	if (need_key) {
		rc = kfilnd_ep_get_key(ep);
		if (rc < 0)
			goto err;
		key = rc;
	}

	kp = kfilnd_peer_get(dev, target_nid);
	if (IS_ERR(kp)) {
		rc = PTR_ERR(kp);
		goto err_put_key;
	}

	tn = kfilnd_tn_alloc_common(ep, kp, alloc_msg, is_initiator, key);
	if (IS_ERR(tn)) {
		rc = PTR_ERR(tn);
		kfilnd_peer_put(kp);
		goto err_put_key;
	}

	return tn;

err_put_key:
	if (need_key)
		kfilnd_ep_put_key(ep, key);
err:
	return ERR_PTR(rc);
}

/* Like kfilnd_tn_alloc(), but caller already looked up the kfilnd_peer.
 * Used only to allocate a TN for a hello request.
 * See kfilnd_tn_alloc()/kfilnd_tn_alloc_comm()
 * Note: Caller must have a reference on @kp
 */
struct kfilnd_transaction *kfilnd_tn_alloc_for_hello(struct kfilnd_dev *dev, int cpt,
						     struct kfilnd_peer *kp)
{
	struct kfilnd_transaction *tn;
	struct kfilnd_ep *ep;
	int rc;

	ep = kfilnd_dev_to_ep(dev, cpt);
	if (IS_ERR(ep)) {
		rc = PTR_ERR(ep);
		goto err;
	}

	tn = kfilnd_tn_alloc_common(ep, kp, true, true, 0);
	if (IS_ERR(tn)) {
		rc = PTR_ERR(tn);
		goto err;
	}

	return tn;

err:
	return ERR_PTR(rc);
}

/**
 * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
 *
 * This function should only be called when there are no outstanding
 * transactions.
 */
void kfilnd_tn_cleanup(void)
{
	mempool_destroy(imm_buf_cache_mp);
	mempool_destroy(tn_cache_mp);
	kmem_cache_destroy(imm_buf_cache);
	kmem_cache_destroy(tn_cache);
}

/**
 * kfilnd_tn_get_mempool_stats() - Get mempool statistics.
 * @tn_min: Pointer to store transaction mempool min_nr
 * @tn_curr: Pointer to store transaction mempool curr_nr
 * @msg_min: Pointer to store message buffer mempool min_nr
 * @msg_curr: Pointer to store message buffer mempool curr_nr
 *
 * Return: 0 if mempools are initialized, -EINVAL otherwise.
 */
int kfilnd_tn_get_mempool_stats(int *tn_min, int *tn_curr,
				int *msg_min, int *msg_curr)
{
	if (!tn_cache_mp || !imm_buf_cache_mp)
		return -EINVAL;

	*tn_min = tn_cache_mp->min_nr;
	*tn_curr = tn_cache_mp->curr_nr;
	*msg_min = imm_buf_cache_mp->min_nr;
	*msg_curr = imm_buf_cache_mp->curr_nr;

	return 0;
}

/**
 * kfilnd_tn_init() - Initialize KFI LND transaction system.
 *
 * Return: On success, zero. Else, negative errno.
 */
int kfilnd_tn_init(void)
{
	int num_cpts = cfs_cpt_number(lnet_cpt_table());
	int min_tn, min_msg;
	int reserve_min, msg_min, credits;

#define KFILND_RESERVE_SAFETY_FACTOR 2

	/* Calculate reserves: peer_credits * num_cpts * safety_factor */
	reserve_min = kfilnd_get_tn_reserve_min();
	msg_min = kfilnd_get_msg_reserve_min();
	credits = kfilnd_get_peer_credits();

	if (reserve_min < 0)
		min_tn = credits * num_cpts * KFILND_RESERVE_SAFETY_FACTOR;
	else
		min_tn = reserve_min;

	if (msg_min < 0)
		min_msg = credits * num_cpts * KFILND_RESERVE_SAFETY_FACTOR;
	else
		min_msg = msg_min;

	CDEBUG(D_NET, "kfilnd: mempool reserves: %d transactions, %d buffers (peer_credits=%d, num_cpts=%d, safety=%d)\n",
	       min_tn, min_msg, credits, num_cpts,
	       KFILND_RESERVE_SAFETY_FACTOR);

	tn_cache = kmem_cache_create("kfilnd_tn",
				     sizeof(struct kfilnd_transaction), 0,
				     SLAB_HWCACHE_ALIGN, NULL);
	if (!tn_cache)
		goto err;

	imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
					  KFILND_IMMEDIATE_MSG_SIZE, 0,
					  SLAB_HWCACHE_ALIGN, NULL);
	if (!imm_buf_cache)
		goto err_tn_cache_destroy;

	tn_cache_mp = mempool_create_slab_pool(min_tn, tn_cache);
	if (!tn_cache_mp)
		goto err_imm_buf_cache_destroy;

	CDEBUG(D_NET, "Created tn_cache_mp with %d reserves\n", min_tn);

	imm_buf_cache_mp = mempool_create_slab_pool(min_msg, imm_buf_cache);
	if (!imm_buf_cache_mp)
		goto err_tn_cache_mp_destroy;

	CDEBUG(D_NET, "Created imm_buf_cache_mp with %d reserves\n", min_msg);

	/* Initialize debugfs mempool stats */
	debugfs_create_file("mempool_stats", 0444, kfilnd_debug_dir, NULL,
			    &kfilnd_mempool_stats_file_ops);

	return 0;

err_tn_cache_mp_destroy:
	mempool_destroy(tn_cache_mp);
err_imm_buf_cache_destroy:
	kmem_cache_destroy(imm_buf_cache);
err_tn_cache_destroy:
	kmem_cache_destroy(tn_cache);
err:
	return -ENOMEM;
}

#ifdef HAVE_KFI_SGL
/**
 * kfilnd_tn_set_sgl_buf - Set up scatter-gather list for transaction
 * @ni: LNet network interface
 * @tn: Transaction structure to configure
 * @kiov: Array of bio_vec structures describing the buffer
 * @num_iov: Number of elements in kiov array
 * @offset: Byte offset into the kiov array where data starts
 * @nob: Number of bytes to map
 *
 * This function creates and maps a scatter-gather table for DMA operations.
 * It handles both GPU and non-GPU buffers differently.
 *
 * Return: 0 on success, negative errno on failure
 */
static int kfilnd_tn_set_sgl_buf(struct lnet_ni *ni,
				 struct kfilnd_transaction *tn,
				 struct bio_vec *kiov, int num_iov, int offset,
				 int nob)
{
	struct kfilnd_dev *dev = ni->ni_data;
	struct scatterlist *sg;
	int fragnob;
	int max_nkiov;
	int sg_count = 0;
	int rc = 0;

	tn->tn_nob = nob;

	while (offset >= kiov->bv_len) {
		offset -= kiov->bv_len;
		num_iov--;
		kiov++;
		LASSERT(num_iov > 0);
	}

	max_nkiov = num_iov;
	rc = sg_alloc_table(&tn->tn_sgt, max_nkiov, GFP_NOFS);
	if (rc) {
		CERROR("%s: sg_alloc_table failed rc = %d\n",
		       ni->ni_interface, rc);
		return rc;
	}

	sg = tn->tn_sgt.sgl;
	do {
		LASSERT(num_iov > 0);

		if (!sg) {
			CERROR("%s: lacking enough sg entries to map tx: rc = %d\n",
			       ni->ni_interface, -EFAULT);
			sg_free_table(&tn->tn_sgt);
			return -EFAULT;
		}

		sg_count++;

		fragnob = min_t(int, (kiov->bv_len - offset), nob);

		sg_set_page(sg, kiov->bv_page, fragnob,
			    kiov->bv_offset + offset);
		sg = sg_next(sg);

		offset = 0;
		kiov++;
		num_iov--;
		nob -= fragnob;
	} while (nob > 0);

	tn->tn_dmadir = tn->sink_buffer ? DMA_FROM_DEVICE : DMA_TO_DEVICE;

	/* Save orig_nents so it can be restored prior to sg_free_table() */
	tn->tn_sgt_alloc_nents = tn->tn_sgt.orig_nents;

	/* dma_[un]map_sgtable() expects all orig_nents segments to be
	 * populated, but only sg_count of them are actually populated.
	 */
	tn->tn_sgt.orig_nents = sg_count;

	if (tn->tn_gpu) {
		rc = lnet_rdma_map_sg_attrs(dev->device, tn->tn_sgt.sgl,
					    sg_count, tn->tn_dmadir);
		if (rc > 0)
			tn->tn_sgt.nents = rc;
	} else {
		rc = dma_map_sgtable(dev->device, &tn->tn_sgt, tn->tn_dmadir,
				     0);
	}

	if (rc < 0) {
		CERROR("%s: %s failed rc = %d\n", ni->ni_interface,
		       tn->tn_gpu ? "lnet_rdma_map_sg_attrs" :
		       "dma_map_sgtable", rc);
		kfilnd_tn_sgt_free(tn);
		return rc;
	}

	/* Set tn_sgt_mapped so kfilnd_tn_free() will free tn_sgt */
	tn->tn_sgt_mapped = true;

	CDEBUG(D_NET,
	       "tn %p tn_sgt %p sgl %p dir %u nob %d alloc_nents %u orig_nents %u nents %u gpu %s\n",
	       tn, &tn->tn_sgt, tn->tn_sgt.sgl, tn->tn_dmadir, tn->tn_nob,
	       tn->tn_sgt_alloc_nents, tn->tn_sgt.orig_nents, tn->tn_sgt.nents,
	       tn->tn_gpu ? "y" : "n");

	return 0;
}

#else

static int kfilnd_tn_set_kiov_buf(struct lnet_ni *ni,
				  struct kfilnd_transaction *tn,
				  struct bio_vec *kiov, size_t num_iov,
				  size_t offset, size_t len)
{
	size_t i;
	size_t cur_len = 0;
	size_t cur_offset = offset;
	size_t cur_iov = 0;
	size_t tmp_len;
	size_t tmp_offset;

	for (i = 0; (i < num_iov) && (cur_len < len); i++) {
		/* Skip KIOVs until a KIOV with a length less than the current
		 * offset is found.
		 */
		if (kiov[i].bv_len <= cur_offset) {
			cur_offset -= kiov[i].bv_len;
			continue;
		}

		tmp_len = kiov[i].bv_len - cur_offset;
		tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;

		if (tmp_len + cur_len > len)
			tmp_len = len - cur_len;

		/* tn_kiov is an array of size LNET_MAX_IOV */
		if (cur_iov >= LNET_MAX_IOV)
			return -EINVAL;

		tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
		tn->tn_kiov[cur_iov].bv_len = tmp_len;
		tn->tn_kiov[cur_iov].bv_offset = tmp_offset;

		cur_iov++;
		cur_len += tmp_len;
		cur_offset = 0;
	}

	tn->tn_num_iovec = cur_iov;
	tn->tn_nob = cur_len;

	return 0;
}
#endif /* HAVE_KFI_SGL */

/**
 * kfilnd_tn_set_buf() - Set the buffer used for a transaction.
 * @tn: Transaction to have buffer set.
 * @kiov: LNet KIOV buffer.
 * @num_iov: Number of IOVs.
 * @offset: Offset into IOVs where the buffer starts.
 * @len: Length of the buffer.
 *
 * This function takes the user provided IOV, offset, and len, and sets the
 * transaction buffer. The user provided IOV is an LNet KIOV. When the
 * transaction buffer is configured, the user provided offset is applied
 * when the transaction buffer is configured (i.e. the transaction buffer
 * offset is zero).
 */
int kfilnd_tn_set_buf(struct lnet_ni *ni, struct kfilnd_transaction *tn,
		      struct bio_vec *kiov, int num_iov, int offset, int nob)
{
#ifdef HAVE_KFI_SGL
	return kfilnd_tn_set_sgl_buf(ni, tn, kiov, num_iov, offset, nob);
#else
	return kfilnd_tn_set_kiov_buf(ni, tn, kiov, num_iov, offset, nob);
#endif
}