import mmcv
import numpy as np
from mmdet.datasets.builder import PIPELINES
from mmdet.datasets.pipelines import LoadAnnotations
@PIPELINES.register_module()
class LoadMultiViewImageFromFiles(object):
"""Load multi channel images from a list of separate channel files.
Expects results['img_filename'] to be a list of filenames.
Args:
to_float32 (bool): Whether to convert the img to float32.
Defaults to False.
color_type (str): Color type of the file. Defaults to 'unchanged'.
"""
def __init__(self, to_float32=False, color_type='unchanged'):
self.to_float32 = to_float32
self.color_type = color_type
def __call__(self, results):
"""Call function to load multi-view image from files.
Args:
results (dict): Result dict containing multi-view image filenames.
Returns:
dict: The result dict containing the multi-view image data. \
Added keys and values are described below.
- filename (str): Multi-view image filenames.
- img (np.ndarray): Multi-view image arrays.
- img_shape (tuple[int]): Shape of multi-view image arrays.
- ori_shape (tuple[int]): Shape of original image arrays.
- pad_shape (tuple[int]): Shape of padded image arrays.
- scale_factor (float): Scale factor.
- img_norm_cfg (dict): Normalization configuration of images.
"""
filename = results['img_filename']
img = np.stack(
[mmcv.imread(name, self.color_type) for name in filename], axis=-1)
if self.to_float32:
img = img.astype(np.float32)
results['filename'] = filename
results['img'] = img
results['img_shape'] = img.shape
results['ori_shape'] = img.shape
# Set initial values for default meta_keys
results['pad_shape'] = img.shape
results['scale_factor'] = 1.0
num_channels = 1 if len(img.shape) < 3 else img.shape[2]
results['img_norm_cfg'] = dict(
mean=np.zeros(num_channels, dtype=np.float32),
std=np.ones(num_channels, dtype=np.float32),
to_rgb=False)
return results
def __repr__(self):
"""str: Return a string that describes the module."""
return "{} (to_float32={}, color_type='{}')".format(
self.__class__.__name__, self.to_float32, self.color_type)
[docs]@PIPELINES.register_module()
class LoadPointsFromMultiSweeps(object):
"""Load points from multiple sweeps.
This is usually used for nuScenes dataset to utilize previous sweeps.
Args:
sweeps_num (int): number of sweeps. Defaults to 10.
load_dim (int): dimension number of the loaded points. Defaults to 5.
file_client_args (dict): Config dict of file clients, refer to
https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py
for more details. Defaults to dict(backend='disk').
"""
def __init__(self,
sweeps_num=10,
load_dim=5,
file_client_args=dict(backend='disk')):
self.load_dim = load_dim
self.sweeps_num = sweeps_num
self.file_client_args = file_client_args.copy()
self.file_client = None
def _load_points(self, pts_filename):
"""Private function to load point clouds data.
Args:
pts_filename (str): Filename of point clouds data.
Returns:
np.ndarray: An array containing point clouds data.
"""
if self.file_client is None:
self.file_client = mmcv.FileClient(**self.file_client_args)
try:
pts_bytes = self.file_client.get(pts_filename)
points = np.frombuffer(pts_bytes, dtype=np.float32)
except ConnectionError:
mmcv.check_file_exist(pts_filename)
if pts_filename.endswith('.npy'):
points = np.load(pts_filename)
else:
points = np.fromfile(pts_filename, dtype=np.float32)
return points
def __call__(self, results):
"""Call function to load multi-sweep point clouds from files.
Args:
results (dict): Result dict containing multi-sweep point cloud \
filenames.
Returns:
dict: The result dict containing the multi-sweep points data. \
Added key and value are described below.
- points (np.ndarray): Multi-sweep point cloud arrays.
"""
points = results['points']
points[:, 3] /= 255
points[:, 4] = 0
sweep_points_list = [points]
ts = results['timestamp']
for idx, sweep in enumerate(results['sweeps']):
if idx >= self.sweeps_num:
break
points_sweep = self._load_points(sweep['data_path'])
points_sweep = np.copy(points_sweep).reshape(-1, self.load_dim)
sweep_ts = sweep['timestamp'] / 1e6
points_sweep[:, 3] /= 255
points_sweep[:, :3] = points_sweep[:, :3] @ sweep[
'sensor2lidar_rotation'].T
points_sweep[:, :3] += sweep['sensor2lidar_translation']
points_sweep[:, 4] = ts - sweep_ts
sweep_points_list.append(points_sweep)
points = np.concatenate(sweep_points_list, axis=0)[:, [0, 1, 2, 4]]
results['points'] = points
return results
def __repr__(self):
"""str: Return a string that describes the module."""
return f'{self.__class__.__name__}(sweeps_num={self.sweeps_num})'
@PIPELINES.register_module()
class PointSegClassMapping(object):
"""Map original semantic class to valid category ids.
Map valid classes as 0~len(valid_cat_ids)-1 and
others as len(valid_cat_ids).
Args:
valid_cat_ids (tuple[int]): A tuple of valid category.
"""
def __init__(self, valid_cat_ids):
self.valid_cat_ids = valid_cat_ids
def __call__(self, results):
"""Call function to map original semantic class to valid category ids.
Args:
results (dict): Result dict containing point semantic masks.
Returns:
dict: The result dict containing the mapped category ids. \
Updated key and value are described below.
- pts_semantic_mask (np.ndarray): Mapped semantic masks.
"""
assert 'pts_semantic_mask' in results
pts_semantic_mask = results['pts_semantic_mask']
neg_cls = len(self.valid_cat_ids)
for i in range(pts_semantic_mask.shape[0]):
if pts_semantic_mask[i] in self.valid_cat_ids:
converted_id = self.valid_cat_ids.index(pts_semantic_mask[i])
pts_semantic_mask[i] = converted_id
else:
pts_semantic_mask[i] = neg_cls
results['pts_semantic_mask'] = pts_semantic_mask
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += '(valid_cat_ids={})'.format(self.valid_cat_ids)
return repr_str
[docs]@PIPELINES.register_module()
class NormalizePointsColor(object):
"""Normalize color of points.
Args:
color_mean (list[float]): Mean color of the point cloud.
"""
def __init__(self, color_mean):
self.color_mean = color_mean
def __call__(self, results):
"""Call function to normalize color of points.
Args:
results (dict): Result dict containing point clouds data.
Returns:
dict: The result dict containing the normalized points. \
Updated key and value are described below.
- points (np.ndarray): Points after color normalization.
"""
points = results['points']
assert points.shape[1] >= 6,\
f'Expect points have channel >=6, got {points.shape[1]}'
points[:, 3:6] = points[:, 3:6] - np.array(self.color_mean) / 256.0
results['points'] = points
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += '(color_mean={})'.format(self.color_mean)
return repr_str
[docs]@PIPELINES.register_module()
class LoadPointsFromFile(object):
"""Load Points From File.
Load sunrgbd and scannet points from file.
Args:
load_dim (int): The dimension of the loaded points.
Defaults to 6.
use_dim (list[int]): Which dimensions of the points to be used.
Defaults to [0, 1, 2]. For KITTI dataset, set use_dim=4
or use_dim=[0, 1, 2, 3] to use the intensity dimension.
shift_height (bool): Whether to use shifted height. Defaults to False.
file_client_args (dict): Config dict of file clients, refer to
https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py
for more details. Defaults to dict(backend='disk').
"""
def __init__(self,
load_dim=6,
use_dim=[0, 1, 2],
shift_height=False,
file_client_args=dict(backend='disk')):
self.shift_height = shift_height
if isinstance(use_dim, int):
use_dim = list(range(use_dim))
assert max(use_dim) < load_dim, \
f'Expect all used dimensions < {load_dim}, got {use_dim}'
self.load_dim = load_dim
self.use_dim = use_dim
self.file_client_args = file_client_args.copy()
self.file_client = None
def _load_points(self, pts_filename):
"""Private function to load point clouds data.
Args:
pts_filename (str): Filename of point clouds data.
Returns:
np.ndarray: An array containing point clouds data.
"""
if self.file_client is None:
self.file_client = mmcv.FileClient(**self.file_client_args)
try:
pts_bytes = self.file_client.get(pts_filename)
points = np.frombuffer(pts_bytes, dtype=np.float32)
except ConnectionError:
mmcv.check_file_exist(pts_filename)
if pts_filename.endswith('.npy'):
points = np.load(pts_filename)
else:
points = np.fromfile(pts_filename, dtype=np.float32)
return points
def __call__(self, results):
"""Call function to load points data from file.
Args:
results (dict): Result dict containing point clouds data.
Returns:
dict: The result dict containing the point clouds data. \
Added key and value are described below.
- points (np.ndarray): Point clouds data.
"""
pts_filename = results['pts_filename']
points = self._load_points(pts_filename)
points = points.reshape(-1, self.load_dim)
points = points[:, self.use_dim]
if self.shift_height:
floor_height = np.percentile(points[:, 2], 0.99)
height = points[:, 2] - floor_height
points = np.concatenate([points, np.expand_dims(height, 1)], 1)
results['points'] = points
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__ + '('
repr_str += 'shift_height={}, '.format(self.shift_height)
repr_str += 'file_client_args={}), '.format(self.file_client_args)
repr_str += 'load_dim={}, '.format(self.load_dim)
repr_str += 'use_dim={})'.format(self.use_dim)
return repr_str
@PIPELINES.register_module()
class LoadAnnotations3D(LoadAnnotations):
"""Load Annotations3D.
Load instance mask and semantic mask of points and
encapsulate the items into related fields.
Args:
with_bbox_3d (bool, optional): Whether to load 3D boxes.
Defaults to True.
with_label_3d (bool, optional): Whether to load 3D labels.
Defaults to True.
with_mask_3d (bool, optional): Whether to load 3D instance masks.
for points. Defaults to False.
with_seg_3d (bool, optional): Whether to load 3D semantic masks.
for points. Defaults to False.
with_bbox (bool, optional): Whether to load 2D boxes.
Defaults to False.
with_label (bool, optional): Whether to load 2D labels.
Defaults to False.
with_mask (bool, optional): Whether to load 2D instance masks.
Defaults to False.
with_seg (bool, optional): Whether to load 2D semantic masks.
Defaults to False.
poly2mask (bool, optional): Whether to convert polygon annotations
to bitmasks. Defaults to True.
file_client_args (dict): Config dict of file clients, refer to
https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py
for more details.
"""
def __init__(self,
with_bbox_3d=True,
with_label_3d=True,
with_mask_3d=False,
with_seg_3d=False,
with_bbox=False,
with_label=False,
with_mask=False,
with_seg=False,
poly2mask=True,
file_client_args=dict(backend='disk')):
super().__init__(
with_bbox,
with_label,
with_mask,
with_seg,
poly2mask,
file_client_args=file_client_args)
self.with_bbox_3d = with_bbox_3d
self.with_label_3d = with_label_3d
self.with_mask_3d = with_mask_3d
self.with_seg_3d = with_seg_3d
def _load_bboxes_3d(self, results):
"""Private function to load 3D bounding box annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded 3D bounding box annotations.
"""
results['gt_bboxes_3d'] = results['ann_info']['gt_bboxes_3d']
results['bbox3d_fields'].append('gt_bboxes_3d')
return results
def _load_labels_3d(self, results):
"""Private function to load label annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded label annotations.
"""
results['gt_labels_3d'] = results['ann_info']['gt_labels_3d']
return results
def _load_masks_3d(self, results):
"""Private function to load 3D mask annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded 3D mask annotations.
"""
pts_instance_mask_path = results['ann_info']['pts_instance_mask_path']
if self.file_client is None:
self.file_client = mmcv.FileClient(**self.file_client_args)
try:
mask_bytes = self.file_client.get(pts_instance_mask_path)
pts_instance_mask = np.frombuffer(mask_bytes, dtype=np.int)
except ConnectionError:
mmcv.check_file_exist(pts_instance_mask_path)
pts_instance_mask = np.fromfile(
pts_instance_mask_path, dtype=np.long)
results['pts_instance_mask'] = pts_instance_mask
results['pts_mask_fields'].append('pts_instance_mask')
return results
def _load_semantic_seg_3d(self, results):
"""Private function to load 3D semantic segmentation annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing the semantic segmentation annotations.
"""
pts_semantic_mask_path = results['ann_info']['pts_semantic_mask_path']
if self.file_client is None:
self.file_client = mmcv.FileClient(**self.file_client_args)
try:
mask_bytes = self.file_client.get(pts_semantic_mask_path)
# add .copy() to fix read-only bug
pts_semantic_mask = np.frombuffer(mask_bytes, dtype=np.int).copy()
except ConnectionError:
mmcv.check_file_exist(pts_semantic_mask_path)
pts_semantic_mask = np.fromfile(
pts_semantic_mask_path, dtype=np.long)
results['pts_semantic_mask'] = pts_semantic_mask
results['pts_seg_fields'].append('pts_semantic_mask')
return results
def __call__(self, results):
"""Call function to load multiple types annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded 3D bounding box, label, mask and
semantic segmentation annotations.
"""
results = super().__call__(results)
if self.with_bbox_3d:
results = self._load_bboxes_3d(results)
if results is None:
return None
if self.with_label_3d:
results = self._load_labels_3d(results)
if self.with_mask_3d:
results = self._load_masks_3d(results)
if self.with_seg_3d:
results = self._load_semantic_seg_3d(results)
return results
def __repr__(self):
"""str: Return a string that describes the module."""
indent_str = ' '
repr_str = self.__class__.__name__ + '(\n'
repr_str += f'{indent_str}with_bbox_3d={self.with_bbox_3d}, '
repr_str += f'{indent_str}with_label_3d={self.with_label_3d}, '
repr_str += f'{indent_str}with_mask_3d={self.with_mask_3d}, '
repr_str += f'{indent_str}with_seg_3d={self.with_seg_3d}, '
repr_str += f'{indent_str}with_bbox={self.with_bbox}, '
repr_str += f'{indent_str}with_label={self.with_label}, '
repr_str += f'{indent_str}with_mask={self.with_mask}, '
repr_str += f'{indent_str}with_seg={self.with_seg}, '
repr_str += f'{indent_str}poly2mask={self.poly2mask})'
return repr_str