diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/lmv/lmv_obd.c')
-rw-r--r-- | drivers/staging/lustre/lustre/lmv/lmv_obd.c | 1484 |
1 files changed, 1050 insertions, 434 deletions
diff --git a/drivers/staging/lustre/lustre/lmv/lmv_obd.c b/drivers/staging/lustre/lustre/lmv/lmv_obd.c index 0e1588a43187..7dbb2b946acf 100644 --- a/drivers/staging/lustre/lustre/lmv/lmv_obd.c +++ b/drivers/staging/lustre/lustre/lmv/lmv_obd.c @@ -43,12 +43,13 @@ #include "../include/lustre/lustre_idl.h" #include "../include/obd_support.h" -#include "../include/lustre_lib.h" #include "../include/lustre_net.h" #include "../include/obd_class.h" +#include "../include/lustre_lmv.h" #include "../include/lprocfs_status.h" -#include "../include/lustre_lite.h" +#include "../include/cl_object.h" #include "../include/lustre_fid.h" +#include "../include/lustre/lustre_ioctl.h" #include "../include/lustre_kernelcomm.h" #include "lmv_internal.h" @@ -70,12 +71,12 @@ static void lmv_activate_target(struct lmv_obd *lmv, * -ENOTCONN: The UUID is found, but the target connection is bad (!) * -EBADF : The UUID is found, but the OBD of the wrong type (!) */ -static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid, +static int lmv_set_mdc_active(struct lmv_obd *lmv, const struct obd_uuid *uuid, int activate) { struct lmv_tgt_desc *uninitialized_var(tgt); struct obd_device *obd; - int i; + u32 i; int rc = 0; CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n", @@ -244,36 +245,12 @@ static int lmv_connect(const struct lu_env *env, return rc; } -static void lmv_set_timeouts(struct obd_device *obd) -{ - struct lmv_obd *lmv; - int i; - - lmv = &obd->u.lmv; - if (lmv->server_timeout == 0) - return; - - if (lmv->connected == 0) - return; - - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; - - tgt = lmv->tgts[i]; - if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) - continue; - - obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS), - KEY_INTERMDS, 0, NULL, NULL); - } -} - -static int lmv_init_ea_size(struct obd_export *exp, int easize, - int def_easize, int cookiesize, int def_cookiesize) +static int lmv_init_ea_size(struct obd_export *exp, u32 easize, u32 def_easize, + u32 cookiesize, u32 def_cookiesize) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - int i; + u32 i; int rc = 0; int change = 0; @@ -420,6 +397,7 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, { struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; + int orig_tgt_count = 0; int rc = 0; CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index); @@ -489,14 +467,17 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, tgt->ltd_uuid = *uuidp; tgt->ltd_active = 0; lmv->tgts[index] = tgt; - if (index >= lmv->desc.ld_tgt_count) + if (index >= lmv->desc.ld_tgt_count) { + orig_tgt_count = lmv->desc.ld_tgt_count; lmv->desc.ld_tgt_count = index + 1; + } if (lmv->connected) { rc = lmv_connect_mdc(obd, tgt); if (rc) { spin_lock(&lmv->lmv_lock); - lmv->desc.ld_tgt_count--; + if (lmv->desc.ld_tgt_count == index + 1) + lmv->desc.ld_tgt_count = orig_tgt_count; memset(tgt, 0, sizeof(*tgt)); spin_unlock(&lmv->lmv_lock); } else { @@ -514,7 +495,7 @@ int lmv_check_connect(struct obd_device *obd) { struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; - int i; + u32 i; int rc; int easize; @@ -554,10 +535,9 @@ int lmv_check_connect(struct obd_device *obd) goto out_disc; } - lmv_set_timeouts(obd); class_export_put(lmv->exp); lmv->connected = 1; - easize = lmv_get_easize(lmv); + easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC); lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0); mutex_unlock(&lmv->lmv_init_mutex); return 0; @@ -629,7 +609,7 @@ static int lmv_disconnect(struct obd_export *exp) struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; int rc; - int i; + u32 i; if (!lmv->tgts) goto out_local; @@ -758,7 +738,7 @@ static int lmv_hsm_req_count(struct lmv_obd *lmv, const struct hsm_user_request *hur, const struct lmv_tgt_desc *tgt_mds) { - int i, nr = 0; + u32 i, nr = 0; struct lmv_tgt_desc *curr_tgt; /* count how many requests must be sent to the given target */ @@ -885,10 +865,8 @@ static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len, rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, &kcd, sizeof(kcd)); - if (rc) { - if (filp) - fput(filp); - } + if (rc) + fput(filp); return rc; } @@ -899,10 +877,10 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, struct obd_device *obddev = class_exp2obd(exp); struct lmv_obd *lmv = &obddev->u.lmv; struct lmv_tgt_desc *tgt = NULL; - int i = 0; + u32 i = 0; int rc = 0; int set = 0; - int count = lmv->desc.ld_tgt_count; + u32 count = lmv->desc.ld_tgt_count; if (count == 0) return -ENOTTY; @@ -1011,6 +989,21 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp, rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg); break; } + case LL_IOC_FID2MDTIDX: { + struct lu_fid *fid = karg; + int mdt_index; + + rc = lmv_fld_lookup(lmv, fid, &mdt_index); + if (rc) + return rc; + + /* + * Note: this is from llite(see ll_dir_ioctl()), @uarg does not + * point to user space memory for FID2MDTIDX. + */ + *(__u32 *)uarg = mdt_index; + break; + } case OBD_IOC_FID2PATH: { rc = lmv_fid2path(exp, len, karg, uarg); break; @@ -1169,32 +1162,37 @@ static int lmv_placement_policy(struct obd_device *obd, return 0; } + if (op_data->op_default_stripe_offset != -1) { + *mds = op_data->op_default_stripe_offset; + return 0; + } + /** * If stripe_offset is provided during setdirstripe * (setdirstripe -i xx), xx MDS will be chosen. */ - if (op_data->op_cli_flags & CLI_SET_MEA) { + if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data) { struct lmv_user_md *lum; - lum = (struct lmv_user_md *)op_data->op_data; - if (lum->lum_type == LMV_STRIPE_TYPE && - lum->lum_stripe_offset != -1) { - if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) { - CERROR("%s: Stripe_offset %d > MDT count %d: rc = %d\n", - obd->obd_name, - lum->lum_stripe_offset, - lmv->desc.ld_tgt_count, -ERANGE); - return -ERANGE; - } - *mds = lum->lum_stripe_offset; - return 0; + lum = op_data->op_data; + if (le32_to_cpu(lum->lum_stripe_offset) != (__u32)-1) { + *mds = le32_to_cpu(lum->lum_stripe_offset); + } else { + /* + * -1 means default, which will be in the same MDT with + * the stripe + */ + *mds = op_data->op_mds; + lum->lum_stripe_offset = cpu_to_le32(op_data->op_mds); } + } else { + /* + * Allocate new fid on target according to operation type and + * parent home mds. + */ + *mds = op_data->op_mds; } - /* Allocate new fid on target according to operation type and parent - * home mds. - */ - *mds = op_data->op_mds; return 0; } @@ -1203,7 +1201,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds) struct lmv_tgt_desc *tgt; int rc; - tgt = lmv_get_target(lmv, mds); + tgt = lmv_get_target(lmv, mds, NULL); if (IS_ERR(tgt)) return PTR_ERR(tgt); @@ -1221,7 +1219,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds) /* * Asking underlaying tgt layer to allocate new fid. */ - rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL); + rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL); if (rc > 0) { LASSERT(fid_is_sane(fid)); rc = 0; @@ -1232,8 +1230,8 @@ out: return rc; } -int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; @@ -1278,10 +1276,10 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg) return -EINVAL; } - lmv->tgts = kcalloc(32, sizeof(*lmv->tgts), GFP_NOFS); + lmv->tgts_size = 32U; + lmv->tgts = kcalloc(lmv->tgts_size, sizeof(*lmv->tgts), GFP_NOFS); if (!lmv->tgts) return -ENOMEM; - lmv->tgts_size = 32; obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid); lmv->desc.ld_tgt_count = 0; @@ -1354,7 +1352,7 @@ static int lmv_process_config(struct obd_device *obd, u32 len, void *buf) obd_str2uuid(&obd_uuid, lustre_cfg_buf(lcfg, 1)); - if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) { + if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1) { rc = -EINVAL; goto out; } @@ -1380,7 +1378,7 @@ static int lmv_statfs(const struct lu_env *env, struct obd_export *exp, struct lmv_obd *lmv = &obd->u.lmv; struct obd_statfs *temp; int rc = 0; - int i; + u32 i; rc = lmv_check_connect(obd); if (rc) @@ -1522,7 +1520,7 @@ static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - int i; + u32 i; int rc; rc = lmv_check_connect(obd); @@ -1545,36 +1543,6 @@ static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid) return 0; } -static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid, - ldlm_iterator_t it, void *data) -{ - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - int i; - int rc; - - rc = lmv_check_connect(obd); - if (rc) - return rc; - - CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid)); - - /* - * With DNE every object can have two locks in different namespaces: - * lookup lock in space of MDT storing direntry and update/open lock in - * space of MDT storing inode. - */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp) - continue; - rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data); - if (rc) - return rc; - } - - return rc; -} - static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, struct md_open_data *mod, struct ptlrpc_request **request) { @@ -1596,25 +1564,116 @@ static int lmv_close(struct obd_export *exp, struct md_op_data *op_data, return rc; } -struct lmv_tgt_desc -*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data, - struct lu_fid *fid) +/** + * Choosing the MDT by name or FID in @op_data. + * For non-striped directory, it will locate MDT by fid. + * For striped-directory, it will locate MDT by name. And also + * it will reset op_fid1 with the FID of the chosen stripe. + **/ +static struct lmv_tgt_desc * +lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm, + const char *name, int namelen, struct lu_fid *fid, + u32 *mds) +{ + const struct lmv_oinfo *oinfo; + struct lmv_tgt_desc *tgt; + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) { + if (cfs_fail_val >= lsm->lsm_md_stripe_count) + return ERR_PTR(-EBADF); + oinfo = &lsm->lsm_md_oinfo[cfs_fail_val]; + } else { + oinfo = lsm_name_to_stripe_info(lsm, name, namelen); + if (IS_ERR(oinfo)) + return ERR_CAST(oinfo); + } + + if (fid) + *fid = oinfo->lmo_fid; + if (mds) + *mds = oinfo->lmo_mds; + + tgt = lmv_get_target(lmv, oinfo->lmo_mds, NULL); + + CDEBUG(D_INFO, "locate on mds %u " DFID "\n", oinfo->lmo_mds, + PFID(&oinfo->lmo_fid)); + return tgt; +} + +/** + * Locate mds by fid or name + * + * For striped directory (lsm != NULL), it will locate the stripe + * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type + * is unknown, it will return -EBADFD, and lmv_intent_lookup might need + * walk through all of stripes to locate the entry. + * + * For normal direcotry, it will locate MDS by FID directly. + * \param[in] lmv LMV device + * \param[in] op_data client MD stack parameters, name, namelen + * mds_num etc. + * \param[in] fid object FID used to locate MDS. + * + * retval pointer to the lmv_tgt_desc if succeed. + * ERR_PTR(errno) if failed. + */ +struct lmv_tgt_desc* +lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data, + struct lu_fid *fid) { + struct lmv_stripe_md *lsm = op_data->op_mea1; struct lmv_tgt_desc *tgt; - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) + /* + * During creating VOLATILE file, it should honor the mdt + * index if the file under striped dir is being restored, see + * ct_restore(). + */ + if (op_data->op_bias & MDS_CREATE_VOLATILE && + (int)op_data->op_mds != -1 && lsm) { + int i; + + tgt = lmv_get_target(lmv, op_data->op_mds, NULL); + if (IS_ERR(tgt)) + return tgt; + + /* refill the right parent fid */ + for (i = 0; i < lsm->lsm_md_stripe_count; i++) { + struct lmv_oinfo *oinfo; + + oinfo = &lsm->lsm_md_oinfo[i]; + if (oinfo->lmo_mds == op_data->op_mds) { + *fid = oinfo->lmo_fid; + break; + } + } + + /* Hmm, can not find the stripe by mdt_index(op_mds) */ + if (i == lsm->lsm_md_stripe_count) + tgt = ERR_PTR(-EINVAL); + return tgt; + } - op_data->op_mds = tgt->ltd_idx; + if (!lsm || !op_data->op_namelen) { + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + return tgt; - return tgt; + op_data->op_mds = tgt->ltd_idx; + + return tgt; + } + + return lmv_locate_target_for_name(lmv, lsm, op_data->op_name, + op_data->op_namelen, fid, + &op_data->op_mds); } static int lmv_create(struct obd_export *exp, struct md_op_data *op_data, - const void *data, int datalen, int mode, __u32 uid, - __u32 gid, cfs_cap_t cap_effective, __u64 rdev, - struct ptlrpc_request **request) + const void *data, size_t datalen, umode_t mode, + uid_t uid, gid_t gid, cfs_cap_t cap_effective, + __u64 rdev, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1632,13 +1691,30 @@ static int lmv_create(struct obd_export *exp, struct md_op_data *op_data, if (IS_ERR(tgt)) return PTR_ERR(tgt); - rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data); + CDEBUG(D_INODE, "CREATE name '%.*s' on "DFID" -> mds #%x\n", + (int)op_data->op_namelen, op_data->op_name, + PFID(&op_data->op_fid1), op_data->op_mds); + + rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc) return rc; - CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n", - op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), - op_data->op_mds); + if (exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE) { + /* + * Send the create request to the MDT where the object + * will be located + */ + tgt = lmv_find_target(lmv, &op_data->op_fid2); + if (IS_ERR(tgt)) + return PTR_ERR(tgt); + + op_data->op_mds = tgt->ltd_idx; + } else { + CDEBUG(D_CONFIG, "Server doesn't support striped dirs\n"); + } + + CDEBUG(D_INODE, "CREATE obj "DFID" -> mds #%x\n", + PFID(&op_data->op_fid1), op_data->op_mds); op_data->op_flags |= MF_MDC_CANCEL_FID1; rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid, @@ -1674,70 +1750,10 @@ static int lmv_done_writing(struct obd_export *exp, } static int -lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - __u64 extra_lock_flags) -{ - struct ptlrpc_request *req = it->it_request; - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - struct lustre_handle plock; - struct lmv_tgt_desc *tgt; - struct md_op_data *rdata; - struct lu_fid fid1; - struct mdt_body *body; - int rc = 0; - int pmode; - - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - - if (!(body->valid & OBD_MD_MDS)) - return 0; - - CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n", - LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1)); - - /* - * We got LOOKUP lock, but we really need attrs. - */ - pmode = it->it_lock_mode; - LASSERT(pmode != 0); - memcpy(&plock, lockh, sizeof(plock)); - it->it_lock_mode = 0; - it->it_request = NULL; - fid1 = body->fid1; - - ptlrpc_req_finished(req); - - tgt = lmv_find_target(lmv, &fid1); - if (IS_ERR(tgt)) { - rc = PTR_ERR(tgt); - goto out; - } - - rdata = kzalloc(sizeof(*rdata), GFP_NOFS); - if (!rdata) { - rc = -ENOMEM; - goto out; - } - - rdata->op_fid1 = fid1; - rdata->op_bias = MDS_CROSS_REF; - - rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh, - lmm, lmmsize, NULL, extra_lock_flags); - kfree(rdata); -out: - ldlm_lock_decref(&plock, pmode); - return rc; -} - -static int lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + const ldlm_policy_data_t *policy, struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - struct ptlrpc_request **req, __u64 extra_lock_flags) + struct lustre_handle *lockh, __u64 extra_lock_flags) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1755,22 +1771,18 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, if (IS_ERR(tgt)) return PTR_ERR(tgt); - CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n", + CDEBUG(D_INODE, "ENQUEUE '%s' on " DFID " -> mds #%u\n", LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx); - rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh, - lmm, lmmsize, req, extra_lock_flags); + rc = md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh, + extra_lock_flags); - if (rc == 0 && it && it->it_op == IT_OPEN) { - rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh, - lmm, lmmsize, extra_lock_flags); - } return rc; } static int lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data, - struct ptlrpc_request **request) + struct ptlrpc_request **preq) { struct ptlrpc_request *req = NULL; struct obd_device *obd = exp->exp_obd; @@ -1787,26 +1799,25 @@ lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data, if (IS_ERR(tgt)) return PTR_ERR(tgt); - CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n", - op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), - tgt->ltd_idx); + CDEBUG(D_INODE, "GETATTR_NAME for %*s on " DFID " -> mds #%u\n", + (int)op_data->op_namelen, op_data->op_name, + PFID(&op_data->op_fid1), tgt->ltd_idx); - rc = md_getattr_name(tgt->ltd_exp, op_data, request); + rc = md_getattr_name(tgt->ltd_exp, op_data, preq); if (rc != 0) return rc; - body = req_capsule_server_get(&(*request)->rq_pill, - &RMF_MDT_BODY); - - if (body->valid & OBD_MD_MDS) { - struct lu_fid rid = body->fid1; + body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY); + if (body->mbo_valid & OBD_MD_MDS) { + struct lu_fid rid = body->mbo_fid1; CDEBUG(D_INODE, "Request attrs for "DFID"\n", PFID(&rid)); tgt = lmv_find_target(lmv, &rid); if (IS_ERR(tgt)) { - ptlrpc_req_finished(*request); + ptlrpc_req_finished(*preq); + *preq = NULL; return PTR_ERR(tgt); } @@ -1815,8 +1826,8 @@ lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data, op_data->op_namelen = 0; op_data->op_name = NULL; rc = md_getattr_name(tgt->ltd_exp, op_data, &req); - ptlrpc_req_finished(*request); - *request = req; + ptlrpc_req_finished(*preq); + *preq = req; } return rc; @@ -1829,23 +1840,24 @@ lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data, fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \ NULL) -static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data, - int op_tgt, enum ldlm_mode mode, int bits, - int flag) +static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt, + struct md_op_data *op_data, int op_tgt, + enum ldlm_mode mode, int bits, int flag) { struct lu_fid *fid = md_op_data_fid(op_data, flag); struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_tgt_desc *tgt; ldlm_policy_data_t policy = { {0} }; int rc = 0; if (!fid_is_sane(fid)) return 0; - tgt = lmv_find_target(lmv, fid); - if (IS_ERR(tgt)) - return PTR_ERR(tgt); + if (!tgt) { + tgt = lmv_find_target(lmv, fid); + if (IS_ERR(tgt)) + return PTR_ERR(tgt); + } if (tgt->ltd_idx != op_tgt) { CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid)); @@ -1882,12 +1894,24 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, LASSERT(op_data->op_namelen != 0); CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n", - PFID(&op_data->op_fid2), op_data->op_namelen, + PFID(&op_data->op_fid2), (int)op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1)); op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid()); op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); op_data->op_cap = cfs_curproc_cap_pack(); + if (op_data->op_mea2) { + struct lmv_stripe_md *lsm = op_data->op_mea2; + const struct lmv_oinfo *oinfo; + + oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name, + op_data->op_namelen); + if (IS_ERR(oinfo)) + return PTR_ERR(oinfo); + + op_data->op_fid2 = oinfo->lmo_fid; + } + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2); if (IS_ERR(tgt)) return PTR_ERR(tgt); @@ -1896,7 +1920,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, * Cancel UPDATE lock on child (fid1). */ op_data->op_flags |= MF_MDC_CANCEL_FID2; - rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, + rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX, MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1); if (rc != 0) return rc; @@ -1907,20 +1931,22 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, } static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, - const char *old, int oldlen, const char *new, int newlen, + const char *old, size_t oldlen, + const char *new, size_t newlen, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *src_tgt; - struct lmv_tgt_desc *tgt_tgt; int rc; LASSERT(oldlen != 0); - CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n", - oldlen, old, PFID(&op_data->op_fid1), - newlen, new, PFID(&op_data->op_fid2)); + CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n", + (int)oldlen, old, PFID(&op_data->op_fid1), + op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0, + (int)newlen, new, PFID(&op_data->op_fid2), + op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0); rc = lmv_check_connect(obd); if (rc) @@ -1929,13 +1955,60 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid()); op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid()); op_data->op_cap = cfs_curproc_cap_pack(); - src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + + if (op_data->op_cli_flags & CLI_MIGRATE) { + LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n", + PFID(&op_data->op_fid3)); + + if (op_data->op_mea1) { + struct lmv_stripe_md *lsm = op_data->op_mea1; + struct lmv_tgt_desc *tmp; + + /* Fix the parent fid for striped dir */ + tmp = lmv_locate_target_for_name(lmv, lsm, old, + oldlen, + &op_data->op_fid1, + NULL); + if (IS_ERR(tmp)) + return PTR_ERR(tmp); + } + + rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); + if (rc) + return rc; + src_tgt = lmv_find_target(lmv, &op_data->op_fid3); + } else { + if (op_data->op_mea1) { + struct lmv_stripe_md *lsm = op_data->op_mea1; + + src_tgt = lmv_locate_target_for_name(lmv, lsm, old, + oldlen, + &op_data->op_fid1, + &op_data->op_mds); + if (IS_ERR(src_tgt)) + return PTR_ERR(src_tgt); + } else { + src_tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(src_tgt)) + return PTR_ERR(src_tgt); + + op_data->op_mds = src_tgt->ltd_idx; + } + + if (op_data->op_mea2) { + struct lmv_stripe_md *lsm = op_data->op_mea2; + const struct lmv_oinfo *oinfo; + + oinfo = lsm_name_to_stripe_info(lsm, new, newlen); + if (IS_ERR(oinfo)) + return PTR_ERR(oinfo); + + op_data->op_fid2 = oinfo->lmo_fid; + } + } if (IS_ERR(src_tgt)) return PTR_ERR(src_tgt); - tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2); - if (IS_ERR(tgt_tgt)) - return PTR_ERR(tgt_tgt); /* * LOOKUP lock on src child (fid3) should also be cancelled for * src_tgt in mdc_rename. @@ -1946,35 +2019,53 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its * own target. */ - rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, + rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx, LCK_EX, MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2); - + if (rc) + return rc; /* - * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt. + * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt. */ - if (rc == 0) { - rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, + if (fid_is_sane(&op_data->op_fid3)) { + struct lmv_tgt_desc *tgt; + + tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(tgt)) + return PTR_ERR(tgt); + + /* Cancel LOOKUP lock on its parent */ + rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx, LCK_EX, MDS_INODELOCK_LOOKUP, - MF_MDC_CANCEL_FID4); + MF_MDC_CANCEL_FID3); + if (rc) + return rc; + + rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx, + LCK_EX, MDS_INODELOCK_FULL, + MF_MDC_CANCEL_FID3); + if (rc) + return rc; } /* * Cancel all the locks on tgt child (fid4). */ - if (rc == 0) - rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx, + if (fid_is_sane(&op_data->op_fid4)) + rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx, LCK_EX, MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID4); - if (rc == 0) - rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, - new, newlen, request); + CDEBUG(D_INODE, DFID":m%d to "DFID"\n", PFID(&op_data->op_fid1), + op_data->op_mds, PFID(&op_data->op_fid2)); + + rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, + new, newlen, request); return rc; } static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, - void *ea, int ealen, void *ea2, int ea2len, + void *ea, size_t ealen, void *ea2, size_t ea2len, struct ptlrpc_request **request, struct md_open_data **mod) { @@ -2021,169 +2112,419 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, return rc; } -/* - * Adjust a set of pages, each page containing an array of lu_dirpages, - * so that each page can be used as a single logical lu_dirpage. +/** + * Get current minimum entry from striped directory * - * A lu_dirpage is laid out as follows, where s = ldp_hash_start, - * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a - * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end - * value is used as a cookie to request the next lu_dirpage in a - * directory listing that spans multiple pages (two in this example): - * ________ - * | | - * .|--------v------- -----. - * |s|e|f|p|ent|ent| ... |ent| - * '--|-------------- -----' Each CFS_PAGE contains a single - * '------. lu_dirpage. - * .---------v------- -----. - * |s|e|f|p|ent| 0 | ... | 0 | - * '----------------- -----' + * This function will search the dir entry, whose hash value is the + * closest(>=) to @hash_offset, from all of sub-stripes, and it is + * only being called for striped directory. * - * However, on hosts where the native VM page size (PAGE_SIZE) is - * larger than LU_PAGE_SIZE, a single host page may contain multiple - * lu_dirpages. After reading the lu_dirpages from the MDS, the - * ldp_hash_end of the first lu_dirpage refers to the one immediately - * after it in the same CFS_PAGE (arrows simplified for brevity, but - * in general e0==s1, e1==s2, etc.): + * \param[in] exp export of LMV + * \param[in] op_data parameters transferred beween client MD stack + * stripe_information will be included in this + * parameter + * \param[in] cb_op ldlm callback being used in enqueue in + * mdc_read_page + * \param[in] hash_offset the hash value, which is used to locate + * minum(closet) dir entry + * \param[in|out] stripe_offset the caller use this to indicate the stripe + * index of last entry, so to avoid hash conflict + * between stripes. It will also be used to + * return the stripe index of current dir entry. + * \param[in|out] entp the minum entry and it also is being used + * to input the last dir entry to resolve the + * hash conflict * - * .-------------------- -----. - * |s0|e0|f0|p|ent|ent| ... |ent| - * |---v---------------- -----| - * |s1|e1|f1|p|ent|ent| ... |ent| - * |---v---------------- -----| Here, each CFS_PAGE contains - * ... multiple lu_dirpages. - * |---v---------------- -----| - * |s'|e'|f'|p|ent|ent| ... |ent| - * '---|---------------- -----' - * v - * .----------------------------. - * | next CFS_PAGE | + * \param[out] ppage the page which holds the minum entry * - * This structure is transformed into a single logical lu_dirpage as follows: + * \retval = 0 get the entry successfully + * negative errno (< 0) does not get the entry + */ +static int lmv_get_min_striped_entry(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + __u64 hash_offset, int *stripe_offset, + struct lu_dirent **entp, + struct page **ppage) +{ + struct lmv_stripe_md *lsm = op_data->op_mea1; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lu_dirent *min_ent = NULL; + struct page *min_page = NULL; + struct lmv_tgt_desc *tgt; + int stripe_count; + int min_idx = 0; + int rc = 0; + int i; + + stripe_count = lsm->lsm_md_stripe_count; + for (i = 0; i < stripe_count; i++) { + __u64 stripe_hash = hash_offset; + struct lu_dirent *ent = NULL; + struct page *page = NULL; + struct lu_dirpage *dp; + + tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL); + if (IS_ERR(tgt)) { + rc = PTR_ERR(tgt); + goto out; + } + + /* + * op_data will be shared by each stripe, so we need + * reset these value for each stripe + */ + op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid; + op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid; + op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root; +next: + rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash, + &page); + if (rc) + goto out; + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent; + ent = lu_dirent_next(ent)) { + /* Skip dummy entry */ + if (!le16_to_cpu(ent->lde_namelen)) + continue; + + if (le64_to_cpu(ent->lde_hash) < hash_offset) + continue; + + if (le64_to_cpu(ent->lde_hash) == hash_offset && + (*entp == ent || i < *stripe_offset)) + continue; + + /* skip . and .. for other stripes */ + if (i && (!strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) || + !strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)))) + continue; + break; + } + + if (!ent) { + stripe_hash = le64_to_cpu(dp->ldp_hash_end); + + kunmap(page); + put_page(page); + page = NULL; + + /* + * reach the end of current stripe, go to next stripe + */ + if (stripe_hash == MDS_DIR_END_OFF) + continue; + else + goto next; + } + + if (min_ent) { + if (le64_to_cpu(min_ent->lde_hash) > + le64_to_cpu(ent->lde_hash)) { + min_ent = ent; + kunmap(min_page); + put_page(min_page); + min_idx = i; + min_page = page; + } else { + kunmap(page); + put_page(page); + page = NULL; + } + } else { + min_ent = ent; + min_page = page; + min_idx = i; + } + } + +out: + if (*ppage) { + kunmap(*ppage); + put_page(*ppage); + } + *stripe_offset = min_idx; + *entp = min_ent; + *ppage = min_page; + return rc; +} + +/** + * Build dir entry page from a striped directory * - * - Replace e0 with e' so the request for the next lu_dirpage gets the page - * labeled 'next CFS_PAGE'. + * This function gets one entry by @offset from a striped directory. It will + * read entries from all of stripes, and choose one closest to the required + * offset(&offset). A few notes + * 1. skip . and .. for non-zero stripes, because there can only have one . + * and .. in a directory. + * 2. op_data will be shared by all of stripes, instead of allocating new + * one, so need to restore before reusing. + * 3. release the entry page if that is not being chosen. * - * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether - * a hash collision with the next page exists. + * \param[in] exp obd export refer to LMV + * \param[in] op_data hold those MD parameters of read_entry + * \param[in] cb_op ldlm callback being used in enqueue in mdc_read_entry + * \param[out] ldp the entry being read + * \param[out] ppage the page holding the entry. Note: because the entry + * will be accessed in upper layer, so we need hold the + * page until the usages of entry is finished, see + * ll_dir_entry_next. * - * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span - * to the first entry of the next lu_dirpage. + * retval =0 if get entry successfully + * <0 cannot get entry */ -#if PAGE_SIZE > LU_PAGE_SIZE -static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs) -{ - int i; +static int lmv_read_striped_page(struct obd_export *exp, + struct md_op_data *op_data, + struct md_callback *cb_op, + __u64 offset, struct page **ppage) +{ + struct inode *master_inode = op_data->op_data; + struct lu_fid master_fid = op_data->op_fid1; + struct obd_device *obd = exp->exp_obd; + __u64 hash_offset = offset; + struct page *min_ent_page = NULL; + struct page *ent_page = NULL; + struct lu_dirent *min_ent = NULL; + struct lu_dirent *last_ent; + struct lu_dirent *ent; + struct lu_dirpage *dp; + size_t left_bytes; + int ent_idx = 0; + void *area; + int rc; - for (i = 0; i < ncfspgs; i++) { - struct lu_dirpage *dp = kmap(pages[i]); - struct lu_dirpage *first = dp; - struct lu_dirent *end_dirent = NULL; - struct lu_dirent *ent; - __u64 hash_end = dp->ldp_hash_end; - __u32 flags = dp->ldp_flags; - - while (--nlupgs > 0) { - ent = lu_dirent_start(dp); - for (end_dirent = ent; ent; - end_dirent = ent, ent = lu_dirent_next(ent)) - ; - - /* Advance dp to next lu_dirpage. */ - dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); - - /* Check if we've reached the end of the CFS_PAGE. */ - if (!((unsigned long)dp & ~PAGE_MASK)) - break; + rc = lmv_check_connect(obd); + if (rc) + return rc; - /* Save the hash and flags of this lu_dirpage. */ - hash_end = dp->ldp_hash_end; - flags = dp->ldp_flags; + /* + * Allocate a page and read entries from all of stripes and fill + * the page by hash order + */ + ent_page = alloc_page(GFP_KERNEL); + if (!ent_page) + return -ENOMEM; - /* Check if lu_dirpage contains no entries. */ - if (!end_dirent) - break; + /* Initialize the entry page */ + dp = kmap(ent_page); + memset(dp, 0, sizeof(*dp)); + dp->ldp_hash_start = cpu_to_le64(offset); + dp->ldp_flags |= LDF_COLLIDE; + + area = dp + 1; + left_bytes = PAGE_SIZE - sizeof(*dp); + ent = area; + last_ent = ent; + do { + __u16 ent_size; + + /* Find the minum entry from all sub-stripes */ + rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset, + &ent_idx, &min_ent, + &min_ent_page); + if (rc) + goto out; - /* Enlarge the end entry lde_reclen from 0 to - * first entry of next lu_dirpage. - */ - LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0); - end_dirent->lde_reclen = - cpu_to_le16((char *)(dp->ldp_entries) - - (char *)end_dirent); + /* + * If it can not get minum entry, it means it already reaches + * the end of this directory + */ + if (!min_ent) { + last_ent->lde_reclen = 0; + hash_offset = MDS_DIR_END_OFF; + goto out; + } + + ent_size = le16_to_cpu(min_ent->lde_reclen); + + /* + * the last entry lde_reclen is 0, but it might not + * the end of this entry of this temporay entry + */ + if (!ent_size) + ent_size = lu_dirent_calc_size( + le16_to_cpu(min_ent->lde_namelen), + le32_to_cpu(min_ent->lde_attrs)); + if (ent_size > left_bytes) { + last_ent->lde_reclen = cpu_to_le16(0); + hash_offset = le64_to_cpu(min_ent->lde_hash); + goto out; } - first->ldp_hash_end = hash_end; - first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); - first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); + memcpy(ent, min_ent, ent_size); + + /* + * Replace . with master FID and Replace .. with the parent FID + * of master object + */ + if (!strncmp(ent->lde_name, ".", + le16_to_cpu(ent->lde_namelen)) && + le16_to_cpu(ent->lde_namelen) == 1) + fid_cpu_to_le(&ent->lde_fid, &master_fid); + else if (!strncmp(ent->lde_name, "..", + le16_to_cpu(ent->lde_namelen)) && + le16_to_cpu(ent->lde_namelen) == 2) + fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3); + + left_bytes -= ent_size; + ent->lde_reclen = cpu_to_le16(ent_size); + last_ent = ent; + ent = (void *)ent + ent_size; + hash_offset = le64_to_cpu(min_ent->lde_hash); + if (hash_offset == MDS_DIR_END_OFF) { + last_ent->lde_reclen = 0; + break; + } + } while (1); +out: + if (min_ent_page) { + kunmap(min_ent_page); + put_page(min_ent_page); + } - kunmap(pages[i]); + if (unlikely(rc)) { + __free_page(ent_page); + ent_page = NULL; + } else { + if (ent == area) + dp->ldp_flags |= LDF_EMPTY; + dp->ldp_flags = cpu_to_le32(dp->ldp_flags); + dp->ldp_hash_end = cpu_to_le64(hash_offset); } - LASSERTF(nlupgs == 0, "left = %d", nlupgs); + + /* + * We do not want to allocate md_op_data during each + * dir entry reading, so op_data will be shared by every stripe, + * then we need to restore it back to original value before + * return to the upper layer + */ + op_data->op_fid1 = master_fid; + op_data->op_fid2 = master_fid; + op_data->op_data = master_inode; + + *ppage = ent_page; + + return rc; } -#else -#define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0) -#endif /* PAGE_SIZE > LU_PAGE_SIZE */ -static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data, - struct page **pages, struct ptlrpc_request **request) +static int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, __u64 offset, + struct page **ppage) { - struct obd_device *obd = exp->exp_obd; - struct lmv_obd *lmv = &obd->u.lmv; - __u64 offset = op_data->op_offset; - int rc; - int ncfspgs; /* pages read in PAGE_SIZE */ - int nlupgs; /* pages read in LU_PAGE_SIZE */ - struct lmv_tgt_desc *tgt; + struct lmv_stripe_md *lsm = op_data->op_mea1; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt; + int rc; rc = lmv_check_connect(obd); if (rc) return rc; - CDEBUG(D_INODE, "READPAGE at %#llx from "DFID"\n", - offset, PFID(&op_data->op_fid1)); + if (unlikely(lsm)) { + rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage); + return rc; + } tgt = lmv_find_target(lmv, &op_data->op_fid1); if (IS_ERR(tgt)) return PTR_ERR(tgt); - rc = md_readpage(tgt->ltd_exp, op_data, pages, request); - if (rc != 0) - return rc; - - ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + PAGE_SIZE - 1) - >> PAGE_SHIFT; - nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; - LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); - LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages); - - CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs, - op_data->op_npages); - - lmv_adjust_dirpages(pages, ncfspgs, nlupgs); + rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage); return rc; } +/** + * Unlink a file/directory + * + * Unlink a file or directory under the parent dir. The unlink request + * usually will be sent to the MDT where the child is located, but if + * the client does not have the child FID then request will be sent to the + * MDT where the parent is located. + * + * If the parent is a striped directory then it also needs to locate which + * stripe the name of the child is located, and replace the parent FID + * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown, + * it will walk through all of sub-stripes until the child is being + * unlinked finally. + * + * \param[in] exp export refer to LMV + * \param[in] op_data different parameters transferred beween client + * MD stacks, name, namelen, FIDs etc. + * op_fid1 is the parent FID, op_fid2 is the child + * FID. + * \param[out] request point to the request of unlink. + * + * retval 0 if succeed + * negative errno if failed. + */ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct obd_device *obd = exp->exp_obd; + struct lmv_stripe_md *lsm = op_data->op_mea1; + struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *parent_tgt = NULL; struct lmv_tgt_desc *tgt = NULL; struct mdt_body *body; + int stripe_index = 0; int rc; rc = lmv_check_connect(obd); if (rc) return rc; -retry: +retry_unlink: + /* For striped dir, we need to locate the parent as well */ + if (lsm) { + struct lmv_tgt_desc *tmp; + + LASSERT(op_data->op_name && op_data->op_namelen); + + tmp = lmv_locate_target_for_name(lmv, lsm, + op_data->op_name, + op_data->op_namelen, + &op_data->op_fid1, + &op_data->op_mds); + + /* + * return -EBADFD means unknown hash type, might + * need try all sub-stripe here + */ + if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD) + return PTR_ERR(tmp); + + /* + * Note: both migrating dir and unknown hash dir need to + * try all of sub-stripes, so we need start search the + * name from stripe 0, but migrating dir is already handled + * inside lmv_locate_target_for_name(), so we only check + * unknown hash type directory here + */ + if (!lmv_is_known_hash_type(lsm->lsm_md_hash_type)) { + struct lmv_oinfo *oinfo; + + oinfo = &lsm->lsm_md_oinfo[stripe_index]; + + op_data->op_fid1 = oinfo->lmo_fid; + op_data->op_mds = oinfo->lmo_mds; + } + } + +try_next_stripe: /* Send unlink requests to the MDT where the child is located */ if (likely(!fid_is_zero(&op_data->op_fid2))) - tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2); + tgt = lmv_find_target(lmv, &op_data->op_fid2); + else if (lsm) + tgt = lmv_get_target(lmv, op_data->op_mds, NULL); else tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); + if (IS_ERR(tgt)) return PTR_ERR(tgt); @@ -2203,29 +2544,57 @@ retry: /* * Cancel FULL locks on child (fid3). */ - rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX, - MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3); + parent_tgt = lmv_find_target(lmv, &op_data->op_fid1); + if (IS_ERR(parent_tgt)) + return PTR_ERR(parent_tgt); + if (parent_tgt != tgt) { + rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx, + LCK_EX, MDS_INODELOCK_LOOKUP, + MF_MDC_CANCEL_FID3); + } + + rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX, + MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3); if (rc != 0) return rc; - CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n", + CDEBUG(D_INODE, "unlink with fid=" DFID "/" DFID " -> mds #%u\n", PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx); rc = md_unlink(tgt->ltd_exp, op_data, request); - if (rc != 0 && rc != -EREMOTE) + if (rc != 0 && rc != -EREMOTE && rc != -ENOENT) return rc; + /* Try next stripe if it is needed. */ + if (rc == -ENOENT && lsm && lmv_need_try_all_stripes(lsm)) { + struct lmv_oinfo *oinfo; + + stripe_index++; + if (stripe_index >= lsm->lsm_md_stripe_count) + return rc; + + oinfo = &lsm->lsm_md_oinfo[stripe_index]; + + op_data->op_fid1 = oinfo->lmo_fid; + op_data->op_mds = oinfo->lmo_mds; + + ptlrpc_req_finished(*request); + *request = NULL; + + goto try_next_stripe; + } + body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY); if (!body) return -EPROTO; /* Not cross-ref case, just get out of here. */ - if (likely(!(body->valid & OBD_MD_MDS))) - return 0; + if (likely(!(body->mbo_valid & OBD_MD_MDS))) + return rc; CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n", - exp->exp_obd->obd_name, PFID(&body->fid1)); + exp->exp_obd->obd_name, PFID(&body->mbo_fid1)); /* This is a remote object, try remote MDT, Note: it may * try more than 1 time here, Considering following case @@ -2247,11 +2616,11 @@ retry: * In theory, it might try unlimited time here, but it should * be very rare case. */ - op_data->op_fid2 = body->fid1; + op_data->op_fid2 = body->mbo_fid1; ptlrpc_req_finished(*request); *request = NULL; - goto retry; + goto retry_unlink; } static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) @@ -2274,6 +2643,22 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) return 0; } +/** + * Get by key a value associated with a LMV device. + * + * Dispatch request to lower-layer devices as needed. + * + * \param[in] env execution environment for this thread + * \param[in] exp export for the LMV device + * \param[in] keylen length of key identifier + * \param[in] key identifier of key to get value for + * \param[in] vallen size of \a val + * \param[out] val pointer to storage location for value + * \param[in] lsm optional striping metadata of object + * + * \retval 0 on success + * \retval negative negated errno on failure + */ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, __u32 keylen, void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm) @@ -2337,6 +2722,22 @@ static int lmv_get_info(const struct lu_env *env, struct obd_export *exp, return -EINVAL; } +/** + * Asynchronously set by key a value associated with a LMV device. + * + * Dispatch request to lower-layer devices as needed. + * + * \param[in] env execution environment for this thread + * \param[in] exp export for the LMV device + * \param[in] keylen length of key identifier + * \param[in] key identifier of key to store value for + * \param[in] vallen size of value to store + * \param[in] val pointer to data to be stored + * \param[in] set optional list of related ptlrpc requests + * + * \retval 0 on success + * \retval negative negated errno on failure + */ static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, u32 keylen, void *key, u32 vallen, void *val, struct ptlrpc_request_set *set) @@ -2354,7 +2755,8 @@ static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, } lmv = &obd->u.lmv; - if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) { + if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) || + KEY_IS(KEY_DEFAULT_EASIZE)) { int i, err = 0; for (i = 0; i < lmv->desc.ld_tgt_count; i++) { @@ -2375,105 +2777,247 @@ static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp, return -EINVAL; } -static int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, - struct lov_stripe_md *lsm) +static int lmv_pack_md_v1(const struct lmv_stripe_md *lsm, + struct lmv_mds_md_v1 *lmm1) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_obd *lmv = &obd->u.lmv; - struct lmv_stripe_md *meap; - struct lmv_stripe_md *lsmp; - int mea_size; - int i; + int cplen; + int i; + + lmm1->lmv_magic = cpu_to_le32(lsm->lsm_md_magic); + lmm1->lmv_stripe_count = cpu_to_le32(lsm->lsm_md_stripe_count); + lmm1->lmv_master_mdt_index = cpu_to_le32(lsm->lsm_md_master_mdt_index); + lmm1->lmv_hash_type = cpu_to_le32(lsm->lsm_md_hash_type); + cplen = strlcpy(lmm1->lmv_pool_name, lsm->lsm_md_pool_name, + sizeof(lmm1->lmv_pool_name)); + if (cplen >= sizeof(lmm1->lmv_pool_name)) + return -E2BIG; + + for (i = 0; i < lsm->lsm_md_stripe_count; i++) + fid_cpu_to_le(&lmm1->lmv_stripe_fids[i], + &lsm->lsm_md_oinfo[i].lmo_fid); + return 0; +} - mea_size = lmv_get_easize(lmv); - if (!lmmp) - return mea_size; +static int +lmv_pack_md(union lmv_mds_md **lmmp, const struct lmv_stripe_md *lsm, + int stripe_count) +{ + int lmm_size = 0, rc = 0; + bool allocated = false; + LASSERT(lmmp); + + /* Free lmm */ if (*lmmp && !lsm) { + int stripe_cnt; + + stripe_cnt = lmv_mds_md_stripe_count_get(*lmmp); + lmm_size = lmv_mds_md_size(stripe_cnt, + le32_to_cpu((*lmmp)->lmv_magic)); + if (!lmm_size) + return -EINVAL; kvfree(*lmmp); *lmmp = NULL; return 0; } + /* Alloc lmm */ + if (!*lmmp && !lsm) { + lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC); + LASSERT(lmm_size > 0); + *lmmp = libcfs_kvzalloc(lmm_size, GFP_NOFS); + if (!*lmmp) + return -ENOMEM; + lmv_mds_md_stripe_count_set(*lmmp, stripe_count); + (*lmmp)->lmv_magic = cpu_to_le32(LMV_MAGIC); + return lmm_size; + } + + /* pack lmm */ + LASSERT(lsm); + lmm_size = lmv_mds_md_size(lsm->lsm_md_stripe_count, + lsm->lsm_md_magic); if (!*lmmp) { - *lmmp = libcfs_kvzalloc(mea_size, GFP_NOFS); + *lmmp = libcfs_kvzalloc(lmm_size, GFP_NOFS); if (!*lmmp) return -ENOMEM; + allocated = true; } - if (!lsm) - return mea_size; + switch (lsm->lsm_md_magic) { + case LMV_MAGIC_V1: + rc = lmv_pack_md_v1(lsm, &(*lmmp)->lmv_md_v1); + break; + default: + rc = -EINVAL; + break; + } - lsmp = (struct lmv_stripe_md *)lsm; - meap = (struct lmv_stripe_md *)*lmmp; + if (rc && allocated) { + kvfree(*lmmp); + *lmmp = NULL; + } - if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR && - lsmp->mea_magic != MEA_MAGIC_ALL_CHARS) - return -EINVAL; + return lmm_size; +} - meap->mea_magic = cpu_to_le32(lsmp->mea_magic); - meap->mea_count = cpu_to_le32(lsmp->mea_count); - meap->mea_master = cpu_to_le32(lsmp->mea_master); +static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm, + const struct lmv_mds_md_v1 *lmm1) +{ + struct lmv_obd *lmv = &exp->exp_obd->u.lmv; + int stripe_count; + int rc = 0; + int cplen; + int i; - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - meap->mea_ids[i] = lsmp->mea_ids[i]; - fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]); + lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic); + lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count); + lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index); + if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE)) + lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN; + else + lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type); + lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version); + cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name, + sizeof(lsm->lsm_md_pool_name)); + + if (cplen >= sizeof(lsm->lsm_md_pool_name)) + return -E2BIG; + + CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d layout_version %d\n", + lsm->lsm_md_stripe_count, lsm->lsm_md_master_mdt_index, + lsm->lsm_md_hash_type, lsm->lsm_md_layout_version); + + stripe_count = le32_to_cpu(lmm1->lmv_stripe_count); + for (i = 0; i < stripe_count; i++) { + fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid, + &lmm1->lmv_stripe_fids[i]); + rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid, + &lsm->lsm_md_oinfo[i].lmo_mds); + if (rc) + return rc; + CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i, + PFID(&lsm->lsm_md_oinfo[i].lmo_fid)); } - return mea_size; + return rc; } -static int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, - struct lov_mds_md *lmm, int lmm_size) +int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp, + const union lmv_mds_md *lmm, int stripe_count) { - struct obd_device *obd = class_exp2obd(exp); - struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp; - struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm; - struct lmv_obd *lmv = &obd->u.lmv; - int mea_size; - int i; - __u32 magic; + struct lmv_stripe_md *lsm; + bool allocated = false; + int lsm_size, rc; - mea_size = lmv_get_easize(lmv); - if (!lsmp) - return mea_size; + LASSERT(lsmp); - if (*lsmp && !lmm) { - kvfree(*tmea); + lsm = *lsmp; + /* Free memmd */ + if (lsm && !lmm) { + int i; + + for (i = 1; i < lsm->lsm_md_stripe_count; i++) { + /* + * For migrating inode, the master stripe and master + * object will be the same, so do not need iput, see + * ll_update_lsm_md + */ + if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION && + !i) && lsm->lsm_md_oinfo[i].lmo_root) + iput(lsm->lsm_md_oinfo[i].lmo_root); + } + + kvfree(lsm); *lsmp = NULL; return 0; } - LASSERT(mea_size == lmm_size); + /* Alloc memmd */ + if (!lsm && !lmm) { + lsm_size = lmv_stripe_md_size(stripe_count); + lsm = libcfs_kvzalloc(lsm_size, GFP_NOFS); + if (!lsm) + return -ENOMEM; + lsm->lsm_md_stripe_count = stripe_count; + *lsmp = lsm; + return 0; + } - *tmea = libcfs_kvzalloc(mea_size, GFP_NOFS); - if (!*tmea) - return -ENOMEM; + if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) + return -EPERM; - if (!lmm) - return mea_size; + /* Unpack memmd */ + if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 && + le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) { + CERROR("%s: invalid lmv magic %x: rc = %d\n", + exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic), + -EIO); + return -EIO; + } - if (mea->mea_magic == MEA_MAGIC_LAST_CHAR || - mea->mea_magic == MEA_MAGIC_ALL_CHARS || - mea->mea_magic == MEA_MAGIC_HASH_SEGMENT) { - magic = le32_to_cpu(mea->mea_magic); - } else { - /* - * Old mea is not handled here. + if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1) + lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm)); + else + /** + * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md, + * stripecount should be 0 then. */ - CERROR("Old not supportable EA is found\n"); - LBUG(); + lsm_size = lmv_stripe_md_size(0); + + if (!lsm) { + lsm = libcfs_kvzalloc(lsm_size, GFP_NOFS); + if (!lsm) + return -ENOMEM; + allocated = true; + *lsmp = lsm; + } + + switch (le32_to_cpu(lmm->lmv_magic)) { + case LMV_MAGIC_V1: + rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1); + break; + default: + CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name, + le32_to_cpu(lmm->lmv_magic)); + rc = -EINVAL; + break; } - (*tmea)->mea_magic = magic; - (*tmea)->mea_count = le32_to_cpu(mea->mea_count); - (*tmea)->mea_master = le32_to_cpu(mea->mea_master); + if (rc && allocated) { + kvfree(lsm); + *lsmp = NULL; + lsm_size = rc; + } + return lsm_size; +} +EXPORT_SYMBOL(lmv_unpack_md); + +static int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, + struct lov_mds_md *lmm, int disk_len) +{ + return lmv_unpack_md(exp, (struct lmv_stripe_md **)lsmp, + (union lmv_mds_md *)lmm, disk_len); +} + +static int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, + struct lov_stripe_md *lsm) +{ + const struct lmv_stripe_md *lmv = (struct lmv_stripe_md *)lsm; + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv_obd = &obd->u.lmv; + int stripe_count; - for (i = 0; i < (*tmea)->mea_count; i++) { - (*tmea)->mea_ids[i] = mea->mea_ids[i]; - fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]); + if (!lmmp) { + if (lsm) + stripe_count = lmv->lsm_md_stripe_count; + else + stripe_count = lmv_obd->desc.ld_tgt_count; + + return lmv_mds_md_size(stripe_count, LMV_MAGIC_V1); } - return mea_size; + + return lmv_pack_md((union lmv_mds_md **)lmmp, lmv, 0); } static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, @@ -2484,7 +3028,7 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, struct lmv_obd *lmv = &obd->u.lmv; int rc = 0; int err; - int i; + u32 i; LASSERT(fid); @@ -2502,8 +3046,9 @@ static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid, return rc; } -static int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data, - __u64 *bits) +static int lmv_set_lock_data(struct obd_export *exp, + const struct lustre_handle *lockh, + void *data, __u64 *bits) { struct lmv_obd *lmv = &exp->exp_obd->u.lmv; struct lmv_tgt_desc *tgt = lmv->tgts[0]; @@ -2526,24 +3071,32 @@ static enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; enum ldlm_mode rc; - int i; + int tgt; + u32 i; CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid)); /* - * With CMD every object can have two locks in different namespaces: - * lookup lock in space of mds storing direntry and update/open lock in - * space of mds storing inode. Thus we check all targets, not only that - * one fid was created in. + * With DNE every object can have two locks in different namespaces: + * lookup lock in space of MDT storing direntry and update/open lock in + * space of MDT storing inode. Try the MDT that the FID maps to first, + * since this can be easily found, and only try others if that fails. */ - for (i = 0; i < lmv->desc.ld_tgt_count; i++) { - struct lmv_tgt_desc *tgt = lmv->tgts[i]; + for (i = 0, tgt = lmv_find_target_index(lmv, fid); + i < lmv->desc.ld_tgt_count; + i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) { + if (tgt < 0) { + CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n", + obd->obd_name, PFID(fid), tgt); + tgt = 0; + } - if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) + if (!lmv->tgts[tgt] || !lmv->tgts[tgt]->ltd_exp || + !lmv->tgts[tgt]->ltd_active) continue; - rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode, - lockh); + rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid, + type, policy, mode, lockh); if (rc) return rc; } @@ -2571,8 +3124,10 @@ static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md) struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt = lmv->tgts[0]; - if (md->mea) - obd_free_memmd(exp, (void *)&md->mea); + if (md->lmv) { + lmv_free_memmd(md->lmv); + md->lmv = NULL; + } if (!tgt || !tgt->ltd_exp) return -EINVAL; return md_free_lustre_md(tgt->ltd_exp, md); @@ -2621,7 +3176,7 @@ static int lmv_intent_getattr_async(struct obd_export *exp, if (rc) return rc; - tgt = lmv_find_target(lmv, &op_data->op_fid1); + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); if (IS_ERR(tgt)) return PTR_ERR(tgt); @@ -2649,6 +3204,23 @@ static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, return rc; } +static int +lmv_get_fid_from_lsm(struct obd_export *exp, + const struct lmv_stripe_md *lsm, + const char *name, int namelen, struct lu_fid *fid) +{ + const struct lmv_oinfo *oinfo; + + LASSERT(lsm); + oinfo = lsm_name_to_stripe_info(lsm, name, namelen); + if (IS_ERR(oinfo)) + return PTR_ERR(oinfo); + + *fid = oinfo->lmo_fid; + + return 0; +} + /** * For lmv, only need to send request to master MDT, and the master MDT will * process with other slave MDTs. The only exception is Q_GETOQUOTA for which @@ -2660,8 +3232,9 @@ static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp, struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt = lmv->tgts[0]; - int rc = 0, i; + int rc = 0; __u64 curspace = 0, curinodes = 0; + u32 i; if (!tgt || !tgt->ltd_exp || !tgt->ltd_active || !lmv->desc.ld_tgt_count) { @@ -2704,7 +3277,8 @@ static int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp, struct obd_device *obd = class_exp2obd(exp); struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt; - int i, rc = 0; + int rc = 0; + u32 i; for (i = 0; i < lmv->desc.ld_tgt_count; i++) { int err; @@ -2723,6 +3297,47 @@ static int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp, return rc; } +static int lmv_merge_attr(struct obd_export *exp, + const struct lmv_stripe_md *lsm, + struct cl_attr *attr, + ldlm_blocking_callback cb_blocking) +{ + int rc, i; + + rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0); + if (rc < 0) + return rc; + + for (i = 0; i < lsm->lsm_md_stripe_count; i++) { + struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root; + + CDEBUG(D_INFO, ""DFID" size %llu, blocks %llu nlink %u, atime %lu ctime %lu, mtime %lu.\n", + PFID(&lsm->lsm_md_oinfo[i].lmo_fid), + i_size_read(inode), (unsigned long long)inode->i_blocks, + inode->i_nlink, LTIME_S(inode->i_atime), + LTIME_S(inode->i_ctime), LTIME_S(inode->i_mtime)); + + /* for slave stripe, it needs to subtract nlink for . and .. */ + if (i) + attr->cat_nlink += inode->i_nlink - 2; + else + attr->cat_nlink = inode->i_nlink; + + attr->cat_size += i_size_read(inode); + attr->cat_blocks += inode->i_blocks; + + if (attr->cat_atime < LTIME_S(inode->i_atime)) + attr->cat_atime = LTIME_S(inode->i_atime); + + if (attr->cat_ctime < LTIME_S(inode->i_ctime)) + attr->cat_ctime = LTIME_S(inode->i_ctime); + + if (attr->cat_mtime < LTIME_S(inode->i_mtime)) + attr->cat_mtime = LTIME_S(inode->i_mtime); + } + return 0; +} + static struct obd_ops lmv_obd_ops = { .owner = THIS_MODULE, .setup = lmv_setup, @@ -2746,7 +3361,6 @@ static struct obd_ops lmv_obd_ops = { static struct md_ops lmv_md_ops = { .getstatus = lmv_getstatus, .null_inode = lmv_null_inode, - .find_cbdata = lmv_find_cbdata, .close = lmv_close, .create = lmv_create, .done_writing = lmv_done_writing, @@ -2760,7 +3374,7 @@ static struct md_ops lmv_md_ops = { .setattr = lmv_setattr, .setxattr = lmv_setxattr, .sync = lmv_sync, - .readpage = lmv_readpage, + .read_page = lmv_read_page, .unlink = lmv_unlink, .init_ea_size = lmv_init_ea_size, .cancel_unused = lmv_cancel_unused, @@ -2768,10 +3382,12 @@ static struct md_ops lmv_md_ops = { .lock_match = lmv_lock_match, .get_lustre_md = lmv_get_lustre_md, .free_lustre_md = lmv_free_lustre_md, + .merge_attr = lmv_merge_attr, .set_open_replay_data = lmv_set_open_replay_data, .clear_open_replay_data = lmv_clear_open_replay_data, .intent_getattr_async = lmv_intent_getattr_async, - .revalidate_lock = lmv_revalidate_lock + .revalidate_lock = lmv_revalidate_lock, + .get_fid_from_lsm = lmv_get_fid_from_lsm, }; static int __init lmv_init(void) |