123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- from PIL import ImageFile, Image
- from flask import Flask, request, jsonify
- import face_recognition
- import base64
- from io import BytesIO
- import joblib
- import numpy as np
- import re
- import os
- import time
- import mysql.connector
- import shlex
- import subprocess
- from face_recognition_svm import train_svm
- ImageFile.SAFEBLOCK = 2048 * 2048
- app = Flask(__name__)
- model_file_name = "saved_model_2.pkl"
- clf = None
- classes = None
- ids = []
- # dummy_data = [
- # {
- # "name": "Bayu",
- # "address": "299 St Louis Road Oak Forest, IL 60452",
- # "nik": "1000076456784631"
- # },
- # {
- # "name": "Dio",
- # "address": "22 Whitemarsh St. Mansfield, MA 02048",
- # "nik": "1000024792887549"
- # },
- # {
- # "name": "Hadi",
- # "address": "643 Honey Creek Dr. Milledgeville, GA 31061",
- # "nik": "1000038502830420"
- # },
- # {
- # "name": "Kevin",
- # "address": "881 Cooper Ave. Hummelstown, PA 17036",
- # "nik": "1000045356476664"
- # },
- # {
- # "name": "Matrix",
- # "address": "580 Glenwood Dr. Garner, NC 27529",
- # "nik": "1000023452134598"
- # },
- # {
- # "name": "Surya",
- # "address": "909 South St Paul Street Hopewell, VA 23860",
- # "nik": "1000075656784734"
- # },
- # ]
- ssl = None
- known_people = "application_data/verification_images"
- known_faces = []
- total_threshold = 0.1
- face_model = "large"
- if face_model == "large":
- model_file_name = "saved_model_2_large.pkl"
- TRAINING_FOLDER = os.path.join("data", "train")
- PROFILE_FOLDER = os.path.join("application_data", "verification_images")
- app.config["TRAINING_FOLDER"] = TRAINING_FOLDER
- app.config["PROFILE_FOLDER"] = PROFILE_FOLDER
- db_user = "facer"
- db_pass = "9Y6Bqg3JwQxXa"
- db = mysql.connector.connect(user=db_user, password=db_pass, unix_socket="/opt/lampp/var/mysql/mysql.sock",
- database='face_recognition')
- def scan_known_people(known_people_folder):
- known_face_encodings = []
- for file in image_files_in_folder(known_people_folder):
- img = face_recognition.load_image_file(file)
- encodings = face_recognition.face_encodings(img, model=face_model)
- if len(encodings) > 1:
- print("WARNING: More than one face found in {}. Only considering the first face.".format(file))
- if len(encodings) == 0:
- print("WARNING: No faces found in {}. Ignoring file.".format(file))
- else:
- known_face_encodings.append(encodings[0])
- return known_face_encodings
- def image_files_in_folder(folder):
- img_list = [os.path.join(folder, f) for f in os.listdir(folder) if
- re.match(r'.*\.(jpg|jpeg|png|webp)', f, flags=re.I)]
- img_list.sort()
- return img_list
- def load_db(data_id):
- db_con = mysql.connector.connect(user=db_user, password=db_pass, unix_socket="/opt/lampp/var/mysql/mysql.sock",
- database='face_recognition')
- db_cursor = db_con.cursor()
- db_result = {"name": "Unknown", "address": "", "nik": ""}
- try:
- db_query = "SELECT `NIK`, `NAME`, `ADDRESS` FROM `face_recognition` WHERE `ID` = %s"
- db_cursor.execute(db_query, data_id)
- for (nik, name, address) in db_cursor:
- db_result['nik'] = nik
- db_result['name'] = name
- db_result['address'] = address
- finally:
- db_cursor.close()
- db_con.close()
- return db_result
- def save_db(nik, name, address):
- db_con = mysql.connector.connect(user=db_user, password=db_pass, unix_socket="/opt/lampp/var/mysql/mysql.sock",
- database='face_recognition')
- data_id = None
- db_cursor = db_con.cursor()
- try:
- db_query = 'INSERT INTO `face_recognition` (`NIK`, `NAME`, `ADDRESS`) VALUES (%s, %s, %s)'
- db_cursor.execute(db_query, (nik, name, address))
- db_con.commit()
- data_id = db_cursor.lastrowid
- finally:
- db_cursor.close()
- db_con.close()
- return data_id
- def get_face(image, crop=False):
- max_height = image.height
- max_width = image.width
- profile_nparray = np.asarray(image)
- profile_face_loc = face_recognition.face_locations(profile_nparray)
- profile_len = len(profile_face_loc)
- if profile_len >= 1:
- if crop:
- top = profile_face_loc[0][0]
- right = profile_face_loc[0][1]
- bottom = profile_face_loc[0][2]
- left = profile_face_loc[0][3]
- width = right - left
- height = bottom - top
- center_x = int(left + width / 2)
- center_y = int(top + height / 2)
- calc_new_width = min(int(width * 2), (max_width - center_x) * 2, abs((0 - center_x) * 2))
- calc_new_height = min(int(height * 2), (max_height - center_y) * 2, abs((0 - center_y) * 2))
- new_size = min(calc_new_height, calc_new_width)
- new_top = int(center_y - new_size / 2)
- new_right = int(center_x + new_size / 2)
- new_bottom = int(center_y + new_size / 2)
- new_left = int(center_x - new_size / 2)
- cropped = image.crop((new_left, new_top, new_right, new_bottom))
- resized = cropped.resize((250, 250))
- else:
- resized = image.resize((250, 250))
- return resized
- return None
- @app.route('/', methods=['GET', 'POST'])
- def test():
- return jsonify({"status": "0"})
- @app.route('/upload', methods=['POST'])
- def upload():
- try:
- profile = request.files["profile"]
- nik = request.form["nik"]
- name = request.form["name"]
- address = request.form["address"]
- training = []
- print(len(request.files))
- if 'training' in request.files:
- training = request.files.getlist("training")
- elif 'training[]' in request.files:
- training = request.files.getlist("training[]")
- else:
- files_length = len(request.files)
- training_names = [f'training{x}' for x in range(files_length - 1)]
- for x in training_names:
- training.append(request.files[x])
- print(training)
- print(len(training))
- if not training:
- return jsonify({"status": "2", "message": "Training files not exist"})
- profile_image = Image.open(profile.stream)
- profile_image.load()
- new_profile_image = get_face(profile_image)
- if not new_profile_image:
- return jsonify({"status": "3", "message": "Profile face not found, please upload a different picture"})
- detected_training = []
- detected_training_fn = []
- for file in training:
- training_image = Image.open(file.stream)
- new_training_image = get_face(training_image)
- if new_training_image:
- detected_training.append(new_training_image)
- detected_training_fn.append(file.filename)
- if not detected_training:
- return jsonify(
- {"status": "4", "message": "No face found on any training images, please upload a different picture"})
- print(detected_training)
- print(detected_training_fn)
- # Save
- user_id = save_db(nik, name, address)
- print(user_id)
- if not user_id:
- return jsonify({"status": "1", "message": "Error uploading data, please try again"})
- new_profile_image.save(os.path.join(app.config["PROFILE_FOLDER"], f'{user_id}.jpg'))
- id_dir = os.path.join(app.config['TRAINING_FOLDER'], str(user_id))
- if not os.path.exists(id_dir):
- os.mkdir(id_dir)
- for (image, filename) in zip(detected_training, detected_training_fn):
- image.save(os.path.join(app.config['TRAINING_FOLDER'], str(user_id), filename))
- return jsonify({"status": "0", "message": "Upload successful"})
- except Exception as exc:
- return jsonify({"status": "1", "message": f"Error uploading data, please try again | {exc}"})
- @app.route('/train', methods=['GET', 'POST'])
- def train():
- try:
- train_svm(model_file_name)
- return jsonify({"status": "0", "message": "Train successful"})
- except Exception as exc:
- return jsonify({"status": "1", "message": f"Error training model: {exc}"})
- @app.route('/reload', methods=['GET', 'POST'])
- def face_reload():
- pid = None
- try:
- with open("app.pid", "r") as f:
- pid = f.readline()
- if pid:
- cmd = f"kill -s HUP {pid}"
- cmd_array = shlex.split(cmd)
- print(cmd_array)
- subprocess.Popen(cmd_array, start_new_session=True)
- else:
- return jsonify({"status": "1", "message": f"Reload unsucessful"})
- except Exception as exc:
- return jsonify({"status": "2", "message": f"Reload unsucessful: {exc}"})
- @app.route('/predict', methods=['POST'])
- def predict():
- db_result = []
- if "image" in request.json:
- im_b64 = request.json["image"]
- elif "image" in request.files:
- im_b64 = request.files["image"]
- elif "image" in request.form:
- im_b64 = request.form["image"]
- else:
- return {"error": "Error reading image"}
- im_bytes = base64.b64decode(im_b64)
- im_file = BytesIO(im_bytes)
- test_image = face_recognition.load_image_file(im_file)
- face_locations = face_recognition.face_locations(test_image)
- no = len(face_locations)
- if no > 0:
- for i in range(no):
- start_time = time.perf_counter()
- test_image_enc = face_recognition.face_encodings(test_image, model=face_model)[i]
- proba_list = clf.predict_proba([test_image_enc])
- dist = face_recognition.face_distance(known_faces, test_image_enc)
- total = np.subtract(proba_list, dist)
- i = np.argmax(total)
- proba = list(*proba_list)[i]
- face_id = ids[i]
- data = load_db(face_id)
- name = data["name"]
- address = data["address"]
- nik = data["nik"]
- print("total threshold: ", total_threshold)
- print("total: ", total[0][i])
- print({
- "id": str(face_id),
- "name": name,
- "address": address,
- "nik": nik,
- "proba": proba,
- "delta": total[0][i]
- })
- if total[0][i] > total_threshold:
- js = {
- "id": str(face_id),
- "name": name,
- "address": address,
- "nik": nik,
- "proba": proba,
- "delta": total[0][i]
- }
- else:
- js = {
- "id": "-1",
- "name": "Unknown",
- "address": "",
- "nik": "",
- "proba": 0.0,
- "delta": 0.0
- }
- print(total[0][i] > total_threshold)
- print(not db_result)
- print(i == no - 1)
- print((not db_result and i == no - 1))
- if total[0][i] > total_threshold or (not db_result and i == no - 1):
- db_result.append(js)
- end_time = time.perf_counter()
- process_time = end_time - start_time
- print(time.strftime("%c"))
- print(total[0])
- print("Process time:", process_time, "s")
- print(db_result)
- return jsonify(db_result)
- try:
- clf = joblib.load(model_file_name)
- classes = clf.classes_
- print('model loaded')
- known_faces = scan_known_people(known_people)
- print('known faces scanned')
- db = mysql.connector.connect(user=db_user, password=db_pass, unix_socket="/opt/lampp/var/mysql/mysql.sock",
- database='face_recognition')
- cursor = db.cursor()
- result = {"name": "Unknown", "address": "", "nik": ""}
- try:
- query = "SELECT `ID` FROM `face_recognition` ORDER BY `ID`"
- cursor.execute(query)
- for data_id in cursor:
- ids.append(data_id[0])
- finally:
- cursor.close()
- db.close()
- print("ids: ", ids)
- except FileNotFoundError as e:
- print('No model here')
- exit(1)
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=8349, debug=True, ssl_context=ssl)
|