Bulijiojio Blog

日常摸鱼划水的虾皮--如果公式不能正常显示请使用Chrome浏览器并安装Tex All the Things插件

0%

mmWave_Tutorial3-快速上手Target tracking and management

前言

在前两个 tutorial 中我们通过编写脚本程序的方法得到了毫米波雷达输出的点云数据, 并进行了相关的数据可视化工作, 在这一篇博客当中我们将通过之前得到的毫米波雷达输出的点云, 进行人员的检测跟踪以及航迹的管理, 到这里读者可能会问了既然在ti官方的demo中已经给出了 gtrack相应的多目标跟踪算法的实现, 我们为什么又要重新写一遍呢? 这不是重复造轮子吗? 但其实对于处在学习阶段的我们来说, 这样重复造轮子的过程是必须的, 能够让我们快速的熟悉已有的一些成熟的算法, 并且在算法实现的过程中得到更深入的理解以便于我们后面进行更加深入的研究.

在本篇博客当中我们将基于已有的一个小的测试数据进行相关的目标跟踪及航迹管理的算法实现, 这篇博客将给出 matlab和Python 的两个实例. 目标跟踪及航迹管理的原理在这里我们就不进行介绍了, 可以参考目标跟踪有关博客的内容, 下图是在本篇博客中实现的目标跟踪及航迹管理算法的框图.

测试数据

测试数据的文件我将会尽快的放在个人博客上

测试数据参数

参数名 单位
数据每帧的扫描间隔T 1 s
距离单元格数 400
方位单元格数 200
距离分辨率 10 m
角度分辨率 0.5° °
过程噪声强度 0.01
信噪比SNR 18 dB
虚警概率 1e-5

第五帧测试数据如下图

这张图中我们可以看到在每一个距离单元与角度单元中都有数值, 那么我们需要进行CFAR处理进而得到检测点

CA-CFAR结果

我们将CFAR检测每一帧的结果叠加起来可以得到下图

通过CFAR检测后的点云数据就是我们进行目标跟踪及航迹管理的input

相关代码(Matlab)

CA-CFAR代码

一维CA-CFAR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function [Cfar] = CA_CFAR(Signal,PF,Refer,Pro)
%CA_CFAR
%Signal input signal
%PF False Probability
%Refer Reference Unit
%Pro Protected Unit

N_all = length(Signal);
L_slipper = 2*(Pro+Refer)+1; %处理窗的大小
Alpha = L_slipper*(PF^(-1/L_slipper)-1);%门限系数阿尔法

Cfar=zeros(1,N_all);
Signal = [zeros(1,Refer+Pro) Signal zeros(1,Refer+Pro)];
for i=1:N_all
Cfar(i)=Alpha*mean([Signal(i:i+Refer-1),Signal(i+Refer+2*Pro+1:i+2*Refer+2*Pro)]);
end`
end

二维CA-CFAR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
function [CFAR] = CA_CFAR2D(Signal,PF,Refer,Pro)

SIZE = size(Signal);
L_slipper = 2*(Pro+Refer)+1; %处理窗的大小

temp=zeros(SIZE+[2*(Refer+Pro) 2*(Refer+Pro)]);
temp(Refer+Pro+1:Refer+Pro+SIZE(1),Refer+Pro+1:Refer+Pro+SIZE(2)) = Signal;

Window = ones(L_slipper,L_slipper);
Window(Refer+1:Refer+2*Pro+1,Refer+1:Refer+2*Pro+1) = zeros(2*Pro+1);
Av_Coff = L_slipper*L_slipper-(Pro*2+1)^2;

Alpha = Av_Coff*(PF^(-1/L_slipper^2)-1);%门限系数阿尔法

CFAR = zeros(SIZE);
for i=1:SIZE(1)
for j = 1:SIZE(2)
CFAR(i,j) = Alpha*sum(sum((temp(i:i+L_slipper-1,j:j+L_slipper-1).*Window).^2))/Av_Coff;
end
end
end

CA-CFAR使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
clear all
close all
clc

load('Data_For_Test.mat');

result = cell(1,size(Data_For_Test.Raw_Data,2));

Range_Cell_Num = str2num(Data_For_Test.Range_Cell_Num);
Azimuth_Cell_Num = str2num(Data_For_Test.Azimuth_Cell_Num);
P_False_Alarm = str2num(Data_For_Test.P_False_Alarm);
Range_Resolution = 10;
Azimuth_Resolution = 0.5;

for i = 1:size(Data_For_Test.Raw_Data,2)

frame = Data_For_Test.Raw_Data{i};

PF = P_False_Alarm;
Refer = 8;
Pro = 1;

%% 1D cfar
% CFAR_thre = zeros(Range_Cell_Num,Azimuth_Cell_Num);
% for j = 1:Range_Cell_Num
% CFAR_thre(j,:) = CA_CFAR(frame(j,:),PF,Refer,Pro);
% end

%% 2D cfar
[CFAR_thre] = sqrt(CA_CFAR2D(frame,PF,Refer,Pro));

cfar_detect = (CFAR_thre<frame);
seq = [];
index = 1;
for k = 1:Range_Cell_Num
for j = 1:Azimuth_Cell_Num
if cfar_detect(k,j)
seq(index,:) = [k j];
index = index + 1;
end
end
end

seq = [ (seq(:,1)*Range_Resolution).*cos((seq(:,2)-100)*Azimuth_Resolution*pi/180) seq(:,1)*Range_Resolution.*sin((seq(:,2)-100)*Azimuth_Resolution*pi/180) ];

result{i} = seq;

end

figure
hold on
for i = 1:30
scatter(result{i}(:,1),result{i}(:,2));
end

figure
hold on
mesh(CFAR_thre)
mesh(frame)

卡尔曼滤波

1
2
3
4
5
6
7
8
9
10
11
function [x_est,P_k_k] = KalmanFilter(z_k,x_est_k_minus1,F,H,P_k,Q,R)
x_pre = F*x_est_k_minus1;
P = F*P_k*F' + Q;

z_pre = H*x_pre;
S = H*P*H' + R;
K = P*H'/(S);

x_est = x_pre + K*(z_k - z_pre);
P_k_k = (eye(size(K*H,1))-K*H)*P;
end

实验航迹和确认航迹类

实验航迹类(MN逻辑起始法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
classdef MNstarter_KFCV
properties
seq = [];
M = 0;
N = 0;

% Kalman Coefficient
T = 1;
F;
H = [1 0 0 0;
0 1 0 0];
Gamma;
Q;
R;

x_pre = zeros(4,1);
P_k = eye(4);
z_pre = zeros(2,1);
P = eye(4);
S = eye(2);
K = zeros(4,2);

x_est = zeros(4,1);
end
methods
function obj = MNstarter_KFCV(Z,T)
obj.seq = Z;
obj.T = T;
obj.F = [1 0 T 0;
0 1 0 T;
0 0 1 0;
0 0 0 1];
obj.Gamma=[T*T/2 0;0 T*T/2;T 0;0 T];
obj.Q = obj.Gamma*obj.Gamma'*1;
obj.R = 1*obj.H*obj.H';
end
function obj = predict(obj)
obj.x_pre = obj.F*obj.seq(end,:)';
obj.P = obj.F*obj.P_k*obj.F' + obj.Q;

obj.z_pre = obj.H*obj.x_pre;
obj.S = obj.H*obj.P*obj.H' + obj.R;
obj.K = obj.P*obj.H'/(obj.S);
end
function obj = addseq(obj,Z)
obj.x_est = obj.x_pre + obj.K*(Z' - obj.z_pre);
obj.P_k = (eye(size(obj.K*obj.H,1))-obj.K*obj.H)*obj.P;
obj.seq = [obj.seq;obj.x_est'];
end
end
end

确认航迹类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
classdef ConfirmTrajectory_KFCV
properties
seq = [];
scoreboard = 0;

% Kalman Coefficient
T = 1;
F;
H = [1 0 0 0;
0 1 0 0];
Gamma;
Q;
R;

x_pre = zeros(4,1);
P_k = eye(4);
z_pre = zeros(2,1);
P = eye(4);
S = eye(2);
K = zeros(4,2);

x_est = zeros(4,1);
end
methods
function obj = ConfirmTrajectory_KFCV(Z,T)
obj.seq = Z;
obj.T = T;
obj.F = [1 0 T 0;
0 1 0 T;
0 0 1 0;
0 0 0 1];
obj.Gamma=[T*T/2 0;0 T*T/2;T 0;0 T];
obj.Q = obj.Gamma*obj.Gamma'*1;
obj.R = 1*obj.H*obj.H';
end
function obj = predict(obj)
obj.x_pre = obj.F*obj.seq(end,:)';
obj.P = obj.F*obj.P_k*obj.F' + obj.Q;

obj.z_pre = obj.H*obj.x_pre;
obj.S = obj.H*obj.P*obj.H' + obj.R;
obj.K = obj.P*obj.H'/(obj.S);
end
function obj = addseq(obj,Z)
obj.x_est = obj.x_pre + obj.K*(Z' - obj.z_pre);
obj.P_k = (eye(size(obj.K*obj.H,1))-obj.K*obj.H)*obj.P;

% Staging data of 20 points
if size(obj.seq,1)<20
obj.seq = [obj.seq;obj.x_est'];
else
obj.seq = [obj.seq;obj.x_est'];
obj.seq(1,:) = [];
end

end
end
end

目标跟踪与航迹管理步进函数

matlab代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
function [Confirmed_trajectory] = Track_step_PNN(Measure)

persistent test_trajectory;
persistent confirmed_trajectory;
persistent Prev_Measure;
persistent initial_Flag;

if isempty(test_trajectory) && isempty(initial_Flag)
test_trajectory = {};
confirmed_trajectory = {};
Prev_Measure = [];
initial_Flag = 1;
end

% tracking parameter
DIS_THRE = chi2inv(0.99,2);
SCOREBOARD_THRE = 40;
N_PARAMETER = 6;
M_PARAMETER = 4;

V_MAX = 150;
V_MIN = 10;
T = 1;

Measure_temp = Measure;

% MN starter NN association
if ~isempty(Measure)
% associate with ConfirmedTrajectory
for kk = 1:size(confirmed_trajectory,2)
if ~isempty(Measure)
confirmed_trajectory(kk) = confirmed_trajectory(kk).predict();
temp = confirmed_trajectory(kk).z_pre'-Measure(:,1:2);
dis = diag(temp*inv(confirmed_trajectory(kk).S)*temp');%计算相邻两帧点迹距离

[val, pos] = min(dis);%选出距离最小的点
if val<DIS_THRE
confirmed_trajectory(kk) = confirmed_trajectory(kk).addseq(Measure(pos,:));
confirmed_trajectory(kk).scoreboard = 0;
Measure(pos,:) = [];
else
confirmed_trajectory(kk) = confirmed_trajectory(kk).addseq(confirmed_trajectory(kk).z_pre');
confirmed_trajectory(kk).scoreboard = confirmed_trajectory(kk).scoreboard + 1;
end
end
end
% terrible score delete ConfirmedTrajectory
delete_pos = [];
for kk = 1:size(confirmed_trajectory,2)
if confirmed_trajectory(kk).scoreboard > SCOREBOARD_THRE
delete_pos = [delete_pos kk];
end
end
confirmed_trajectory(delete_pos) = [];

% associate with MNstarter
for kk = 1:size(test_trajectory,2)
if ~isempty(Measure)
test_trajectory(kk) = test_trajectory(kk).predict();
temp = test_trajectory(kk).z_pre'-Measure(:,1:2);
dis = diag(temp*inv(test_trajectory(kk).S)*temp');%计算相邻两帧点迹距离

[val, pos] = min(dis);
if val<DIS_THRE
test_trajectory(kk) = test_trajectory(kk).addseq(Measure(pos,:));
test_trajectory(kk).N = test_trajectory(kk).N + 1;
test_trajectory(kk).M = test_trajectory(kk).M + 1;
Measure(pos,:) = [];
else
test_trajectory(kk) = test_trajectory(kk).addseq(test_trajectory(kk).z_pre');
test_trajectory(kk).N = test_trajectory(kk).N + 1;
end
end
end
% check whether satisfy MN logic
delete_pos = [];
for kk = 1:size(test_trajectory,2)
if test_trajectory(kk).N >= N_PARAMETER
if test_trajectory(kk).M >= M_PARAMETER
% transfer to ConfirmedTrajectory
confirmed_trajectory = [confirmed_trajectory TestToConfirmed(test_trajectory(kk),T)];
% free MNstarter
delete_pos = [delete_pos kk];
else
% delete MNstarter
delete_pos = [delete_pos kk];
end
end
end
test_trajectory(delete_pos) = [];

% directly start as a new MNstarter
test_trajectory = [test_trajectory directbegin(Measure,Prev_Measure,V_MIN,V_MIN,T)];
else
for kk = 1:size(confirmed_trajectory,2)
% predict
confirmed_trajectory(kk) = confirmed_trajectory(kk).predict();
% use prev point
confirmed_trajectory(kk) = confirmed_trajectory(kk).addseq(confirmed_trajectory(kk).z_pre');
% ConfirmedTrajectory scoreboard +1
confirmed_trajectory(kk).scoreboard = confirmed_trajectory(kk).scoreboard + 1;
end
% terrible score delete ConfirmedTrajectory
delete_pos = [];
for kk = 1:size(confirmed_trajectory,2)
if confirmed_trajectory(kk).scoreboard > SCOREBOARD_THRE
delete_pos = [delete_pos kk];
end
end
confirmed_trajectory(delete_pos) = [];

for kk = 1:size(test_trajectory,2)
% predict
test_trajectory(kk) = test_trajectory(kk).predict();
% use prev point
test_trajectory(kk) = test_trajectory(kk).addseq(test_trajectory(kk).z_pre');
% MNstarter N +1
test_trajectory(kk).N = test_trajectory(kk).N + 1;
end
% check whether satisfy MN logic
delete_pos = [];
for kk = 1:size(test_trajectory,2)
if test_trajectory(kk).N >= N_PARAMETER
if test_trajectory(kk).M >= M_PARAMETER
% transfer to ConfirmedTrajectory
confirmed_trajectory = [confirmed_trajectory TestToConfirmed(test_trajectory(kk),T)];
% free MNstarter
delete_pos = [delete_pos kk];
else
% delete MNstarter
delete_pos = [delete_pos kk];
end
end
end
test_trajectory(delete_pos) = [];
end

Prev_Measure = Measure_temp;

Confirmed_trajectory = confirmed_trajectory;
end

function [testroot] = directbegin(Measure,Prev_Measure,vmin,vmax,T)
testroot = MNstarter_KFCV.empty;

index = 1;
if ~isempty(Measure)
for i = 1:size(Prev_Measure,1)
% calculate distance between two frame
dis = [];
for j = 1:size(Measure,1)
dis = [dis norm(Prev_Measure(i,1:2)-Measure(j,1:2))];
end
[val pos] = min(dis);
if vmin*T<val<vmax*T
v = (Measure(pos,:)-Prev_Measure(i,:))/T;
testroot(index) = MNstarter_KFCV([[Prev_Measure(i,:) 0 0];[Measure(pos,:) v]],T);
index = index +1;
end
end
end

end

function [confirmedroot] = TestToConfirmed(testroot,T)
confirmedroot = ConfirmTrajectory_KFCV(testroot.seq,T);
confirmedroot.x_pre = testroot.x_pre;
confirmedroot.P_k = testroot.P_k;
confirmedroot.z_pre = testroot.z_pre;
confirmedroot.P = testroot.P;
confirmedroot.S = testroot.S;
confirmedroot.K = testroot.K;
confirmedroot.x_est = testroot.x_est;
end

python代码

二维跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#
# File: 实现逻辑法航迹起始,NN法数据关联,卡尔曼滤波以及传统航迹管理(2维)
# 主要实现了MOT迭代函数
# Author: UESTC Yu Xuyao
# Email: yxy19991102@163.com
#

import scipy.io as scio
import numpy
import scipy.stats as st
import time
from IWR.track import *

# 2自由度卡方分布
threshold = st.chi2.ppf(0.80, 2)
T = 0.1

F = numpy.mat([[1, 0, T, 0], [0, 1, 0, T], [0, 0, 1, 0], [0, 0, 0, 1]])
H = numpy.mat([[1, 0, 0, 0], [0, 1, 0, 0]])
Gamma = numpy.mat([[T * T / 2, 0], [0, T * T / 2], [T, 0], [0, T]])
Q = Gamma * Gamma.T * 0.5
R = 0.1 * H * H.T

vmin = 0.2
vmax = 4

max_velocity = 1.5


def beginTrack(points):
global threshold, T, F, H, Gamma, Q, R, vmin, vmax

outside = []

index = 0
testroot = []

Z_k = (points[0])
Z_k_plus1 = (points[1])

for j in range(0, Z_k.shape[1]):
for k in range(0, Z_k_plus1.shape[1]):
d = max(0, numpy.linalg.norm(Z_k_plus1[:, k] - Z_k[:, j]) - vmax * T) + max(0, -numpy.linalg.norm(
Z_k_plus1[:, k] - Z_k[:, j]) + vmin * T)
D = d / numpy.linalg.det(R + R) * d
if D <= threshold:
temp = TestTrack(Z_k[:, j])
temp.addseq(Z_k_plus1[:, k])

Px0 = 5 * numpy.eye(4)
P = F * Px0 * F.T + Q
S = H * P * H.T + R * 0.2
K = P * H.T * numpy.linalg.inv(S)
Pkk = (numpy.eye(4) - K * H) * P

x_init = numpy.concatenate((Z_k_plus1[:, k], (Z_k_plus1[:, k] - Z_k[:, j]) / T), axis=0)
x_forest = F * x_init

temp.est = x_init
temp.pkk = Pkk
temp.x_predict = x_forest
temp.M = 2
temp.N = 2

testroot.append(temp)

return testroot


def TestToConfirmed(testroot):
confirmedroot = ConfirmedTrack(testroot.start)
confirmedroot.seq = testroot.seq
confirmedroot.est = testroot.est
confirmedroot.pkk = testroot.pkk
confirmedroot.seq = testroot.seq
confirmedroot.x_predict = testroot.x_predict
return confirmedroot


def MOT(Z_k, Z_k_prev, confirmedroot, testroot):
global threshold, T, F, H, Gamma, Q, R, vmin, vmax

Z_k_present = Z_k

if Z_k.shape[1] == 0:
if len(confirmedroot) != 0:
pos = []
for kk in range(0, len(confirmedroot)):
confirmedroot[kk].board += 1
if confirmedroot[kk].board >= 4:
pos.append(kk)
confirmedroot = numpy.delete(confirmedroot, pos)
confirmedroot = confirmedroot.tolist()

if Z_k.shape[1] != 0:
if len(confirmedroot) != 0:
pos = []
for kk in range(0, len(confirmedroot)):
if Z_k.shape[1] == 0:
break

P = F * confirmedroot[kk].pkk * F.T + Q
S = H * P * H.T + R
K = P * H.T * numpy.linalg.inv(S)
outside = H * confirmedroot[kk].x_predict
V = []
for i in range(0, Z_k.shape[1]):
V.append((Z_k[:, i] - outside).T * numpy.linalg.inv(S) * (Z_k[:, i] - outside))

index = numpy.argmin(V)
val = numpy.amin(V)
if val <= threshold:
confirmedroot[kk].addseq(Z_k[:, index])
est = confirmedroot[kk].x_predict + K * (Z_k[:, index] - outside)
confirmedroot[kk].addest(est)
confirmedroot[kk].pkk = (numpy.eye(4) - K * H) * P
confirmedroot[kk].x_predict = F * est
confirmedroot[kk].board = 0

Z_k = numpy.delete(Z_k, index, axis=1)
else:
confirmedroot[kk].addseq(outside)
confirmedroot[kk].addest(confirmedroot[kk].x_predict)
confirmedroot[kk].pkk = P
confirmedroot[kk].x_predict = F * confirmedroot[kk].est[:, -1]
confirmedroot[kk].board += 1

if confirmedroot[kk].board >= 4 or numpy.linalg.norm(confirmedroot[kk].est[2:4, -1]) > max_velocity:
pos.append(kk)

confirmedroot = numpy.delete(confirmedroot, pos)
confirmedroot = confirmedroot.tolist()

if Z_k.shape[1] != 0:
if len(testroot) != 0:
pos = []
for kk in range(0, len(testroot)):
if Z_k.shape[1] == 0:
break

P = F * testroot[kk].pkk * F.T + Q
S = H * P * H.T + R
K = P * H.T * numpy.linalg.inv(S)
outside = H * testroot[kk].x_predict
V = []
for i in range(0, Z_k.shape[1]):
V.append((Z_k[:, i] - outside).T * numpy.linalg.inv(S) * (Z_k[:, i] - outside))

index = numpy.argmin(V)
val = numpy.amin(V)
if val <= threshold:
testroot[kk].addseq(Z_k[:, index])
est = testroot[kk].x_predict + K * (Z_k[:, index] - outside)
testroot[kk].addest(est)
testroot[kk].pkk = (numpy.eye(4) - K * H) * P
testroot[kk].x_predict = F * est
testroot[kk].M += 1
testroot[kk].N += 1
Z_k = numpy.delete(Z_k, index, axis=1)
else:
testroot[kk].addseq(outside)
testroot[kk].addest(testroot[kk].x_predict)
testroot[kk].pkk = P
testroot[kk].x_predict = F * testroot[kk].est[:, -1]
testroot[kk].N += 1

if testroot[kk].N == 6:
if testroot[kk].M >= 4 and numpy.linalg.norm(testroot[kk].est[2:4, -1]) < max_velocity:
confirmedroot.append(TestToConfirmed(testroot[kk]))
else:
pos.append(kk)

testroot = numpy.delete(testroot, pos)
testroot = testroot.tolist()

if Z_k.shape[1] != 0:
testroot.extend(beginTrack([Z_k_prev, Z_k_present]))

Z_k_prev = Z_k_present
return confirmedroot, testroot, Z_k_present

三维跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#
# File: 实现逻辑法航迹起始,NN法数据关联,卡尔曼滤波以及传统航迹管理(3维)
# 主要实现了MOT迭代函数
# Author: UESTC YuXuyao
# Email: yxy19991102@163.com
#

import scipy.io as scio
import numpy
import scipy.stats as st
import time
from IWR.track import *

threshold = st.chi2.ppf(0.80, 3)
T = 0.1

F = numpy.mat([[1, 0, 0, T, 0, 0], [0, 1, 0, 0, T, 0], [0, 0, 1, 0, 0, T], [0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1]])
H = numpy.mat([[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0]])
Gamma = numpy.mat([[T * T / 2, 0, 0], [0, T * T / 2, 0], [0, 0, T * T / 2], [T, 0, 0], [0, T, 0], [0, 0, T]])
Q = Gamma * Gamma.T * 0.5
R = 0.1 * H * H.T

vmin = 0.2
vmax = 4

max_velocity = 2

def beginTrack(points):
global threshold, T, F, H, Gamma, Q, R, vmin, vmax

outside = []

index = 0
testroot = []

Z_k = (points[0])
Z_k_plus1 = (points[1])

for j in range(0, Z_k.shape[1]):
for k in range(0, Z_k_plus1.shape[1]):
d = max(0, numpy.linalg.norm(Z_k_plus1[:, k] - Z_k[:, j]) - vmax * T) + max(0, -numpy.linalg.norm(
Z_k_plus1[:, k] - Z_k[:, j]) + vmin * T)
D = d / numpy.linalg.det(R + R) * d
if D <= threshold:
temp = TestTrack(Z_k[:, j])
temp.addseq(Z_k_plus1[:, k])

Px0 = 5 * numpy.eye(6)
P = F * Px0 * F.T + Q
S = H * P * H.T + R * 0.2
K = P * H.T * numpy.linalg.inv(S)
Pkk = (numpy.eye(6) - K * H) * P

x_init = numpy.concatenate((Z_k_plus1[:, k], (Z_k_plus1[:, k] - Z_k[:, j]) / T), axis=0)
x_forest = F * x_init

temp.est = x_init
temp.pkk = Pkk
temp.x_predict = x_forest
temp.M = 2
temp.N = 2

testroot.append(temp)

return testroot


def TestToConfirmed(testroot):
confirmedroot = ConfirmedTrack(testroot.start)
confirmedroot.seq = testroot.seq
confirmedroot.est = testroot.est
confirmedroot.pkk = testroot.pkk
confirmedroot.seq = testroot.seq
confirmedroot.x_predict = testroot.x_predict
return confirmedroot


def MOT(Z_k, Z_k_prev, confirmedroot, testroot):
global threshold, T, F, H, Gamma, Q, R, vmin, vmax

Z_k_present = Z_k

if Z_k.shape[1] == 0:
if len(confirmedroot) != 0:
pos = []
for kk in range(0, len(confirmedroot)):
confirmedroot[kk].board += 1
if confirmedroot[kk].board >= 4:
pos.append(kk)
confirmedroot = numpy.delete(confirmedroot, pos)
confirmedroot = confirmedroot.tolist()

if Z_k.shape[1] != 0:
if len(confirmedroot) != 0:
pos = []
for kk in range(0, len(confirmedroot)):
if Z_k.shape[1] == 0:
break

P = F * confirmedroot[kk].pkk * F.T + Q
S = H * P * H.T + R
K = P * H.T * numpy.linalg.inv(S)
outside = H * confirmedroot[kk].x_predict
V = []
for i in range(0, Z_k.shape[1]):
V.append((Z_k[:, i] - outside).T * numpy.linalg.inv(S) * (Z_k[:, i] - outside))

index = numpy.argmin(V)
val = numpy.amin(V)
if val <= threshold:
confirmedroot[kk].addseq(Z_k[:, index])
est = confirmedroot[kk].x_predict + K * (Z_k[:, index] - outside)
confirmedroot[kk].addest(est)
confirmedroot[kk].pkk = (numpy.eye(6) - K * H) * P
confirmedroot[kk].x_predict = F * est
confirmedroot[kk].board = 0

Z_k = numpy.delete(Z_k, index, axis=1)
else:
confirmedroot[kk].addseq(outside)
confirmedroot[kk].addest(confirmedroot[kk].x_predict)
confirmedroot[kk].pkk = P
confirmedroot[kk].x_predict = F * confirmedroot[kk].est[:, -1]
confirmedroot[kk].board += 1

if confirmedroot[kk].board >= 4 or numpy.linalg.norm(confirmedroot[kk].est[3:6, -1]) > max_velocity:
pos.append(kk)

confirmedroot = numpy.delete(confirmedroot, pos)
confirmedroot = confirmedroot.tolist()

if Z_k.shape[1] != 0:
if len(testroot) != 0:
pos = []
for kk in range(0, len(testroot)):
if Z_k.shape[1] == 0:
break

P = F * testroot[kk].pkk * F.T + Q
S = H * P * H.T + R
K = P * H.T * numpy.linalg.inv(S)
outside = H * testroot[kk].x_predict
V = []
for i in range(0, Z_k.shape[1]):
V.append((Z_k[:, i] - outside).T * numpy.linalg.inv(S) * (Z_k[:, i] - outside))

index = numpy.argmin(V)
val = numpy.amin(V)
if val <= threshold:
testroot[kk].addseq(Z_k[:, index])
est = testroot[kk].x_predict + K * (Z_k[:, index] - outside)
testroot[kk].addest(est)
testroot[kk].pkk = (numpy.eye(6) - K * H) * P
testroot[kk].x_predict = F * est
testroot[kk].M += 1
testroot[kk].N += 1
Z_k = numpy.delete(Z_k, index, axis=1)
else:
testroot[kk].addseq(outside)
testroot[kk].addest(testroot[kk].x_predict)
testroot[kk].pkk = P
testroot[kk].x_predict = F * testroot[kk].est[:, -1]
testroot[kk].N += 1

if testroot[kk].N == 6:
if testroot[kk].M >= 4 and numpy.linalg.norm(testroot[kk].est[3:6, -1]) < max_velocity:
confirmedroot.append(TestToConfirmed(testroot[kk]))
else:
pos.append(kk)

testroot = numpy.delete(testroot, pos)
testroot = testroot.tolist()

if Z_k.shape[1] != 0:
testroot.extend(beginTrack([Z_k_prev, Z_k_present]))

Z_k_prev = Z_k_present
return confirmedroot, testroot, Z_k_present

目标跟踪与航迹管理实例脚本

matlab示例

其中 measure.mat 是由CFAR检测得到的每一帧检测点的集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
clear all
close all
clc

load('measure.mat');
figure
hold on

for i = 1:size(measure,2)

hold off
plot(1,1)
axis([0 4000 -4000 4000]);
hold on
[Confirmed_trajectory] = Track_step_PNN(measure{i});

for j = 1:size(Confirmed_trajectory,2)
plot(Confirmed_trajectory(j).seq(:,1),Confirmed_trajectory(j).seq(:,2),'-o','linewidth',1);
axis([0 4000 -4000 4000]);
end

pause(0.1);
end

python示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#
# File: 读取IWR6843处理得到的点云数据(3维)进行目标跟踪,同时调用openGL对第一个航迹进行显示(多个显示未完成)
# Author: UESTC Yu Xuyao
# Email: yxy19991102@163.com
#

import serial
import time
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui
import pyqtgraph.opengl as gl
from IWR.MOT3D import *
import numpy
from sklearn.cluster import dbscan
import pandas

# Change the configuration file name
configFileName = 'profile_iwr6843_ods_3d.cfg'

CLIport = {}
Dataport = {}
byteBuffer = np.zeros(2 ** 15, dtype='uint8')
byteBufferLength = 0

testroot = []
confirmedroot = []
Z_k_prev = numpy.mat([])


# ------------------------------------------------------------------
# Function to configure the serial ports and send the data from
# the configuration file to the radar
def serialConfig(configFileName):
global CLIport
global Dataport
# Open the serial ports for the configuration and the data ports

# Linux
# CLIport = serial.Serial('/dev/ttyACM0', 115200)
# Dataport = serial.Serial('/dev/ttyACM1', 921600)

# Windows
CLIport = serial.Serial('COM7', 115200)
Dataport = serial.Serial('COM8', 921600)

# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
print(i)
CLIport.write((i + '\n').encode())
# wait for reply
time.sleep(0.1)
reply = CLIport.read(CLIport.in_waiting).decode()
print(reply)

return CLIport, Dataport

# ------------------------------------------------------------------

# Funtion to read and parse the incoming data
def readAndParseData(Dataport):
global byteBuffer, byteBufferLength

# Constants
OBJ_STRUCT_SIZE_BYTES = 12
BYTE_VEC_ACC_MAX_SIZE = 2 ** 15
MMWDEMO_UART_MSG_DETECTED_POINTS = 1
MMWDEMO_UART_MSG_RANGE_PROFILE = 2
maxBufferSize = 2 ** 15
tlvHeaderLengthInBytes = 8
pointLengthInBytes = 16
magicWord = [2, 1, 4, 3, 6, 5, 8, 7]

# Initialize variables
magicOK = 0 # Checks if magic number has been read
dataOK = 0 # Checks if the data has been read correctly
frameNumber = 0
detObj = {}

readBuffer = Dataport.read(Dataport.in_waiting)
byteVec = np.frombuffer(readBuffer, dtype='uint8')
byteCount = len(byteVec)

# Check that the buffer is not full, and then add the data to the buffer
if (byteBufferLength + byteCount) < maxBufferSize:
byteBuffer[byteBufferLength:(byteBufferLength + byteCount)] = byteVec[0:byteCount]
byteBufferLength = byteBufferLength + byteCount

# Check that the buffer has some data
if byteBufferLength > 16:

# Check for all possible locations of the magic word
possibleLocs = np.where(byteBuffer == magicWord[0])[0]

# Confirm that is the beginning of the magic word and store the index in startIdx
startIdx = []
for loc in possibleLocs:
check = byteBuffer[loc:loc + 8]
if np.all(check == magicWord):
startIdx.append(loc)

# Check that startIdx is not empty
if startIdx:

# Remove the data before the first start index
if 0 < startIdx[0] < byteBufferLength:
byteBuffer[:byteBufferLength - startIdx[0]] = byteBuffer[startIdx[0]:byteBufferLength]
byteBuffer[byteBufferLength - startIdx[0]:] = np.zeros(len(byteBuffer[byteBufferLength - startIdx[0]:]),
dtype='uint8')
byteBufferLength = byteBufferLength - startIdx[0]

# Check that there have no errors with the byte buffer length
if byteBufferLength < 0:
byteBufferLength = 0

# Read the total packet length
totalPacketLen = int.from_bytes(byteBuffer[12:12 + 4], byteorder='little')

# Check that all the packet has been read
if (byteBufferLength >= totalPacketLen) and (byteBufferLength != 0):
magicOK = 1

# If magicOK is equal to 1 then process the message
if magicOK:

# Initialize the pointer index
idX = 0

# Read the header
magicNumber = byteBuffer[idX:idX + 8]
idX += 8
version = format(int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little'), 'x')
idX += 4
totalPacketLen = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4
platform = format(int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little'), 'x')
idX += 4
frameNumber = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4
timeCpuCycles = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4
numDetectedObj = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4
numTLVs = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4
subFrameNumber = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4

# Read the TLV messages
for tlvIdx in range(numTLVs):

# Check the header of the TLV message
tlv_type = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4
tlv_length = int.from_bytes(byteBuffer[idX:idX + 4], byteorder='little')
idX += 4

# Read the data depending on the TLV message
if tlv_type == MMWDEMO_UART_MSG_DETECTED_POINTS:

# Initialize the arrays
x = np.zeros(numDetectedObj, dtype=np.float32)
y = np.zeros(numDetectedObj, dtype=np.float32)
z = np.zeros(numDetectedObj, dtype=np.float32)
velocity = np.zeros(numDetectedObj, dtype=np.float32)

for objectNum in range(numDetectedObj):
# Read the data for each object
x[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
y[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
z[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
velocity[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4

# Store the data in the detObj dictionary
detObj = {"numObj": numDetectedObj, "x": x, "y": y, "z": z, "velocity": velocity}
dataOK = 1

# Remove already processed data
if 0 < idX < byteBufferLength:
shiftSize = totalPacketLen

byteBuffer[:byteBufferLength - shiftSize] = byteBuffer[shiftSize:byteBufferLength]
byteBuffer[byteBufferLength - shiftSize:] = np.zeros(len(byteBuffer[byteBufferLength - shiftSize:]),
dtype='uint8')
byteBufferLength = byteBufferLength - shiftSize

# Check that there are no errors with the buffer length
if byteBufferLength < 0:
byteBufferLength = 0

return dataOK, frameNumber, detObj


# ------------------------------------------------------------------

# Funtion to update the data and display in the plot
def update():
dataOk = 0
global detObj, confirmedroot, testroot, Z_k_prev, LinePlot
x = []
y = []
z = []
# Read and parse the received data
dataOk, frameNumber, detObj = readAndParseData(Dataport)

if dataOk:
x = detObj["x"]
y = detObj["y"]
z = detObj["z"]

if len(x) != 0:
core_samples, cluster_ids = dbscan(np.vstack((x, y, z)).T, eps=0.3, min_samples=3)
df = pandas.DataFrame(np.c_[np.vstack((x, y, z)).T, cluster_ids],
columns=['x', 'y', 'z', 'cluster_id'])
df = df[df.cluster_id != -1]
count = df['cluster_id'].value_counts()

x = []
y = []
z = []
if count.shape[0] >= 1:
for i in range(0, count.shape[0]):
df1 = df[df.cluster_id == i]
mean = np.mean([df1['x'], df1['y'], df1['z']], 1)
x.append(mean[0])
y.append(mean[1])
z.append(mean[2])

Z_k = numpy.mat([x, y, z])

confirmedroot, testroot, Z_k_prev = MOT(Z_k, Z_k_prev, confirmedroot, testroot)

if len(confirmedroot) > 0:
tmppos = confirmedroot[0].est[0:3, :].T.A
for i in range(1,len(confirmedroot)):
tmppos = numpy.vstack((tmppos,confirmedroot[i].est[0:3, :].T.A))
LinePlot.setData(pos=tmppos)
QtGui.QApplication.processEvents()
else:
LinePlot.setData(pos=np.array([100, 100, 100]))
QtGui.QApplication.processEvents()

return dataOk


# ------------------------- MAIN -----------------------------------------

# Configurate the serial port
CLIport, Dataport = serialConfig(configFileName)

# START QtAPPfor the plot
app = QtGui.QApplication([])

app = pg.mkQApp()

view = gl.GLViewWidget()
view.setBackgroundColor('k')
view.show()

axis = gl.GLAxisItem()
axis.scale(10, 10, 10)

view.addItem(axis)

LinePlot = gl.GLScatterPlotItem()
view.addItem(LinePlot)

# Main loop
detObj = {}
frameData = {}
currentIndex = 0
while True:
try:
# Update the data and check if the data is okay
dataOk = update()

if dataOk:
# Store the current frame into frameData
frameData[currentIndex] = detObj
currentIndex += 0

time.sleep(0.01) # Sampling frequency of 20 Hz

# Stop the program and close everything if Ctrl + c is pressed
except KeyboardInterrupt:
CLIport.write(('sensorStop\n').encode())
print('sensorStop\n')
CLIport.close()
Dataport.close()
win.close()
break