Viewing: kfilnd_ep.c
// SPDX-License-Identifier: GPL-2.0
/*
* Copyright 2022 Hewlett Packard Enterprise Development LP
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*
* kfilnd endpoint implementation.
*/
#include "kfilnd_ep.h"
#include "kfilnd_dev.h"
#include "kfilnd_tn.h"
#include "kfilnd_cq.h"
/**
* kfilnd_ep_post_recv() - Post a single receive buffer.
* @ep: KFI LND endpoint to have receive buffers posted on.
* @buf: Receive buffer to be posted.
*
* Return: On succes, zero. Else, negative errno.
*/
static int kfilnd_ep_post_recv(struct kfilnd_ep *ep,
struct kfilnd_immediate_buffer *buf)
{
int rc;
if (!ep || !buf)
return -EINVAL;
if (buf->immed_no_repost)
return 0;
if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV))
return -EIO;
else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV_EAGAIN))
return -EAGAIN;
atomic_inc(&buf->immed_ref);
rc = kfi_recv(ep->end_rx, buf->immed_buf, buf->immed_buf_size, NULL,
KFI_ADDR_UNSPEC, buf);
if (rc)
atomic_dec(&buf->immed_ref);
return rc;
}
#define KFILND_EP_REPLAY_TIMER_MSEC (100U)
/**
* kfilnd_ep_imm_buffer_put() - Decrement the immediate buffer count reference
* counter.
* @buf: Immediate buffer to have reference count decremented.
*
* If the immediate buffer's reference count reaches zero, the buffer will
* automatically be reposted.
*/
void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf)
{
unsigned long expires;
int rc;
if (!buf)
return;
if (atomic_sub_return(1, &buf->immed_ref) != 0)
return;
rc = kfilnd_ep_post_recv(buf->immed_end, buf);
switch (rc) {
case 0:
break;
/* Return the buffer reference and queue the immediate buffer put to be
* replayed.
*/
case -EAGAIN:
expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
jiffies;
atomic_inc(&buf->immed_ref);
spin_lock(&buf->immed_end->replay_lock);
list_add_tail(&buf->replay_entry,
&buf->immed_end->imm_buffer_replay);
atomic_inc(&buf->immed_end->replay_count);
spin_unlock(&buf->immed_end->replay_lock);
if (!timer_pending(&buf->immed_end->replay_timer))
mod_timer(&buf->immed_end->replay_timer, expires);
break;
/* Unexpected error resulting in immediate buffer not being able to be
* posted. Since immediate buffers are used to sink incoming messages,
* failure to post immediate buffers means failure to communicate.
*
* TODO: Prevent LNet NI from doing sends/recvs?
*/
default:
KFILND_EP_ERROR(buf->immed_end,
"Failed to post immediate receive buffer: rc=%d",
rc);
}
}
/**
* kfilnd_ep_post_imm_buffers() - Post all immediate receive buffers.
* @ep: KFI LND endpoint to have receive buffers posted on.
*
* This function should be called only during KFI LND device initialization.
*
* Return: On success, zero. Else, negative errno.
*/
int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep)
{
int rc = 0;
int i;
if (!ep)
return -EINVAL;
for (i = 0; i < immediate_rx_buf_count; i++) {
rc = kfilnd_ep_post_recv(ep, &ep->end_immed_bufs[i]);
if (rc)
goto out;
}
out:
return rc;
}
/**
* kfilnd_ep_cancel_imm_buffers() - Cancel all immediate receive buffers.
* @ep: KFI LND endpoint to have receive buffers canceled.
*/
void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep)
{
int i;
if (!ep)
return;
for (i = 0; i < immediate_rx_buf_count; i++) {
ep->end_immed_bufs[i].immed_no_repost = true;
/* Since this is called during LNet NI teardown, no need to
* pipeline retries. Just spin until -EAGAIN is not returned.
*/
while (kfi_cancel(&ep->end_rx->fid, &ep->end_immed_bufs[i]) ==
-EAGAIN)
schedule();
}
}
static void kfilnd_ep_err_fail_loc_work(struct work_struct *work)
{
struct kfilnd_ep_err_fail_loc_work *err =
container_of(work, struct kfilnd_ep_err_fail_loc_work, work);
kfilnd_cq_process_error(err->ep, &err->err);
kfree(err);
}
int kfilnd_ep_gen_fake_err(struct kfilnd_ep *ep,
const struct kfi_cq_err_entry *err)
{
struct kfilnd_ep_err_fail_loc_work *fake_err;
fake_err = kmalloc(sizeof(*fake_err), GFP_KERNEL);
if (!fake_err)
return -ENOMEM;
fake_err->ep = ep;
fake_err->err = *err;
INIT_WORK(&fake_err->work, kfilnd_ep_err_fail_loc_work);
queue_work(kfilnd_wq, &fake_err->work);
return 0;
}
static uint64_t gen_init_tag_bits(struct kfilnd_transaction *tn)
{
return (tn->tn_response_session_key << KFILND_EP_KEY_BITS) |
tn->tn_response_mr_key;
}
/**
* kfilnd_ep_post_tagged_send() - Post a tagged send operation.
* @ep: KFI LND endpoint used to post the tagged receivce operation.
* @tn: Transaction structure containing the send buffer to be posted.
*
* The tag for the post tagged send operation is the response memory region key
* associated with the transaction.
*
* Return: On success, zero. Else, negative errno value.
*/
int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep,
struct kfilnd_transaction *tn)
{
struct kfi_cq_err_entry fake_error = {
.op_context = tn,
.flags = KFI_TAGGED | KFI_SEND,
.err = EIO,
};
int rc;
if (!ep || !tn)
return -EINVAL;
/* Make sure the device is not being shut down */
if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
return -EINVAL;
/* Progress transaction to failure if send should fail. */
if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EVENT)) {
rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
if (!rc)
return 0;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND)) {
return -EIO;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EAGAIN)) {
return -EAGAIN;
}
KFILND_TN_DEBUG(tn, "tagged_data: %llu tn_status: %d\n",
tn->tagged_data, tn->tn_status);
rc = kfi_tsenddata(ep->end_tx, NULL, 0, NULL, tn->tagged_data,
tn->tn_target_addr, gen_init_tag_bits(tn), tn);
switch (rc) {
case 0:
case -EAGAIN:
KFILND_EP_DEBUG(ep,
"TN %p: %s tagged send of with tag 0x%x to peer %s(0x%llx): rc=%d",
tn, rc ? "Failed to post" : "Posted",
tn->tn_response_mr_key,
libcfs_nid2str(tn->tn_kp->kp_nid),
tn->tn_target_addr, rc);
break;
default:
KFILND_EP_ERROR(ep,
"TN %p: Failed to post tagged send with tag 0x%x to peer %s(0x%llx): rc=%d",
tn, tn->tn_response_mr_key,
libcfs_nid2str(tn->tn_kp->kp_nid),
tn->tn_target_addr, rc);
}
return rc;
}
/**
* kfilnd_ep_cancel_tagged_recv() - Cancel a tagged recv.
* @ep: KFI LND endpoint used to cancel the tagged receivce operation.
* @tn: Transaction structure containing the receive buffer to be cancelled.
*
* The tagged receive buffer context pointer is used to cancel a tagged receive
* operation. The context pointer is always the transaction pointer.
*
* Return: 0 on success. -ENOENT if the tagged receive buffer is not found. The
* tagged receive buffer may not be found due to a tagged send operation already
* landing or the tagged receive buffer never being posted. Negative errno value
* on error.
*/
int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep,
struct kfilnd_transaction *tn)
{
if (!ep || !tn)
return -EINVAL;
/* Make sure the device is not being shut down */
if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
return -EINVAL;
if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN))
return -EAGAIN;
/* The async event count is not decremented for a cancel operation since
* it was incremented for the post tagged receive.
*/
return kfi_cancel(&ep->end_rx->fid, tn);
}
static uint64_t gen_target_tag_bits(struct kfilnd_transaction *tn)
{
return (tn->tn_kp->kp_local_session_key << KFILND_EP_KEY_BITS) |
tn->tn_mr_key;
}
/**
* kfilnd_ep_post_tagged_recv() - Post a tagged receive operation.
* @ep: KFI LND endpoint used to post the tagged receivce operation.
* @tn: Transaction structure containing the receive buffer to be posted.
*
* The tag for the post tagged receive operation is the memory region key
* associated with the transaction.
*
* Return: On success, zero. Else, negative errno value.
*/
int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep,
struct kfilnd_transaction *tn)
{
struct kfi_msg_tagged msg = {
.tag = gen_target_tag_bits(tn),
.context = tn,
.addr = tn->tn_kp->kp_addr,
};
struct kfi_cq_err_entry fake_error = {
.op_context = tn,
.flags = KFI_TAGGED | KFI_RECV,
.err = EIO,
};
int rc;
if (!ep || !tn)
return -EINVAL;
/* Make sure the device is not being shut down */
if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
return -EINVAL;
/* Progress transaction to failure if send should fail. */
if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EVENT)) {
rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
if (!rc)
return 0;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV)) {
return -EIO;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EAGAIN)) {
return -EAGAIN;
}
#ifdef HAVE_KFI_SGL
msg.type = KFI_SGL,
msg.msg_sgl = tn->tn_sgt.sgl,
msg.iov_count = tn->tn_sgt.nents,
#else
msg.iov_count = tn->tn_num_iovec;
msg.type = KFI_BVEC;
msg.msg_biov = tn->tn_kiov;
#endif
rc = kfi_trecvmsg(ep->end_rx, &msg, KFI_COMPLETION);
switch (rc) {
case 0:
case -EAGAIN:
KFILND_EP_DEBUG(ep,
"TN %p: %s tagged recv of %u bytes (%lu frags) with tag 0x%llx: rc=%d",
tn, rc ? "Failed to post" : "Posted",
tn->tn_nob, msg.iov_count, msg.tag, rc);
break;
default:
KFILND_EP_ERROR(ep,
"TN %p: Failed to post tagged recv of %u bytes (%lu frags) with tag 0x%llx: rc=%d",
tn, tn->tn_nob, msg.iov_count, msg.tag, rc);
}
return rc;
}
/**
* kfilnd_ep_post_send() - Post a send operation.
* @ep: KFI LND endpoint used to post the send operation.
* @tn: Transaction structure containing the buffer to be sent.
*
* The target of the send operation is based on the target LNet NID field within
* the transaction structure. A lookup of LNet NID to KFI address is performed.
*
* Return: On success, zero. Else, negative errno value.
*/
int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
{
size_t len;
void *buf;
struct kfi_cq_err_entry fake_error = {
.op_context = tn,
.flags = KFI_MSG | KFI_SEND,
.err = EIO,
};
int rc;
if (!ep || !tn)
return -EINVAL;
buf = tn->tn_tx_msg.msg;
len = tn->tn_tx_msg.length;
/* Make sure the device is not being shut down */
if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
return -EINVAL;
/* Progress transaction to failure if send should fail. */
if (CFS_FAIL_CHECK_VALUE(CFS_KFI_FAIL_MSG_TYPE,
tn->tn_tx_msg.msg->type) ||
CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EVENT)) {
rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
if (!rc)
return 0;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND)) {
return -EIO;
} else if (CFS_FAIL_CHECK_VALUE(CFS_KFI_FAIL_MSG_TYPE_EAGAIN,
tn->tn_tx_msg.msg->type) ||
CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EAGAIN)) {
return -EAGAIN;
}
rc = kfi_send(ep->end_tx, buf, len, NULL, tn->tn_target_addr, tn);
switch (rc) {
case 0:
case -EAGAIN:
KFILND_EP_DEBUG(ep,
"TN %p: %s send of %lu bytes to peer %s(0x%llx): rc=%d",
tn, rc ? "Failed to post" : "Posted",
len, libcfs_nid2str(tn->tn_kp->kp_nid),
tn->tn_target_addr, rc);
break;
default:
KFILND_EP_ERROR(ep,
"TN %p: Failed to post send of %lu bytes to peer %s(0x%llx): rc=%d",
tn, len, libcfs_nid2str(tn->tn_kp->kp_nid),
tn->tn_target_addr, rc);
}
return rc;
}
/**
* kfilnd_ep_post_write() - Post a write operation.
* @ep: KFI LND endpoint used to post the write operation.
* @tn: Transaction structure containing the buffer to be read from.
*
* The target of the write operation is based on the target LNet NID field
* within the transaction structure. A lookup of LNet NID to KFI address is
* performed.
*
* The transaction cookie is used as the remote key for the target memory
* region.
*
* Return: On success, zero. Else, negative errno value.
*/
int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
{
int rc;
struct kfi_cq_err_entry fake_error = {
.op_context = tn,
.flags = KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND,
.err = EIO,
};
struct kfi_rma_iov rma_iov = {
.len = tn->tn_nob,
.key = gen_init_tag_bits(tn),
};
struct kfi_msg_rma rma = {
.addr = tn->tn_target_addr,
.rma_iov = &rma_iov,
.rma_iov_count = 1,
.context = tn,
};
if (!ep || !tn)
return -EINVAL;
/* Make sure the device is not being shut down */
if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
return -EINVAL;
/* Progress transaction to failure if read should fail. */
if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EVENT)) {
rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
if (!rc)
return 0;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE)) {
return -EIO;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EAGAIN)) {
return -EAGAIN;
}
#ifdef HAVE_KFI_SGL
rma.type = KFI_SGL;
rma.msg_sgl = tn->tn_sgt.sgl;
rma.iov_count = tn->tn_sgt.nents;
#else
rma.iov_count = tn->tn_num_iovec;
rma.type = KFI_BVEC;
rma.msg_biov = tn->tn_kiov;
#endif
rc = kfi_writemsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
switch (rc) {
case 0:
case -EAGAIN:
KFILND_EP_DEBUG(ep,
"TN ID %p: %s write of %u bytes in %lu frags kp %s(%p) rma_iov.key %llu: rc=%d",
tn, rc ? "Failed to post" : "Posted",
tn->tn_nob, rma.iov_count,
libcfs_nid2str(tn->tn_kp->kp_nid), tn->tn_kp,
rma_iov.key, rc);
break;
default:
KFILND_EP_ERROR(ep,
"TN %p: Failed to post write of %u bytes in %lu frags kp %s(%p) rma_iov.key %llu: rc=%d",
tn, tn->tn_nob, rma.iov_count,
libcfs_nid2str(tn->tn_kp->kp_nid), tn->tn_kp,
rma_iov.key, rc);
}
return rc;
}
/**
* kfilnd_ep_post_read() - Post a read operation.
* @ep: KFI LND endpoint used to post the read operation.
* @tn: Transaction structure containing the buffer to be read into.
*
* The target of the read operation is based on the target LNet NID field within
* the transaction structure. A lookup of LNet NID to KFI address is performed.
*
* The transaction cookie is used as the remote key for the target memory
* region.
*
* Return: On success, zero. Else, negative errno value.
*/
int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
{
int rc;
struct kfi_cq_err_entry fake_error = {
.op_context = tn,
.flags = KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND,
.err = EIO,
};
struct kfi_rma_iov rma_iov = {
.len = tn->tn_nob,
.key = gen_init_tag_bits(tn),
};
struct kfi_msg_rma rma = {
.addr = tn->tn_target_addr,
.rma_iov = &rma_iov,
.rma_iov_count = 1,
.context = tn,
};
if (!ep || !tn)
return -EINVAL;
/* Make sure the device is not being shut down */
if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
return -EINVAL;
/* Progress transaction to failure if read should fail. */
if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EVENT)) {
rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
if (!rc)
return 0;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ)) {
return -EIO;
} else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EAGAIN)) {
return -EAGAIN;
}
#ifdef HAVE_KFI_SGL
rma.type = KFI_SGL;
rma.msg_sgl = tn->tn_sgt.sgl;
rma.iov_count = tn->tn_sgt.nents;
#else
rma.iov_count = tn->tn_num_iovec;
rma.type = KFI_BVEC;
rma.msg_biov = tn->tn_kiov;
#endif
rc = kfi_readmsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
switch (rc) {
case 0:
case -EAGAIN:
KFILND_EP_DEBUG(ep,
"TN %p: %s read of %u bytes in %lu frags kp %s(%p) rma_iov.key %llu: rc=%d",
tn, rc ? "Failed to post" : "Posted",
tn->tn_nob, rma.iov_count,
libcfs_nid2str(tn->tn_kp->kp_nid), tn->tn_kp,
rma_iov.key, rc);
break;
default:
KFILND_EP_ERROR(ep,
"TN %p: Failed to post read of %u bytes in %lu frags kp %s(%p) rma_iov.key %llu: rc=%d",
tn, tn->tn_nob, rma.iov_count,
libcfs_nid2str(tn->tn_kp->kp_nid), tn->tn_kp,
rma_iov.key, rc);
}
return rc;
}
void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep,
struct kfilnd_transaction *tn)
{
unsigned long expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
jiffies;
spin_lock(&ep->replay_lock);
list_add_tail(&tn->replay_entry, &ep->tn_replay);
atomic_inc(&ep->replay_count);
spin_unlock(&ep->replay_lock);
if (!timer_pending(&ep->replay_timer))
mod_timer(&ep->replay_timer, expires);
}
void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep)
{
LIST_HEAD(tn_replay);
LIST_HEAD(imm_buf_replay);
struct kfilnd_transaction *tn_first;
struct kfilnd_transaction *tn_last;
struct kfilnd_immediate_buffer *buf_first;
struct kfilnd_immediate_buffer *buf_last;
/* Since the endpoint replay lists can be manipulated while
* attempting to do replays, the entire replay list is moved to a
* temporary list.
*/
spin_lock(&ep->replay_lock);
tn_first = list_first_entry_or_null(&ep->tn_replay,
struct kfilnd_transaction,
replay_entry);
if (tn_first) {
tn_last = list_last_entry(&ep->tn_replay,
struct kfilnd_transaction,
replay_entry);
list_bulk_move_tail(&tn_replay, &tn_first->replay_entry,
&tn_last->replay_entry);
LASSERT(list_empty(&ep->tn_replay));
}
buf_first = list_first_entry_or_null(&ep->imm_buffer_replay,
struct kfilnd_immediate_buffer,
replay_entry);
if (buf_first) {
buf_last = list_last_entry(&ep->imm_buffer_replay,
struct kfilnd_immediate_buffer,
replay_entry);
list_bulk_move_tail(&imm_buf_replay, &buf_first->replay_entry,
&buf_last->replay_entry);
LASSERT(list_empty(&ep->imm_buffer_replay));
}
spin_unlock(&ep->replay_lock);
/* Replay all queued transactions. */
list_for_each_entry_safe(tn_first, tn_last, &tn_replay, replay_entry) {
list_del(&tn_first->replay_entry);
atomic_dec(&ep->replay_count);
kfilnd_tn_event_handler(tn_first, tn_first->replay_event,
tn_first->replay_status);
}
list_for_each_entry_safe(buf_first, buf_last, &imm_buf_replay,
replay_entry) {
list_del(&buf_first->replay_entry);
atomic_dec(&ep->replay_count);
kfilnd_ep_imm_buffer_put(buf_first);
}
}
static void kfilnd_ep_replay_work(struct work_struct *work)
{
struct kfilnd_ep *ep =
container_of(work, struct kfilnd_ep, replay_work);
kfilnd_ep_flush_replay_queue(ep);
}
static void kfilnd_ep_replay_timer(cfs_timer_cb_arg_t data)
{
struct kfilnd_ep *ep = cfs_from_timer(ep, data, replay_timer);
unsigned int cpu =
cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt));
queue_work_on(cpu, kfilnd_wq, &ep->replay_work);
}
#define KFILND_EP_ALLOC_SIZE \
(sizeof(struct kfilnd_ep) + \
(sizeof(struct kfilnd_immediate_buffer) * immediate_rx_buf_count))
/**
* kfilnd_ep_free() - Free a KFI LND endpoint.
* @ep: KFI LND endpoint to be freed.
*
* Safe to call on NULL or error pointer.
*/
void kfilnd_ep_free(struct kfilnd_ep *ep)
{
int i;
int k = 2;
if (IS_ERR_OR_NULL(ep))
return;
while (atomic_read(&ep->replay_count)) {
k++;
CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
"Waiting for replay count %d not zero\n",
atomic_read(&ep->replay_count));
schedule_timeout_uninterruptible(HZ);
}
/* Cancel any outstanding immediate receive buffers. */
kfilnd_ep_cancel_imm_buffers(ep);
/* Wait for RX buffers to no longer be used and then free them. */
for (i = 0; i < immediate_rx_buf_count; i++) {
k = 2;
while (atomic_read(&ep->end_immed_bufs[i].immed_ref)) {
k++;
CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
"Waiting for RX buffer %d to release\n", i);
schedule_timeout_uninterruptible(HZ);
}
}
/* Wait for all transactions to complete. */
k = 2;
spin_lock(&ep->tn_list_lock);
while (!list_empty(&ep->tn_list)) {
spin_unlock(&ep->tn_list_lock);
k++;
CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
"Waiting for transactions to complete\n");
schedule_timeout_uninterruptible(HZ);
spin_lock(&ep->tn_list_lock);
}
spin_unlock(&ep->tn_list_lock);
/* Free all immediate buffers. */
for (i = 0; i < immediate_rx_buf_count; i++)
__free_pages(ep->end_immed_bufs[i].immed_buf_page,
order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
kfi_close(&ep->end_tx->fid);
kfi_close(&ep->end_rx->fid);
kfilnd_cq_free(ep->end_tx_cq);
kfilnd_cq_free(ep->end_rx_cq);
ida_destroy(&ep->keys);
LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
}
/**
* kfilnd_ep_alloc() - Allocate a new KFI LND endpoint.
* @dev: KFI LND device used to allocate endpoints.
* @context_id: Context ID associated with the endpoint.
* @cpt: CPT KFI LND endpoint should be associated with.
*
* An KFI LND endpoint consists of unique transmit/receive command queues
* (contexts) and completion queues. The underlying completion queue interrupt
* vector is associated with a core within the CPT.
*
* Return: On success, valid pointer. Else, negative errno pointer.
*/
struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev,
unsigned int context_id, unsigned int cpt,
size_t nrx, size_t rx_size)
{
int rc;
struct kfi_cq_attr cq_attr = {};
struct kfi_rx_attr rx_attr = {};
struct kfi_tx_attr tx_attr = {};
int ncpts;
size_t min_multi_recv = KFILND_IMMEDIATE_MSG_SIZE;
struct kfilnd_ep *ep;
int i;
size_t rx_buf_size;
if (!dev || !nrx || !rx_size) {
rc = -EINVAL;
goto err;
}
ncpts = dev->kfd_ni->ni_ncpts;
LIBCFS_CPT_ALLOC(ep, lnet_cpt_table(), cpt, KFILND_EP_ALLOC_SIZE);
if (!ep) {
rc = -ENOMEM;
goto err;
}
ep->end_dev = dev;
ep->end_cpt = cpt;
ep->end_context_id = context_id;
INIT_LIST_HEAD(&ep->tn_list);
spin_lock_init(&ep->tn_list_lock);
INIT_LIST_HEAD(&ep->tn_replay);
INIT_LIST_HEAD(&ep->imm_buffer_replay);
spin_lock_init(&ep->replay_lock);
cfs_timer_setup(&ep->replay_timer, kfilnd_ep_replay_timer,
(unsigned long)ep, 0);
INIT_WORK(&ep->replay_work, kfilnd_ep_replay_work);
atomic_set(&ep->replay_count, 0);
ida_init(&ep->keys);
/* Create a CQ for this CPT */
cq_attr.flags = KFI_AFFINITY;
cq_attr.format = KFI_CQ_FORMAT_DATA;
cq_attr.wait_cond = KFI_CQ_COND_NONE;
cq_attr.wait_obj = KFI_WAIT_NONE;
/* Vector is set to first core in the CPT */
cq_attr.signaling_vector =
cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), cpt));
cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
rx_cq_scale_factor;
ep->end_rx_cq = kfilnd_cq_alloc(ep, &cq_attr);
if (IS_ERR(ep->end_rx_cq)) {
rc = PTR_ERR(ep->end_rx_cq);
CERROR("Failed to allocated KFILND RX CQ: rc=%d\n", rc);
goto err_free_ep;
}
cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
tx_cq_scale_factor;
ep->end_tx_cq = kfilnd_cq_alloc(ep, &cq_attr);
if (IS_ERR(ep->end_tx_cq)) {
rc = PTR_ERR(ep->end_tx_cq);
CERROR("Failed to allocated KFILND TX CQ: rc=%d\n", rc);
goto err_free_rx_cq;
}
/* Initialize the RX/TX contexts for the given CPT */
rx_attr.op_flags = KFI_COMPLETION | KFI_MULTI_RECV;
rx_attr.msg_order = KFI_ORDER_NONE;
rx_attr.comp_order = KFI_ORDER_NONE;
rx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits +
immediate_rx_buf_count;
rx_attr.iov_limit = LNET_MAX_IOV;
rc = kfi_rx_context(dev->kfd_sep, context_id, &rx_attr, &ep->end_rx,
ep);
if (rc) {
CERROR("Could not create RX context on CPT %d, rc = %d\n", cpt,
rc);
goto err_free_tx_cq;
}
/* Set the lower limit for multi-receive buffers */
rc = kfi_setopt(&ep->end_rx->fid, KFI_OPT_ENDPOINT,
KFI_OPT_MIN_MULTI_RECV, &min_multi_recv,
sizeof(min_multi_recv));
if (rc) {
CERROR("Could not set min_multi_recv on CPT %d, rc = %d\n", cpt,
rc);
goto err_free_rx_context;
}
tx_attr.op_flags = KFI_COMPLETION | KFI_TRANSMIT_COMPLETE;
tx_attr.msg_order = KFI_ORDER_NONE;
tx_attr.comp_order = KFI_ORDER_NONE;
tx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
tx_scale_factor;
tx_attr.iov_limit = LNET_MAX_IOV;
tx_attr.rma_iov_limit = LNET_MAX_IOV;
tx_attr.tclass =
dev->kfd_ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_traffic_class;
rc = kfi_tx_context(dev->kfd_sep, context_id, &tx_attr, &ep->end_tx,
ep);
if (rc) {
CERROR("Could not create TX context on CPT %d, rc = %d\n", cpt,
rc);
goto err_free_rx_context;
}
/* Bind these two contexts to the CPT's CQ */
rc = kfi_ep_bind(ep->end_rx, &ep->end_rx_cq->cq->fid, 0);
if (rc) {
CERROR("Could not bind RX context on CPT %d, rc = %d\n", cpt,
rc);
goto err_free_tx_context;
}
rc = kfi_ep_bind(ep->end_tx, &ep->end_tx_cq->cq->fid, 0);
if (rc) {
CERROR("Could not bind TX context on CPT %d, rc = %d\n", cpt,
rc);
goto err_free_tx_context;
}
/* Enable both endpoints */
rc = kfi_enable(ep->end_rx);
if (rc) {
CERROR("Could not enable RX context on CPT %d, rc = %d\n", cpt,
rc);
goto err_free_tx_context;
}
rc = kfi_enable(ep->end_tx);
if (rc) {
CERROR("Could not enable TX context on CPT %d, rc=%d\n", cpt,
rc);
goto err_free_tx_context;
}
/* The nrx value is the max number of immediate messages any one peer
* can send us. Given that compute nodes are RPC-based, we should not
* see any more incoming messages than we are able to send. A such, nrx
* is a good size for each multi-receive buffer. However, if we are
* a server or LNet router, we need a multiplier of this value. For
* now, we will just have nrx drive the buffer size per CPT. Then,
* LNet routers and servers can just define more CPTs to get a better
* spread of buffers to receive messages from multiple peers. A better
* way should be devised in the future.
*/
rx_buf_size = roundup_pow_of_two(max(nrx * rx_size, PAGE_SIZE));
for (i = 0; i < immediate_rx_buf_count; i++) {
/* Using physically contiguous allocations can allow for
* underlying kfabric providers to use untranslated addressing
* instead of having to setup NIC memory mappings. This
* typically leads to improved performance.
*/
ep->end_immed_bufs[i].immed_buf_page =
alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
GFP_KERNEL | __GFP_NOWARN,
order_base_2(rx_buf_size / PAGE_SIZE));
if (!ep->end_immed_bufs[i].immed_buf_page) {
rc = -ENOMEM;
goto err_free_rx_buffers;
}
atomic_set(&ep->end_immed_bufs[i].immed_ref, 0);
ep->end_immed_bufs[i].immed_buf =
page_address(ep->end_immed_bufs[i].immed_buf_page);
ep->end_immed_bufs[i].immed_buf_size = rx_buf_size;
ep->end_immed_bufs[i].immed_end = ep;
}
return ep;
err_free_rx_buffers:
for (i = 0; i < immediate_rx_buf_count; i++) {
if (ep->end_immed_bufs[i].immed_buf_page)
__free_pages(ep->end_immed_bufs[i].immed_buf_page,
order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
}
err_free_tx_context:
kfi_close(&ep->end_tx->fid);
err_free_rx_context:
kfi_close(&ep->end_rx->fid);
err_free_tx_cq:
kfilnd_cq_free(ep->end_tx_cq);
err_free_rx_cq:
kfilnd_cq_free(ep->end_rx_cq);
err_free_ep:
LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
err:
return ERR_PTR(rc);
}
int kfilnd_ep_get_key(struct kfilnd_ep *ep)
{
return ida_simple_get(&ep->keys, 1, KFILND_EP_KEY_MAX, GFP_NOFS);
}
void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key)
{
ida_simple_remove(&ep->keys, key);
}