Viewing: lustre_osc.h
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
* Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*
* OSC layer structures and methods common for both OSC and MDC.
*
* This file contains OSC interfaces used by OSC and MDC. Most of them
* were just moved from lustre/osc/osc_cl_internal.h for Data-on-MDT
* purposes.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
* Author: Mikhail Pershin <mike.pershin@intel.com>
*/
#ifndef LUSTRE_OSC_H
#define LUSTRE_OSC_H
#include <obd.h>
#include <cfs_hash.h>
#include <cl_object.h>
#include <lustre_crypto.h>
/** \defgroup osc osc
* @{
*/
enum oap_async_flags {
/* ap_make_ready will not be called before page is added to an rpc */
ASYNC_READY = 0x1,
ASYNC_URGENT = 0x2, /* page must be put into RPC before return */
/* ap_refresh_count will not be called to give the caller a chance to
* update or cancel the size of the io
*/
ASYNC_COUNT_STABLE = 0x4,
ASYNC_HP = 0x8,
OAP_ASYNC_MAX,
OAP_ASYNC_BITS = 4
};
/* add explicit padding to keep fields aligned despite "packed",
* which is needed to pack with following field in osc_page
*/
#define OAP_PAD_BITS (16 - OBD_BRW_WRITE - OAP_ASYNC_BITS)
struct osc_async_page {
unsigned short oap_page_off /* :PAGE_SHIFT */;
unsigned int oap_cmd:OBD_BRW_WRITE;
enum oap_async_flags oap_async_flags:OAP_ASYNC_BITS;
unsigned int oap_padding1:OAP_PAD_BITS; /* unused */
unsigned int oap_padding2; /* unused */
struct list_head oap_pending_item;
struct list_head oap_rpc_item;
loff_t oap_obj_off;
struct osc_object *oap_obj;
struct brw_page oap_brw_page;
} __attribute__((packed));
#define oap_count oap_brw_page.bp_count
#define oap_brw_flags oap_brw_page.bp_flag
static inline struct osc_async_page *brw_page2oap(struct brw_page *pga)
{
BUILD_BUG_ON(OAP_ASYNC_MAX - 1 >= (1 << OAP_ASYNC_BITS));
return container_of(pga, struct osc_async_page, oap_brw_page);
}
struct osc_device {
struct cl_device osc_cl;
struct obd_export *osc_exp;
/* Write stats is actually protected by client_obd's lock. */
struct osc_stats {
ktime_t os_init;
uint64_t os_lockless_writes; /* by bytes */
uint64_t os_lockless_reads; /* by bytes */
} osc_stats;
/* configuration item(s) */
time64_t osc_contention_time;
};
struct osc_extent;
/**
* State maintained by osc layer for each IO context.
*/
struct osc_io {
/** super class */
struct cl_io_slice oi_cl;
/** true if this io is lockless. */
unsigned int oi_lockless:1,
/** true if this io is counted as active IO */
oi_is_active:1,
/** true if this io has CAP_SYS_RESOURCE */
oi_cap_sys_resource:1,
/** true if this io issued by readahead */
oi_is_readahead:1;
/** how many LRU pages are reserved for this IO */
unsigned long oi_lru_reserved;
/** active extents, we know how many bytes is going to be written,
* so having an active extent will prevent it from being fragmented
*/
struct osc_extent *oi_active;
/** partially truncated extent, we need to hold this extent to prevent
* page writeback from happening.
*/
struct osc_extent *oi_trunc;
/** write osc_lock for this IO, used by osc_extent_find(). */
struct osc_lock *oi_write_osclock;
struct osc_lock *oi_read_osclock;
struct obdo oi_oa;
struct osc_async_cbargs {
bool opc_rpc_sent;
int opc_rc;
struct completion opc_sync;
} oi_cbarg;
};
/**
* State maintained by osc layer for the duration of a system call.
*/
struct osc_session {
struct osc_io os_io;
};
#define OTI_PVEC_SIZE 256
struct osc_thread_info {
struct ldlm_res_id oti_resname;
union ldlm_policy_data oti_policy;
struct cl_attr oti_attr;
struct cl_io oti_io;
struct folio_batch oti_fbatch;
void *oti_pvec[OTI_PVEC_SIZE];
/**
* Fields used by cl_lock_discard_pages().
*/
pgoff_t oti_next_index;
pgoff_t oti_fn_index; /* first non-overlapped index */
pgoff_t oti_ng_index; /* negative lock caching */
struct cl_sync_io oti_anchor;
struct cl_req_attr oti_req_attr;
struct lu_buf oti_ladvise_buf;
};
static inline __u64 osc_enq2ldlm_flags(__u32 enqflags)
{
__u64 result = 0;
CDEBUG(D_DLMTRACE, "flags: %x\n", enqflags);
LASSERT((enqflags & ~CEF_MASK) == 0);
if (enqflags & CEF_NONBLOCK)
result |= LDLM_FL_BLOCK_NOWAIT;
if (enqflags & CEF_GLIMPSE)
result |= LDLM_FL_HAS_INTENT|LDLM_FL_CBPENDING;
if (enqflags & CEF_DISCARD_DATA)
result |= LDLM_FL_AST_DISCARD_DATA;
if (enqflags & CEF_PEEK)
result |= LDLM_FL_TEST_LOCK;
if (enqflags & CEF_LOCK_MATCH)
result |= LDLM_FL_MATCH_LOCK;
if (enqflags & CEF_LOCK_NO_EXPAND)
result |= LDLM_FL_NO_EXPANSION;
if (enqflags & CEF_SPECULATIVE)
result |= LDLM_FL_SPECULATIVE;
return result;
}
typedef int (*osc_enqueue_upcall_f)(void *cookie, struct lustre_handle *lockh,
int rc);
struct osc_enqueue_args {
struct obd_export *oa_exp;
enum ldlm_type oa_type;
enum ldlm_mode oa_mode;
__u64 *oa_flags;
osc_enqueue_upcall_f oa_upcall;
void *oa_cookie;
struct ost_lvb *oa_lvb;
struct lustre_handle oa_lockh;
bool oa_speculative;
};
/**
* Bit flags for osc_dlm_lock_at_pageoff().
*/
enum osc_dap_flags {
/**
* Just check if the desired lock exists, it won't hold reference
* count on lock.
*/
OSC_DAP_FL_TEST_LOCK = BIT(0),
/**
* Return the lock even if it is being canceled.
*/
OSC_DAP_FL_CANCELING = BIT(1),
/**
* check ast data is present, requested to cancel cb
*/
OSC_DAP_FL_AST = BIT(2),
/**
* look at right region for the desired lock
*/
OSC_DAP_FL_RIGHT = BIT(3),
};
/*
* The set of operations which are different for MDC and OSC objects
*/
struct osc_object_operations {
void (*oto_build_res_name)(struct osc_object *osc,
struct ldlm_res_id *resname);
struct ldlm_lock* (*oto_dlmlock_at_pgoff)(const struct lu_env *env,
struct osc_object *obj,
pgoff_t index,
enum osc_dap_flags dap_flags);
};
struct osc_object {
struct cl_object oo_cl;
struct lov_oinfo *oo_oinfo;
#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
/**
* IO context used for invariant checks in osc_lock_has_pages().
*/
struct cl_io oo_debug_io;
/** Serialization object for osc_object::oo_debug_io. */
struct mutex oo_debug_mutex;
#endif
/**
* used by the osc to keep track of what objects to build into rpcs.
* Protected by client_obd->cli_loi_list_lock.
*/
struct list_head oo_ready_item;
struct list_head oo_hp_ready_item;
struct list_head oo_write_item;
struct list_head oo_read_item;
/**
* extent is a red black tree to manage (async) dirty pages.
*/
struct rb_root oo_root;
/**
* Manage write(dirty) extents.
*/
struct list_head oo_hp_exts; /* list of hp extents */
struct list_head oo_hp_read_exts;/* list for hp read extents */
struct list_head oo_urgent_exts; /* list of writeback extents */
struct list_head oo_full_exts;
struct list_head oo_dio_exts;
struct list_head oo_reading_exts;
atomic_t oo_nr_reads;
atomic_t oo_nr_writes;
/** Protect extent tree. used to protect oo_{read|write}_pages soon. */
spinlock_t oo_lock;
/**
* Radix tree for caching pages
*/
spinlock_t oo_tree_lock;
struct radix_tree_root oo_tree;
unsigned long oo_npages;
/* Protect osc_lock this osc_object has */
struct list_head oo_ol_list;
spinlock_t oo_ol_spin;
/** number of active IOs of this object */
atomic_t oo_nr_ios;
wait_queue_head_t oo_io_waitq;
const struct osc_object_operations *oo_obj_ops;
bool oo_initialized;
};
static inline void osc_build_res_name(struct osc_object *osc,
struct ldlm_res_id *resname)
{
return osc->oo_obj_ops->oto_build_res_name(osc, resname);
}
static inline struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
struct osc_object *obj,
pgoff_t index,
enum osc_dap_flags flags)
{
return obj->oo_obj_ops->oto_dlmlock_at_pgoff(env, obj, index, flags);
}
static inline void osc_object_lock(struct osc_object *obj)
{
spin_lock(&obj->oo_lock);
}
static inline void osc_object_unlock(struct osc_object *obj)
{
spin_unlock(&obj->oo_lock);
}
#define assert_osc_object_is_locked(obj) \
assert_spin_locked(&obj->oo_lock)
/*
* Lock "micro-states" for osc layer.
*/
enum osc_lock_state {
OLS_NEW,
OLS_ENQUEUED,
OLS_UPCALL_RECEIVED,
OLS_GRANTED,
OLS_CANCELLED
};
/**
* osc-private state of cl_lock.
*
* Interaction with DLM.
*
* Once receive upcall is invoked, osc_lock remembers a handle of DLM lock in
* osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_dlmlock.
*
* This pointer is protected through a reference, acquired by
* osc_lock_upcall0(). Also, an additional reference is acquired by
* ldlm_lock_addref() call protecting the lock from cancellation, until
* osc_lock_unuse() releases it.
*
* Below is a description of how lock references are acquired and released
* inside of DLM.
*
* - When new lock is created and enqueued to the server (ldlm_cli_enqueue())
* - ldlm_lock_create()
* - ldlm_lock_new(): initializes a lock with 2 references. One for
* the caller (released when reply from the server is received, or on
* error), and another for the hash table.
* - ldlm_lock_addref_internal(): protects the lock from cancellation.
*
* - When reply is received from the server (osc_enqueue_interpret())
* - ldlm_cli_enqueue_fini()
* - ldlm_lock_put(): releases caller reference acquired by
* ldlm_lock_new().
* - if (rc != 0)
* ldlm_lock_decref(): error case: matches ldlm_cli_enqueue().
* - ldlm_lock_decref(): for async locks, matches ldlm_cli_enqueue().
*
* - When lock is being cancelled (ldlm_lock_cancel())
* - ldlm_lock_destroy()
* - ldlm_lock_put(): releases hash-table reference acquired by
* ldlm_lock_new().
*
* osc_lock is detached from ldlm_lock by osc_lock_detach() that is called
* either when lock is cancelled (osc_lock_blocking()), or when locks is
* deleted without cancellation (e.g., from cl_locks_prune()). In the latter
* case ldlm lock remains in memory, and can be re-attached to osc_lock in the
* future.
*/
struct osc_lock {
struct cl_lock_slice ols_cl;
/** Internal lock to protect states, etc. */
spinlock_t ols_lock;
/** Owner sleeps on this channel for state change */
struct cl_sync_io *ols_owner;
/** waiting list for this lock to be cancelled */
struct list_head ols_waiting_list;
/** wait entry of ols_waiting_list */
struct list_head ols_wait_entry;
/** list entry for osc_object::oo_ol_list */
struct list_head ols_nextlock_oscobj;
/** underlying DLM lock */
struct ldlm_lock *ols_dlmlock;
/** DLM flags with which osc_lock::ols_lock was enqueued */
__u64 ols_flags;
/** osc_lock::ols_lock handle */
struct lustre_handle ols_handle;
struct ldlm_enqueue_info ols_einfo;
enum osc_lock_state ols_state;
/** lock value block */
struct ost_lvb ols_lvb;
/** Lockless operations to be used by lockless lock */
const struct cl_lock_operations *ols_lockless_ops;
/**
* true, if ldlm_lock_addref() was called against
* osc_lock::ols_lock. This is used for sanity checking.
*
* \see osc_lock::ols_has_ref
*/
unsigned ols_hold :1,
/**
* this is much like osc_lock::ols_hold, except that this bit is
* cleared _after_ reference in released in osc_lock_unuse(). This
* fine distinction is needed because:
*
* - if ldlm lock still has a reference, osc_ast_data_get() needs
* to return associated cl_lock (so that a flag is needed that is
* cleared after ldlm_lock_decref() returned), and
*
* - ldlm_lock_decref() can invoke blocking ast (for a
* LDLM_FL_CBPENDING lock), and osc_lock functions like
* osc_lock_cancel() called from there need to know whether to
* release lock reference (so that a flag is needed that is
* cleared before ldlm_lock_decref() is called).
*/
ols_has_ref:1,
/**
* inherit the lockless attribute from top level cl_io.
* If true, osc_lock_enqueue is able to tolerate the -EUSERS error.
*/
ols_locklessable:1,
/**
* if set, the osc_lock is a glimpse lock. For glimpse locks, we treat
* the EVAVAIL error as torerable, this will make upper logic happy
* to wait all glimpse locks to each OSTs to be completed.
* Glimpse lock converts to normal lock if the server lock is granted.
* Glimpse lock should be destroyed immediately after use.
*/
ols_glimpse:1,
/**
* For async glimpse lock.
*/
ols_agl:1,
/**
* for speculative locks - asynchronous glimpse locks and ladvise
* lockahead manual lock requests
*
* Used to tell osc layer to not wait for the ldlm reply from the
* server, so the osc lock will be short lived - It only exists to
* create the ldlm request and is not updated on request completion.
*/
ols_speculative:1;
};
static inline int osc_lock_is_lockless(const struct osc_lock *ols)
{
return (ols->ols_cl.cls_ops == ols->ols_lockless_ops);
}
/**
* Page state private for osc layer.
*/
struct osc_page {
struct cl_page_slice ops_cl;
/**
* Page queues used by osc to detect when RPC can be formed.
*/
struct osc_async_page ops_oap;
/**
* An offset within page from which next transfer starts. This is used
* by cl_page_clip() to submit partial page transfers.
*/
unsigned int ops_from:PAGE_SHIFT,
/**
* An offset within page at which next transfer ends(inclusive).
*
* \see osc_page::ops_from.
*/
ops_to:PAGE_SHIFT,
/**
* Boolean, true iff page is under transfer. Used for sanity checking.
*/
ops_transfer_pinned:1,
/**
* in LRU?
*/
ops_in_lru:1,
/**
* Set if the page must be transferred with OBD_BRW_SRVLOCK.
*/
ops_srvlock:1,
/**
* If the page is in osc_object::oo_tree.
*/
ops_intree:1,
/**
* If the page is marked with PG_mlocked.
*/
ops_vm_locked:1;
/**
* lru page list. See osc_lru_{del|use}() in osc_page.c for usage.
*/
struct list_head ops_lru;
};
struct osc_brw_async_args {
struct obdo *aa_oa;
int aa_requested_nob;
int aa_nio_count;
u32 aa_page_count;
s32 aa_resends;
ktime_t aa_start_time;
struct brw_page **aa_ppga;
struct client_obd *aa_cli;
struct list_head aa_oaps;
struct list_head aa_exts;
struct ptlrpc_request *aa_request;
};
extern struct kmem_cache *osc_lock_kmem;
extern struct kmem_cache *osc_object_kmem;
extern struct kmem_cache *osc_thread_kmem;
extern struct kmem_cache *osc_session_kmem;
extern struct kmem_cache *osc_extent_kmem;
extern struct kmem_cache *osc_obdo_kmem;
extern struct lu_context_key osc_key;
extern struct lu_context_key osc_session_key;
#define OSC_FLAGS (ASYNC_URGENT|ASYNC_READY)
/* osc_page.c */
int osc_dio_pages_init(const struct lu_env *env, struct cl_object *obj,
struct cl_dio_pages *cdp, pgoff_t index);
int osc_page_init(const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, pgoff_t ind);
void osc_index2policy(union ldlm_policy_data *policy,
const struct cl_object *obj, pgoff_t start, pgoff_t end);
void osc_lru_add_batch(struct client_obd *cli, struct list_head *list);
void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
enum cl_req_type crt, int brw_flags);
long osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
long target, bool force, long *scanned);
/* osc_cache.c */
int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
u32 async_flags);
int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
struct cl_page *page, loff_t offset);
int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
struct osc_object *osc, struct osc_page *ops,
cl_commit_cbt cb);
int osc_page_cache_add(const struct lu_env *env, struct osc_object *osc,
struct osc_page *opg, struct cl_io *io,
cl_commit_cbt cb);
int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
struct osc_page *ops);
int osc_queue_dio_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct cl_dio_pages *cdp,
struct list_head *list,
int from_page, int to_page, int brw_flags);
int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
struct osc_object *obj, struct list_head *list,
int brw_flags);
int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
__u64 size, struct osc_extent **extp);
void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext);
int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
pgoff_t start, pgoff_t end, int hp, int discard,
enum cl_io_priority prio);
int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
pgoff_t start, pgoff_t end);
int __osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
struct osc_object *osc, int async);
static inline void osc_wake_cache_waiters(struct client_obd *cli)
{
wake_up(&cli->cl_cache_waiters);
}
static inline int osc_io_unplug_async(const struct lu_env *env,
struct client_obd *cli,
struct osc_object *osc)
{
return __osc_io_unplug(env, cli, osc, 1);
}
static inline void osc_io_unplug(const struct lu_env *env,
struct client_obd *cli,
struct osc_object *osc)
{
(void)__osc_io_unplug(env, cli, osc, 0);
}
typedef bool (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
void**, int, void *);
bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
struct osc_object *osc, pgoff_t start, pgoff_t end,
osc_page_gang_cbt cb, void *cbdata);
bool osc_discard_cb(const struct lu_env *env, struct cl_io *io,
void **pvec, int count, void *cbdata);
/* osc_object.c */
int osc_object_init(const struct lu_env *env, struct lu_object *obj,
const struct lu_object_conf *conf);
void osc_object_free(const struct lu_env *env, struct lu_object *obj);
int osc_lvb_print(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct ost_lvb *lvb);
int osc_object_print(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *obj);
int osc_attr_get(const struct lu_env *env, struct cl_object *obj,
struct cl_attr *attr);
int osc_attr_update(const struct lu_env *env, struct cl_object *obj,
const struct cl_attr *attr, enum cl_attr_valid valid);
int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj,
struct ost_lvb *lvb);
int osc_object_invalidate(const struct lu_env *env, struct osc_object *osc);
int osc_object_find_cbdata(const struct lu_env *env, struct cl_object *obj,
ldlm_iterator_t iter, void *data);
int osc_object_prune(const struct lu_env *env, struct cl_object *obj);
/* osc_request.c */
void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd);
int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg);
int osc_precleanup_common(struct obd_device *obd);
int osc_cleanup_common(struct obd_device *obd);
int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
u32 keylen, void *key, u32 vallen, void *val,
struct ptlrpc_request_set *set);
int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
struct hlist_node *hnode, void *arg);
int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
struct obd_device *obd, struct obd_uuid *cluuid,
struct obd_connect_data *data, void *localdata);
int osc_disconnect(struct obd_export *exp);
int osc_punch_send(struct obd_export *exp, struct obdo *oa,
obd_enqueue_update_f upcall, void *cookie);
int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
obd_enqueue_update_f upcall, void *cookie, int mode);
void osc_update_next_shrink(struct client_obd *cli);
void osc_schedule_grant_work(void);
/* osc_io.c */
int osc_io_submit(const struct lu_env *env, struct cl_io *io,
const struct cl_io_slice *ios, enum cl_req_type crt,
struct cl_2queue *queue);
int osc_dio_submit(const struct lu_env *env, struct cl_io *io,
const struct cl_io_slice *ios, enum cl_req_type crt,
struct cl_dio_pages *cdp);
int osc_io_commit_async(const struct lu_env *env,
const struct cl_io_slice *ios,
struct cl_page_list *qin, int from, int to,
cl_commit_cbt cb, enum cl_io_priority prio);
void osc_io_extent_release(const struct lu_env *env,
const struct cl_io_slice *ios,
enum cl_io_priority prio);
int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios);
void osc_io_iter_fini(const struct lu_env *env,
const struct cl_io_slice *ios);
void osc_io_rw_iter_fini(const struct lu_env *env,
const struct cl_io_slice *ios);
int osc_io_fault_start(const struct lu_env *env, const struct cl_io_slice *ios);
void osc_io_setattr_end(const struct lu_env *env,
const struct cl_io_slice *slice);
int osc_io_read_start(const struct lu_env *env,
const struct cl_io_slice *slice);
int osc_io_write_start(const struct lu_env *env,
const struct cl_io_slice *slice);
void osc_io_end(const struct lu_env *env, const struct cl_io_slice *slice);
int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
struct cl_fsync_io *fio);
void osc_io_fsync_end(const struct lu_env *env,
const struct cl_io_slice *slice);
void osc_read_ahead_release(const struct lu_env *env, struct cl_read_ahead *ra);
int osc_io_lseek_start(const struct lu_env *env,
const struct cl_io_slice *slice);
void osc_io_lseek_end(const struct lu_env *env,
const struct cl_io_slice *slice);
int osc_io_lru_reserve(const struct lu_env *env, const struct cl_io_slice *ios,
loff_t pos, size_t count);
int osc_punch_start(const struct lu_env *env, struct cl_io *io,
struct cl_object *obj);
/* osc_lock.c */
void osc_lock_to_lockless(const struct lu_env *env, struct osc_lock *ols,
int force);
void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc,
struct osc_lock *oscl);
int osc_lock_enqueue_wait(const struct lu_env *env, struct osc_object *obj,
struct osc_lock *oscl);
void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
struct cl_object *obj, struct osc_lock *oscl);
void osc_lock_set_reader(const struct lu_env *env, const struct cl_io *io,
struct cl_object *obj, struct osc_lock *oscl);
int osc_lock_print(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct cl_lock_slice *slice);
void osc_lock_cancel(const struct lu_env *env,
const struct cl_lock_slice *slice);
void osc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice);
int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data);
unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock);
/* Accessors and type conversions. */
static inline struct osc_thread_info *osc_env_info(const struct lu_env *env)
{
struct osc_thread_info *info;
info = lu_context_key_get(&env->le_ctx, &osc_key);
LASSERT(info != NULL);
return info;
}
static inline struct osc_session *osc_env_session(const struct lu_env *env)
{
struct osc_session *ses;
ses = lu_context_key_get(env->le_ses, &osc_session_key);
LASSERT(ses != NULL);
return ses;
}
static inline struct osc_io *osc_env_io(const struct lu_env *env)
{
return &osc_env_session(env)->os_io;
}
static inline struct osc_device *lu2osc_dev(const struct lu_device *d)
{
return container_of_safe(d, struct osc_device, osc_cl.cd_lu_dev);
}
static inline struct obd_export *osc_export(const struct osc_object *obj)
{
return lu2osc_dev(obj->oo_cl.co_lu.lo_dev)->osc_exp;
}
static inline struct client_obd *osc_cli(const struct osc_object *obj)
{
return &osc_export(obj)->exp_obd->u.cli;
}
static inline char *cli_name(struct client_obd *cli)
{
return cli->cl_import->imp_obd->obd_name;
}
static inline struct osc_object *cl2osc(const struct cl_object *obj)
{
return container_of_safe(obj, struct osc_object, oo_cl);
}
static inline struct cl_object *osc2cl(const struct osc_object *obj)
{
return (struct cl_object *)&obj->oo_cl;
}
static inline struct osc_device *obd2osc_dev(const struct obd_device *obd)
{
return container_of_safe(obd->obd_lu_dev, struct osc_device,
osc_cl.cd_lu_dev);
}
static inline struct lu_device *osc2lu_dev(struct osc_device *osc)
{
return &osc->osc_cl.cd_lu_dev;
}
static inline struct lu_object *osc2lu(struct osc_object *osc)
{
return &osc->oo_cl.co_lu;
}
static inline struct osc_object *lu2osc(const struct lu_object *obj)
{
return container_of_safe(obj, struct osc_object, oo_cl.co_lu);
}
static inline struct osc_io *cl2osc_io(const struct lu_env *env,
const struct cl_io_slice *slice)
{
struct osc_io *oio = container_of(slice, struct osc_io, oi_cl);
LINVRNT(oio == osc_env_io(env));
return oio;
}
static inline enum ldlm_mode osc_cl_lock2ldlm(enum cl_lock_mode mode)
{
LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
if (mode == CLM_READ)
return LCK_PR;
if (mode == CLM_WRITE)
return LCK_PW;
return LCK_GROUP;
}
static inline enum cl_lock_mode osc_ldlm2cl_lock(enum ldlm_mode mode)
{
LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
if (mode == LCK_PR)
return CLM_READ;
if (mode == LCK_PW)
return CLM_WRITE;
return CLM_GROUP;
}
static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
{
return container_of_safe(slice, struct osc_page, ops_cl);
}
static inline struct osc_page *oap2osc(struct osc_async_page *oap)
{
return container_of_safe(oap, struct osc_page, ops_oap);
}
static inline pgoff_t osc_index(struct osc_page *opg)
{
return opg->ops_oap.oap_obj_off >> PAGE_SHIFT;
}
static inline struct osc_object *osc_page_object(struct osc_page *ops)
{
return ops->ops_oap.oap_obj;
}
static inline struct cl_page *oap2cl_page(struct osc_async_page *oap)
{
return oap2osc(oap)->ops_cl.cpl_page;
}
static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
{
return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
}
static inline struct osc_page *
osc_cl_page_osc(struct cl_page *page, struct osc_object *osc)
{
const struct cl_page_slice *slice;
LASSERT(osc != NULL);
slice = cl_object_page_slice(&osc->oo_cl, page);
return cl2osc_page(slice);
}
static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice)
{
return container_of_safe(slice, struct osc_lock, ols_cl);
}
static inline int osc_io_srvlock(struct osc_io *oio)
{
return (oio->oi_lockless && !oio->oi_cl.cis_io->ci_no_srvlock);
}
enum osc_extent_state {
OES_INV = 0, /** extent is just initialized or destroyed */
OES_ACTIVE = 1, /** process is using this extent */
OES_CACHE = 2, /** extent is ready for IO */
OES_LOCKING = 3, /** locking page to prepare IO */
OES_LOCK_DONE = 4, /** locking finished, ready to send */
OES_RPC = 5, /** in RPC */
OES_TRUNC = 6, /** being truncated */
OES_STATE_MAX
};
/**
* osc_extent data to manage dirty pages.
* osc_extent has the following attributes:
* 1. all pages in the same must be in one RPC in write back;
* 2. # of pages must be less than max_pages_per_rpc - implied by 1;
* 3. must be covered by only 1 osc_lock;
* 4. exclusive. It's impossible to have overlapped osc_extent.
*
* The lifetime of an extent is from when the 1st page is dirtied to when
* all pages inside it are written out.
*
* LOCKING ORDER
* =============
* page lock -> client_obd_list_lock -> object lock(osc_object::oo_lock)
*/
struct osc_extent {
/** red-black tree node */
struct rb_node oe_node;
/** osc_object of this extent */
struct osc_object *oe_obj;
/** refcount, removed from red-black tree if reaches zero. */
struct kref oe_refc;
/** busy if non-zero */
atomic_t oe_users;
/** link list of osc_object's oo_{hp|urgent|locking}_exts. */
struct list_head oe_link;
/** state of this extent */
enum osc_extent_state oe_state;
/** flags for this extent. */
/** 0 is write, 1 is read */
unsigned int oe_rw:1,
/** sync extent, queued by osc_queue_sync_pages() */
oe_sync:1,
/** set if this extent has partial, sync pages.
* Extents with partial page(s) can't merge with others in RPC
*/
oe_no_merge:1,
oe_srvlock:1,
oe_memalloc:1,
/** an ACTIVE extent is going to be truncated, so when this extent
* is released, it will turn into TRUNC state instead of CACHE.
*/
oe_trunc_pending:1,
/** this extent should be written asap and someone may wait for the
* write to finish. This bit is usually set along with urgent if
* the extent was CACHE state.
* fsync_wait extent can't be merged because new extent region may
* exceed fsync range.
*/
oe_fsync_wait:1,
/** covering lock is being canceled */
oe_hp:1,
/** this extent should be written back asap. set if one of pages is
* called by page WB daemon, or sync write or reading requests.
*/
oe_urgent:1,
/** Non-delay RPC should be used for this extent. */
oe_ndelay:1,
/** direct IO pages */
oe_dio:1,
/** this extent consists of pages that are not directly accessible
* from the CPU
*/
oe_is_rdma_only:1;
/** how many grants allocated for this extent.
* Grant allocated for this extent. There is no grant allocated
* for reading extents and sync write extents.
*/
unsigned int oe_grants;
/** # of dirty pages in this extent */
unsigned int oe_nr_pages;
/** list of pending oap pages. Pages in this list are NOT sorted. */
struct list_head oe_pages;
struct cl_sub_dio *oe_csd;
/** start and end index of this extent, include start and end
* themselves. Page offset here is the page index of osc_pages.
* oe_start is used as keyword for red-black tree.
*/
pgoff_t oe_start;
pgoff_t oe_end;
/** maximum ending index of this extent, this is limited by
* max_pages_per_rpc, lock extent and chunk size.
*/
pgoff_t oe_max_end;
/** waitqueue - for those who want to be notified if this extent's
* state has changed.
*/
wait_queue_head_t oe_waitq;
/** lock covering this extent */
struct ldlm_lock *oe_dlmlock;
/** terminator of this extent. Must be true if this extent is in IO. */
struct task_struct *oe_owner;
/** return value of writeback. If somebody is waiting for this extent,
* this value can be known by outside world.
*/
int oe_rc;
/** max pages per rpc when this extent was created */
unsigned int oe_mppr;
/** FLR: layout version when this osc_extent is publised */
__u32 oe_layout_version;
};
/** @} osc */
#endif /* LUSTRE_OSC_H */