Viewing: lustre_update.h
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2013, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*
* Author: Di Wang <di.wang@intel.com>
*/
#ifndef _LUSTRE_UPDATE_H
#define _LUSTRE_UPDATE_H
#include <dt_object.h>
#include <lustre_net.h>
#include <obj_update.h>
#define OUT_UPDATE_REPLY_SIZE 4096
#define OUT_BULK_BUFFER_SIZE 4096
struct dt_key;
struct dt_rec;
struct object_update_param;
struct llog_update_record;
static inline size_t update_params_size(const struct update_params *params,
unsigned int param_count)
{
struct object_update_param *param;
size_t total_size = sizeof(*params);
unsigned int i;
param = (struct object_update_param *)¶ms->up_params[0];
for (i = 0; i < param_count; i++) {
size_t size = object_update_param_size(param);
param = (struct object_update_param *)((char *)param + size);
total_size += size;
}
return total_size;
}
static inline struct object_update_param *
update_params_get_param(const struct update_params *params,
unsigned int index, unsigned int param_count)
{
struct object_update_param *param;
unsigned int i;
if (index > param_count)
return NULL;
param = (struct object_update_param *)¶ms->up_params[0];
for (i = 0; i < index; i++)
param = (struct object_update_param *)((char *)param +
object_update_param_size(param));
return param;
}
static inline void*
update_params_get_param_buf(const struct update_params *params, __u16 index,
unsigned int param_count, __u16 *size)
{
struct object_update_param *param;
param = update_params_get_param(params, (unsigned int)index,
param_count);
if (param == NULL)
return NULL;
if (size != NULL)
*size = param->oup_len;
return param->oup_buf;
}
static inline size_t
update_op_size(unsigned int param_count)
{
return offsetof(struct update_op, uop_params_off[param_count]);
}
static inline struct update_op *
update_op_next_op(const struct update_op *uop)
{
return (struct update_op *)((char *)uop +
update_op_size(uop->uop_param_count));
}
static inline size_t update_ops_size(const struct update_ops *ops,
unsigned int update_count)
{
struct update_op *op;
size_t total_size = sizeof(*ops);
unsigned int i;
op = (struct update_op *)&ops->uops_op[0];
for (i = 0; i < update_count; i++, op = update_op_next_op(op))
total_size += update_op_size(op->uop_param_count);
return total_size;
}
static inline struct update_params *
update_records_get_params(const struct update_records *record)
{
return (struct update_params *)((char *)record +
offsetof(struct update_records, ur_ops) +
update_ops_size(&record->ur_ops, record->ur_update_count));
}
static inline struct update_param *
update_param_next_param(const struct update_param *param)
{
return (struct update_param *)((char *)param +
object_update_param_size(
(struct object_update_param *)param));
}
static inline size_t
__update_records_size(size_t raw_size)
{
return round_up(offsetof(struct update_records, ur_ops) + raw_size, 8);
}
static inline size_t
update_records_size(const struct update_records *record)
{
size_t op_size = 0;
size_t param_size = 0;
if (record->ur_update_count > 0)
op_size = update_ops_size(&record->ur_ops,
record->ur_update_count);
if (record->ur_param_count > 0) {
struct update_params *params;
params = update_records_get_params(record);
param_size = update_params_size(params, record->ur_param_count);
}
return __update_records_size(op_size + param_size);
}
static inline size_t
__llog_update_record_size(size_t records_size)
{
return round_up(sizeof(struct llog_rec_hdr) + records_size +
sizeof(struct llog_rec_tail), 8);
}
static inline size_t
llog_update_record_size(const struct llog_update_record *lur)
{
return __llog_update_record_size(
update_records_size(&lur->lur_update_rec));
}
static inline struct update_op *
update_ops_get_op(const struct update_ops *ops, unsigned int index,
unsigned int update_count)
{
struct update_op *op;
unsigned int i;
if (index > update_count)
return NULL;
op = (struct update_op *)&ops->uops_op[0];
for (i = 0; i < index; i++)
op = update_op_next_op(op);
return op;
}
static inline void
*object_update_param_get(const struct object_update *update, size_t index,
size_t *size, const char *name, const char *type)
{
const struct object_update_param *param;
size_t i;
int rc = 0;
if (index >= update->ou_params_count)
return ERR_PTR(-EINVAL);
param = &update->ou_params[0];
for (i = 0; i < index; i++)
param = (struct object_update_param *)((char *)param +
object_update_param_size(param));
if (param->oup_len == 0)
return ERR_PTR(-ENODATA);
if (size) {
if (unlikely((*size != 0 && *size != param->oup_len) ||
(*size == 0 && param->oup_len == 0))) {
rc = -EPROTO;
CERROR("%s: wrong size for %s in RPC %zu != %u: rc = %d\n",
name, type, *size, param->oup_len, rc);
return ERR_PTR(rc);
}
*size = param->oup_len;
}
return (void *)¶m->oup_buf[0];
}
static inline unsigned long
object_update_request_size(const struct object_update_request *our)
{
unsigned long size;
size_t i = 0;
size = offsetof(struct object_update_request, ourq_updates[0]);
for (i = 0; i < our->ourq_count; i++) {
struct object_update *update;
update = (struct object_update *)((char *)our + size);
size += object_update_size(update);
}
return size;
}
static inline void
object_update_result_insert(struct object_update_reply *reply,
void *data, size_t data_len, size_t index,
int rc)
{
struct object_update_result *update_result;
update_result = object_update_result_get(reply, index, NULL);
LASSERT(update_result);
update_result->our_rc = ptlrpc_status_hton(rc);
if (rc >= 0) {
if (data_len > 0 && data)
memcpy(update_result->our_data, data, data_len);
update_result->our_datalen = data_len;
}
reply->ourp_lens[index] = round_up(data_len +
sizeof(struct object_update_result),
8);
}
static inline int
object_update_result_data_get(const struct object_update_reply *reply,
struct lu_buf *lbuf, size_t index)
{
struct object_update_result *update_result;
size_t size = 0;
int result;
LASSERT(lbuf != NULL);
update_result = object_update_result_get(reply, index, &size);
if (update_result == NULL ||
size < round_up(sizeof(struct object_update_reply), 8) ||
update_result->our_datalen > size)
RETURN(-EFAULT);
result = ptlrpc_status_ntoh(update_result->our_rc);
if (result < 0)
return result;
lbuf->lb_buf = update_result->our_data;
lbuf->lb_len = update_result->our_datalen;
return result;
}
/**
* Attached in the thandle to record the updates for distribute
* distribution.
*/
struct thandle_update_records {
/* All of updates for the cross-MDT operation, vmalloc'd. */
struct llog_update_record *tur_update_records;
size_t tur_update_records_buf_size;
/* All of parameters for the cross-MDT operation, vmalloc'd */
struct update_params *tur_update_params;
unsigned int tur_update_param_count;
size_t tur_update_params_buf_size;
};
#define TOP_THANDLE_MAGIC 0x20140917
struct top_multiple_thandle {
struct dt_device *tmt_master_sub_dt;
struct kref tmt_refcount;
/* Other sub transactions will be listed here. */
struct list_head tmt_sub_thandle_list;
spinlock_t tmt_sub_lock;
struct list_head tmt_commit_list;
/* All of update records will packed here */
struct thandle_update_records *tmt_update_records;
wait_queue_head_t tmt_stop_waitq;
__u64 tmt_batchid;
int tmt_result;
__u32 tmt_magic;
size_t tmt_record_size;
__u32 tmt_committed:1;
};
/* {top,sub}_thandle are used to manage distributed transactions which
* include updates on several nodes. A top_handle represents the
* whole operation, and sub_thandle represents updates on each node.
*/
struct top_thandle {
struct thandle tt_super;
/* The master sub transaction. */
struct thandle *tt_master_sub_thandle;
struct top_multiple_thandle *tt_multiple_thandle;
};
struct sub_thandle_cookie {
struct llog_cookie stc_cookie;
struct list_head stc_list;
};
/* Sub thandle used to track multiple sub thandles under one parent thandle */
struct sub_thandle {
struct thandle *st_sub_th;
struct dt_device *st_dt;
struct list_head st_cookie_list;
struct dt_txn_commit_cb st_commit_dcb;
struct dt_txn_commit_cb st_stop_dcb;
int st_result;
/* linked to top_thandle */
struct list_head st_sub_list;
/* If this sub thandle is committed */
bool st_committed:1,
st_stopped:1,
st_started:1;
};
struct tx_arg;
typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
struct tx_arg *ta);
/* Structure for holding one update execution */
struct tx_arg {
tx_exec_func_t exec_fn;
tx_exec_func_t undo_fn;
struct dt_object *object;
const char *file;
struct object_update_reply *reply;
int line;
int index;
union {
struct {
struct dt_insert_rec rec;
const struct dt_key *key;
} insert;
struct {
} ref;
struct {
struct lu_attr attr;
} attr_set;
struct {
struct lu_buf buf;
const char *name;
int flags;
__u32 csum;
} xattr_set;
struct {
struct lu_attr attr;
struct dt_allocation_hint hint;
struct dt_object_format dof;
struct lu_fid fid;
} create;
struct {
struct lu_buf buf;
loff_t pos;
} write;
struct {
struct ost_body *body;
} destroy;
} u;
};
/* Structure for holding all update executations of one transaction */
struct thandle_exec_args {
struct thandle *ta_handle;
int ta_argno; /* used args */
int ta_alloc_args; /* allocated args count */
struct tx_arg **ta_args;
};
/* target/out_lib.c */
int out_update_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, enum update_type op,
const struct lu_fid *fid, unsigned int params_count,
__u16 *param_sizes, const void **param_bufs,
__u32 reply_size);
int out_create_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, const struct lu_fid *fid,
const struct lu_attr *attr, struct dt_allocation_hint *hint,
struct dt_object_format *dof);
int out_destroy_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, const struct lu_fid *fid);
int out_index_delete_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_key *key);
int out_index_insert_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_set_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
const char *name, __u32 flag);
int out_xattr_del_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const char *name);
int out_attr_set_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_attr *attr);
int out_ref_add_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_ref_del_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_write_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
__u64 pos);
int out_attr_get_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_index_lookup_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_get_pack(const struct lu_env *env,
struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const char *name,
const int bufsize);
int out_xattr_list_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, const struct lu_fid *fid,
const int bufsize);
int out_read_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_length, const struct lu_fid *fid,
size_t size, loff_t pos);
const char *update_op_str(__u16 opcode);
/* target/update_trans.c */
struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
struct thandle *th,
struct dt_device *sub_dt);
static inline struct thandle *
thandle_get_sub(const struct lu_env *env, struct thandle *th,
const struct dt_object *sub_obj)
{
return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev));
}
struct thandle *
top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
struct thandle *th);
int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
struct thandle *th);
void top_multiple_thandle_destroy(struct kref *kref);
static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
{
kref_get(&tmt->tmt_refcount);
}
static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
{
kref_put(&tmt->tmt_refcount, top_multiple_thandle_destroy);
}
struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
struct dt_device *dt_dev);
int sub_thandle_trans_create(const struct lu_env *env,
struct top_thandle *top_th,
struct sub_thandle *st);
/* update_records.c */
size_t update_records_create_size(const struct lu_env *env,
const struct lu_fid *fid,
const struct lu_attr *attr,
const struct dt_allocation_hint *hint,
struct dt_object_format *dof);
size_t update_records_attr_set_size(const struct lu_env *env,
const struct lu_fid *fid,
const struct lu_attr *attr);
size_t update_records_ref_add_size(const struct lu_env *env,
const struct lu_fid *fid);
size_t update_records_ref_del_size(const struct lu_env *env,
const struct lu_fid *fid);
size_t update_records_destroy_size(const struct lu_env *env,
const struct lu_fid *fid);
size_t update_records_index_insert_size(const struct lu_env *env,
const struct lu_fid *fid,
const struct dt_rec *rec,
const struct dt_key *key);
size_t update_records_index_delete_size(const struct lu_env *env,
const struct lu_fid *fid,
const struct dt_key *key);
size_t update_records_xattr_set_size(const struct lu_env *env,
const struct lu_fid *fid,
const struct lu_buf *buf,
const char *name,
__u32 flag);
size_t update_records_xattr_del_size(const struct lu_env *env,
const struct lu_fid *fid,
const char *name);
size_t update_records_write_size(const struct lu_env *env,
const struct lu_fid *fid,
const struct lu_buf *buf,
__u64 pos);
size_t update_records_punch_size(const struct lu_env *env,
const struct lu_fid *fid,
__u64 start, __u64 end);
int update_records_create_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
const struct lu_attr *attr,
const struct dt_allocation_hint *hint,
struct dt_object_format *dof);
int update_records_attr_set_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
const struct lu_attr *attr);
int update_records_ref_add_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid);
int update_records_ref_del_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid);
int update_records_destroy_pack(const struct lu_env *env,
struct update_ops *ops, unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid);
int update_records_index_insert_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
const struct dt_rec *rec,
const struct dt_key *key);
int update_records_index_delete_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
const struct dt_key *key);
int update_records_xattr_set_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
const struct lu_buf *buf, const char *name,
__u32 flag);
int update_records_xattr_del_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
const char *name);
int update_records_write_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
const struct lu_buf *buf,
__u64 pos);
int update_records_punch_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid,
__u64 start, __u64 end);
int update_records_noop_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
struct update_params *params,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid);
int tur_update_records_extend(struct thandle_update_records *tur,
size_t new_size);
int tur_update_params_extend(struct thandle_update_records *tur,
size_t new_size);
int tur_update_extend(struct thandle_update_records *tur,
size_t new_op_size, size_t new_param_size);
#define update_record_pack(name, th, ...) \
({ \
struct top_thandle *top_th; \
struct top_multiple_thandle *tmt; \
struct thandle_update_records *tur; \
struct llog_update_record *lur; \
size_t avail_param_size; \
size_t avail_op_size; \
int ret; \
\
while (1) { \
top_th = container_of(th, struct top_thandle, tt_super);\
tmt = top_th->tt_multiple_thandle; \
tur = tmt->tmt_update_records; \
lur = tur->tur_update_records; \
avail_param_size = tur->tur_update_params_buf_size - \
update_params_size(tur->tur_update_params, \
tur->tur_update_param_count); \
avail_op_size = tur->tur_update_records_buf_size - \
llog_update_record_size(lur); \
ret = update_records_##name##_pack(env, \
&lur->lur_update_rec.ur_ops, \
&lur->lur_update_rec.ur_update_count, \
&avail_op_size, \
tur->tur_update_params, \
&tur->tur_update_param_count, \
&avail_param_size, __VA_ARGS__); \
if (ret == -E2BIG) { \
ret = tur_update_extend(tur, avail_op_size, \
avail_param_size); \
if (ret != 0) \
break; \
continue; \
} else { \
break; \
} \
} \
ret; \
})
#define update_record_size(env, name, th, ...) \
({ \
struct top_thandle *top_th; \
struct top_multiple_thandle *tmt; \
\
top_th = container_of(th, struct top_thandle, tt_super); \
\
LASSERT(top_th->tt_multiple_thandle != NULL); \
tmt = top_th->tt_multiple_thandle; \
tmt->tmt_record_size += \
update_records_##name##_size(env, __VA_ARGS__); \
})
#endif