Backend/AI App Server

[App Server] Flask, Socket으로 앱 서버 구축해 보기

eraser 2021. 8. 13. 02:34
반응형

 (지난 포스팅에 이어) 클라이언트의 요청이 들어 왔을 때, 요청을 처리해 모델링을 한 뒤, 해당 결과를 클라이언트에게 전송하는 앱 서버를 개발하였다. Flask와 Socket을 이용해 개발했는데, 구조를 기획하고 실제 코드를 작성하는 과정에서 배운 점이 많았기 때문에 간략하게 전체적인 과정을 기록하고자 한다.

 예전에 데이터 분석, AI를 배울 때부터 꼭 해보고 싶었던 일이기에, 회사 업무를 통해 경험할 수 있다는 것 자체가 큰 성장이었다.

 


 

# 구조 

 

 개발에 앞서 다음 그림과 같은 구조를 기획했다. Flask app server는 클라이언트의 요청을 라우팅하는 router 모듈로 동작하고, backend server는 Flask app server에서 받은 요청에 따라 그에 맞는 모델을 호출해 작업을 수행한 뒤 결과를 돌려준다. 

 

Flask app과 model, DB 부분이 모두 우리 backend지만, flask는 routing 역할만 담당하도록 구상했다.

 

 서빙될 모델은 2개이다. 클라이언트는 model1과 model2에 대해 GET 요청을 보낸다. Flask app server는 model1에 대한 URL <server>/model1로 요청이 들어 오면 backend server에 model1에 대한 요청이 들어 왔음을 알린다. model2에 대한 경우도 마찬가지이다. 이후 model1과 model2에서 사용자가 알고자 하는 결과에 대한 inference가 끝나면, 각 모델은 inference가 끝났다는 알림(과 데이터)을 Flask app에 보내고, Flask app은 클라이언트에게 해당 결과를 전송한다.

 

 각 모델이 동작하는 방식은 내부적으로 다르지만, Flask app으로부터 호출을 받는다는 것이 핵심이다.

더보기

 model1의 경우, DB와 외부 API에서 모델링에 필요한 데이터를 로드해 전처리한 뒤, inference 로직에 전처리된 데이터를 주입한다. inference 결과를 DB의 result table에 INSERT하면, 클라이언트 앱 서버를 구축하는 쪽에서 해당 result table에 접근해 모델 inference 결과를 가져 간다. 따라서 INSERT가 완료되면 model inference가 끝났다는 response를 돌려 준다.

 model2의 경우, 클라이언트가 GET 요청을 보낼 때 request body에 모델링에 필요한 데이터가 함께 전송된다. 따라서 model2는 Flask app server로부터 전송된 데이터를 전처리하고, inference 과정을 거친다. model1과 달리 inference 결과를 Flask app server에 돌려 준다.

 

 

 Nginx는 Flask app server를 배포하기 위해 필요한 것으로, 구조도를 작성할 때는 집어 넣었지만, 지금 단계에서는 구현하지 않았다.

 


# 구현

 

 구현에 사용한 라이브러리는 다음과 같다.

  • flask
  • socket: flask app과 backend server간 통신을 위한 용도
  • pyodbc: Tibero DB에 접근하기 위한 용도
  • requests
  • pandas
  • python-dotenv: 환경변수(Tibero DSN 설정 등)를 import해서 사용하기 위한 용도
  • flake8: python 린팅

 

 

 전체적인 디렉토리 구조는 아래와 같다.

backend
ㄴ model1
   ㄴ loader.py
   ㄴ preprocessor.py
   ㄴ model.py
ㄴ model2
   ㄴ preprocessor.py
   ㄴ model.py
   ㄴ api.py
app.py
server.py
config.py

 

 app.py에서 Flask app을 구현하고, server.py에서 backend server를 구현한다.

 

더보기

모델에 대한 구현은 내가 담당한 부분이 아니기 때문에, 상세히 기록하지는 않는다.

 

 각 모델이 model.py에서 정의되는 Model1 혹은 Model2 클래스에서 predict 메소드를 호출하면 inference 결과가 나온다. inference 과정에서 load, preprocess, api 등 필요한 로직을 사용한다. 그리고 inference 결과로 나올 것이라 예상되는 데이터 형태에 맞추어 dummy data를 반환하도록 했다.

 사실 원래 Model1의 경우 Tibero DB의 result table에 결과를 INSERT해야 하지만, 일단은 response로 dummy data를 반환하는 것으로 가정하고 개발했다.

 


 

## app.py

 

from flask import Flask, request, jsonify
import json
import socket

from config import HOST, PORT

# Flask app
app = Flask(__name__)

# model1 routing
@app.route('/model1'):
def model1():
	# date for model inference
    req = request.data
    req_data = json.loads(req.decode())
    date = req_data['date']
    
    # send message via socket to server
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect((HOST, PORT))
    client_msg = f'model1, {date}'.encode()
    client_socket.sendall(client_msg)
    
    # receive message(model result) via socket from server
    server_msg = client_socket.recv(65535)
    client_socket.close()
    response = server_msg.decode('utf-8')
    return jsonify(response)

# model2 routing
@app.route('/model2')
def model2():
    # data for model inference
    req = request.data
    req_data = json.loads(req.decode())
    
    # send message via socket to server with data in request body
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_msg = f'model2, {data}'.encode()
    client_socket.sendall(client_msg)
    
    # receive message(model result) via socket from server
    server_msg = client_socket.recv(65535)
    client_socket.close()
    
    response = server_msg.decode('utf-8')
    return jsonify(response)
    

if __name__ == '__main__':
    app.run(debug=True)

 

 Flask app이 라우팅 후 각 모델을 호출하기 위해 server에 알림을 보내는 부분이 중요했다. 요청이 들어 오면 client 소켓을 열어 메시지와 데이터를 server 소켓에 전송한다. server 소켓이 model inference 결과를 메시지로 전송하고, 이 메시지를 받은 후 client 소켓을 닫는다.

 해당 코드를 구현할 때 주의해야 할 점은 소켓 생성이 각 url별 함수 안에서 이루어져야 한다는 것이다. 함수 밖에서 client 소켓을 생성하게 되면, Flask app server가 열려 있는 동안 계속해서 client 소켓이 열리게 된다. client 소켓은 client의 요청이 들어올 때만 열면 된다.

 

더보기

그렇지 않으면(예컨대, 위의 코드에서 6라인 혹은 8라인 사이에서 `client_socket = socket.socket()`과 같은 코드를 작성하는 경우이다), 아래와 같이 Flask app server를 종료한 후에야 server에 메시지가 전송되는 불상사를 겪게 된다. Flask app server가 동작하는 동안 계속해서 client 소켓을 생성하기 때문에 send를 타지 못하는 것이 아닐까 하는 추측을 해 보았지만, 좀 더 연구해 볼 필요가 있다.

 

분명 client socket이 연결되고, 오른쪽 Flask app server의 client socket에서 메시지를 보냈지만, 왼쪽 server 코드에서 아무 것도 받지 못한다.

 


 

## server.py

 

import socket
import json
from threading import Thread

from model1.loader import DataLoader
from model1.preprocessor import DataPreprocessor
from model1.model import Model1
from model2.model import Model2
from config import HOST, PORT, DSN


class ClientThread(Thread):
    def __init__(self, host, port):
        Thread.__init__(self)
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # prevent WinError 10048 error 
        self.server_socket.setsockopt(
            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(10)
    
    def run(self):
        while True:
            client_socket, addr = self.server_socket.accept()
            while True:
                client_msg = client_socket.recv(1024)
                if not client_msg:
                    break
                data = client_msg.decode().split()[-1]
                if b'model1' in client_msg:
                    data_loader = DataLoader(dsn=DSN)
                    rows = data_loader.fetch(data=data)
                    data_preprocessor = DataPreprocessor(rows=rows)
                    data = data_preprocessor.preprocess()
                    model = Model1(data)
                else:
                    model = Model2(data)
                
                result = model.predict()
                result_json = json.dumps(result)
                client_socket.sendall(result_json.encode())
            client_socket.close()
        self.server_socket.close()
        
 
 if __name__ == '__main__':
     thread_client = ClientThread(HOST, PORT)
     thread_client.start()
     thread_client.join()

 

 여러 클라이언트의 요청을 실행하기 위해 쓰레드를 활용하였다. threading.Thread 클래스를 상속받아 쓰레드가 실행할 run() 메소드를 재정의하면 된다. 각각의 쓰레드에 client 소켓을 보고 있는 server 소켓이 있다. 쓰레드가 시작되기 전까지 server socket은 listen, bind 상태로 client 소켓을 바라 보고(?) 있다. 쓰레드가 시작되어 server 소켓이 client 소켓의 연결을 accept하면, client 소켓과 server 소켓이 연결되고, 이후 통신이 진행된다.

 그 이후 로직은 간단하다. server 소켓은 client 소켓으로부터의 메시지를 해석하여 메시지에 'Model1'이 있으면 Model1을, 그렇지 않다면 Model2를 호출한다. 사실 이렇게 메시지를 해석해 분기 처리하는 것이 하드 코딩이 아닌가 싶긴 했는데, 당장 떠오르는 방법이 없었다. 이후 각 모델로부터 predict 결과가 나오면 client 소켓에 다시 메시지와 predict 결과를 보내고, 소켓을 닫는다.

 start()는 run 메소드를 호출해 쓰레드를 시작하는 역할을, join()은 새로 생성한 자식 스레드의 종료를 기다리는 것으로, 한 프로세스에서 여러 클라이언트의 요청을 처리할 수 있도록 했다.

 

 이를 구현하는 과정에서 주의해야 할 점은 두 가지가 있었다. 첫째로, 소켓 메시지는 byte string이다. 따라서 server 소켓에서 client 메시지를 해석해 분기 처리를 할 때, b'' 형식의 문자열을 사용해야 한다. 둘째로, 소켓 재사용을 위해서는 socket 플래그로 socket.SO_REUSEADDR를 사용해야 한다.

더보기

 특히 두 번째 경우, 플래그를 추가하지 않으면 아래 그림에서와 같이 에러가 발생한다. 위와 같이 플래그를 추가해주지 않으면, 일일이 해당 프로세스를 죽인 후 서버를 구동해야(kill [pid]) 한다.

 


 

## 테스트

 

 테스트를 위해서 다음과 같이 클라이언트 터미널에서 curl 커맨드를 사용해 Flask app으로 요청을 보내면 된다. 아래와 같이 클라이언트 터미널에 dummy data가 response로 반환된다.

왼쪽부터 차례로 client, Flask app server, backend server

 


 

# 피드백

 

 가장 우선적인 피드백은 구조 설계에 관한 것이었다. 애초에 Flask app server와 backend server가 나뉘어야 할 필요가 있는지, 그에 따라 둘 간에 네트워킹의 필요성이 있는지에 관한 부분이다.

  • 데이터가 많고 모델이 무거운 경우라면, 위와 같은 구조로 Flask는 라우팅 역할을 담당하고, backend server에서 모델을 돌리는 게 필요할 수 있다. 그러나 이 서비스의 경우는 그렇게 무겁지 않기 때문에 Flask app server에서 처리하는 것이 더 나아 보인다. (과하게 어렵게 짰다)
  • 따라서 소켓 네트워킹 역시 필요하지 않다. 게다가 Flask가 애초에 socket listen, bind 처리를 다 해준다. ⇒ Flask의 동작 원리에 대해 제대로 알지 못한 채 구조를 설계했음을 알 수 있었다. 위의 피드백에서처럼 대용량 데이터 및 큰 모델을 돌려야 하는 경우여서 네트워킹이 필요하다면 모를까, 이 경우에는 애초에 Flask에서 처리하는 작업을 중복으로 하는 코드라고도 볼 수 있다.

 

 모듈화에 대한 피드백도 있었다. 지금 코드에서는 model 부분에 load, 전처리 등의 코드가 모두 있는데, 이를 각 기능별로 나누어, 예컨대 데이터베이스에서 데이터를 로드하는 부분, 전처리하는 부분 등으로 나누어야 한다.

  • 모듈화를 위해서 먼저 인터페이스부터 작성해야 한다.
  • 각 모듈 간 이동해야 하는 데이터를 Data Access Object로 정의하는 것이 좋다.

 

 그 외에 코드 구현 레벨에서의 피드백은 다음과 같은 것들이 있었다.

  • 에러 핸들링이 필요하다.
  • 아마 지금과 같이 backend server를 구현하면, 쓰레드가 생성되지 않을 것처럼 보인다.

 

 처음 도전해 보는 서버 개발이었는데, 여러 모로 부족한 점이 많았다. 그럼에도 불구하고 처음부터 끝까지 구조를 설계해 보고, 요청과 응답이 돌아 오는 서버를 미약하게나마 구현해 본 것에 의의를 둔다. 또한, 피드백을 통해 개발에 대한 시야가 넓어질 수 있었다는 것에 매우 감사함을 느낀다.

 

 앞으로 더 연구해 보고 싶은 부분은 다음과 같다.

  • 인터페이스 작성 방법, 파이썬에서의 인터페이스
  • 현재 구현된 backend/server.py 코드에서 실제 쓰레드 생성되는지 확인
  • Flask 쓰레드 구현 방식 확인
  • Flask app에서 client 소켓의 생성 위치에 따른 동작의 차이
반응형