개인 프로젝트: 4차시 Raspberry Pi 로 사람 판별하기 - Marvic1130/EmbeddedSystem_Class GitHub Wiki

Raspberry Pi 로 사람 판별하기

얼굴 학습하기

개요

이 프로젝트에서는 마스크를 쓴 상태에서 사람을 판별해야 하므로 얼굴의 하관을 제외한 눈 부분만 이미지를 크롭하여 학습한다.

학습 머신러닝 알고리즘은 CNN을 사용한다.

CNN(Convolutional Neural Networks)란?

CNN(Convolutional Neural Networks)이란 우리말로 번역하면 합성곱 신경망으로, 이름 그대로 Convolution(합성곱) 전처리를 이용하여 학습을 하는 머신러닝 기법이다.

CNN은 딥러닝에서 이미지나 영상 데이터를 처리할 때 쓰인다.


신경망은 기본적으로 Layer를 연결하는 방식으로 이루어지는데, CNN에서는 Convolution layer와 pooling layer가 사용된다.

img


Convolution layer는 이미지의 특징을 판단하는 layer이다. 컴퓨터는 이미지에서 물체를 판별할 때, 다른 위치에 있는 같은 물체를 다른 물체라고 판단하는데, 이러한 문제를 해결해 주는것이 Convolution layer이다.

R1280x0-2

R1280x0


pooling layer는 특징을 강화하는 layer이다.

pooling layer는 특정값을 제외하고 나머지 값들은 버린다고 생각하면 된다.

Pooling layer의 종류에는 max pooling, average pooling, overlapping Pooling이 있다.

1*8DRW7Uw6lHfAdPdXrHiY9w

CreateModel.py 코드 설명

학습할 데이터를 가져오는 코드


groups_folder_path = 'labeled'
categories = []
filelist = os.listdir(groups_folder_path)
filelist.sort()

for i in range(filelist.__len__()):
    if filelist[i] != '.DS_Store':
        categories.append(filelist[i])

num_classes = categories.__len__()

ls_x = []
ls_y = []

for idex, categorie in enumerate(categories):
    label = [0 for i in range(num_classes)]
    label[idex] = 1
    image_dir = groups_folder_path + '/' + categorie + '/'

    for top, dir, f in os.walk(image_dir):
        for filename in f:
            if filename.endswith('.jpg'):
                img = cv2.imread(image_dir + filename)
                img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                img = cv2.resize(img, (192, 128))
                ls_x.append(img / 256)
                ls_y.append(label)


학습 데이터와 테스트 데이터를 나누는 코드


x_train, x_test, y_train, y_test = train_test_split(ls_x, ls_y, test_size=.2, random_state=0)

X_train = np.array(x_train)
Y_train = np.array(y_train)
X_test = np.array(x_test)
Y_test = np.array(y_test)

print(X_train.shape)
print(X_train.shape[1:])

Sequential 선언 및 컴파일


model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(128, 192, 1)))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Convolution2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Convolution2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Convolution2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(categories.__len__(), activation='softmax'))

model.summary()

model.compile(loss='categorical_crossentropy', optimizer='Adam', metrics=['accuracy'])

스크린샷 2022-12-20 오후 9 02 49

Early Stopping 을 위한 콜백을 선언 후 model.fit


early_stopping_calback = EarlyStopping(monitor='loss', patience=3)
checkpointer = ModelCheckpoint(filepath=modelpath, monitor='loss', verbose=0, save_best_only=True)

hist = model.fit(X_train, Y_train, batch_size=64, epochs=200, callbacks=[early_stopping_calback, checkpointer])

스크린샷 2022-12-20 오후 9 02 58

테스트 데이터셋 예측


y_predicted = model.predict(X_test)
y_pred = []
for i in range(y_predicted.__len__()):
    max_index = 0
    y_pred.append([])
    for j in range(1, y_predicted[i].__len__()):
        if y_predicted[i, j] > y_predicted[i, max_index]:
            max_index = j
    for k in range(y_predicted[i].__len__()):
        if k == max_index:
            y_pred[i].append(1)
        else:
            y_pred[i].append(0)

학습 결과 도출


Y_pred = np.array(y_pred)
loss = log_loss(Y_test, y_predicted)
f1 = f1_score(Y_test, Y_pred, average='macro')

print('Test accuracy:', accuracy_score(Y_test, Y_pred))
print('Test loss:', loss)
print("F1-score: {:.2%}".format(f1))

스크린샷 2022-12-20 오후 9 03 58

전체 코드


import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dropout, Dense
from keras.layers import Flatten, Convolution2D, MaxPooling2D, Conv2D
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.metrics import f1_score, accuracy_score, log_loss
from sklearn.model_selection import train_test_split
import os
import cv2
import numpy as np
import matplotlib
from matplotlib import pyplot as plt
import absl.logging
matplotlib.use('TKAgg')
absl.logging.set_verbosity(absl.logging.ERROR)

groups_folder_path = 'labeled'
categories = []
filelist = os.listdir(groups_folder_path)
filelist.sort()

for i in range(filelist.__len__()):
    if filelist[i] != '.DS_Store':
        categories.append(filelist[i])

num_classes = categories.__len__()

ls_x = []
ls_y = []

for idex, categorie in enumerate(categories):
    label = [0 for i in range(num_classes)]
    label[idex] = 1
    image_dir = groups_folder_path + '/' + categorie + '/'

    for top, dir, f in os.walk(image_dir):
        for filename in f:
            if filename.endswith('.jpg'):
                img = cv2.imread(image_dir + filename)
                img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                img = cv2.resize(img, (192, 128))
                ls_x.append(img / 256)
                ls_y.append(label)

x_train, x_test, y_train, y_test = train_test_split(ls_x, ls_y, test_size=.2, random_state=0)

X_train = np.array(x_train)
Y_train = np.array(y_train)
X_test = np.array(x_test)
Y_test = np.array(y_test)

print(X_train.shape)
print(X_train.shape[1:])

model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(128, 192, 1)))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Convolution2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Convolution2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Convolution2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(categories.__len__(), activation='softmax'))

model.summary()

model.compile(loss='categorical_crossentropy', optimizer='Adam', metrics=['accuracy'])

modelpath = './models'

early_stopping_calback = EarlyStopping(monitor='loss', patience=3)
checkpointer = ModelCheckpoint(filepath=modelpath, monitor='loss', verbose=0, save_best_only=True)

hist = model.fit(X_train, Y_train, batch_size=64, epochs=200, callbacks=[early_stopping_calback, checkpointer])

plt.figure(figsize=(12, 8))
plt.plot(hist.history['loss'])
plt.plot(hist.history['accuracy'])
plt.legend(['loss', 'acc'])
plt.grid()
plt.show()

y_predicted = model.predict(X_test)
y_pred = []
for i in range(y_predicted.__len__()):
    max_index = 0
    y_pred.append([])
    for j in range(1, y_predicted[i].__len__()):
        if y_predicted[i, j] > y_predicted[i, max_index]:
            max_index = j
    for k in range(y_predicted[i].__len__()):
        if k == max_index:
            y_pred[i].append(1)
        else:
            y_pred[i].append(0)

Y_pred = np.array(y_pred)
loss = log_loss(Y_test, y_predicted)
f1 = f1_score(Y_test, Y_pred, average='macro')

print('Test accuracy:', accuracy_score(Y_test, Y_pred))
print('Test loss:', loss)
print("F1-score: {:.2%}".format(f1))
path = str(hex(X_train.shape[0]))[2:] + "F" + str(hex(int(f1*10000)))[2:]

model_path = './models' + '/' + path

tf.saved_model.save(model, model_path)
model.save(model_path + "/" + path + ".h5")
print('save model path:', path)

Raspberry Pi 캠에서 얼굴 추출 후 예측

기존 Project/FaceDetector_Server/Server.py 파일을 위에서 학습한 모델을 이용하여 분석할 수 있게 코드 수정

코드 설명

학습한 모델을 불러오기 위해 keras.models,load_model import

from keras.models import load_model

네트워크와 가중치를 가지고있는 파일과 모델 선언


 facenet = cv2.dnn.readNet('models/model.prototxt', 'models/model.caffemodel')
    model_path = 'models/68aF26c2/68aF26c2.h5'
    model = load_model(model_path)

블롭 객체 생성


blob = cv2.dnn.blobFromImage(frame, scalefactor=1., size=(405, 405), mean=(104., 177., 123.))
facenet.setInput(blob)
dets = facenet.forward()

얼굴 예측


face_input = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
face_input = cv2.resize(face_input, (192, 128))
face_input = np.expand_dims(face_input, axis=0)
face_input = np.array(face_input)

modelpredict = model.predict(face_input)
pred_class_ls = modelpredict[0, :]
for i in range(pred_class_ls.__len__()):
    if pred_class_ls[i]:
        lable = i.__str__()
        break

전체 코드


import socket
import numpy as np
import os
import random
import cv2
from keras.models import load_model


# socket에서 수신한 버퍼를 반환하는 함수
def recvall(sock, count):
    # 바이트 문자열
    buf = b''
    while count:
        newbuf = sock.recv(count)
        if not newbuf: return None
        buf += newbuf
        count -= len(newbuf)
    return buf


def rename_file(dist_lable: str):
    count = 0
    file_list = os.listdir("croppedData")


    for i in range(file_list.__len__()):
        if file_list[i].endswith(".jpg"):

            src = "croppedData/" + file_list[i]
            dst = "croppedData/" + dist_lable + count.__str__() + ".jpg"
            os.rename(src, dst)
            print(src + " rename to " + dst)
            count += 1


if __name__ == '__main__':
    facenet = cv2.dnn.readNet('models/model.prototxt', 'models/model.caffemodel')
    model_path = 'models/68aF26c2/68aF26c2.h5'
    model = load_model(model_path)

    HOST = '0.0.0.0'
    PORT = 8808

    # TCP 사용
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    print('Socket created')

    # 서버의 아이피와 포트번호 지정
    s.bind((HOST, PORT))
    print('Socket bind complete')
    # 클라이언트의 접속을 기다린다. (클라이언트 연결을 10개까지 받는다)
    s.listen(10)
    print('Socket now listening')

    # 연결, conn에는 소켓 객체, addr은 소켓에 바인드 된 주소
    conn, addr = s.accept()

    while True:
        # client에서 받은 stringData의 크기 (==(str(len(stringData))).encode().ljust(16))
        length = recvall(conn, 16)
        stringData = recvall(conn, int(length))
        data = np.fromstring(stringData, dtype='uint8')

        # data를 디코딩한다.
        frame = cv2.imdecode(data, cv2.IMREAD_COLOR)
        h, w = frame.shape[:2]

        blob = cv2.dnn.blobFromImage(frame, scalefactor=1., size=(405, 405), mean=(104., 177., 123.))
        facenet.setInput(blob)
        dets = facenet.forward()

        for i in range(dets.shape[2]):
            confidence = dets[0, 0, i, 2]
            if confidence < 0.5:
                continue

            x1 = int(dets[0, 0, i, 3] * w)
            y1 = int(dets[0, 0, i, 4] * h)
            x2 = int(dets[0, 0, i, 5] * w)
            y2 = int(dets[0, 0, i, 6] * h)

            face = frame[y1:y2, x1:x2]
            face = face / 256

            if (x2 >= w or y2 >= h):
                continue
            if (x1 <= 0 or y1 <= 0):
                continue

            face_input = cv2.resize(face, (200, 200))
            face_input = np.expand_dims(face_input, axis=0)
            face_input = np.array(face_input)

            color = (255, 255, 255)

            file_list = os.listdir("croppedData")

            cropped_data_path = "croppedData/temp" + random.randrange(0, 999999).__str__() + ".jpg"
            height_dist = (y2 - y1) // 2
            crop = frame[y1: y2 - height_dist, x1: x2]

            face_input = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
            face_input = cv2.resize(face_input, (192, 128))
            face_input = np.expand_dims(face_input, axis=0)
            face_input = np.array(face_input)

            modelpredict = model.predict(face_input)
            pred_class_ls = modelpredict[0, :]
            for i in range(pred_class_ls.__len__()):
                if pred_class_ls[i]:
                    lable = i.__str__()
                    break

            try:
                cv2.imwrite(cropped_data_path, crop)
            except Exception as e:
                print(e)

            cv2.rectangle(frame, pt1=(x1, y1), pt2=(x2, y2), thickness=2, color=color, lineType=cv2.LINE_AA)
            cv2.putText(frame , text=lable, org=(x1, y1 - 10), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.8,
                        color = color, thickness=2, lineType=cv2.LINE_AA)


        cv2.imshow('masktest', frame)

        key = cv2.waitKey(1)
        if key == 27:
            break

    rename_file('crop')


결과

스크린샷 2022-12-20 오후 9 13 57


이전: 개인 프로젝트: 3차시 Raspberry Pi 로 얼굴인식하기