한강 공개 cctv 영상 이후 실시간으로 저장되는 영상에 대해 object detection

이전 자료 링크 <- 한강공원 자전거 사고 방지 알림이 시스템[1] – python, 파이썬,cctv 영상 웹 스크래핑, cctv 영상 웹 크롤링

실시간 영상 재생

import cv2
import os
import time
import pygame
import re

video_dir = './video_ts/'  # 저장된 비디오 파일들의 디렉토리

# pygame를 사용하여 사운드 재생
pygame.init()
pygame.mixer.init()

def get_video_length(video_path):
    # 동영상 파일을 열기
    cap = cv2.VideoCapture(video_path)
    # 동영상의 총 프레임 수와 FPS 얻기
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    # 동영상의 길이 (초) 계산
    length_in_seconds = total_frames / fps
    # 동영상 파일 닫기
    cap.release()
    # 동영상의 길이를 분:초 형식으로 반환
    minutes = int(length_in_seconds // 60)
    seconds = int(length_in_seconds % 60)
    return f"{minutes}:{seconds:02}"


def extract_number(filename):
    # 파일 이름에서 숫자를 추출
    match = re.search(r'(\d+)', filename)
    if match:
        return int(match.group(1))
    return 0

def play_video(video_path, video_file,video_length):
    cap = cv2.VideoCapture(video_path)
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # 파일명을 화면에 표시
        cv2.putText(frame, video_file+video_length, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.imshow('Video', frame)
        if cv2.waitKey(28) & 0xFF == ord('q'):  # 28ms delay between frames
            break
    cap.release()

def play_audio(video_path):
    pygame.mixer.music.load(video_path)
    pygame.mixer.music.play()

while True:
    for video_file in sorted(os.listdir(video_dir), key=extract_number):
        if video_file.endswith('.ts'):
            video_path = os.path.join(video_dir, video_file)
            video_length = get_video_length(video_path)
            print(f"Playing {video_file} (Length: {video_length})")
            # play_audio(video_path)
            play_video(video_path, video_file,video_length)
    time.sleep(20)

cv2.destroyAllWindows()
pygame.mixer.quit()
pygame.quit()
image

오른쪽이 kbs 재난 포털에서 가져오는 실시간 cctv 영상이고 해당 영상을 저장한 뒤에 저장된 영상을 python 으로 재생하는 영상이다.

왼쪽 위에는 파일 이름과 재생 시간이다. 6초에 해당하는 영상이 분할되어 웹에 들어오기 때문에 우리가 저장한 영상 하나하나도 6초 단위로 끊겨있고 해당 영상을 재생할 때에도 6초마다 다른 영상을 틀어 주어야 한다.

object detection

import cv2
import os
import time
import pygame
import re
import threading
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

video_dir = './video_ts/'  # 저장된 비디오 파일들의 디렉토리


def get_video_length(video_path):
    # 동영상 파일을 열기
    cap = cv2.VideoCapture(video_path)
    # 동영상의 총 프레임 수와 FPS 얻기
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    # 동영상의 길이 (초) 계산
    length_in_seconds = total_frames / fps
    # 동영상 파일 닫기
    cap.release()
    # 동영상의 길이를 분:초 형식으로 반환
    minutes = int(length_in_seconds // 60)
    seconds = int(length_in_seconds % 60)
    return f"{minutes}:{seconds:02}"


def extract_number(filename):
    # 파일 이름에서 숫자를 추출
    match = re.search(r'(\d+)', filename)
    if match:
        return int(match.group(1))
    return 0


def play_video(video_path, video_file, video_length):
    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()
    
    processed_frame = frame.copy()
    frame_ready = threading.Event()
    
    
    # 비디오의 해상도를 가져옵니다.
    video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    def detect_and_display(frame):
        nonlocal processed_frame
        processed_frame = detect_objects(frame, YOLO_net, output_layers)
        frame_ready.set()

    while ret:
        # 객체 감지를 별도의 스레드에서 시작
        detection_thread = threading.Thread(target=detect_and_display, args=(frame,))
        detection_thread.start()

        # 메인 스레드에서는 현재 처리된 프레임을 표시하고 다음 프레임을 읽습니다.
        while detection_thread.is_alive():
            cv2.imshow(window_name, processed_frame)
            cv2.putText(frame, video_file+video_length, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            if cv2.waitKey(28) & 0xFF == ord('q'):  # 28ms delay between frames
                break
            ret, frame = cap.read()

        detection_thread.join()

    cap.release()
    cv2.destroyAllWindows()

def adjust_brightness(frame, threshold=100, scale=2.0):
    # 평균 밝기 계산
    mean_brightness = np.mean(frame)
    if mean_brightness < threshold:
        # 밝기 조절
        frame = cv2.convertScaleAbs(frame, alpha=scale, beta=0)
    return frame

def adjust_brightness_using_hsv(frame, value=50):
    # HSV 색 공간으로 변환
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    
    # V 채널 (밝기)을 조절
    h, s, v = cv2.split(hsv)
    v = cv2.add(v, value)
    v[v > 255] = 255
    v[v < 0] = 0
    hsv = cv2.merge([h, s, v])
    
    # RGB 색 공간으로 다시 변환
    frame = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    return frame


def detect_objects(frame, net, output_layers):
    # frame = adjust_brightness(frame) # 어두울 때 안되는 것에 대한 처리 -> 잘 안됨
    # frame = adjust_brightness_using_hsv(frame)
    # 노이즈 제거
    # frame = cv2.fastNlMeansDenoisingColored(frame, None, 10, 10, 7, 21)
    
    height, width, channels = frame.shape

    # Detecting objects
    blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
    net.setInput(blob)
    outs = net.forward(output_layers)

    class_ids = []
    confidences = []
    boxes = []
    

    for out in outs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            
            if class_id == bicycle_index and confidence > bicycle_confidence_threshold:
                confidence_threshold = bicycle_confidence_threshold
            else:
                confidence_threshold = 0.5
                
            if confidence > confidence_threshold:
                # Object detected
                center_x = int(detection[0] * width)
                center_y = int(detection[1] * height)
                w = int(detection[2] * width)
                h = int(detection[3] * height)

                # Rectangle coordinates
                x = int(center_x - w / 2)
                y = int(center_y - h / 2) 

                boxes.append([x, y, w, h])
                confidences.append(float(confidence))
                class_ids.append(class_id)

    indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)

    for i in range(len(boxes)):
        if i in indexes:
            label = str(classes[class_ids[i]])
            confidence = confidences[i]
            # color = (0, 255, 0)
            # Set color to red for bicycles, green for others
            color = (0, 0, 255) if class_ids[i] == bicycle_index  else (0, 255, 0)
            x, y, w, h = boxes[i]
            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
            cv2.putText(frame, label + " " + str(round(confidence, 2)), (x, y + 30), cv2.FONT_HERSHEY_PLAIN, 3, color, 3)

    return frame



# YOLO 가중치 파일과 CFG 파일 로드
YOLO_net = cv2.dnn.readNet("./yolo_resource/yolov3.weights","./yolo_resource/yolov3.cfg")

# YOLO NETWORK 재구성
classes = []
with open("./yolo_resource/coco.names", "r") as f:
    classes = [line.strip() for line in f.readlines()]
print(classes)
layer_names = YOLO_net.getLayerNames()
output_layers = [layer_names[i - 1] for i in YOLO_net.getUnconnectedOutLayers()]

bicycle_index = classes.index("bicycle")
print("bicycle_index:",bicycle_index)
bicycle_confidence_threshold = 0.0  # You can adjust this value as needed

window_name = "Video Playback"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)  # 일반 창으로 설정

while True:
    for video_file in sorted(os.listdir(video_dir), key=extract_number):
        if video_file.endswith('.ts'):
            video_path = os.path.join(video_dir, video_file)
            video_length = get_video_length(video_path)
            print(f"Playing {video_file} (Length: {video_length})")
            # play_audio(video_path)
            play_video(video_path, video_file,video_length)
    print("time sleep")
    time.sleep(20)

cv2.destroyAllWindows()
pygame.mixer.quit()
pygame.quit()

image 1
image 2

먼저 코드 중간에
YOLO_net = cv2.dnn.readNet(“./yolo_resource/yolov3.weights”,”./yolo_resource/yolov3.cfg”)
부분은 아래 링크 <- 에서 다운 받으면 된다

다양한 YOLO 모델이 있고 이중에 선택하면 된다.

코드를 보면 스레드를 활용하였는데 이는 원래의 cctv 화면 재생과 object detection 을 같이 진행하면 cctv 재생 자체가 엄청 느려지게 된다. 모든 프레임에 대해서 실행하기 때문

이 때문에 object detection 을 진행하는 부분과 cctv 영상을 재생하는 부분을 나누어서 cctv 영상을 계속 진행이 되고 object detection 은 다 될 때마다 그 때의 cctv 영상 프레임을 받아 object detection 진행하는 형태로 하였다.

현재 문제점

1. 뒤에 있는 자전거나 사람이 인식이 안된다.

이 문제를 해결하기 위해 뒤에 있는 도로 즉 화면에 1/3 지점부터 1/2 지점을 크롭하여 확대해서 하려고 하였으나 워낙 기본 cctv 영상 화질이 좋지 않아 인식이 여전히 안되었다.

2. 밤이나 저녁에는 인식이 잘 안된다.

색 공간 조절과 같은 방법을 통해 쉽게 될 것이라 생각했지만 정말 낮이 된 것처럼 밝게 되는 것이 아니다 보니 밝게 했을 때 어색할 뿐 아니라 성능이 유의미하게 좋아지지도 않았다.

image 4
image 3

밝기 조절 코드

def adjust_brightness(frame, threshold=100, scale=2.0):
    # 평균 밝기 계산
    mean_brightness = np.mean(frame)
    if mean_brightness < threshold:
        # 밝기 조절
        frame = cv2.convertScaleAbs(frame, alpha=scale, beta=0)
    return frame

def adjust_brightness_using_hsv(frame, value=50):
    # HSV 색 공간으로 변환
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    
    # V 채널 (밝기)을 조절
    h, s, v = cv2.split(hsv)
    v = cv2.add(v, value)
    v[v > 255] = 255
    v[v < 0] = 0
    hsv = cv2.merge([h, s, v])
    
    # RGB 색 공간으로 다시 변환
    frame = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    return frame

영상 crop 코드

 # # 영상의 1/2 윗 부분만 잘라내기
        # cropped_frame = frame[video_height//3:video_height//2, :]
        
        # # 잘라낸 부분을 원래 영상 크기로 확대
        # resized_frame = cv2.resize(cropped_frame, (1280, 640))
        # frame=resized_frame

향후 개발 계획

다크넷의 오픈 모델 YOLOv3 와 opencv 를 통해 쉽게 개발을 하려고 했지만 상황이 직접 데이터셋을 모아 새롭게 모델을 학습시키는 방법을 적용해봐야 하는 상황이 되었다.

다크넷의 YOLOv3 에 대해 custom dataset 을 적용하는 방법도 있지만 linux 환경을 사용해야 할 뿐 아니라 오래된 시스템 이기 때문에 user 에게 친절하지가 않다.

때문에 익숙한 pytorch 환경에 대해 새로운 YOLOv5 모델에 대해 직접 데이터셋을 만들어서 학습을 진행하고 그 모델을 사용하려 한다.


0 Comments

Leave a Reply