john wangchu wang
Created March 25, 2022

Real-time traffic identifying, tracking and counting

People and cars can be real-time identified, tracked and counted using python and vvas pipeline on KV260 board.

21
Real-time traffic identifying, tracking and counting

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Kria KV260 Vision AI Starter Kit
AMD Kria KV260 Vision AI Starter Kit
×1
AOCU27N3C 4k monitor
×1

Software apps and online services

Snappy Ubuntu Core
Snappy Ubuntu Core
PYNQ Framework
AMD PYNQ Framework

Story

Read more

Code

Yolov4-tiny tracking vehicle python program

Python
This is a vitis-ai vart python program for vehicle identifying and tracking running on Xilinx FPGA, based on the trained yolov4tiny model.
'''
Copyright 2019 Xilinx Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

import os
import cv2
import colorsys
import random
import numpy as np
import time
import math

from pynq_dpu import DpuOverlay

score_thresh = 0.5
ovr_limit = 0.45
# Starting from Vitis AI 1.3, xmodel files will be used as the models
# instead of elf files.

'''resize image with unchanged aspect ratio using padding'''
def letterbox_image(image, size):
    ih, iw, _ = image.shape
    w, h = size
    scale = min(w/iw, h/ih)
    #print(scale)
    
    nw = int(iw*scale)
    nh = int(ih*scale)
    #print(nw)
    #print(nh)

    image = cv2.resize(image, (nw,nh), interpolation=cv2.INTER_LINEAR)
    new_image = np.ones((h,w,3), np.uint8) * 128
    h_start = (h-nh)//2
    w_start = (w-nw)//2
    new_image[h_start:h_start+nh, w_start:w_start+nw, :] = image
    return new_image

'''image preprocessing'''
def pre_process(image, model_image_size):
    image = image[...,::-1]
    image_h, image_w, _ = image.shape
 
    if model_image_size != (None, None):
        assert model_image_size[0]%32 == 0, 'Multiples of 32 required'
        assert model_image_size[1]%32 == 0, 'Multiples of 32 required'
        boxed_image = letterbox_image(image, tuple(reversed(model_image_size)))
    else:
        new_image_size = (image_w - (image_w % 32), image_h - (image_h % 32))
        boxed_image = letterbox_image(image, new_image_size)
    image_data = np.array(boxed_image, dtype='float32')
    image_data /= 255.
    #image_data = image_data*0.25
    image_data = np.expand_dims(image_data, 0) 	
    return image_data

def preprocess_one_image_fn(image_path, fix_scale, width=416, height=416):
    means = MEANS
    scales = SCALES
    image = cv2.imread(image_path)
    image = cv2.resize(image, (width, height))
    #print(f"image.shape={image.shape}")
    B, G, R = cv2.split(image)
    B = (B - means[0]) * scales[0] * fix_scale
    G = (G - means[1]) * scales[1] * fix_scale
    R = (R - means[2]) * scales[2] * fix_scale
    #print(f"fix_scale={fix_scale}")
    image = cv2.merge([B, G, R])
    #print(f"image = {np.max(image)}")
    #print(f"image = {np.min(image)}")
    image = image.astype(np.int8)
    return image

'''Get model classification information'''	
def get_class(classes_path):
    with open(classes_path) as f:
        class_names = f.readlines()
    class_names = [c.strip() for c in class_names]
    return class_names

'''Get model anchors value'''
def get_anchors(anchors_path):
    with open(anchors_path) as f:
        anchors = f.readline()
    anchors = [float(x) for x in anchors.split(',')]
    return np.array(anchors).reshape(-1, 2)
    
def _get_feats(feats, anchors, num_classes, input_shape):
    num_anchors = len(anchors)
    anchors_tensor = np.reshape(np.array(anchors, dtype=np.float32), [1, 1, 1, num_anchors, 2])
    grid_size = np.shape(feats)[1:3]

    #print(f"feats.shape={feats.shape}")
    nu = num_classes + 5
    predictions = np.reshape(feats, [-1, grid_size[0], grid_size[1], num_anchors, nu])
    #print(f"predictions.shape={predictions.shape}")
    #print(f"predictions.size={predictions.size}")
  
    grid_y = np.tile(np.reshape(np.arange(grid_size[0]), [-1, 1, 1, 1]), [1, grid_size[1], 1, 1])
    grid_x = np.tile(np.reshape(np.arange(grid_size[1]), [1, -1, 1, 1]), [grid_size[0], 1, 1, 1])
    grid = np.concatenate([grid_x, grid_y], axis = -1)
    grid = np.array(grid, dtype=np.float32)

    box_xy = (1/(1+np.exp(-predictions[..., :2])) + grid) / np.array(grid_size[::-1], dtype=np.float32)
    box_wh = np.exp(predictions[..., 2:4]) * anchors_tensor / np.array(input_shape[::-1], dtype=np.float32)
    box_confidence = 1/(1+np.exp(-predictions[..., 4:5]))
    box_class_probs = 1/(1+np.exp(-predictions[..., 5:]))
    return box_xy, box_wh, box_confidence, box_class_probs
	
def correct_boxes(box_xy, box_wh, input_shape, image_shape):
    box_yx = box_xy[..., ::-1]
    box_hw = box_wh[..., ::-1]
    input_shape = np.array(input_shape, dtype = np.float32)
    image_shape = np.array(image_shape, dtype = np.float32)
    new_shape = np.around(image_shape * np.min(input_shape / image_shape))
    offset = (input_shape - new_shape) / 2. / input_shape
    scale = input_shape / new_shape
    box_yx = (box_yx - offset) * scale
    box_hw *= scale

    box_mins = box_yx - (box_hw / 2.)
    box_maxes = box_yx + (box_hw / 2.)
    boxes = np.concatenate([
        box_mins[..., 0:1],
        box_mins[..., 1:2],
        box_maxes[..., 0:1],
        box_maxes[..., 1:2]
    ], axis = -1)
    boxes *= np.concatenate([image_shape, image_shape], axis = -1)
    return boxes
	
def boxes_and_scores(feats, anchors, classes_num, input_shape, image_shape):
    box_xy, box_wh, box_confidence, box_class_probs = _get_feats(feats, anchors, classes_num, input_shape)
    boxes = correct_boxes(box_xy, box_wh, input_shape, image_shape)
    boxes = np.reshape(boxes, [-1, 4])
    box_scores = box_confidence * box_class_probs
    box_scores = np.reshape(box_scores, [-1, classes_num])
    return boxes, box_scores	

'''Draw detection frame'''
def draw_bbox(image, bboxes, classes):
    """
    bboxes: [x_min, y_min, x_max, y_max, probability, cls_id] format coordinates.
    """
    num_classes = len(classes)
    image_h, image_w, _ = image.shape
    hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
    colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
    colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
    #print(f"colors = {len(colors)}")
    random.seed(0)
    random.shuffle(colors)
    random.seed(None)

    for i, bbox in enumerate(bboxes):
        coor = np.array(bbox[:4], dtype=np.int32)
        fontScale = 0.5
        score = bbox[4]
        class_ind = int(bbox[5])
        if class_ind == 6:    # only show cars
           #print(f"class_ind = {class_ind}")
           bbox_color = colors[class_ind]
           bbox_thick = int(0.6 * (image_h + image_w) / 600)
           c1, c2 = (coor[0], coor[1]), (coor[2], coor[3])
           cv2.rectangle(image, c1, c2, bbox_color, bbox_thick)
           print(f"(c1, c2) = {c1, c2}")
    return image
	
def nms_boxes(boxes, scores):
    """Suppress non-maximal boxes.

    # Arguments
        boxes: ndarray, boxes of objects.
        scores: ndarray, scores of objects.

    # Returns
        keep: ndarray, index of effective boxes.
    """
    global ovr
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]
    
    areas = (x2-x1+1)*(y2-y1+1)
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)

        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w1 = np.maximum(0.0, xx2 - xx1 + 1)
        h1 = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w1 * h1

        ovr = inter / (areas[i] + areas[order[1:]] - inter)
        inds = np.where(ovr <= ovr_limit)[0]  # threshold
        order = order[inds + 1]

    return keep	
  
'''Model post-processing'''
def eval(yolo_outputs, image_shape, max_boxes = 20):
    global score_thresh 
    #score_thresh = 0.4
    class_names = get_class(classes_path)
    anchors     = get_anchors(anchors_path)
#    anchor_mask = [[6, 7, 8], [3, 4, 5], [0, 1, 2]]
    anchor_mask = [[3, 4, 5], [0, 1, 2]]
#    anchor_mask = [[0, 1, 2, 3, 4]]
    boxes = []
    box_scores = []
    
    input_shape = np.shape(yolo_outputs[0])[1 : 3]
    #print(input_shape)
    input_shape = np.array(input_shape)*32
    #print(input_shape)
    #print(f"len(yolo_outputs)={len(yolo_outputs)}")
    for i in range(len(yolo_outputs)):
        _boxes, _box_scores = boxes_and_scores(yolo_outputs[i], anchors[anchor_mask[i]], len(class_names), input_shape, image_shape)
        boxes.append(_boxes)
        box_scores.append(_box_scores)
    boxes = np.concatenate(boxes, axis = 0)
    box_scores = np.concatenate(box_scores, axis = 0)
    
    mask = box_scores >= score_thresh
    boxes_ = []
    scores_ = []
    classes_ = []
    for c in range(len(class_names)):
        class_boxes_np = boxes[mask[:, c]]
        class_box_scores_np = box_scores[:, c]
        class_box_scores_np = class_box_scores_np[mask[:, c]]
        nms_index_np = nms_boxes(class_boxes_np, class_box_scores_np) 
        class_boxes_np = class_boxes_np[nms_index_np]
        class_box_scores_np = class_box_scores_np[nms_index_np]
        classes_np = np.ones_like(class_box_scores_np, dtype = np.int32) * c
        boxes_.append(class_boxes_np)
        scores_.append(class_box_scores_np)
        classes_.append(classes_np)
    boxes_ = np.concatenate(boxes_, axis = 0)
    scores_ = np.concatenate(scores_, axis = 0)
    classes_ = np.concatenate(classes_, axis = 0)
    
    return boxes_, scores_, classes_

count = 0  
center_points_prev = []  
track_objects = {}  
temp_objects = {}
track_id = 0  
miss_objects = {}

def draw_track(image, bboxes, classes):
    """
    bboxes: [x_min, y_min, x_max, y_max, probability, cls_id] format coordinates.
    """
#    coor = np.array(bbox[:4], dtype=np.int32)
#    fontScale = 0.5
#    score = bbox[4]
# for i, bbox in enumerate(bboxes):
#        coor = np.array(bbox[:4], dtype=np.int32)
#        fontScale = 0.5
#        score = bbox[4]
#        class_ind = int(bbox[5])
#        if class_ind == 5:    # only show cars
#           print(f"class_ind = {class_ind}")
#           bbox_color = colors[class_ind]
#           bbox_thick = int(0.6 * (image_h + image_w) / 600)
#           c1, c2 = (coor[0], coor[1]), (coor[2], coor[3])
#           cv2.rectangle(image, c1, c2, bbox_color, bbox_thick)
    center_points_current = [] 
    global count
    global center_points_prev
    global track_objects
    global track_id 
    for i, bbox in enumerate(bboxes):
        class_ind = int(bbox[5])
        if class_ind == 6:    # only show cars
           #coor = np.array(bbox[:4], dtype=np.int32)
           #print(f"coor = {coor}")
           (x1, y1, x2, y2) = bbox[:4]
           #print(f"t(x1, y1, x2, y2) = {x1, y1, x2, y2}")
           (x, y, w, h) = (x1, y1, x2-x1, y2-y1)
           cx, cy = int((x+x+w)/2), int((y+y+h)/2) 
           center_points_current.append((cx,cy))
           #cv2.rectangle(img, (x,y), (x+w,y+h), (255,255,0), 2)
           cv2.rectangle(img, (x1,y1), (x2,y2), (255,255,0), 2)
           #print(f"center_points_current = {len(center_points_current)}")
           #print(f"t(x1, y1, x2, y2) = {x1, y1, x2, y2}")
    print(f"count = {count}")
    if count <= 2:
       # define all track id for the second frame 
       print(f"count1 = {count}")
       print(f"center_points_current = {center_points_current}")
       print(f"center_points_prev = {center_points_prev}")
       for pt1 in center_points_current:
           print(f"pt1 = {pt1}")
           for pt2 in center_points_prev:
               print(f"pt2 = {pt2}")
               distance = math.hypot(pt2[0]-pt1[0], pt2[1]-pt1[1])
               print(f"distance ={distance}")
               if distance < 20:
               
                  track_objects[track_id] = pt1
                  print(f"track_id = {track_id}")
                  print(f"track_objects = {track_objects}")
                  track_id += 1
    else:
        #print(f"track_objects = {track_objects}") 
        temp_objects = {}    
        for k in sorted(track_objects):  # sort according key 
            temp_objects[k] = track_objects[k]
        #print(f"temp_objects = {temp_objects}")     
        #track_objects_new = track_objects.copy()
        track_objects_new = temp_objects.copy()
        for object_id, pt2 in track_objects_new.items(): 
            object_exist = False  
            for pt1 in center_points_current:  
                distance = math.hypot(pt2[0]-pt1[0], pt2[1]-pt1[1])
                if distance < 40:  # orignal 20 change to 40
                   print(f"pt1 = {pt1}")
                   track_objects[object_id] = pt1  # inherit track id
                   object_exist = True
                   center_points_current.remove(pt1)
                   continue
            if object_exist == False:
               track_objects.pop(object_id)
               miss_objects[object_id] = pt2 # all missing objects record
               #print(f"miss_objects = {miss_objects}") 
               
       # add new object id and still use original id d < 40
        print(f"track_id = {track_id}")
        for object_id, pt2 in miss_objects.items():      
            object_exist = False  
            for pt1 in center_points_current:  
                distance = math.hypot(pt2[0]-pt1[0], pt2[1]-pt1[1])
                if distance < 40:  # orignal 20 change to 40
                   #print(f"pt1 = {pt1}")
                   track_objects[object_id] = pt1  # inherit track id after missing objects a while
                   object_exist = True
                   center_points_current.remove(pt1)
                   continue
                else:
                   track_objects[track_id] = pt1
                   #print(f"track_id = {track_id}")
                   track_id += 1
        
#        for pt in center_points_current:  # available points after removing
#            track_objects[track_id] = pt
#            print(f"track_id = {track_id}")
#            track_id += 1
          
    for object_id, pt in track_objects.items():
        # print(f"object_id = {object_id}")
        # add circle for tracking objects
        cv2.circle(img, pt, 15, (0,255,255), -1)
        # show id
        cv2.putText(img, str(object_id), (pt[0]-7, pt[1]+7), 0, 0.7, (0,0,255),2)
    
    center_points_prev = center_points_current.copy()
    
    return image

if __name__ == "__main__":

    from pynq_dpu import DpuOverlay

    overlay = DpuOverlay("dpu.bit")
    overlay.load_model('yolov4tiny.xmodel')
    
    dpu = overlay.runner
    
    inputTensors = dpu.get_input_tensors()
    outputTensors = dpu.get_output_tensors()
    shapeIn = tuple(inputTensors[0].dims)
    #print('ShapeIn:',shapeIn)
    shapeOut1 = tuple(outputTensors[0].dims)
    #shapeOut2 = tuple(outputTensors[1].dims)
    #print('ShapeOut1:',shapeOut1)
#    shapeOut3 = tuple(outputTensors[2].dims)
    image_folder = 'image'
#   original_images = [i for i in os.listdir(image_folder) if i.endswith("jpg")]
#   total_images = len(original_images)

    video_path = "./video/adas.mp4"
    #video_path = "./video/VID2022.mp4"	
    #video_path = "./video/car.mp4"
    print("\nYou can press ESC to pause, then press c to continue, or press any other key to quit")
    #cv2.namedWindow("show", cv2.WINDOW_AUTOSIZE)
    cv2.namedWindow("show")
    
    camera = cv2.VideoCapture(video_path)
    
#    global count 
#    global center_points_prev
#    global track_objects  
#    global track_id 
    while True:
#       times+=1
        count += 1  
        res, img = camera.read()
        if not res:
           print('not res , not image')
           break
       
        time1 = time.time()
        image_ho, image_wo, _ = img.shape
        image_size = img.shape[:2]
        image_data = pre_process(img, (416, 416))
    
        outputData =[np.empty(shapeOut1,dtype=np.float32,order="C")]
        inputData = [np.empty(shapeIn, dtype=np.float32, order="C")]
       
        image = inputData[0]
        image[0, ...] = image_data.reshape(shapeIn[1:])
        #print(f"inputData.max = {np.max(inputData)}")
        #print(f"inputData.min = {np.min(inputData)}")
        job_id = dpu.execute_async(inputData, outputData)
        #print(f"inputData.shape={inputData[0].shape}")
        #print(f"outputData.len={outputData[0].shape}")
        #print(f"outputData.max = {np.max(outputData)}")
        #print(f"outputData.min = {np.min(outputData)}")
        dpu.wait(job_id)
        
        """Get model classification information"""
        classes_path = "./image/voc_classes.txt"
        class_names = get_class(classes_path)
        
        """Get model anchor value"""
        anchors_path = "./image/yolov4_anchors.txt"
        anchors = get_anchors(anchors_path)
        
        """Post-processing"""   
        out_boxes, out_scores, out_classes = eval(outputData, image_size)	
        items = []
        draws = []
        for i, c in reversed(list(enumerate(out_classes))):
            predicted_class = class_names[c]
            box = out_boxes[i]
            score = out_scores[i]
    
            top, left, bottom, right = box
            top = max(0, np.floor(top + 0.5).astype('int32'))
            left = max(0, np.floor(left + 0.5).astype('int32'))
            bottom = min(image_ho, np.floor(bottom + 0.5).astype('int32'))
            right = min(image_wo, np.floor(right + 0.5).astype('int32'))
            draw  = [left, top, right, bottom, score, c]
            item  = [predicted_class, score, left, top, right, bottom]
            draws.append(draw)
            items.append(item)
    	
        time2 = time.time()
        fps = 1/(time2-time1)	
        #print("Performance: {} FPS".format(fps))
         
        image_id = 0
        draws = reversed(draws)
        #image_result = draw_bbox(img, draws, class_names)
        image_label = draw_track(img, draws, class_names)
        image_label = cv2.resize(image_label, (700, 360))
        #cv2.resizeWindow("show", 1024, 516)
        cv2.imshow("show",image_label)
        #cv2.imshow("show",image_result)
        if cv2.waitKey(1) == 27: 
          flag = input()    
          print(flag)    
          if flag=="c":
             print("continue")
             continue
          else:
             print("quit")
             break

Yolov4-tiny tracking people vvas pipeline

JSON
This is a vvas pipeline to track and count people.
xilinx-k26-starterkit-2021_1:~$ gst-launch-1.0 filesrc location=./video/cross.h264 ! h264parse ! video/x-h264, alignment=au ! omxh264dec low-latency=0 internal-entropy-buffers=2 ! video/x-raw, format=NV12, framerate=30/1 ! tee name=t ! queue ! ivas_xmultisrc kconfig="/home/petalinux/notebooks/yolov4tiny/preprocess.json" ! queue ! ivas_xfilter kernels-config="/home/petalinux/notebooks/yolov4tiny/aiinference.json" ! ima.sink_master ivas_xmetaaffixer name=ima ima.src_master ! fakesink t. ! queue ! ima.sink_slave_0 ima.src_slave_0 ! queue ! ivas_xfilter kernels-config="/home/petalinux/notebooks/yolov4tiny/drawresult.json" ! queue ! kmssink driver-name=xlnx plane-id=39 sync=false fullscreen-overlay=true

Credits

john wang

john wang

2 projects • 2 followers
chu wang

chu wang

1 project • 1 follower

Comments