Xinyu ChenVera Lee
Published © Apache-2.0

Point-Clouds based 3D-Object-Detection on PYNQ-DPU

With PYNQ -DPU overlay on KV260, we made 3D object detection on LiDAR point clouds more efficient than ever!

IntermediateFull instructions provided20 hours1,756

Things used in this project

Hardware components

Kria KV260 Vision AI Starter Kit
AMD Kria KV260 Vision AI Starter Kit
×1

Software apps and online services

PYNQ Framework
AMD PYNQ Framework
AMD-Xilinx Vitis-AI 1.4

Story

Read more

Schematics

precompiled xmodel

to be deployed on DPU

pretrained model

model with only 10 epoches

Code

demo_2_sides_dpu.py

Python
used for demo
import sys
import os
import warnings

warnings.filterwarnings("ignore", category=UserWarning)

import cv2
import torch
import numpy as np

src_dir = os.path.dirname(os.path.realpath(__file__))
while not src_dir.endswith("sfa"):
    src_dir = os.path.dirname(src_dir)
if src_dir not in sys.path:
    sys.path.append(src_dir)

from data_process.demo_dataset import Demo_KittiDataset
from models.model_utils import create_model
from utils.evaluation_utils import draw_predictions, convert_det_to_real_values
import config.kitti_config as cnf
from data_process.transformation import lidar_to_camera_box
from utils.visualization_utils import show_rgb_image_with_boxes
from data_process.kitti_data_utils import Calibration
from utils.demo_utils import parse_demo_configs, download_and_unzip, write_credit
from utils.misc import make_folder, time_synchronized
from utils.evaluation_utils import decode, post_processing
from utils.torch_utils import _sigmoid


from pynq_dpu import DpuOverlay


def do_detect(dpu, shapeIn, image, input_data, output_data, configs, bevmap, is_front):
    
    if not is_front:
        bevmap = torch.flip(bevmap, [1, 2])
    input_bev_maps = bevmap.unsqueeze(0).to("cpu", non_blocking=True).float()
    
   
    
    input_bev_maps = input_bev_maps.permute(0, 2, 3, 1)
    
    image[0,...] = input_bev_maps[0,...] #.reshape(shapeIn[1:])

    #print("shapeIn[1:]", shapeIn[1:])
    #print("input_bev_maps.shape", input_bev_maps.shape)
    #print("input_bev_maps.reshape", input_bev_maps.reshape(shapeIn[1:]).shape)
    
    t1 = time_synchronized() 
    
    job_id = dpu.execute_async(input_data, output_data)
    
    dpu.wait(job_id)

    t2 = time_synchronized() 

    fps = 1 / (t2 - t1)
    
    #print(output_data[0])
    
    outputs0 = torch.tensor(output_data[0])
    outputs1 = torch.tensor(output_data[1])
    outputs2 = torch.tensor(output_data[2])
    outputs3 = torch.tensor(output_data[3])
    outputs4 = torch.tensor(output_data[4])
    
    outputs0 = outputs0.permute(0, 3, 1, 2)
    outputs1 = outputs1.permute(0, 3, 1, 2)
    outputs2 = outputs2.permute(0, 3, 1, 2)
    outputs3 = outputs3.permute(0, 3, 1, 2)
    outputs4 = outputs4.permute(0, 3, 1, 2)

    outputs0 = _sigmoid(outputs0)
    outputs1 = _sigmoid(outputs1)
    
    detections = decode(
                        outputs0,
                        outputs1,
                        outputs2,
                        outputs3,
                        outputs4, K=configs.K)
        
    detections = detections.cpu().numpy().astype(np.float32)
    detections = post_processing(detections, configs.num_classes, configs.down_ratio, configs.peak_thresh)
    t2 = time_synchronized()
    # Inference speed

    return detections[0], bevmap, fps



if __name__ == '__main__':
    
    configs = parse_demo_configs()

    # Try to download the dataset for demonstration
    server_url = 'https://s3.eu-central-1.amazonaws.com/avg-kitti/raw_data'
    download_url = '{}/{}/{}.zip'.format(server_url, configs.foldername[:-5], configs.foldername)
    download_and_unzip(configs.dataset_dir, download_url)

    out_cap = None
    demo_dataset = Demo_KittiDataset(configs)
    
    
    overlay = DpuOverlay("dpu.bit")
    overlay.load_model("./CNN_zcu102.xmodel")
    dpu = overlay.runner
    
    inputTensors = dpu.get_input_tensors()
    outputTensors = dpu.get_output_tensors()

    shapeIn = tuple(inputTensors[0].dims)
    
    #print("shapein", shapeIn)
    
    outputSize = int(outputTensors[0].get_data_size() / shapeIn[0])

    shapeOut = tuple(outputTensors[0].dims)
    shapeOut1 = tuple(outputTensors[1].dims)
    shapeOut2 = tuple(outputTensors[2].dims)
    shapeOut3 = tuple(outputTensors[3].dims)
    shapeOut4 = tuple(outputTensors[4].dims)

    #print(shapeOut,shapeOut1,shapeOut2,shapeOut3,shapeOut4)
    
    output_data = [np.empty(shapeOut, dtype=np.float32, order="C"),
                  np.empty(shapeOut1, dtype=np.float32, order="C"),
                  np.empty(shapeOut2, dtype=np.float32, order="C"),
                  np.empty(shapeOut3, dtype=np.float32, order="C"),
                  np.empty(shapeOut4, dtype=np.float32, order="C")]
    
    input_data = [np.empty(shapeIn, dtype=np.float32, order="C")]
    image = input_data[0]
    
    for sample_idx in range(len(demo_dataset)):
    #for sample_idx in range(10):  
        print(sample_idx)
        
        metadatas, front_bevmap, back_bevmap, img_rgb = demo_dataset.load_bevmap_front_vs_back(sample_idx)

        front_detections, front_bevmap, fps = do_detect(dpu, shapeIn, image, input_data, output_data, configs, front_bevmap, is_front=True)
        back_detections, back_bevmap, _ = do_detect(dpu, shapeIn, image, input_data, output_data, configs, back_bevmap, is_front=False)

        # Draw prediction in the image
        front_bevmap = (front_bevmap.permute(1, 2, 0).numpy() * 255).astype(np.uint8)
        front_bevmap = cv2.resize(front_bevmap, (cnf.BEV_WIDTH, cnf.BEV_HEIGHT))
        front_bevmap = draw_predictions(front_bevmap, front_detections, configs.num_classes)
        # Rotate the front_bevmap
        front_bevmap = cv2.rotate(front_bevmap, cv2.ROTATE_90_COUNTERCLOCKWISE)

        # Draw prediction in the image
        back_bevmap = (back_bevmap.permute(1, 2, 0).numpy() * 255).astype(np.uint8)
        back_bevmap = cv2.resize(back_bevmap, (cnf.BEV_WIDTH, cnf.BEV_HEIGHT))
        back_bevmap = draw_predictions(back_bevmap, back_detections, configs.num_classes)
        # Rotate the back_bevmap
        back_bevmap = cv2.rotate(back_bevmap, cv2.ROTATE_90_CLOCKWISE)

        # merge front and back bevmap
        full_bev = np.concatenate((back_bevmap, front_bevmap), axis=1)

        img_path = metadatas['img_path'][0]
        img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
        calib = Calibration(configs.calib_path)
        kitti_dets = convert_det_to_real_values(front_detections)
        if len(kitti_dets) > 0:
            kitti_dets[:, 1:] = lidar_to_camera_box(kitti_dets[:, 1:], calib.V2C, calib.R0, calib.P2)
            img_bgr = show_rgb_image_with_boxes(img_bgr, kitti_dets, calib)
        img_bgr = cv2.resize(img_bgr, (cnf.BEV_WIDTH * 2, 375))

        out_img = np.concatenate((img_bgr, full_bev), axis=0)
        write_credit(out_img, (50, 410), text_author='3D Object Detection on KV260', org_fps=(900, 410), fps=fps*2)

        if out_cap is None:
            out_cap_h, out_cap_w = out_img.shape[:2]
            fourcc = cv2.VideoWriter_fourcc(*'MJPG')
            out_path = os.path.join(configs.results_dir, '{}_both_2_sides_final.avi'.format(configs.foldername))
            print('Create video writer at {}'.format(out_path))
            out_cap = cv2.VideoWriter(out_path, fourcc, 30, (out_cap_w, out_cap_h))

        out_cap.write(out_img)

    if out_cap:
        out_cap.release()

quantize.py

Python
used for quantization
'''
 Copyright 2020 Xilinx Inc.
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
     http://www.apache.org/licenses/LICENSE-2.0
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
'''



import os
import sys
import argparse
import random
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from pytorch_nndct.apis import torch_quantizer, dump_xmodel

#from common import *

import time
import numpy as np
import sys
import random
import os
import warnings

warnings.filterwarnings("ignore", category=UserWarning)

from easydict import EasyDict as edict
import cv2
import torch
import numpy as np

import torch
from torch.utils.tensorboard import SummaryWriter
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.utils.data.distributed
from tqdm import tqdm

src_dir = os.path.dirname(os.path.realpath(__file__))
while not src_dir.endswith("sfa"):
    src_dir = os.path.dirname(src_dir)
if src_dir not in sys.path:
    sys.path.append(src_dir)

from data_process.kitti_dataloader import create_test_dataloader
from data_process.kitti_dataloader import create_train_dataloader, create_val_dataloader
from models.model_utils import create_model, make_data_parallel, get_num_parameters
from utils.train_utils import create_optimizer, create_lr_scheduler, get_saved_state, save_checkpoint
from utils.torch_utils import reduce_tensor, to_python_float
from utils.misc import AverageMeter, ProgressMeter
from utils.logger import Logger
from config.train_config import parse_train_configs 
from losses.losses import Compute_Loss
from utils.misc import make_folder, time_synchronized
from utils.torch_utils import _sigmoid
from utils.evaluation_utils import decode, post_processing, draw_predictions, convert_det_to_real_values
import config.kitti_config as cnf
from data_process.transformation import lidar_to_camera_box
from utils.visualization_utils import merge_rgb_to_bev, show_rgb_image_with_boxes
from data_process.kitti_data_utils import Calibration



import math

def sigmoid(x):
    sig = 1 / (1 + math.exp(-x))
    return sig

DIVIDER = '-----------------------------------------'




def parse_test_configs():
    parser = argparse.ArgumentParser(description='Testing config for the Implementation')
    parser.add_argument('--saved_fn', type=str, default='resnet_18', metavar='FN',
                        help='The name using for saving logs, models,...')
    parser.add_argument('-a', '--arch', type=str, default='resnet_18', metavar='ARCH',
                        help='The name of the model architecture')
    parser.add_argument('--pretrained_path', type=str,
                        default='./Model_resnet_18_epoch_10.pth', metavar='PATH',
                        help='the path of the pretrained checkpoint')
    parser.add_argument('--K', type=int, default=50,
                        help='the number of top K') 
    parser.add_argument('--no_cuda', action='store_true',
                        help='If true, cuda is not used.')
    parser.add_argument('--gpu_idx', default=0, type=int,
                        help='GPU index to use.')
    parser.add_argument('--num_samples', type=int, default=None,
                        help='Take a subset of the dataset to run and debug')
    parser.add_argument('--num_workers', type=int, default=1,
                        help='Number of threads for loading data')
    parser.add_argument('--batch_size', type=int, default=1,
                        help='mini-batch size (default: 4)')
    parser.add_argument('--peak_thresh', type=float, default=0.2)
    parser.add_argument('--save_test_output', action='store_true',
                        help='If true, the output image of the testing phase will be saved')
    parser.add_argument('--output_format', type=str, default='image', metavar='PATH',
                        help='the type of the test output (support image or video)')
    parser.add_argument('--output_video_fn', type=str, default='out_fpn_resnet_18', metavar='PATH',
                        help='the video filename if the output format is video')
    parser.add_argument('--output-width', type=int, default=608,
                        help='the width of showing output, the height maybe vary')

    configs = edict(vars(parser.parse_args()))
    configs.pin_memory = True
    configs.distributed = False  # For testing on 1 GPU only

    configs.input_size = (608, 608)
    configs.hm_size = (152, 152)
    configs.down_ratio = 4
    configs.max_objects = 50

    configs.imagenet_pretrained = False
    configs.head_conv = 64
    configs.num_classes = 3
    configs.num_center_offset = 2
    configs.num_z = 1
    configs.num_dim = 3
    configs.num_direction = 2  # sin, cos

    configs.heads = {
        'hm_cen': configs.num_classes,
        'cen_offset': configs.num_center_offset,
        'direction': configs.num_direction,
        'z_coor': configs.num_z,
        'dim': configs.num_dim
    }

    configs.num_input_features = 4

    ####################################################################
    ##############Dataset, Checkpoints, and results dir configs#########
    ####################################################################
    configs.root_dir = '../'
    configs.dataset_dir = os.path.join(configs.root_dir, 'dataset', 'kitti')

    configs.results_dir = os.path.join(configs.root_dir, 'results', configs.saved_fn)
    make_folder(configs.results_dir)

    return configs




def test(model,device,configs):
    '''
    test the model
    '''
    model = model.to(device=configs.device)
    out_cap = None
    model.eval()

    test_dataloader = create_test_dataloader(configs)
    
    with torch.no_grad():
        for batch_idx, batch_data in enumerate(test_dataloader):
            metadatas, bev_maps, img_rgbs = batch_data
            input_bev_maps = bev_maps.to(configs.device, non_blocking=True).float()
            
            t1 = time_synchronized()
            
            outputs = model(input_bev_maps)
            
            #print(outputs)
            #print(outputs[0].shape)
            #tmp = torch.tensor(outputs[0])

            #tmp = _sigmoid(outputs)
            #outputs[0] = tmp[0] #_sigmoid(torch.stack(outputs[0]))
            
            #_sigmoid(torch.Tensor(outputs[0]))
            #outputs[1] = tmp[1] #_sigmoid(outputs[1])
            
            # detections size (batch_size, K, 10)

            detections = decode(outputs[0], outputs[1], outputs[2], outputs[3],
                                outputs[4], K=configs.K)
            detections = detections.cpu().numpy().astype(np.float32)
            detections = post_processing(detections, configs.num_classes, configs.down_ratio, configs.peak_thresh)
            t2 = time_synchronized()

            detections = detections[0]  # only first batch
            # Draw prediction in the image
            bev_map = (bev_maps.squeeze().permute(1, 2, 0).numpy() * 255).astype(np.uint8)
            bev_map = cv2.resize(bev_map, (cnf.BEV_WIDTH, cnf.BEV_HEIGHT))
            bev_map = draw_predictions(bev_map, detections.copy(), configs.num_classes)

            # Rotate the bev_map
            bev_map = cv2.rotate(bev_map, cv2.ROTATE_180)

            img_path = metadatas['img_path'][0]
            img_rgb = img_rgbs[0].numpy()
            img_rgb = cv2.resize(img_rgb, (img_rgb.shape[1], img_rgb.shape[0]))
            img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
            calib = Calibration(img_path.replace(".png", ".txt").replace("image_2", "calib"))
            kitti_dets = convert_det_to_real_values(detections)
            if len(kitti_dets) > 0:
                kitti_dets[:, 1:] = lidar_to_camera_box(kitti_dets[:, 1:], calib.V2C, calib.R0, calib.P2)
                img_bgr = show_rgb_image_with_boxes(img_bgr, kitti_dets, calib)

            out_img = merge_rgb_to_bev(img_bgr, bev_map, output_width=configs.output_width)

            print('\tDone testing the {}th sample, time: {:.1f}ms, speed {:.2f}FPS'.format(batch_idx, (t2 - t1) * 1000,
                                                                                           1 / (t2 - t1)))
            if configs.output_format == 'image':
                img_fn = os.path.basename(metadatas['img_path'][0])[:-4]
                cv2.imwrite(os.path.join(configs.results_dir, '{}.jpg'.format(img_fn)), out_img)
                print(configs.results_dir)
                
                print(batch_idx)
                if(batch_idx == 30):
                  return

            elif configs.output_format == 'video':
                if out_cap is None:
                    out_cap_h, out_cap_w = out_img.shape[:2]
                    fourcc = cv2.VideoWriter_fourcc(*'MJPG')
                    out_cap = cv2.VideoWriter(
                        os.path.join(configs.results_dir, '{}.avi'.format(configs.output_video_fn)),
                        fourcc, 30, (out_cap_w, out_cap_h))

                out_cap.write(out_img)
            else:
                raise TypeError
            
            print(DIVIDER)
            
            #cv2.imshow('test-img', out_img)
            #print('\n[INFO] Press n to see the next sample >>> Press Esc to quit...\n')
            #if cv2.waitKey(0) & 0xFF == 27:
            #    break
    if out_cap:
        out_cap.release()
    cv2.destroyAllWindows()



def quantize(build_dir,quant_mode,batchsize,configs):

  dset_dir = build_dir + '/dataset'
  float_model = build_dir + '/float_model'
  quant_model = build_dir + '/quant_model'

  # use GPU if available   
  if (torch.cuda.device_count() > 0):
    print('You have',torch.cuda.device_count(),'CUDA devices available')
    for i in range(torch.cuda.device_count()):
      print(' Device',str(i),': ',torch.cuda.get_device_name(i))
    print('Selecting device 0..')
    device = torch.device('cuda:0')
  else:
    print('No CUDA devices available..selecting CPU')
    device = torch.device('cpu')

  configs.device = device
  # load trained model
  #model = CNN().to(device)
  
  model = create_model(configs)

  # load weight from a checkpoint
  if configs.pretrained_path is not None:
      assert os.path.isfile(configs.pretrained_path), "=> no checkpoint found at '{}'".format(configs.pretrained_path)
      model.load_state_dict(torch.load(configs.pretrained_path, map_location='cpu'))
      #if logger is not None:
      print('loaded pretrained model at {}'.format(configs.pretrained_path))

  # force to merge BN with CONV for better quantization accuracy
  optimize = 1

  # override batchsize if in test mode
  if (quant_mode=='test'):
    batchsize = 1
  
  rand_in = torch.randn([batchsize, 3, 608, 608])
  #rand_in = torch.randn([batchsize, 3, 152, 152])

  quantizer = torch_quantizer(quant_mode, model, (rand_in), output_dir=quant_model)  
  quantized_model = quantizer.quant_model


  # evaluate 
  test(quantized_model, device, configs)

  # export config
  if quant_mode == 'calib':
    quantizer.export_quant_config()
  if quant_mode == 'test':
    quantizer.export_xmodel(deploy_check=False, output_dir=quant_model)

  return





def run_main():

  # construct the argument parser and parse the arguments
  ap = argparse.ArgumentParser()
  ap.add_argument('-d',  '--build_dir',  type=str, default='build',    help='Path to build folder. Default is build')
  ap.add_argument('-q',  '--quant_mode', type=str, default='test',    choices=['calib','test'], help='Quantization mode (calib or test). Default is calib')
  ap.add_argument('-b',  '--batchsize',  type=int, default=100,        help='Testing batchsize - must be an integer. Default is 100')
  args = ap.parse_args()

  print('\n'+DIVIDER)
  print('PyTorch version : ',torch.__version__)
  print(sys.version)
  print(DIVIDER)
  print(' Command line options:')
  print ('--build_dir    : ',args.build_dir)
  print ('--quant_mode   : ',args.quant_mode)
  print ('--batchsize    : ',args.batchsize)
  print(DIVIDER)

  configs = parse_test_configs()
  
  quantize(args.build_dir,args.quant_mode,args.batchsize,configs)

  return

if __name__ == '__main__':
    run_main()

demo_front_dpu.py

Python
used for demo
"""
# -*- coding: utf-8 -*-
-----------------------------------------------------------------------------------
# Author: Nguyen Mau Dung
# DoC: 2020.08.17
# email: nguyenmaudung93.kstn@gmail.com
-----------------------------------------------------------------------------------
# Description: Demonstration script for the front view only
"""

import argparse
import sys
import os
import time
import warnings
import zipfile

warnings.filterwarnings("ignore", category=UserWarning)

import cv2
import torch
import numpy as np

src_dir = os.path.dirname(os.path.realpath(__file__))
while not src_dir.endswith("sfa"):
    src_dir = os.path.dirname(src_dir)
if src_dir not in sys.path:
    sys.path.append(src_dir)

from data_process.demo_dataset import Demo_KittiDataset
from models.model_utils import create_model
from utils.evaluation_utils import draw_predictions, convert_det_to_real_values
import config.kitti_config as cnf
from data_process.transformation import lidar_to_camera_box
from utils.visualization_utils import merge_rgb_to_bev, show_rgb_image_with_boxes
from data_process.kitti_data_utils import Calibration
from utils.demo_utils import parse_demo_configs, download_and_unzip, write_credit
from utils.misc import make_folder, time_synchronized
from utils.evaluation_utils import decode, post_processing
from utils.torch_utils import _sigmoid


from pynq_dpu import DpuOverlay


def do_detect(dpu, shapeIn, image, input_data, output_data, configs, bevmap, is_front):
    
    if not is_front:
        bevmap = torch.flip(bevmap, [1, 2])
    input_bev_maps = bevmap.unsqueeze(0).to("cpu", non_blocking=True).float()
    
   
    
    input_bev_maps = input_bev_maps.permute(0, 2, 3, 1)
    
    image[0,...] = input_bev_maps[0,...] #.reshape(shapeIn[1:])

    #print("shapeIn[1:]", shapeIn[1:])
    #print("input_bev_maps.shape", input_bev_maps.shape)
    #print("input_bev_maps.reshape", input_bev_maps.reshape(shapeIn[1:]).shape)
    
    t1 = time_synchronized() 
    
    job_id = dpu.execute_async(input_data, output_data)
    
    dpu.wait(job_id)

    t2 = time_synchronized() 

    fps = 1 / (t2 - t1)
    
    #print(output_data[0])
    
    outputs0 = torch.tensor(output_data[0])
    outputs1 = torch.tensor(output_data[1])
    outputs2 = torch.tensor(output_data[2])
    outputs3 = torch.tensor(output_data[3])
    outputs4 = torch.tensor(output_data[4])
    
    outputs0 = outputs0.permute(0, 3, 1, 2)
    outputs1 = outputs1.permute(0, 3, 1, 2)
    outputs2 = outputs2.permute(0, 3, 1, 2)
    outputs3 = outputs3.permute(0, 3, 1, 2)
    outputs4 = outputs4.permute(0, 3, 1, 2)

    outputs0 = _sigmoid(outputs0)
    outputs1 = _sigmoid(outputs1)
    
    detections = decode(
                        outputs0,
                        outputs1,
                        outputs2,
                        outputs3,
                        outputs4, K=configs.K)
        
    detections = detections.cpu().numpy().astype(np.float32)
    detections = post_processing(detections, configs.num_classes, configs.down_ratio, configs.peak_thresh)
    t2 = time_synchronized()
    # Inference speed

    return detections[0], bevmap, fps





if __name__ == '__main__':
    configs = parse_demo_configs()

    # Try to download the dataset for demonstration
    server_url = 'https://s3.eu-central-1.amazonaws.com/avg-kitti/raw_data'
    download_url = '{}/{}/{}.zip'.format(server_url, configs.foldername[:-5], configs.foldername)
    download_and_unzip(configs.dataset_dir, download_url)

    out_cap = None
    demo_dataset = Demo_KittiDataset(configs)
    
    
    overlay = DpuOverlay("dpu.bit")
    overlay.load_model("./CNN_zcu102.xmodel")
    dpu = overlay.runner
    
    inputTensors = dpu.get_input_tensors()
    outputTensors = dpu.get_output_tensors()

    shapeIn = tuple(inputTensors[0].dims)
    
    #print("shapein", shapeIn)
    
    outputSize = int(outputTensors[0].get_data_size() / shapeIn[0])

    shapeOut = tuple(outputTensors[0].dims)
    shapeOut1 = tuple(outputTensors[1].dims)
    shapeOut2 = tuple(outputTensors[2].dims)
    shapeOut3 = tuple(outputTensors[3].dims)
    shapeOut4 = tuple(outputTensors[4].dims)

    #print(shapeOut,shapeOut1,shapeOut2,shapeOut3,shapeOut4)
    
    output_data = [np.empty(shapeOut, dtype=np.float32, order="C"),
                  np.empty(shapeOut1, dtype=np.float32, order="C"),
                  np.empty(shapeOut2, dtype=np.float32, order="C"),
                  np.empty(shapeOut3, dtype=np.float32, order="C"),
                  np.empty(shapeOut4, dtype=np.float32, order="C")]
    
    input_data = [np.empty(shapeIn, dtype=np.float32, order="C")]
    image = input_data[0]
    
    for sample_idx in range(len(demo_dataset)):
        metadatas, bev_map, img_rgb = demo_dataset.load_bevmap_front(sample_idx)
        detections, bev_map, fps = do_detect(dpu, shapeIn, image, input_data, output_data, configs, bev_map, is_front=True)

        # Draw prediction in the image
        bev_map = (bev_map.permute(1, 2, 0).numpy() * 255).astype(np.uint8)
        bev_map = cv2.resize(bev_map, (cnf.BEV_WIDTH, cnf.BEV_HEIGHT))
        bev_map = draw_predictions(bev_map, detections, configs.num_classes)

        # Rotate the bev_map
        bev_map = cv2.rotate(bev_map, cv2.ROTATE_180)

        img_path = metadatas['img_path'][0]
        img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
        calib = Calibration(configs.calib_path)
        kitti_dets = convert_det_to_real_values(detections)
        if len(kitti_dets) > 0:
            kitti_dets[:, 1:] = lidar_to_camera_box(kitti_dets[:, 1:], calib.V2C, calib.R0, calib.P2)
            img_bgr = show_rgb_image_with_boxes(img_bgr, kitti_dets, calib)

        out_img = merge_rgb_to_bev(img_bgr, bev_map, output_width=configs.output_width)
        write_credit(out_img, (80, 210), text_author='3D Object Detection on KV260', org_fps=(80, 250), fps=fps)
        if out_cap is None:
            out_cap_h, out_cap_w = out_img.shape[:2]
            fourcc = cv2.VideoWriter_fourcc(*'MJPG')
            out_path = os.path.join(configs.results_dir, '{}_front.avi'.format(configs.foldername))
            print('Create video writer at {}'.format(out_path))
            out_cap = cv2.VideoWriter(out_path, fourcc, 30, (out_cap_w, out_cap_h))

        out_cap.write(out_img)

    if out_cap:
        out_cap.release()

DPU-Accelerated-3D-Object-Detection-on-Point-Clouds

source code of the implementation

Credits

Xinyu Chen

Xinyu Chen

2 projects • 10 followers
PhD student at National University of Singapore.
Vera Lee

Vera Lee

0 projects • 2 followers

Comments