Viewing: kfilnd_cq.c

// SPDX-License-Identifier: GPL-2.0

/*
 * Copyright 2022 Hewlett Packard Enterprise Development LP
 */

/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * kfilnd completion queue.
 */

#include <linux/idr.h>
#include <linux/mutex.h>
#include <linux/byteorder/generic.h>

#include "kfilnd_cq.h"
#include "kfilnd_tn.h"
#include "kfilnd_ep.h"

void kfilnd_cq_process_error(struct kfilnd_ep *ep,
			     struct kfi_cq_err_entry *error)
{
	struct kfilnd_immediate_buffer *buf;
	struct kfilnd_transaction *tn;
	enum tn_events tn_event;
	int status;

	switch (error->flags) {
	case KFI_MSG | KFI_RECV:
		if (error->err != ECANCELED) {
			KFILND_EP_ERROR(ep, "Dropping error receive event %d",
					-error->err);
			return;
		}
		fallthrough;
	case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
		buf = error->op_context;
		kfilnd_ep_imm_buffer_put(buf);
		return;

	case KFI_TAGGED | KFI_RECV:
	case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
	case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
	case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
		tn = error->op_context;
		if (error->err == ECANCELED) {
			tn_event = TN_EVENT_TAG_RX_CANCEL;
			status = 0;
		} else {
			tn_event = TN_EVENT_TAG_RX_FAIL;
			status = -error->err;
		}
		break;

	case KFI_MSG | KFI_SEND:
		tn = error->op_context;
		tn_event = TN_EVENT_TX_FAIL;
		status = -error->err;
		KFILND_EP_ERROR(ep,
				"msg send error %d prov error %d flags %llx",
				status, -error->prov_errno, error->flags);

		break;

	case KFI_TAGGED | KFI_SEND:
	case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
	case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
		tn = error->op_context;
		tn_event = TN_EVENT_TAG_TX_FAIL;
		status = -error->err;
		KFILND_EP_ERROR(ep,
				"tagged error %d prov error %d flags %llx",
				status, -error->prov_errno, error->flags);
		break;

	default:
		LBUG();
	}

	kfilnd_tn_event_handler(tn, tn_event, status);
}

static void kfilnd_cq_process_event(struct kfi_cq_data_entry *event)
{
	struct kfilnd_immediate_buffer *buf;
	struct kfilnd_msg *rx_msg;
	struct kfilnd_transaction *tn;
	enum tn_events tn_event;
	int64_t status = 0;

	switch (event->flags) {
	case KFI_MSG | KFI_RECV:
	case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
		buf = event->op_context;
		rx_msg = event->buf;

		kfilnd_tn_process_rx_event(buf, rx_msg, event->len);

		/* If the KFI_MULTI_RECV flag is set, the buffer was
		 * unlinked.
		 */
		if (event->flags & KFI_MULTI_RECV)
			kfilnd_ep_imm_buffer_put(buf);
		return;

	case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
		status = -1 * (int64_t)be64_to_cpu(event->data);
		fallthrough;
	case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
	case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
		tn_event = TN_EVENT_TAG_RX_OK;
		tn = event->op_context;
		break;

	case KFI_TAGGED | KFI_SEND:
	case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
	case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
		tn = event->op_context;
		tn_event = TN_EVENT_TAG_TX_OK;
		break;

	case KFI_MSG | KFI_SEND:
		tn = event->op_context;
		tn_event = TN_EVENT_TX_OK;
		break;

	default:
		LBUG();
	}

	kfilnd_tn_event_handler(tn, tn_event, status);
}

static void kfilnd_cq_process_completion(struct work_struct *work)
{
	struct kfilnd_cq_work *cq_work =
		container_of(work, struct kfilnd_cq_work, work);
	struct kfilnd_cq *kfilnd_cq = cq_work->cq;
	struct kfid_cq *cq = kfilnd_cq->cq;
	struct kfi_cq_data_entry event;
	struct kfi_cq_err_entry error;
	ssize_t rc;
	bool done = false;

	/* Drain the KFI completion queue of all events and errors. */
	while (!done) {
		rc = kfi_cq_read(cq, &event, 1);
		if (rc == -KFI_EAVAIL) {
			while (kfi_cq_readerr(cq, &error, 0) == 1)
				kfilnd_cq_process_error(kfilnd_cq->ep, &error);
		} else if (rc == 1) {
			kfilnd_cq_process_event(&event);
		} else if (rc == -EAGAIN) {
			done = true;
		} else {
			KFILND_EP_ERROR(kfilnd_cq->ep, "Unexpected rc = %ld",
					rc);
			done = true;
		}
	}

	if (kfilnd_ep_replays_pending(kfilnd_cq->ep))
		kfilnd_ep_flush_replay_queue(kfilnd_cq->ep);
}

static void kfilnd_cq_completion(struct kfid_cq *cq, void *context)
{
	struct kfilnd_cq *kfilnd_cq = context;
	struct kfilnd_cq_work *cq_work;
	unsigned int i;
	unsigned int start_count;

	/* kcxi provider queues on signaling vector (index 0 cpu),
	 * optionally don't queue kfilnd on that cpu
	 */
	start_count = kfilnd_cq->cq_work_count > 1 &&
			prov_cpu_exclusive ? 1 : 0;
	for (i = start_count; i < kfilnd_cq->cq_work_count; i++) {
		cq_work = &kfilnd_cq->cq_works[i];
		queue_work_on(cq_work->work_cpu, kfilnd_wq, &cq_work->work);
	}
}

#define CQ_ALLOC_SIZE(cpu_count) \
	(sizeof(struct kfilnd_cq) + \
	 (sizeof(struct kfilnd_cq_work) * (cpu_count)))

struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep,
				  struct kfi_cq_attr *attr)
{
	struct kfilnd_cq *cq;
	cpumask_var_t *cpu_mask;
	int rc;
	unsigned int cpu_count = 0;
	unsigned int cpu;
	unsigned int i;
	size_t alloc_size;
	struct kfilnd_cq_work *cq_work;

	cpu_mask = cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt);
	for_each_cpu(cpu, *cpu_mask)
		cpu_count++;

	alloc_size = CQ_ALLOC_SIZE(cpu_count);
	LIBCFS_CPT_ALLOC(cq, lnet_cpt_table(), ep->end_cpt, alloc_size);
	if (!cq) {
		rc = -ENOMEM;
		KFILND_EP_ERROR(ep, "Failed to allocate memory: rc=%d", rc);
		goto err;
	}

	memset(cq, 0, alloc_size);

	rc = kfi_cq_open(ep->end_dev->dom->domain, attr, &cq->cq,
			 kfilnd_cq_completion, cq);
	if (rc) {
		KFILND_EP_ERROR(ep, "Failed to open KFI CQ: rc=%d", rc);
		goto err_free_kfilnd_cq;
	}

	i = 0;
	for_each_cpu(cpu, *cpu_mask) {
		cq_work = &cq->cq_works[i];
		cq_work->cq = cq;
		cq_work->work_cpu = cpu;
		INIT_WORK(&cq_work->work, kfilnd_cq_process_completion);
		i++;
	}

	cq->ep = ep;
	cq->cq_work_count = cpu_count;

	return cq;

err_free_kfilnd_cq:
	LIBCFS_FREE(cq, alloc_size);
err:
	return ERR_PTR(rc);
}

void kfilnd_cq_free(struct kfilnd_cq *cq)
{
	flush_workqueue(kfilnd_wq);
	kfi_close(&cq->cq->fid);
	LIBCFS_FREE(cq, CQ_ALLOC_SIZE(cq->cq_work_count));
}