from PIL import ImageFile, Image from flask import Flask, request, jsonify import face_recognition import base64 from io import BytesIO import joblib import numpy as np import re import os import time import mysql.connector import shlex import subprocess from face_recognition_svm import train_svm ImageFile.SAFEBLOCK = 2048 * 2048 app = Flask(__name__) model_file_name = "saved_model_2.pkl" clf = None classes = None ids = [] ssl = None known_people = "application_data/verification_images" known_faces = [] total_threshold = 0.1 face_model = "large" if face_model == "large": model_file_name = "saved_model_2_large.pkl" TRAINING_FOLDER = os.path.join("data", "train") PROFILE_FOLDER = os.path.join("application_data", "verification_images") app.config["TRAINING_FOLDER"] = TRAINING_FOLDER app.config["PROFILE_FOLDER"] = PROFILE_FOLDER db_user = os.environ.get("DB_USER", "facer") db_pass = os.environ.get("DB_PASS", "9Y6Bqg3JwQxXa") # Pick one db_host, db_port = os.environ.get("DB_HOST", "db"), os.environ.get("DB_PORT", "5050") db_unix_socket = "" # Pick one db_name = os.environ.get("DB_NAME", "face_recognition") def scan_known_people(known_people_folder): known_face_encodings = [] for file in image_files_in_folder(known_people_folder): img = face_recognition.load_image_file(file) encodings = face_recognition.face_encodings(img, model=face_model) if len(encodings) > 1: print("WARNING: More than one face found in {}. Only considering the first face.".format(file)) if len(encodings) == 0: print("WARNING: No faces found in {}. Ignoring file.".format(file)) else: known_face_encodings.append(encodings[0]) return known_face_encodings def image_files_in_folder(folder): img_list = [os.path.join(folder, f) for f in os.listdir(folder) if re.match(r'.*\.(jpg|jpeg|png|webp)', f, flags=re.I)] img_list.sort() return img_list def load_db(data_id): db_con = None if db_unix_socket: db_con = mysql.connector.connect(user=db_user, password=db_pass, unix_socket=db_unix_socket, database=db_name) elif db_host and db_port: db_con = mysql.connector.connect(user=db_user, password=db_pass, host=db_host, port=db_port, database=db_name) else: return None db_cursor = db_con.cursor() db_result = {"name": "Unknown", "address": "", "nik": ""} try: db_query = "SELECT `NIK`, `NAME`, `ADDRESS` FROM `face_recognition` WHERE `ID` = %s" db_cursor.execute(db_query, (data_id,)) for (nik, name, address) in db_cursor: db_result['nik'] = nik db_result['name'] = name db_result['address'] = address finally: db_cursor.close() db_con.close() return db_result def save_db(nik, name, address): db_con = None if db_unix_socket: db_con = mysql.connector.connect(user=db_user, password=db_pass, unix_socket=db_unix_socket, database=db_name) elif db_host and db_port: db_con = mysql.connector.connect(user=db_user, password=db_pass, host=db_host, port=db_port, database=db_name) else: return None data_id = None db_cursor = db_con.cursor() try: db_query = 'INSERT INTO `face_recognition` (`NIK`, `NAME`, `ADDRESS`) VALUES (%s, %s, %s)' db_cursor.execute(db_query, (nik, name, address)) db_con.commit() data_id = db_cursor.lastrowid finally: db_cursor.close() db_con.close() return data_id def get_face(image, crop=False): max_height = image.height max_width = image.width profile_nparray = np.asarray(image) profile_face_loc = face_recognition.face_locations(profile_nparray) profile_len = len(profile_face_loc) if profile_len >= 1: if crop: top = profile_face_loc[0][0] right = profile_face_loc[0][1] bottom = profile_face_loc[0][2] left = profile_face_loc[0][3] width = right - left height = bottom - top center_x = int(left + width / 2) center_y = int(top + height / 2) calc_new_width = min(int(width * 2), (max_width - center_x) * 2, abs((0 - center_x) * 2)) calc_new_height = min(int(height * 2), (max_height - center_y) * 2, abs((0 - center_y) * 2)) new_size = min(calc_new_height, calc_new_width) new_top = int(center_y - new_size / 2) new_right = int(center_x + new_size / 2) new_bottom = int(center_y + new_size / 2) new_left = int(center_x - new_size / 2) cropped = image.crop((new_left, new_top, new_right, new_bottom)) resized = cropped.resize((250, 250)) else: resized = image.resize((250, 250)) return resized return None @app.route('/', methods=['GET', 'POST']) def test(): return jsonify({"status": "0"}) @app.route('/upload', methods=['POST']) def upload(): try: profile = request.files["profile"] nik = request.form["nik"] name = request.form["name"] address = request.form["address"] training = [] print(len(request.files)) if 'training' in request.files: training = request.files.getlist("training") elif 'training[]' in request.files: training = request.files.getlist("training[]") else: files_length = len(request.files) training_names = [f'training{x}' for x in range(files_length - 1)] for x in training_names: training.append(request.files[x]) print(training) print(len(training)) if not training: return jsonify({"status": "2", "message": "Training files not exist"}) profile_image = Image.open(profile.stream) profile_image.load() new_profile_image = get_face(profile_image) if not new_profile_image: return jsonify({"status": "3", "message": "Profile face not found, please upload a different picture"}) detected_training = [] detected_training_fn = [] for file in training: training_image = Image.open(file.stream) new_training_image = get_face(training_image) if new_training_image: detected_training.append(new_training_image) detected_training_fn.append(file.filename) if not detected_training: return jsonify( {"status": "4", "message": "No face found on any training images, please upload a different picture"}) print(detected_training) print(detected_training_fn) # Save user_id = save_db(nik, name, address) print(user_id) if not user_id: return jsonify({"status": "1", "message": "Error uploading data, please try again"}) new_profile_image.save(os.path.join(app.config["PROFILE_FOLDER"], f'{user_id}.jpg')) id_dir = os.path.join(app.config['TRAINING_FOLDER'], str(user_id)) if not os.path.exists(id_dir): os.mkdir(id_dir) for (image, filename) in zip(detected_training, detected_training_fn): image.save(os.path.join(app.config['TRAINING_FOLDER'], str(user_id), filename)) return jsonify({"status": "0", "message": "Upload successful"}) except Exception as exc: return jsonify({"status": "1", "message": f"Error uploading data, please try again | {exc}"}) @app.route('/train', methods=['GET', 'POST']) def train(): try: train_svm(model_file_name) return jsonify({"status": "0", "message": "Train successful"}) except Exception as exc: return jsonify({"status": "1", "message": f"Error training model: {exc}"}) @app.route('/reload', methods=['GET', 'POST']) def face_reload(): pid = None try: with open("app.pid", "r") as f: pid = f.readline() if pid: cmd = f"kill -s HUP {pid}" cmd_array = shlex.split(cmd) print(cmd_array) subprocess.Popen(cmd_array, start_new_session=True) else: return jsonify({"status": "1", "message": f"Reload unsucessful"}) except Exception as exc: return jsonify({"status": "2", "message": f"Reload unsucessful: {exc}"}) @app.route('/predict', methods=['POST']) def predict(): db_result = [] if "image" in request.json: im_b64 = request.json["image"] elif "image" in request.files: im_b64 = request.files["image"] elif "image" in request.form: im_b64 = request.form["image"] else: return {"error": "Error reading image"} im_bytes = base64.b64decode(im_b64) im_file = BytesIO(im_bytes) test_image = face_recognition.load_image_file(im_file) face_locations = face_recognition.face_locations(test_image) no = len(face_locations) if no > 0: for i in range(no): start_time = time.perf_counter() test_image_enc = face_recognition.face_encodings(test_image, model=face_model)[i] proba_list = clf.predict_proba([test_image_enc]) dist = face_recognition.face_distance(known_faces, test_image_enc) total = np.subtract(proba_list, dist) i = np.argmax(total) proba = list(*proba_list)[i] face_id = ids[i] data = load_db(face_id) name = data["name"] address = data["address"] nik = data["nik"] print("total threshold: ", total_threshold) print("total: ", total[0][i]) print({ "id": str(face_id), "name": name, "address": address, "nik": nik, "proba": proba, "delta": total[0][i] }) if total[0][i] > total_threshold: js = { "id": str(face_id), "name": name, "address": address, "nik": nik, "proba": proba, "delta": total[0][i] } else: js = { "id": "-1", "name": "Unknown", "address": "", "nik": "", "proba": 0.0, "delta": 0.0 } if total[0][i] > total_threshold or (not db_result and i == no - 1): db_result.append(js) end_time = time.perf_counter() process_time = end_time - start_time print(time.strftime("%c")) print(total[0]) print("Process time:", process_time, "s") print(db_result) return jsonify(db_result) try: clf = joblib.load(model_file_name) classes = clf.classes_ print('model loaded') known_faces = scan_known_people(known_people) print('known faces scanned') db_con = None if db_unix_socket: db_con = mysql.connector.connect(user=db_user, password=db_pass, unix_socket=db_unix_socket, database=db_name) elif db_host and db_port: db_con = mysql.connector.connect(user=db_user, password=db_pass, host=db_host, port=db_port, database=db_name) else: exit(1) cursor = db_con.cursor() result = {"name": "Unknown", "address": "", "nik": ""} try: query = "SELECT `ID` FROM `face_recognition` ORDER BY `ID`" cursor.execute(query) for data_id in cursor: ids.append(data_id[0]) finally: cursor.close() db_con.close() print("ids: ", ids) except FileNotFoundError as e: print('No model here') exit(1) if __name__ == '__main__': app.run(host='0.0.0.0', port=8349, debug=True, ssl_context=ssl)