한강 공개 cctv 영상을 가지고 YOLOv5 를 통한 객체 인식과 optical flow 를 통한 자전거의 이동 방향 판단을 통해 자전거 알림 시스템 구현 및 arduino 를 활용한 프로토 타입 제작

이전 자료 링크 <- 한강공원 자전거 사고 방지 알림이 시스템[1] – python, 파이썬,cctv 영상 웹 스크래핑, cctv 영상 웹 크롤링

이전 자료 링크 <- 한강공원 자전거 사고 방지 알림 시스템[2] – python, 파이썬,cctv 영상, yolo v3, object detection

이전 자료 링크 <- 한강공원 자전거 사고 방지 알림 시스템[3] – python, 파이썬 객체 인식, cctv 영상, yolo v5, object detection

YOLOv5 마무리

from matplotlib import scale
import torch
import numpy as np
import cv2

# model = torch.hub.load('ultralytics/yolov5', 'custom', path='./best_afternoon_final.pt', force_reload=True)
model = torch.hub.load('ultralytics/yolov5', 'custom', path='./best_afternoon_final_2.pt', force_reload=True)

video_list = [
]

for video_path in video_list:
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(1000 / fps)-4
    print(frame_interval)
    ret, frame = cap.read()

    frame_count = 0

    while ret:
        ret, frame = cap.read()
        
        if frame is None:
            continue
        results = model(frame)
        if len(results.pred[0]) > 45:
            # Draw bounding boxes on the frame
            for det in results.pred[0]:
            # det is a tensor with the following structure: [x1, y1, x2, y2, confidence, class]
                x1, y1, x2, y2, conf, cls_id = map(int, det[:6])
                class_name = results.names[cls_id]
                conf_score = f"{det[4]*100:.2f}"  # formatted to two decimal places
                text_label = f"{class_name} {conf_score}%"
                if float(conf_score) > 10:
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
                    cv2.putText(frame, text_label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
                frame_interval_final=frame_interval-int(sum(results.t))
        else:
            frame_interval_final=frame_interval
        
        cv2.imshow('YOLO', frame)
        
        if cv2.waitKey(frame_interval_final) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()
image 29

이전 글에서 YOLOv5 모델을 통해 추론 한것은 모델의 기본 렌더링 코드를 활용한 것이고 자유자제로 다루기 위해 모델의 출력을 가지고 직접 구현

또한, 프레임과 모델 추론하는데 걸리는 시간을 가지고 영상이 1배속으로 저장 될 수 있도록 함.

자전거 방향 판단, optical flow

import torch
import cv2
import numpy as np
import serial
import time

# 아두이노와 시리얼 연결 설정
# arduino = serial.Serial('COM5', 9600)  # COM_PORT는 사용하는 포트에 맞게 변경하세요. 예) 'COM3' 또는 '/dev/ttyACM0' 등
time.sleep(2)  # 아두이노와 연결이 정상적으로 수립되기 위한 대기 시간

def send_signal(signal, duration):
    global pub_signal_pre
    print("send_signal",end=' ')
    print(signal)
    pub_signal_pre=signal
    # arduino.write(signal.encode())  # 신호 전송
    # time.sleep(duration)  # 지정된 시간만큼 대기
    
# Load the YOLOv5 model
model = torch.hub.load('ultralytics/yolov5', 'custom', path='./best_afternoon_final_2.pt', force_reload=False)
threshold=45

# Define a list of colors for the bounding boxes
colors = np.random.randint(0, 255, size=(len(model.names), 3), dtype=int)

# 추적 경로를 그리기 위한 랜덤 색상
color = np.random.randint(0,255,(200,3))
lines = None  #추적 선을 그릴 이미지 저장 변수
prevImg = None  # 이전 프레임 저장 변수

# calcOpticalFlowPyrLK 중지 요건 설정
termcriteria =  (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)

signal_count=50
pub_signal="0000"
pub_signal_pre="2222"
red_on=0


# List of videos to process
video_list = [
]

# Process each video in the list
for video_path in video_list:
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(1000 / fps) - 6
    print("Frame interval:", frame_interval)
    
    while True:
        start_time = time.time()
        ret, frame = cap.read()

        if not ret or frame is None:
            break
        img_draw = frame.copy()
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Run the model on the current frame
        results = model(frame)
        pred = results.pred[0]
        # Display the bounding boxes and labels on the frame
        for det in pred:
            # det structure: [x1, y1, x2, y2, confidence, class]
            x1, y1, x2, y2, conf, cls_id = map(int, det[:6])
            class_name = results.names[cls_id]
            conf_score = f"{det[4]*100:.2f}"#f"{det[4] * 100:.2f}%"  # formatted to two decimal places

            # Only display the bounding box if confidence is above a threshold (e.g., 10%)
            if float(conf_score) > threshold:
                # Get a unique color for each class
                color_box = (255, 0, 255*int(cls_id-15)) #colors[cls_id].tolist()
                
                cv2.rectangle(img_draw, (x1, y1), (x2, y2), color_box, 2)
                cv2.putText(img_draw, f"{class_name} {conf_score}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color_box, 2)
                if class_name=="vehicle":
                    # print("detected vehicle")
                    # print("x1 =:{} y1 =:{}".format(x1, y1))
                    if ( x1>100 or x1 < 515 ) and ( y1 < 225 or y1 > 100):
                        # print("x1 =:{} y1 =:{}".format(x1, y1))
                        # print("1000")
                        
                        if prevImg is None:
                            prevImg = gray
                            # 추적선 그릴 이미지를 프레임 크기에 맞게 생성
                            lines = np.zeros_like(frame)
                            # 추적 시작을 위한 코너 검출  ---①
                            prevPt = cv2.goodFeaturesToTrack(prevImg, 200, 0.01, 10)
                        else:
                            nextImg = gray
                            # 옵티컬 플로우로 다음 프레임의 코너점  찾기 ---②
                            nextPt, status, err = cv2.calcOpticalFlowPyrLK(prevImg, nextImg, \
                                                            prevPt, None, criteria=termcriteria)
                            # 대응점이 있는 코너, 움직인 코너 선별 ---③
                            prevMv = prevPt[status==1]
                            nextMv = nextPt[status==1]
                            
                            bbox_polygon = np.array([[x1, y1], [x2, y1], [x2, y2], [x1, y2]])
                            motion_vectors = []
                            
                            for i,(p, n) in enumerate(zip(prevMv, nextMv)):
                                px,py = p.ravel()
                                nx,ny = n.ravel()
                                # 이전 코너와 새로운 코너에 선그리기 ---④
                                cv2.line(lines, (int(px), int(py)), (int(nx), int(ny)), color[i].tolist(), 2)
                                # 새로운 코너에 점 그리기
                                cv2.circle(img_draw, (int(nx), int(ny)), 2, color[i].tolist(), -1)
                                if cv2.pointPolygonTest(bbox_polygon, (nx, ny), False) >= 0:
                                    # Bounding box 내부의 코너에 대해 처리합니다.
                                    motion_vectors.append((nx - px, ny - py))
                            # 누적된 추적 선을 출력 이미지에 합성 ---⑤
                            img_draw = cv2.add(img_draw, lines)
                            # 다음 프레임을 위한 프레임과 코너점 이월
                            prevImg = nextImg
                            prevPt = nextMv.reshape(-1,1,2)
                            
                            if motion_vectors:
                                mean_dx, mean_dy = np.mean(motion_vectors, axis=0)
                                # 평균 이동 벡터를 출력합니다.
                                # print(f"Average motion in x: {mean_dx}, Average motion in y: {mean_dy}")
                                if x1 < 420 and y1 < 125 and mean_dx<0 and mean_dy<0:
                                    # print("1000")
                                    if pub_signal_pre=="0000":
                                        pub_signal="1000"
                                    # red_on=1
        
        cv2.imshow('YOLO', img_draw)
        
        if signal_count==0:
            if pub_signal_pre!=pub_signal:
                send_signal(pub_signal, 2)
            signal_count=100
            
            # if red_on>0:
            #     red_on=red_on-1
            # else:
            pub_signal="0000"
        else:
            signal_count=signal_count-1
            
            
        # Adjust the frame interval based on processing time
        
        elapsed_time = (time.time() - start_time) * 1000  # 밀리초 단위로 변환

        # frame_interval_final = frame_interval - int(sum(results.t))
        
        frame_interval_final = max(1, frame_interval-1 - int(elapsed_time))
        
        
        if frame_interval_final<1:
            frame_interval_final=1
        # Break the loop if 'q' is pressed
        # print(frame_interval_final)
        if cv2.waitKey(frame_interval_final) & 0xFF == ord('q'):
            break

    # Release the video capture and destroy all windows
    # cap.release()
    # cv2.destroyAllWindows()
cap.release()
cv2.destroyAllWindows()
image 30

optical flow 를 통해 자전거의 방향을 판단하는 것을 함. 물론 YOLO 만으로 박스의 x,y 좌표를 알 수 있기 때문에 알고리즘을 적절히 만든다면 YOLO 만으로 방향 판단을 할 수 있을 것이지만 인공지능 특성상 결국 100% 확률로 검출 하는 것이 아니기 때문에 자전거가 검출된 프레임과 프레임 사이의 아무것도 검출되지 않은 프레임이 존재할 수 있고 이와 같은 예외 사항을 처리하는 것은 까다롭다. 때문에 optical flow 활용

하드웨어 제작, arduino

image 36
image 34
image 35

아두이노를 활용하여 다음과 같이 하드웨어 제작함.

led 를 활용하여 자전거가 감지된 길에는 빨간 불빛이 나오도록 하고 아무것도 감지되지 않은 경우에는 녹색불이 켜져 있도록 함.

#include <Wire.h>
#include <LiquidCrystal_I2C.h>
#define SPEAKER_PIN1 5
#define SPEAKER_PIN2 4

int GREEN_1 = 13;
int RED_1 = 12;

int GREEN_2 = 11;
int RED_2 = 10;

int GREEN_3 = 9;
int RED_3 = 8;

int GREEN_4 = 7;
int RED_4 = 6;

LiquidCrystal_I2C lcd1(0x27, 16, 2);
LiquidCrystal_I2C lcd2(0x23, 16, 2);

String data = "0000";
bool soundPlayed = false;

int tone1_on=0;
int tone2_on=0;
int tone3_on=0;
int tone4_on=0;

void setup() {
  pinMode(RED_1, OUTPUT);
  pinMode(GREEN_1, OUTPUT);
  pinMode(RED_2, OUTPUT);
  pinMode(GREEN_2, OUTPUT);
  pinMode(RED_3, OUTPUT);
  pinMode(GREEN_3, OUTPUT);
  pinMode(RED_4, OUTPUT);
  pinMode(GREEN_4, OUTPUT);
  pinMode(SPEAKER_PIN1, OUTPUT);
  pinMode(SPEAKER_PIN2, OUTPUT);
  
  lcd1.init(); // lcd1 초기화
  lcd1.backlight(); // lcd1 백라이트 활성화

  lcd2.init(); // lcd2 초기화
  lcd2.backlight(); // lcd2 백라이트 활성화

  Serial.begin(9600);
}

void loop() {
  if (Serial.available()) {  // 시리얼로부터 데이터가 도착했는지 확인
    data = Serial.readString();
    delay(100);
    lcd1.clear();
    lcd2.clear();
    soundPlayed = false;

    int road_1 = data[0] - '0'; // 자전거 있으면 1, 없으면 0
    int road_2 = data[1] - '0';
    int road_3 = data[2] - '0';
    int road_4 = data[3] - '0';

    if (road_1){
      tone(SPEAKER_PIN1, 262); // 도
      delay(100);
      noTone(SPEAKER_PIN1);
      tone(SPEAKER_PIN1, 294); // 레
      delay(100);
      noTone(SPEAKER_PIN1);
    }
    if (road_2){
      tone(SPEAKER_PIN1, 294); // 레
      delay(100);
      noTone(SPEAKER_PIN1);
      tone(SPEAKER_PIN1, 330); // 미
      delay(100);
      noTone(SPEAKER_PIN1);
    }
    if (road_3){
      tone(SPEAKER_PIN2, 330); // 미
      delay(100);
      noTone(SPEAKER_PIN2);
      tone(SPEAKER_PIN2, 349); // 파
      delay(100);
      noTone(SPEAKER_PIN2);
    }
    if (road_4){
      tone(SPEAKER_PIN2, 349); // 파
      delay(100);
      noTone(SPEAKER_PIN2);
      tone(SPEAKER_PIN2, 262); // 도
      delay(100);
      noTone(SPEAKER_PIN2);
    }
  }
  int road_1 = data[0] - '0'; // 자전거 있으면 1, 없으면 0
  int road_2 = data[1] - '0';
  int road_3 = data[2] - '0';
  int road_4 = data[3] - '0';
  

  if (road_1) {
    analogWrite(RED_1, 255);
    analogWrite(GREEN_1, 0);
    
    lcd1.setCursor(0, 0);
    lcd1.print(" Hello Seoul ");
    lcd1.setCursor(0, 1);
    lcd1.print("  Bike is coming ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);

  } else {
    analogWrite(RED_1, 0);
    analogWrite(GREEN_1, 255);
    
    lcd1.setCursor(0, 0);
    lcd1.print(" Hello Seoul ");
    lcd1.setCursor(0, 1);
    lcd1.print("  ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);
  }

  if (road_2) {
    analogWrite(RED_2, 255);
    analogWrite(GREEN_2, 0);
    
    lcd2.setCursor(0, 0);
    lcd2.print(" Hello Seoul");
    lcd2.setCursor(0, 1);
    lcd2.print(" Bike is coming ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);

  } else {
    analogWrite(RED_2, 0);
    analogWrite(GREEN_2, 255);
    lcd2.setCursor(0, 0);
    lcd2.print(" Hello Seoul ");
    lcd2.setCursor(0, 1);
    lcd2.print("  ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);
  }

  if (road_3) {
    analogWrite(RED_3, 255);
    analogWrite(GREEN_3, 0);
    
    lcd1.setCursor(0, 0);
    lcd1.print(" Hello Seoul ");
    lcd1.setCursor(0, 1);
    lcd1.print("  ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);

  } else {
    analogWrite(RED_3, 0);
    analogWrite(GREEN_3, 255);
    lcd1.setCursor(0, 0);
    lcd1.print(" Hello Seoul ");
    lcd1.setCursor(0, 1);
    lcd1.print("  ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);
  }

  if (road_4) {
    analogWrite(RED_4, 255);
    analogWrite(GREEN_4, 0);

    lcd1.setCursor(0, 0);
    lcd1.print(" Hello Seoul ");
    lcd1.setCursor(0, 1);
    lcd1.print("  ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);

  } else {
    analogWrite(RED_4, 0);
    analogWrite(GREEN_4, 255);
    lcd1.setCursor(0, 0);
    lcd1.print(" Hello Seoul ");
    lcd1.setCursor(0, 1);
    lcd1.print("  ");
    lcd1.setCursor(0, 2);
    lcd1.print(data);
  }

  
}

아두이노 코드. 2개의 i2c lcd 모니터와 여러개의 led 를 시리얼 통신을 통해 받은 정보에 따라 제어하는 코드


import serial
import time

# 아두이노와 시리얼 연결 설정
arduino = serial.Serial('COM5', 9600)  # COM_PORT는 사용하는 포트에 맞게 변경하세요. 예) 'COM3' 또는 '/dev/ttyACM0' 등
time.sleep(2)  # 아두이노와 연결이 정상적으로 수립되기 위한 대기 시간

def send_signal(signal, duration):
    print(signal)
    arduino.write(signal.encode())  # 신호 전송
    time.sleep(duration)  # 지정된 시간만큼 대기

while True:
    send_signal("0000", 3)
    send_signal("1000", 3)
    send_signal("0100", 3)
    send_signal("0000", 3)
    send_signal("0011", 3)
    send_signal("0001", 3)

python 에서 아두이노에 제어신호 보내는 예제 코드

image 40
image 39

cctv 위치에 대해서 갈 수 있는 길이 4개이고 위와 같이 indexing 했을 때

1000 은 1번 방향으로 자전거가 “간다”, 1001 은 1번 과 4번 방향으로 자전거가 “간다” 에 의미이다. 후자는 자전거가 2대 이상일 것이다.

최종 코드

import torch
import cv2
import numpy as np
import serial
import time

# 아두이노와 시리얼 연결 설정
arduino = serial.Serial('COM5', 9600)  # COM_PORT는 사용하는 포트에 맞게 변경하세요. 예) 'COM3' 또는 '/dev/ttyACM0' 등
time.sleep(2)  # 아두이노와 연결이 정상적으로 수립되기 위한 대기 시간

def send_signal(signal, duration):
    global pub_signal_pre
    print("send_signal",end=' ')
    print(signal)
    pub_signal_pre=signal
    arduino.write(signal.encode())  # 신호 전송
    # time.sleep(duration)  # 지정된 시간만큼 대기
    
# Load the YOLOv5 model
model = torch.hub.load('ultralytics/yolov5', 'custom', path='./best_afternoon_final_2.pt', force_reload=False)
threshold=45

# Define a list of colors for the bounding boxes
colors = np.random.randint(0, 255, size=(len(model.names), 3), dtype=int)

# 추적 경로를 그리기 위한 랜덤 색상
color = np.random.randint(0,255,(200,3))
lines = None  #추적 선을 그릴 이미지 저장 변수
prevImg = None  # 이전 프레임 저장 변수

# calcOpticalFlowPyrLK 중지 요건 설정
termcriteria =  (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)

signal_count=50
pub_signal="0000"
pub_signal_pre="2222"
red_on=0


# List of videos to process
video_list = [
]

# Process each video in the list
for video_path in video_list:
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(1000 / fps) - 6
    print("Frame interval:", frame_interval)
    
    while True:
        start_time = time.time()
        ret, frame = cap.read()

        if not ret or frame is None:
            break
        img_draw = frame.copy()
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Run the model on the current frame
        results = model(frame)
        pred = results.pred[0]
        # Display the bounding boxes and labels on the frame
        for det in pred:
            # det structure: [x1, y1, x2, y2, confidence, class]
            x1, y1, x2, y2, conf, cls_id = map(int, det[:6])
            class_name = results.names[cls_id]
            conf_score = f"{det[4]*100:.2f}"#f"{det[4] * 100:.2f}%"  # formatted to two decimal places

            # Only display the bounding box if confidence is above a threshold (e.g., 10%)
            if float(conf_score) > threshold:
                # Get a unique color for each class
                color_box = (255, 0, 255*int(cls_id-15)) #colors[cls_id].tolist()
                
                cv2.rectangle(img_draw, (x1, y1), (x2, y2), color_box, 2)
                cv2.putText(img_draw, f"{class_name} {conf_score}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color_box, 2)
                if class_name=="vehicle":
                    # print("detected vehicle")
                    # print("x1 =:{} y1 =:{}".format(x1, y1))
                    if ( x1>100 or x1 < 515 ) and ( y1 < 225 or y1 > 100):
                        # print("x1 =:{} y1 =:{}".format(x1, y1))
                        # print("1000")
                        
                        if prevImg is None:
                            prevImg = gray
                            # 추적선 그릴 이미지를 프레임 크기에 맞게 생성
                            lines = np.zeros_like(frame)
                            # 추적 시작을 위한 코너 검출  ---①
                            prevPt = cv2.goodFeaturesToTrack(prevImg, 200, 0.01, 10)
                        else:
                            nextImg = gray
                            # 옵티컬 플로우로 다음 프레임의 코너점  찾기 ---②
                            nextPt, status, err = cv2.calcOpticalFlowPyrLK(prevImg, nextImg, \
                                                            prevPt, None, criteria=termcriteria)
                            # 대응점이 있는 코너, 움직인 코너 선별 ---③
                            prevMv = prevPt[status==1]
                            nextMv = nextPt[status==1]
                            
                            bbox_polygon = np.array([[x1, y1], [x2, y1], [x2, y2], [x1, y2]])
                            motion_vectors = []
                            
                            for i,(p, n) in enumerate(zip(prevMv, nextMv)):
                                px,py = p.ravel()
                                nx,ny = n.ravel()
                                # 이전 코너와 새로운 코너에 선그리기 ---④
                                cv2.line(lines, (int(px), int(py)), (int(nx), int(ny)), color[i].tolist(), 2)
                                # 새로운 코너에 점 그리기
                                cv2.circle(img_draw, (int(nx), int(ny)), 2, color[i].tolist(), -1)
                                if cv2.pointPolygonTest(bbox_polygon, (nx, ny), False) >= 0:
                                    # Bounding box 내부의 코너에 대해 처리합니다.
                                    motion_vectors.append((nx - px, ny - py))
                            # 누적된 추적 선을 출력 이미지에 합성 ---⑤
                            img_draw = cv2.add(img_draw, lines)
                            # 다음 프레임을 위한 프레임과 코너점 이월
                            prevImg = nextImg
                            prevPt = nextMv.reshape(-1,1,2)
                            
                            if motion_vectors:
                                mean_dx, mean_dy = np.mean(motion_vectors, axis=0)
                                # 평균 이동 벡터를 출력합니다.
                                # print(f"Average motion in x: {mean_dx}, Average motion in y: {mean_dy}")
                                if x1 < 420 and y1 < 125 and mean_dx<0 and mean_dy<0:
                                    # print("1000")
                                    if pub_signal_pre=="0000":
                                        pub_signal="1000"
                                    # red_on=1
        
        cv2.imshow('YOLO', img_draw)
        
        if signal_count==0:
            if pub_signal_pre!=pub_signal:
                send_signal(pub_signal, 2)
            signal_count=100
            
            # if red_on>0:
            #     red_on=red_on-1
            # else:
            pub_signal="0000"
        else:
            signal_count=signal_count-1
            
            
        # Adjust the frame interval based on processing time
        
        elapsed_time = (time.time() - start_time) * 1000  # 밀리초 단위로 변환

        # frame_interval_final = frame_interval - int(sum(results.t))
        
        frame_interval_final = max(1, frame_interval-1 - int(elapsed_time))
        
        
        if frame_interval_final<1:
            frame_interval_final=1
        # Break the loop if 'q' is pressed
        # print(frame_interval_final)
        if cv2.waitKey(frame_interval_final) & 0xFF == ord('q'):
            break

    # Release the video capture and destroy all windows
    # cap.release()
    # cv2.destroyAllWindows()
cap.release()
cv2.destroyAllWindows()

YOLOv5 를 통해 객체인식, optical flow 를 통해 자전거의 방향을 판단하는 것과 시리얼 통신을 통해 아두이노에 신호를 보내는 코드가 모두 합쳐져 있는 코드이다.

아두이노에 연결되어 있지 않으면 에러가 나기 때문에 아두이노를 제외한 나머지 코드만 실행시켜 보고 싶으면 8, 16 line 을 주석하면 된다.

image 41
image 42

이런식으로 자전거가 1번 방향으로 갈때 signal 1000 을 아두이노에 보내는 것을 볼 수 있다.

image 43

영상 자료


0 Comments

Leave a Reply