Viewing: ext4-pdirop.patch
commit 8e858671be59ed53fad2d340cf841b026943cc8a
Author: Liang Zhen <liang.zhen@intel.com>
AuthorDate: Fri Jul 8 13:43:08 2011 -0400
LU-50 ldiskfs: pdirops patch for ldiskfs
Single directory performance is a critical for HPC workloads. In a
typical use case an application creates a separate output file for
each node and task in a job. As nodes and tasks increase, hundreds
of thousands of files may be created in a single directory within
a short window of time.
Today, both filename lookup and file system modifying operations
(such as create and unlink) are protected with a single lock for
an entire ldiskfs directory. PDO project will remove this
bottleneck by introducing a parallel locking mechanism for entire
ldiskfs directories. This work will enable multiple application
threads to simultaneously lookup, create and unlink in parallel.
This patch contains:
- pdirops support for ldiskfs
- N-level htree directory
- integrate with osd-ldiskfs
Signed-off-by: Liang Zhen <liang@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Change-Id: I269c0e3112e68f3acd79e860dab052a68c7d7aaa
Reviewed-on: http://review.whamcloud.com/375
---
fs/ext4/Makefile | 1 +
fs/ext4/ext4.h | 78 ++++++++
fs/ext4/namei.c | 450 +++++++++++++++++++++++++++++++++++++++++++----
fs/ext4/super.c | 1 +
4 files changed, 492 insertions(+), 38 deletions(-)
diff --git a/fs/ext4/Makefile b/fs/ext4/Makefile
index b17ddc22..45a68cb5 100644
--- a/fs/ext4/Makefile
+++ b/fs/ext4/Makefile
@@ -7,6 +7,7 @@ obj-$(CONFIG_EXT4_FS) += ext4.o
ext4-y := balloc.o bitmap.o block_validity.o dir.o ext4_jbd2.o extents.o \
extents_status.o file.o fsmap.o fsync.o hash.o ialloc.o \
+ htree_lock.o \
indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
mmp.o move_extent.o namei.o page-io.o readpage.o resize.o \
super.o symlink.o sysfs.o xattr.o xattr_trusted.o xattr_user.o
diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
index af38fcf2..80b8120d 100644
--- a/fs/ext4/ext4.h
+++ b/fs/ext4/ext4.h
@@ -29,6 +29,7 @@
#include <linux/timer.h>
#include <linux/version.h>
#include <linux/wait.h>
+#include <linux/htree_lock.h>
#include <linux/sched/signal.h>
#include <linux/blockgroup_lock.h>
#include <linux/percpu_counter.h>
@@ -961,6 +962,9 @@ struct ext4_inode_info {
__u32 i_dtime;
ext4_fsblk_t i_file_acl;
+ /* following fields for parallel directory operations -bzzz */
+ struct semaphore i_append_sem;
+
/*
* i_block_group is the number of the block group which contains
* this file's inode. Constant across the lifetime of the inode,
@@ -2186,6 +2190,72 @@ struct dx_hash_info
*/
#define HASH_NB_ALWAYS 1
+/* assume name-hash is protected by upper layer */
+#define EXT4_HTREE_LOCK_HASH 0
+
+enum ext4_pdo_lk_types {
+#if EXT4_HTREE_LOCK_HASH
+ EXT4_LK_HASH,
+#endif
+ EXT4_LK_DX, /* index block */
+ EXT4_LK_DE, /* directory entry block */
+ EXT4_LK_SPIN, /* spinlock */
+ EXT4_LK_MAX,
+};
+
+/* read-only bit */
+#define EXT4_LB_RO(b) (1 << (b))
+/* read + write, high bits for writer */
+#define EXT4_LB_RW(b) ((1 << (b)) | (1 << (EXT4_LK_MAX + (b))))
+
+enum ext4_pdo_lock_bits {
+ /* DX lock bits */
+ EXT4_LB_DX_RO = EXT4_LB_RO(EXT4_LK_DX),
+ EXT4_LB_DX = EXT4_LB_RW(EXT4_LK_DX),
+ /* DE lock bits */
+ EXT4_LB_DE_RO = EXT4_LB_RO(EXT4_LK_DE),
+ EXT4_LB_DE = EXT4_LB_RW(EXT4_LK_DE),
+ /* DX spinlock bits */
+ EXT4_LB_SPIN_RO = EXT4_LB_RO(EXT4_LK_SPIN),
+ EXT4_LB_SPIN = EXT4_LB_RW(EXT4_LK_SPIN),
+ /* accurate searching */
+ EXT4_LB_EXACT = EXT4_LB_RO(EXT4_LK_MAX << 1),
+};
+
+enum ext4_pdo_lock_opc {
+ /* external */
+ EXT4_HLOCK_READDIR = (EXT4_LB_DE_RO | EXT4_LB_DX_RO),
+ EXT4_HLOCK_LOOKUP = (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO |
+ EXT4_LB_EXACT),
+ EXT4_HLOCK_DEL = (EXT4_LB_DE | EXT4_LB_SPIN_RO |
+ EXT4_LB_EXACT),
+ EXT4_HLOCK_ADD = (EXT4_LB_DE | EXT4_LB_SPIN_RO),
+
+ /* internal */
+ EXT4_HLOCK_LOOKUP_SAFE = (EXT4_LB_DE_RO | EXT4_LB_DX_RO |
+ EXT4_LB_EXACT),
+ EXT4_HLOCK_DEL_SAFE = (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT),
+ EXT4_HLOCK_SPLIT = (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN),
+};
+
+extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits);
+#define ext4_htree_lock_head_free(lhead) htree_lock_head_free(lhead)
+
+extern struct htree_lock *ext4_htree_lock_alloc(void);
+#define ext4_htree_lock_free(lck) htree_lock_free(lck)
+
+extern void ext4_htree_lock(struct htree_lock *lck,
+ struct htree_lock_head *lhead,
+ struct inode *dir, unsigned flags);
+#define ext4_htree_unlock(lck) htree_unlock(lck)
+
+extern struct buffer_head *ext4_find_entry_locked(struct inode *dir,
+ const struct qstr *d_name,
+ struct ext4_dir_entry_2 **res_dir,
+ int *inlined, struct htree_lock *lck);
+extern int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
+ struct inode *inode, struct htree_lock *lck);
+
struct ext4_filename {
const struct qstr *usr_fname;
struct fscrypt_str disk_name;
@@ -2553,8 +2623,16 @@ void ext4_insert_dentry(struct inode *inode,
struct ext4_filename *fname, void *data);
static inline void ext4_update_dx_flag(struct inode *inode)
{
+ /* Disable it for ldiskfs, because going from a DX directory to
+ * a non-DX directory while it is in use will completely break
+ * the htree-locking.
+ * If we really want to support this operation in the future,
+ * we need to exclusively lock the directory at here which will
+ * increase complexity of code */
+#if 0
if (!ext4_has_feature_dir_index(inode->i_sb))
ext4_clear_inode_flag(inode, EXT4_INODE_INDEX);
+#endif
}
static const unsigned char ext4_filetype_table[] = {
DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK
diff --git a/fs/ext4/namei.c b/fs/ext4/namei.c
index cc4193f1..7e9aabaf 100644
--- a/fs/ext4/namei.c
+++ b/fs/ext4/namei.c
@@ -55,6 +55,7 @@ struct buffer_head *ext4_append(handle_t *handle,
ext4_lblk_t *block)
{
struct buffer_head *bh;
+ struct ext4_inode_info *ei = EXT4_I(inode);
int err;
if (unlikely(EXT4_SB(inode->i_sb)->s_max_dir_size_kb &&
@@ -62,15 +63,22 @@ struct buffer_head *ext4_append(handle_t *handle,
EXT4_SB(inode->i_sb)->s_max_dir_size_kb)))
return ERR_PTR(-ENOSPC);
+ /* with parallel dir operations all appends
+ * have to be serialized -bzzz */
+ down(&ei->i_append_sem);
+
*block = inode->i_size >> inode->i_sb->s_blocksize_bits;
bh = ext4_bread(handle, inode, *block, EXT4_GET_BLOCKS_CREATE);
- if (IS_ERR(bh))
+ if (IS_ERR(bh)) {
+ up(&ei->i_append_sem);
return bh;
+ }
inode->i_size += inode->i_sb->s_blocksize;
EXT4_I(inode)->i_disksize = inode->i_size;
BUFFER_TRACE(bh, "get_write_access");
err = ext4_journal_get_write_access(handle, bh);
+ up(&ei->i_append_sem);
if (err) {
brelse(bh);
ext4_std_error(inode->i_sb, err);
@@ -264,7 +272,8 @@ static unsigned dx_node_limit(struct inode *dir);
static struct dx_frame *dx_probe(struct ext4_filename *fname,
struct inode *dir,
struct dx_hash_info *hinfo,
- struct dx_frame *frame);
+ struct dx_frame *frame,
+ struct htree_lock *lck);
static void dx_release(struct dx_frame *frames);
static int dx_make_map(struct inode *dir, struct ext4_dir_entry_2 *de,
unsigned blocksize, struct dx_hash_info *hinfo,
@@ -278,12 +287,13 @@ static void dx_insert_block(struct dx_frame *frame,
static int ext4_htree_next_block(struct inode *dir, __u32 hash,
struct dx_frame *frame,
struct dx_frame *frames,
- __u32 *start_hash);
+ __u32 *start_hash, struct htree_lock *lck);
static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
struct ext4_filename *fname,
- struct ext4_dir_entry_2 **res_dir);
+ struct ext4_dir_entry_2 **res_dir, struct htree_lock *lck);
static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
- struct inode *dir, struct inode *inode);
+ struct inode *dir, struct inode *inode,
+ struct htree_lock *lck);
/* checksumming functions */
void ext4_initialize_dirent_tail(struct buffer_head *bh,
@@ -748,6 +758,227 @@ struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
}
#endif /* DX_DEBUG */
+/* private data for htree_lock */
+struct ext4_dir_lock_data {
+ unsigned ld_flags; /* bits-map for lock types */
+ unsigned ld_count; /* # entries of the last DX block */
+ struct dx_entry ld_at_entry; /* copy of leaf dx_entry */
+ struct dx_entry *ld_at; /* position of leaf dx_entry */
+};
+
+#define ext4_htree_lock_data(l) ((struct ext4_dir_lock_data *)(l)->lk_private)
+#define ext4_find_entry(dir, name, dirent, inline) \
+ ext4_find_entry_locked(dir, name, dirent, inline, NULL)
+#define ext4_add_entry(handle, dentry, inode) \
+ ext4_add_entry_locked(handle, dentry, inode, NULL)
+
+/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */
+#define EXT4_HTREE_NODE_CHANGED (0xcafeULL << 32)
+
+static void ext4_htree_event_cb(void *target, void *event)
+{
+ u64 *block = (u64 *)target;
+
+ if (*block == dx_get_block((struct dx_entry *)event))
+ *block = EXT4_HTREE_NODE_CHANGED;
+}
+
+struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits)
+{
+ struct htree_lock_head *lhead;
+
+ lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0);
+ if (lhead != NULL) {
+ htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR,
+ ext4_htree_event_cb);
+ }
+ return lhead;
+}
+EXPORT_SYMBOL(ext4_htree_lock_head_alloc);
+
+struct htree_lock *ext4_htree_lock_alloc(void)
+{
+ return htree_lock_alloc(EXT4_LK_MAX,
+ sizeof(struct ext4_dir_lock_data));
+}
+EXPORT_SYMBOL(ext4_htree_lock_alloc);
+
+static htree_lock_mode_t ext4_htree_mode(unsigned flags)
+{
+ switch (flags) {
+ default: /* 0 or unknown flags require EX lock */
+ return HTREE_LOCK_EX;
+ case EXT4_HLOCK_READDIR:
+ return HTREE_LOCK_PR;
+ case EXT4_HLOCK_LOOKUP:
+ return HTREE_LOCK_CR;
+ case EXT4_HLOCK_DEL:
+ case EXT4_HLOCK_ADD:
+ return HTREE_LOCK_CW;
+ }
+}
+
+/* return PR for read-only operations, otherwise return EX */
+static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags)
+{
+ int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE;
+
+ /* 0 requires EX lock */
+ return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR;
+}
+
+static int ext4_htree_safe_locked(struct htree_lock *lck)
+{
+ int writer;
+
+ if (lck == NULL || lck->lk_mode == HTREE_LOCK_EX)
+ return 1;
+
+ writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) ==
+ EXT4_LB_DE;
+ if (writer) /* all readers & writers are excluded? */
+ return lck->lk_mode == HTREE_LOCK_EX;
+
+ /* all writers are excluded? */
+ return lck->lk_mode == HTREE_LOCK_PR ||
+ lck->lk_mode == HTREE_LOCK_PW ||
+ lck->lk_mode == HTREE_LOCK_EX;
+}
+
+/* relock htree_lock with EX mode if it's change operation, otherwise
+ * relock it with PR mode. It's noop if PDO is disabled. */
+static void ext4_htree_safe_relock(struct htree_lock *lck)
+{
+ if (!ext4_htree_safe_locked(lck)) {
+ unsigned flags = ext4_htree_lock_data(lck)->ld_flags;
+
+ htree_change_lock(lck, ext4_htree_safe_mode(flags));
+ }
+}
+
+void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead,
+ struct inode *dir, unsigned flags)
+{
+ htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) :
+ ext4_htree_safe_mode(flags);
+
+ ext4_htree_lock_data(lck)->ld_flags = flags;
+ htree_lock(lck, lhead, mode);
+ if (!is_dx(dir))
+ ext4_htree_safe_relock(lck); /* make sure it's safe locked */
+}
+EXPORT_SYMBOL(ext4_htree_lock);
+
+static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at,
+ unsigned lmask, int wait, void *ev)
+{
+ u32 key = (at == NULL) ? 0 : dx_get_block(at);
+ u32 mode;
+
+ /* NOOP if htree is well protected or caller doesn't require the lock */
+ if (ext4_htree_safe_locked(lck) ||
+ !(ext4_htree_lock_data(lck)->ld_flags & lmask))
+ return 1;
+
+ mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ?
+ HTREE_LOCK_PW : HTREE_LOCK_PR;
+ while (1) {
+ if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev))
+ return 1;
+ if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */
+ return 0;
+ cpu_relax(); /* spin until granted */
+ }
+}
+
+static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask)
+{
+ return ext4_htree_safe_locked(lck) ||
+ htree_node_is_granted(lck, ffz(~lmask));
+}
+
+static void ext4_htree_node_unlock(struct htree_lock *lck,
+ unsigned lmask, void *buf)
+{
+ /* NB: it's safe to call mutiple times or even it's not locked */
+ if (!ext4_htree_safe_locked(lck) &&
+ htree_node_is_granted(lck, ffz(~lmask)))
+ htree_node_unlock(lck, ffz(~lmask), buf);
+}
+
+#define ext4_htree_dx_lock(lck, key) \
+ ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL)
+#define ext4_htree_dx_lock_try(lck, key) \
+ ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL)
+#define ext4_htree_dx_unlock(lck) \
+ ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL)
+#define ext4_htree_dx_locked(lck) \
+ ext4_htree_node_locked(lck, EXT4_LB_DX)
+
+static void ext4_htree_dx_need_lock(struct htree_lock *lck)
+{
+ struct ext4_dir_lock_data *ld;
+
+ if (ext4_htree_safe_locked(lck))
+ return;
+
+ ld = ext4_htree_lock_data(lck);
+ switch (ld->ld_flags) {
+ default:
+ return;
+ case EXT4_HLOCK_LOOKUP:
+ ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE;
+ return;
+ case EXT4_HLOCK_DEL:
+ ld->ld_flags = EXT4_HLOCK_DEL_SAFE;
+ return;
+ case EXT4_HLOCK_ADD:
+ ld->ld_flags = EXT4_HLOCK_SPLIT;
+ return;
+ }
+}
+
+#define ext4_htree_de_lock(lck, key) \
+ ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL)
+#define ext4_htree_de_unlock(lck) \
+ ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL)
+
+#define ext4_htree_spin_lock(lck, key, event) \
+ ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event)
+#define ext4_htree_spin_unlock(lck) \
+ ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL)
+#define ext4_htree_spin_unlock_listen(lck, p) \
+ ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p)
+
+static void ext4_htree_spin_stop_listen(struct htree_lock *lck)
+{
+ if (!ext4_htree_safe_locked(lck) &&
+ htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN)))
+ htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN));
+}
+
+enum {
+ DX_HASH_COL_IGNORE, /* ignore collision while probing frames */
+ DX_HASH_COL_YES, /* there is collision and it does matter */
+ DX_HASH_COL_NO, /* there is no collision */
+};
+
+static int dx_probe_hash_collision(struct htree_lock *lck,
+ struct dx_entry *entries,
+ struct dx_entry *at, u32 hash)
+{
+ if (!(lck && ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) {
+ return DX_HASH_COL_IGNORE; /* don't care about collision */
+
+ } else if (at == entries + dx_get_count(entries) - 1) {
+ return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */
+
+ } else { /* hash collision? */
+ return ((dx_get_hash(at + 1) & ~1) == hash) ?
+ DX_HASH_COL_YES : DX_HASH_COL_NO;
+ }
+}
+
/*
* Probe for a directory leaf block to search.
*
@@ -759,10 +990,11 @@ struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
*/
static struct dx_frame *
dx_probe(struct ext4_filename *fname, struct inode *dir,
- struct dx_hash_info *hinfo, struct dx_frame *frame_in)
+ struct dx_hash_info *hinfo, struct dx_frame *frame_in,
+ struct htree_lock *lck)
{
unsigned count, indirect;
- struct dx_entry *at, *entries, *p, *q, *m;
+ struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL;
struct dx_root_info *info;
struct dx_frame *frame = frame_in;
struct dx_frame *ret_err = ERR_PTR(ERR_BAD_DX_DIR);
@@ -824,8 +1056,15 @@ dx_probe(struct ext4_filename *fname, struct inode *dir,
dxtrace(printk("Look up %x", hash));
while (1) {
+ if (indirect == 0) { /* the last index level */
+ /* NB: ext4_htree_dx_lock() could be noop if
+ * DX-lock flag is not set for current operation */
+ ext4_htree_dx_lock(lck, dx);
+ ext4_htree_spin_lock(lck, dx, NULL);
+ }
count = dx_get_count(entries);
- if (!count || count > dx_get_limit(entries)) {
+ if (count == 0 || count > dx_get_limit(entries)) {
+ ext4_htree_spin_unlock(lck); /* release spin */
ext4_warning_inode(dir,
"dx entry: count %u beyond limit %u",
count, dx_get_limit(entries));
@@ -864,8 +1103,70 @@ dx_probe(struct ext4_filename *fname, struct inode *dir,
dx_get_block(at)));
frame->entries = entries;
frame->at = at;
- if (!indirect--)
+
+ if (indirect == 0) { /* the last index level */
+ struct ext4_dir_lock_data *ld;
+ u64 myblock;
+
+ /* By default we only lock DE-block, however, we will
+ * also lock the last level DX-block if:
+ * a) there is hash collision
+ * we will set DX-lock flag (a few lines below)
+ * and redo to lock DX-block
+ * see detail in dx_probe_hash_collision()
+ * b) it's a retry from splitting
+ * we need to lock the last level DX-block so nobody
+ * else can split any leaf blocks under the same
+ * DX-block, see detail in ext4_dx_add_entry()
+ */
+ if (ext4_htree_dx_locked(lck)) {
+ /* DX-block is locked, just lock DE-block
+ * and return */
+ ext4_htree_spin_unlock(lck);
+ if (!ext4_htree_safe_locked(lck))
+ ext4_htree_de_lock(lck, frame->at);
+ return frame;
+ }
+ /* it's pdirop and no DX lock */
+ if (dx_probe_hash_collision(lck, entries, at, hash) ==
+ DX_HASH_COL_YES) {
+ /* found hash collision, set DX-lock flag
+ * and retry to abtain DX-lock */
+ ext4_htree_spin_unlock(lck);
+ ext4_htree_dx_need_lock(lck);
+ continue;
+ }
+ ld = ext4_htree_lock_data(lck);
+ /* because I don't lock DX, so @at can't be trusted
+ * after I release spinlock so I have to save it */
+ ld->ld_at = at;
+ ld->ld_at_entry = *at;
+ ld->ld_count = dx_get_count(entries);
+
+ frame->at = &ld->ld_at_entry;
+ myblock = dx_get_block(at);
+
+ /* NB: ordering locking */
+ ext4_htree_spin_unlock_listen(lck, &myblock);
+ /* other thread can split this DE-block because:
+ * a) I don't have lock for the DE-block yet
+ * b) I released spinlock on DX-block
+ * if it happened I can detect it by listening
+ * splitting event on this DE-block */
+ ext4_htree_de_lock(lck, frame->at);
+ ext4_htree_spin_stop_listen(lck);
+
+ if (myblock == EXT4_HTREE_NODE_CHANGED) {
+ /* someone split this DE-block before
+ * I locked it, I need to retry and lock
+ * valid DE-block */
+ ext4_htree_de_unlock(lck);
+ continue;
+ }
return frame;
+ }
+ dx = at;
+ indirect--;
frame++;
frame->bh = ext4_read_dirblock(dir, dx_get_block(at), INDEX);
if (IS_ERR(frame->bh)) {
@@ -934,7 +1235,7 @@ static void dx_release(struct dx_frame *frames)
static int ext4_htree_next_block(struct inode *dir, __u32 hash,
struct dx_frame *frame,
struct dx_frame *frames,
- __u32 *start_hash)
+ __u32 *start_hash, struct htree_lock *lck)
{
struct dx_frame *p;
struct buffer_head *bh;
@@ -949,12 +1250,22 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
* this loop, num_frames indicates the number of interior
* nodes need to be read.
*/
+ ext4_htree_de_unlock(lck);
while (1) {
+ if (num_frames > 0 || ext4_htree_dx_locked(lck)) {
+ /* num_frames > 0 :
+ * DX block
+ * ext4_htree_dx_locked:
+ * frame->at is reliable pointer returned by dx_probe,
+ * otherwise dx_probe already knew no collision */
if (++(p->at) < p->entries + dx_get_count(p->entries))
break;
+ }
if (p == frames)
return 0;
num_frames++;
+ if (num_frames == 1)
+ ext4_htree_dx_unlock(lck);
p--;
}
@@ -977,6 +1288,13 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
* block so no check is necessary
*/
while (num_frames--) {
+ if (num_frames == 0) {
+ /* it's not always necessary, we just don't want to
+ * detect hash collision again */
+ ext4_htree_dx_need_lock(lck);
+ ext4_htree_dx_lock(lck, p->at);
+ }
+
bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX);
if (IS_ERR(bh))
return PTR_ERR(bh);
@@ -985,6 +1303,7 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
p->bh = bh;
p->at = p->entries = ((struct dx_node *) bh->b_data)->entries;
}
+ ext4_htree_de_lock(lck, p->at);
return 1;
}
@@ -1132,10 +1451,10 @@ int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
}
hinfo.hash = start_hash;
hinfo.minor_hash = 0;
- frame = dx_probe(NULL, dir, &hinfo, frames);
+ /* assume it's PR locked */
+ frame = dx_probe(NULL, dir, &hinfo, frames, NULL);
if (IS_ERR(frame))
return PTR_ERR(frame);
-
/* Add '.' and '..' from the htree header */
if (!start_hash && !start_minor_hash) {
de = (struct ext4_dir_entry_2 *) frames[0].bh->b_data;
@@ -1175,7 +1494,7 @@ int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
count += ret;
hashval = ~0;
ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS,
- frame, frames, &hashval);
+ frame, frames, &hashval, NULL);
*next_hash = hashval;
if (ret < 0) {
err = ret;
@@ -1451,7 +1770,7 @@ static int is_dx_internal_node(struct inode *dir, ext4_lblk_t block,
static struct buffer_head *__ext4_find_entry(struct inode *dir,
struct ext4_filename *fname,
struct ext4_dir_entry_2 **res_dir,
- int *inlined)
+ int *inlined, struct htree_lock *lck)
{
struct super_block *sb;
struct buffer_head *bh_use[NAMEI_RA_SIZE];
@@ -1493,7 +1812,7 @@ static struct buffer_head *__ext4_find_entry(struct inode *dir,
goto restart;
}
if (is_dx(dir)) {
- ret = ext4_dx_find_entry(dir, fname, res_dir);
+ ret = ext4_dx_find_entry(dir, fname, res_dir, lck);
/*
* On success, or if the error was file not found,
* return. Otherwise, fall back to doing a search the
@@ -1503,6 +1822,7 @@ static struct buffer_head *__ext4_find_entry(struct inode *dir,
goto cleanup_and_exit;
dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, "
"falling back\n"));
+ ext4_htree_safe_relock(lck);
ret = NULL;
}
nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb);
@@ -1590,10 +1910,10 @@ cleanup_and_exit:
return ret;
}
-static struct buffer_head *ext4_find_entry(struct inode *dir,
+struct buffer_head *ext4_find_entry_locked(struct inode *dir,
const struct qstr *d_name,
struct ext4_dir_entry_2 **res_dir,
- int *inlined)
+ int *inlined, struct htree_lock *lck)
{
int err;
struct ext4_filename fname;
@@ -1605,12 +1925,14 @@ static struct buffer_head *ext4_find_entry(struct inode *dir,
if (err)
return ERR_PTR(err);
- bh = __ext4_find_entry(dir, &fname, res_dir, inlined);
+ bh = __ext4_find_entry(dir, &fname, res_dir, inlined, lck);
ext4_fname_free_filename(&fname);
return bh;
}
+EXPORT_SYMBOL(ext4_find_entry_locked);
+
static struct buffer_head *ext4_lookup_entry(struct inode *dir,
struct dentry *dentry,
struct ext4_dir_entry_2 **res_dir)
@@ -1625,7 +1947,7 @@ static struct buffer_head *ext4_lookup_entry(struct inode *dir,
if (err)
return ERR_PTR(err);
- bh = __ext4_find_entry(dir, &fname, res_dir, NULL);
+ bh = __ext4_find_entry(dir, &fname, res_dir, NULL, NULL);
ext4_fname_free_filename(&fname);
return bh;
@@ -1633,7 +1955,8 @@ static struct buffer_head *ext4_lookup_entry(struct inode *dir,
static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
struct ext4_filename *fname,
- struct ext4_dir_entry_2 **res_dir)
+ struct ext4_dir_entry_2 **res_dir,
+ struct htree_lock *lck)
{
struct super_block * sb = dir->i_sb;
struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
@@ -1644,7 +1967,7 @@ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
#ifdef CONFIG_FS_ENCRYPTION
*res_dir = NULL;
#endif
- frame = dx_probe(fname, dir, NULL, frames);
+ frame = dx_probe(fname, dir, NULL, frames, lck);
if (IS_ERR(frame))
return (struct buffer_head *) frame;
do {
@@ -1666,7 +1989,7 @@ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
/* Check to see if we should continue to search */
retval = ext4_htree_next_block(dir, fname->hinfo.hash, frame,
- frames, NULL);
+ frames, NULL, lck);
if (retval < 0) {
ext4_warning_inode(dir,
"error %d reading directory index block",
@@ -1846,8 +2169,9 @@ static struct ext4_dir_entry_2* dx_pack_dirents(char *base, unsigned blocksize)
* Returns pointer to de in block into which the new entry will be inserted.
*/
static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
- struct buffer_head **bh,struct dx_frame *frame,
- struct dx_hash_info *hinfo)
+ struct buffer_head **bh, struct dx_frame *frames,
+ struct dx_frame *frame, struct dx_hash_info *hinfo,
+ struct htree_lock *lck)
{
unsigned blocksize = dir->i_sb->s_blocksize;
unsigned count, continued;
@@ -1908,8 +2232,14 @@ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
hash2, split, count-split));
/* Fancy dance to stay within two buffers */
- de2 = dx_move_dirents(data1, data2, map + split, count - split,
- blocksize);
+ if (hinfo->hash < hash2) {
+ de2 = dx_move_dirents(data1, data2, map + split,
+ count - split, blocksize);
+ } else {
+ /* make sure we will add entry to the same block which
+ * we have already locked */
+ de2 = dx_move_dirents(data1, data2, map, split, blocksize);
+ }
de = dx_pack_dirents(data1, blocksize);
de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) -
(char *) de,
@@ -1927,12 +2257,21 @@ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
dxtrace(dx_show_leaf(dir, hinfo, (struct ext4_dir_entry_2 *) data2,
blocksize, 1));
- /* Which block gets the new entry? */
- if (hinfo->hash >= hash2) {
- swap(*bh, bh2);
- de = de2;
- }
+ ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL,
+ frame->at); /* notify block is being split */
+ if (hinfo->hash < hash2) {
dx_insert_block(frame, hash2 + continued, newblock);
+
+ } else {
+ /* switch block number */
+ dx_insert_block(frame, hash2 + continued,
+ dx_get_block(frame->at));
+ dx_set_block(frame->at, newblock);
+ (frame->at)++;
+ }
+ ext4_htree_spin_unlock(lck);
+ ext4_htree_dx_unlock(lck);
+
err = ext4_handle_dirty_dirblock(handle, dir, bh2);
if (err)
goto journal_error;
@@ -2181,7 +2520,7 @@ static int make_indexed_dir(handle_t *handle, struct ext4_filename *fname,
if (retval)
goto out_frames;
- de = do_split(handle,dir, &bh2, frame, &fname->hinfo);
+ de = do_split(handle, dir, &bh2, frames, frame, &fname->hinfo, NULL);
if (IS_ERR(de)) {
retval = PTR_ERR(de);
goto out_frames;
@@ -2394,8 +2733,8 @@ out:
* may not sleep between calling this and putting something into
* the entry, as someone else might have used it while you slept.
*/
-static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
- struct inode *inode)
+int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
+ struct inode *inode, struct htree_lock *lck)
{
struct inode *dir = d_inode(dentry->d_parent);
struct buffer_head *bh = NULL;
@@ -2444,9 +2783,10 @@ static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
return ext4_update_dotdot(handle, dentry, inode);
if (is_dx(dir)) {
- retval = ext4_dx_add_entry(handle, &fname, dir, inode);
+ retval = ext4_dx_add_entry(handle, &fname, dir, inode, lck);
if (!retval || (retval != ERR_BAD_DX_DIR))
goto out;
+ ext4_htree_safe_relock(lck);
ext4_clear_inode_flag(dir, EXT4_INODE_INDEX);
dx_fallback++;
ext4_mark_inode_dirty(handle, dir);
@@ -2500,12 +2840,14 @@ out:
ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY);
return retval;
}
+EXPORT_SYMBOL(ext4_add_entry_locked);
/*
* Returns 0 for success, or a negative error value
*/
static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
- struct inode *dir, struct inode *inode)
+ struct inode *dir, struct inode *inode,
+ struct htree_lock *lck)
{
struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
struct dx_entry *entries, *at;
@@ -2517,7 +2859,7 @@ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
again:
restart = 0;
- frame = dx_probe(fname, dir, NULL, frames);
+ frame = dx_probe(fname, dir, NULL, frames, lck);
if (IS_ERR(frame))
return PTR_ERR(frame);
entries = frame->entries;
@@ -2552,6 +2894,12 @@ again:
struct dx_node *node2;
struct buffer_head *bh2;
+ if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */
+ ext4_htree_safe_relock(lck);
+ restart = 1;
+ goto cleanup;
+ }
+
while (frame > frames) {
if (dx_get_count((frame - 1)->entries) <
dx_get_limit((frame - 1)->entries)) {
@@ -2654,8 +3002,32 @@ again:
restart = 1;
goto journal_error;
}
+ } else if (!ext4_htree_dx_locked(lck)) {
+ struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck);
+
+ /* not well protected, require DX lock */
+ ext4_htree_dx_need_lock(lck);
+ at = frame > frames ? (frame - 1)->at : NULL;
+
+ /* NB: no risk of deadlock because it's just a try.
+ *
+ * NB: we check ld_count for twice, the first time before
+ * having DX lock, the second time after holding DX lock.
+ *
+ * NB: We never free blocks for directory so far, which
+ * means value returned by dx_get_count() should equal to
+ * ld->ld_count if nobody split any DE-block under @at,
+ * and ld->ld_at still points to valid dx_entry. */
+ if ((ld->ld_count != dx_get_count(entries)) ||
+ !ext4_htree_dx_lock_try(lck, at) ||
+ (ld->ld_count != dx_get_count(entries))) {
+ restart = 1;
+ goto cleanup;
+ }
+ /* OK, I've got DX lock and nothing changed */
+ frame->at = ld->ld_at;
}
- de = do_split(handle, dir, &bh, frame, &fname->hinfo);
+ de = do_split(handle, dir, &bh, frames, frame, &fname->hinfo, lck);
if (IS_ERR(de)) {
err = PTR_ERR(de);
goto cleanup;
@@ -2666,6 +3038,8 @@ again:
journal_error:
ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */
cleanup:
+ ext4_htree_dx_unlock(lck);
+ ext4_htree_de_unlock(lck);
brelse(bh);
dx_release(frames);
/* @restart is true means htree-path has been changed, we need to
diff --git a/fs/ext4/super.c b/fs/ext4/super.c
index 0fcc33b1..3cc03066 100644
--- a/fs/ext4/super.c
+++ b/fs/ext4/super.c
@@ -1076,6 +1076,7 @@ static struct inode *ext4_alloc_inode(struct super_block *sb)
inode_set_iversion(&ei->vfs_inode, 1);
spin_lock_init(&ei->i_raw_lock);
+ sema_init(&ei->i_append_sem, 1);
INIT_LIST_HEAD(&ei->i_prealloc_list);
spin_lock_init(&ei->i_prealloc_lock);
ext4_es_init_tree(&ei->i_es_tree);
--
2.34.1