Spaces:
Running
on
Zero
Running
on
Zero
| # -*- coding: UTF-8 -*- | |
| '''================================================= | |
| @Project -> File pram -> basicdataset | |
| @IDE PyCharm | |
| @Author [email protected] | |
| @Date 29/01/2024 14:27 | |
| ==================================================''' | |
| import torchvision.transforms.functional as tvf | |
| import torchvision.transforms as tvt | |
| import os.path as osp | |
| import numpy as np | |
| import cv2 | |
| from colmap_utils.read_write_model import qvec2rotmat, read_model | |
| from dataset.utils import normalize_size | |
| class BasicDataset: | |
| def __init__(self, | |
| img_list_fn, | |
| feature_dir, | |
| sfm_path, | |
| seg_fn, | |
| dataset_path, | |
| n_class, | |
| dataset, | |
| nfeatures=1024, | |
| query_p3d_fn=None, | |
| train=True, | |
| with_aug=False, | |
| min_inliers=0, | |
| max_inliers=4096, | |
| random_inliers=False, | |
| jitter_params=None, | |
| scale_params=None, | |
| image_dim=1, | |
| pre_load=False, | |
| query_info_path=None, | |
| sc_mean_scale_fn=None, | |
| ): | |
| self.n_class = n_class | |
| self.train = train | |
| self.min_inliers = min_inliers | |
| self.max_inliers = max_inliers if max_inliers < nfeatures else nfeatures | |
| self.random_inliers = random_inliers | |
| self.dataset_path = dataset_path | |
| self.with_aug = with_aug | |
| self.dataset = dataset | |
| self.jitter_params = jitter_params | |
| self.scale_params = scale_params | |
| self.image_dim = image_dim | |
| self.image_prefix = '' | |
| train_transforms = [] | |
| if self.with_aug: | |
| train_transforms.append(tvt.ColorJitter( | |
| brightness=jitter_params['brightness'], | |
| contrast=jitter_params['contrast'], | |
| saturation=jitter_params['saturation'], | |
| hue=jitter_params['hue'])) | |
| if jitter_params['blur'] > 0: | |
| train_transforms.append(tvt.GaussianBlur(kernel_size=int(jitter_params['blur']))) | |
| self.train_transforms = tvt.Compose(train_transforms) | |
| # only for testing of query images | |
| if not self.train: | |
| data = np.load(query_p3d_fn, allow_pickle=True)[()] | |
| self.img_p3d = data | |
| else: | |
| self.img_p3d = {} | |
| self.img_fns = [] | |
| with open(img_list_fn, 'r') as f: | |
| lines = f.readlines() | |
| for l in lines: | |
| l = l.strip() | |
| self.img_fns.append(l) | |
| print('Load {} images from {} for {}...'.format(len(self.img_fns), dataset, 'training' if train else 'eval')) | |
| self.feats = {} | |
| if train: | |
| self.cameras, self.images, point3Ds = read_model(path=sfm_path, ext='.bin') | |
| self.name_to_id = {image.name: i for i, image in self.images.items()} | |
| data = np.load(seg_fn, allow_pickle=True)[()] | |
| p3d_id = data['id'] | |
| seg_id = data['label'] | |
| self.p3d_seg = {p3d_id[i]: seg_id[i] for i in range(p3d_id.shape[0])} | |
| self.p3d_xyzs = {} | |
| for pid in self.p3d_seg.keys(): | |
| p3d = point3Ds[pid] | |
| self.p3d_xyzs[pid] = p3d.xyz | |
| with open(sc_mean_scale_fn, 'r') as f: | |
| lines = f.readlines() | |
| for l in lines: | |
| l = l.strip().split() | |
| self.mean_xyz = np.array([float(v) for v in l[:3]]) | |
| self.scale_xyz = np.array([float(v) for v in l[3:]]) | |
| if not train: | |
| self.query_info = self.read_query_info(path=query_info_path) | |
| self.nfeatures = nfeatures | |
| self.feature_dir = feature_dir | |
| print('Pre loaded {} feats, mean xyz {}, scale xyz {}'.format(len(self.feats.keys()), self.mean_xyz, | |
| self.scale_xyz)) | |
| def normalize_p3ds(self, p3ds): | |
| mean_p3ds = np.ceil(np.mean(p3ds, axis=0)) | |
| p3ds_ = p3ds - mean_p3ds | |
| dx = np.max(abs(p3ds_[:, 0])) | |
| dy = np.max(abs(p3ds_[:, 1])) | |
| dz = np.max(abs(p3ds_[:, 2])) | |
| scale_p3ds = np.ceil(np.array([dx, dy, dz], dtype=float).reshape(3, )) | |
| scale_p3ds[scale_p3ds < 1] = 1 | |
| scale_p3ds[scale_p3ds == 0] = 1 | |
| return mean_p3ds, scale_p3ds | |
| def read_query_info(self, path): | |
| query_info = {} | |
| with open(path, 'r') as f: | |
| lines = f.readlines() | |
| for l in lines: | |
| l = l.strip().split() | |
| image_name = l[0] | |
| cam_model = l[1] | |
| h, w = int(l[2]), int(l[3]) | |
| params = np.array([float(v) for v in l[4:]]) | |
| query_info[image_name] = { | |
| 'width': w, | |
| 'height': h, | |
| 'model': cam_model, | |
| 'params': params, | |
| } | |
| return query_info | |
| def extract_intrinsic_extrinsic_params(self, image_id): | |
| cam = self.cameras[self.images[image_id].camera_id] | |
| params = cam.params | |
| model = cam.model | |
| if model in ("SIMPLE_PINHOLE", "SIMPLE_RADIAL", "RADIAL"): | |
| fx = fy = params[0] | |
| cx = params[1] | |
| cy = params[2] | |
| elif model in ("PINHOLE", "OPENCV", "OPENCV_FISHEYE", "FULL_OPENCV"): | |
| fx = params[0] | |
| fy = params[1] | |
| cx = params[2] | |
| cy = params[3] | |
| else: | |
| raise Exception("Camera model not supported") | |
| K = np.eye(3, dtype=float) | |
| K[0, 0] = fx | |
| K[1, 1] = fy | |
| K[0, 2] = cx | |
| K[1, 2] = cy | |
| qvec = self.images[image_id].qvec | |
| tvec = self.images[image_id].tvec | |
| R = qvec2rotmat(qvec=qvec) | |
| P = np.eye(4, dtype=float) | |
| P[:3, :3] = R | |
| P[:3, 3] = tvec.reshape(3, ) | |
| return {'K': K, 'P': P} | |
| def get_item_train(self, idx): | |
| img_name = self.img_fns[idx] | |
| if img_name in self.feats.keys(): | |
| feat_data = self.feats[img_name] | |
| else: | |
| feat_data = np.load(osp.join(self.feature_dir, img_name.replace('/', '+') + '.npy'), allow_pickle=True)[()] | |
| # descs = feat_data['descriptors'] # [N, D] | |
| scores = feat_data['scores'] # [N, 1] | |
| kpts = feat_data['keypoints'] # [N, 2] | |
| image_size = feat_data['image_size'] | |
| nfeat = kpts.shape[0] | |
| # print(img_name, self.name_to_id[img_name]) | |
| p3d_ids = self.images[self.name_to_id[img_name]].point3D_ids | |
| p3d_xyzs = np.zeros(shape=(nfeat, 3), dtype=float) | |
| seg_ids = np.zeros(shape=(nfeat,), dtype=int) # + self.n_class - 1 | |
| for i in range(nfeat): | |
| p3d = p3d_ids[i] | |
| if p3d in self.p3d_seg.keys(): | |
| seg_ids[i] = self.p3d_seg[p3d] + 1 # 0 for invalid | |
| if seg_ids[i] == -1: | |
| seg_ids[i] = 0 | |
| if p3d in self.p3d_xyzs.keys(): | |
| p3d_xyzs[i] = self.p3d_xyzs[p3d] | |
| seg_ids = np.array(seg_ids).reshape(-1, ) | |
| n_inliers = np.sum(seg_ids > 0) | |
| n_outliers = np.sum(seg_ids == 0) | |
| inlier_ids = np.where(seg_ids > 0)[0] | |
| outlier_ids = np.where(seg_ids == 0)[0] | |
| if n_inliers <= self.min_inliers: | |
| sel_inliers = n_inliers | |
| sel_outliers = self.nfeatures - sel_inliers | |
| out_ids = np.arange(n_outliers) | |
| np.random.shuffle(out_ids) | |
| sel_ids = np.hstack([inlier_ids, outlier_ids[out_ids[:self.nfeatures - n_inliers]]]) | |
| else: | |
| sel_inliers = np.random.randint(self.min_inliers, self.max_inliers) | |
| if sel_inliers > n_inliers: | |
| sel_inliers = n_inliers | |
| if sel_inliers + n_outliers < self.nfeatures: | |
| sel_inliers = self.nfeatures - n_outliers | |
| sel_outliers = self.nfeatures - sel_inliers | |
| in_ids = np.arange(n_inliers) | |
| np.random.shuffle(in_ids) | |
| sel_inlier_ids = inlier_ids[in_ids[:sel_inliers]] | |
| out_ids = np.arange(n_outliers) | |
| np.random.shuffle(out_ids) | |
| sel_outlier_ids = outlier_ids[out_ids[:sel_outliers]] | |
| sel_ids = np.hstack([sel_inlier_ids, sel_outlier_ids]) | |
| # sel_descs = descs[sel_ids] | |
| sel_scores = scores[sel_ids] | |
| sel_kpts = kpts[sel_ids] | |
| sel_seg_ids = seg_ids[sel_ids] | |
| sel_xyzs = p3d_xyzs[sel_ids] | |
| shuffle_ids = np.arange(sel_ids.shape[0]) | |
| np.random.shuffle(shuffle_ids) | |
| # sel_descs = sel_descs[shuffle_ids] | |
| sel_scores = sel_scores[shuffle_ids] | |
| sel_kpts = sel_kpts[shuffle_ids] | |
| sel_seg_ids = sel_seg_ids[shuffle_ids] | |
| sel_xyzs = sel_xyzs[shuffle_ids] | |
| if sel_kpts.shape[0] < self.nfeatures: | |
| # print(sel_descs.shape, sel_kpts.shape, sel_scores.shape, sel_seg_ids.shape, sel_xyzs.shape) | |
| valid_sel_ids = np.array([v for v in range(sel_kpts.shape[0]) if sel_seg_ids[v] > 0], dtype=int) | |
| # ref_sel_id = np.random.choice(valid_sel_ids, size=1)[0] | |
| if valid_sel_ids.shape[0] == 0: | |
| valid_sel_ids = np.array([v for v in range(sel_kpts.shape[0])], dtype=int) | |
| random_n = self.nfeatures - sel_kpts.shape[0] | |
| random_scores = np.random.random((random_n,)) | |
| random_kpts, random_seg_ids, random_xyzs = self.random_points_from_reference( | |
| n=random_n, | |
| ref_kpts=sel_kpts[valid_sel_ids], | |
| ref_segs=sel_seg_ids[valid_sel_ids], | |
| ref_xyzs=sel_xyzs[valid_sel_ids], | |
| radius=5, | |
| ) | |
| # sel_descs = np.vstack([sel_descs, random_descs]) | |
| sel_scores = np.hstack([sel_scores, random_scores]) | |
| sel_kpts = np.vstack([sel_kpts, random_kpts]) | |
| sel_seg_ids = np.hstack([sel_seg_ids, random_seg_ids]) | |
| sel_xyzs = np.vstack([sel_xyzs, random_xyzs]) | |
| gt_n_seg = np.zeros(shape=(self.n_class,), dtype=int) | |
| gt_cls = np.zeros(shape=(self.n_class,), dtype=int) | |
| gt_cls_dist = np.zeros(shape=(self.n_class,), dtype=float) | |
| uids = np.unique(sel_seg_ids).tolist() | |
| for uid in uids: | |
| if uid == 0: | |
| continue | |
| gt_cls[uid] = 1 | |
| gt_n_seg[uid] = np.sum(sel_seg_ids == uid) | |
| gt_cls_dist[uid] = np.sum(seg_ids == uid) / np.sum(seg_ids > 0) # [valid_id / total_valid_id] | |
| param_out = self.extract_intrinsic_extrinsic_params(image_id=self.name_to_id[img_name]) | |
| img = self.read_image(image_name=img_name) | |
| image_size = img.shape[:2] | |
| if self.image_dim == 1: | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| else: | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| if self.with_aug: | |
| nh = img.shape[0] | |
| nw = img.shape[1] | |
| if self.scale_params is not None: | |
| do_scale = np.random.random() | |
| if do_scale <= 0.25: | |
| p = np.random.randint(0, 11) | |
| s = self.scale_params[0] + (self.scale_params[1] - self.scale_params[0]) / 10 * p | |
| nh = int(img.shape[0] * s) | |
| nw = int(img.shape[1] * s) | |
| sh = nh / img.shape[0] | |
| sw = nw / img.shape[1] | |
| sel_kpts[:, 0] = sel_kpts[:, 0] * sw | |
| sel_kpts[:, 1] = sel_kpts[:, 1] * sh | |
| img = cv2.resize(img, dsize=(nw, nh)) | |
| brightness = np.random.uniform(-self.jitter_params['brightness'], self.jitter_params['brightness']) * 255 | |
| contrast = 1 + np.random.uniform(-self.jitter_params['contrast'], self.jitter_params['contrast']) | |
| img = cv2.addWeighted(img, contrast, img, 0, brightness) | |
| img = np.clip(img, a_min=0, a_max=255) | |
| if self.image_dim == 1: | |
| img = img[..., None] | |
| img = img.astype(float) / 255. | |
| image_size = np.array([nh, nw], dtype=int) | |
| else: | |
| if self.image_dim == 1: | |
| img = img[..., None].astype(float) / 255. | |
| output = { | |
| # 'descriptors': sel_descs, # may not be used | |
| 'scores': sel_scores, | |
| 'keypoints': sel_kpts, | |
| 'norm_keypoints': normalize_size(x=sel_kpts, size=image_size), | |
| 'image': [img], | |
| 'gt_seg': sel_seg_ids, | |
| 'gt_cls': gt_cls, | |
| 'gt_cls_dist': gt_cls_dist, | |
| 'gt_n_seg': gt_n_seg, | |
| 'file_name': img_name, | |
| 'prefix_name': self.image_prefix, | |
| # 'mean_xyz': self.mean_xyz, | |
| # 'scale_xyz': self.scale_xyz, | |
| # 'gt_sc': sel_xyzs, | |
| # 'gt_norm_sc': (sel_xyzs - self.mean_xyz) / self.scale_xyz, | |
| 'K': param_out['K'], | |
| 'gt_P': param_out['P'] | |
| } | |
| return output | |
| def get_item_test(self, idx): | |
| # evaluation of recognition only | |
| img_name = self.img_fns[idx] | |
| feat_data = np.load(osp.join(self.feature_dir, img_name.replace('/', '+') + '.npy'), allow_pickle=True)[()] | |
| descs = feat_data['descriptors'] # [N, D] | |
| scores = feat_data['scores'] # [N, 1] | |
| kpts = feat_data['keypoints'] # [N, 2] | |
| image_size = feat_data['image_size'] | |
| nfeat = descs.shape[0] | |
| if img_name in self.img_p3d.keys(): | |
| p3d_ids = self.img_p3d[img_name] | |
| p3d_xyzs = np.zeros(shape=(nfeat, 3), dtype=float) | |
| seg_ids = np.zeros(shape=(nfeat,), dtype=int) # attention! by default invalid!!! | |
| for i in range(nfeat): | |
| p3d = p3d_ids[i] | |
| if p3d in self.p3d_seg.keys(): | |
| seg_ids[i] = self.p3d_seg[p3d] + 1 | |
| if seg_ids[i] == -1: | |
| seg_ids[i] = 0 # 0 for in valid | |
| if p3d in self.p3d_xyzs.keys(): | |
| p3d_xyzs[i] = self.p3d_xyzs[p3d] | |
| seg_ids = np.array(seg_ids).reshape(-1, ) | |
| if self.nfeatures > 0: | |
| sorted_ids = np.argsort(scores)[::-1][:self.nfeatures] # large to small | |
| descs = descs[sorted_ids] | |
| scores = scores[sorted_ids] | |
| kpts = kpts[sorted_ids] | |
| p3d_xyzs = p3d_xyzs[sorted_ids] | |
| seg_ids = seg_ids[sorted_ids] | |
| gt_n_seg = np.zeros(shape=(self.n_class,), dtype=int) | |
| gt_cls = np.zeros(shape=(self.n_class,), dtype=int) | |
| gt_cls_dist = np.zeros(shape=(self.n_class,), dtype=float) | |
| uids = np.unique(seg_ids).tolist() | |
| for uid in uids: | |
| if uid == 0: | |
| continue | |
| gt_cls[uid] = 1 | |
| gt_n_seg[uid] = np.sum(seg_ids == uid) | |
| gt_cls_dist[uid] = np.sum(seg_ids == uid) / np.sum( | |
| seg_ids < self.n_class - 1) # [valid_id / total_valid_id] | |
| gt_cls[0] = 0 | |
| img = self.read_image(image_name=img_name) | |
| if self.image_dim == 1: | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| img = img[..., None].astype(float) / 255. | |
| else: | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(float) / 255. | |
| return { | |
| 'descriptors': descs, | |
| 'scores': scores, | |
| 'keypoints': kpts, | |
| 'image_size': image_size, | |
| 'norm_keypoints': normalize_size(x=kpts, size=image_size), | |
| 'gt_seg': seg_ids, | |
| 'gt_cls': gt_cls, | |
| 'gt_cls_dist': gt_cls_dist, | |
| 'gt_n_seg': gt_n_seg, | |
| 'file_name': img_name, | |
| 'prefix_name': self.image_prefix, | |
| 'image': [img], | |
| 'mean_xyz': self.mean_xyz, | |
| 'scale_xyz': self.scale_xyz, | |
| 'gt_sc': p3d_xyzs, | |
| 'gt_norm_sc': (p3d_xyzs - self.mean_xyz) / self.scale_xyz | |
| } | |
| def __getitem__(self, idx): | |
| if self.train: | |
| return self.get_item_train(idx=idx) | |
| else: | |
| return self.get_item_test(idx=idx) | |
| def __len__(self): | |
| return len(self.img_fns) | |
| def read_image(self, image_name): | |
| return cv2.imread(osp.join(self.dataset_path, image_name)) | |
| def jitter_augmentation(self, img, params): | |
| brightness, contrast, saturation, hue = params | |
| p = np.random.randint(0, 20) / 20 | |
| b = brightness[0] + (brightness[1] - brightness[0]) / 20 * p | |
| img = tvf.adjust_brightness(img=img, brightness_factor=b) | |
| p = np.random.randint(0, 20) / 20 | |
| c = contrast[0] + (contrast[1] - contrast[0]) / 20 * p | |
| img = tvf.adjust_contrast(img=img, contrast_factor=c) | |
| p = np.random.randint(0, 20) / 20 | |
| s = saturation[0] + (saturation[1] - saturation[0]) / 20 * p | |
| img = tvf.adjust_saturation(img=img, saturation_factor=s) | |
| p = np.random.randint(0, 20) / 20 | |
| h = hue[0] + (hue[1] - hue[0]) / 20 * p | |
| img = tvf.adjust_hue(img=img, hue_factor=h) | |
| return img | |
| def random_points(self, n, d, h, w): | |
| desc = np.random.random((n, d)) | |
| desc = desc / np.linalg.norm(desc, ord=2, axis=1)[..., None] | |
| xs = np.random.randint(0, w - 1, size=(n, 1)) | |
| ys = np.random.randint(0, h - 1, size=(n, 1)) | |
| kpts = np.hstack([xs, ys]) | |
| return desc, kpts | |
| def random_points_from_reference(self, n, ref_kpts, ref_segs, ref_xyzs, radius=5): | |
| n_ref = ref_kpts.shape[0] | |
| if n_ref < n: | |
| ref_ids = np.random.choice([i for i in range(n_ref)], size=n).tolist() | |
| else: | |
| ref_ids = [i for i in range(n)] | |
| new_xs = [] | |
| new_ys = [] | |
| # new_descs = [] | |
| new_segs = [] | |
| new_xyzs = [] | |
| for i in ref_ids: | |
| nx = np.random.randint(-radius, radius) + ref_kpts[i, 0] | |
| ny = np.random.randint(-radius, radius) + ref_kpts[i, 1] | |
| new_xs.append(nx) | |
| new_ys.append(ny) | |
| # new_descs.append(ref_descs[i]) | |
| new_segs.append(ref_segs[i]) | |
| new_xyzs.append(ref_xyzs[i]) | |
| new_xs = np.array(new_xs).reshape(n, 1) | |
| new_ys = np.array(new_ys).reshape(n, 1) | |
| new_segs = np.array(new_segs).reshape(n, ) | |
| new_kpts = np.hstack([new_xs, new_ys]) | |
| # new_descs = np.array(new_descs).reshape(n, -1) | |
| new_xyzs = np.array(new_xyzs) | |
| return new_kpts, new_segs, new_xyzs | |