AI 모델링에 사용한 데이터를 데이터 분석을 위한 Elastic stack(https://sirzzang.github.io/dev/Dev-elk-stack-01/)을 이용해 관리하고 시각화할 수 있다. AI 모델링 서버(https://projectlog-eraser.tistory.com/64)에서 로깅을 통해 input, modeling(모델링 중간 과정에 발생하는 데이터) data를 남기고, ELK stack을 통해 이를 수집 및 가공해 적재하고, 시각화하면 된다.
# 구조
전체 구조도는 다음과 같다.
AI app server의 Model, Datastore에서 input, output data 및 모델링 과정을 나타내는 데이터(예컨대, 클러스터링 점수 등)를 로그로 남긴다. Elastic stack은 로그 데이터를 이용해 AI modeling 데이터를 관리 및 시각화한다.
Elastic stack을 이루는 각각의 스택이 담당하는 역할은 다음과 같다.
- Filebeats: 실시간으로 발생하는 로그 데이터를 읽고, 로그 데이터에서 필요한 데이터를 추출해 Logstash로 전송
- Logstash: Filebeats에서 전송한 데이터를 가공하여 Elasticsearch로 전송
- Elasticsearch: 데이터 적재 및 검색
- Kibana: Elasticsearch에 적재된 데이터를 시각화
이 때, 클라이언트는 Kibana를 통해 시각화된 모델링 결과를 대시보드로 확인할 수 있다.
# 구현
Model1에서 발생하는 로그 데이터의 형태는 다음과 같다.
2021-12-24 16:54:39,804 - root - Model1 - inference - INFO:['ST20000515', '20210909115502', 90.33, 70.22, 30.33, 'Good', 'Moderate', 'Bad']
2021-12-24 16:54:39,804 - root - Model2 - _get_cluster_score - INFO:['ST20000509', '20210928092810', 23.73, 59.13, 0]
Model2에서 발생하는 로그 데이터의 형태는 다음과 같다.
2021-12-24 16:54:39,804 - root - Model2 - _solution - INFO : {'type': 'optimizer', 'order': 1, 'id': 1, 'from': {'lat': '36.88563292330548', 'lon': '128.73457595862516'}, 'to': {'lat': '36.817064467622814', 'lon': '128.96344333457913'}, 'duration': '0:32:00'}
2021-12-24 16:54:39,806 - root - Model2 - _solution - INFO : Optimizer Total Time(minute): 521
2021-12-24 16:54:39,806 - root - Model3 - _solution - INFO : Ortools Total Distance(meter): 267104
Filebeats에서 로그 데이터를 읽어 필요한 데이터를 추출하고, Lostash에서 자료형을 바꾸는 방식으로 가공한다.
## Filebeats.yml
기본적으로 입력(input), 처리(processor), 출력(output)과 관련된 설정 항목을 작성하면 된다. filebeats를 설치하면 자동으로 깔리는 예시 configuration 파일(https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-reference-yml.html)을 참고하거나, Filebeats 공식 문서를 참고해 설정하면 된다.
아래 설정은 예시일 뿐, 개발 요구사항에 맞추어 작성해야 한다. 예컨대, exclude_lines, include_lines와 같은 것은 로그 패턴을 보고 수집할 것과 수집하지 않을 것을 결정해야 한다. 또한, processor dissect 모듈 등을 활용하여 로그 데이터에서 필요한 정보를 추출하는데, 추출되는 데이터가 문자열 형태임에 유의한다.
# --------------------------------- filebeats 입력 ---------------------------------
filebeat.inputs:
- type: filestream # 실시간 로그 filestream
enabled: true
prospector.scanner.check_interval: 30s # scan 간격
paths:
- /path/to/log # glob-based path 설정도 가능
# 수집하지 않을 라인
exclude_lines: ['DEBUG']
# 수집할 로그 라인
include_lines: ['inference', '_solution', '_get_cluster_score']
# --------------------------------- filebeats 설정 파일 -------------------------------
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
# 설정 변경시 reload 여부
reload.enabled: true
# --------------------------------- filebeats output -------------------------------
# Kibana 설정
setup.kibana:
host: "localhost:5601" # Kibana host
# 아래 주석 풀고 작성 시 Filebeats에서 바로 Elasticsearch로 전송
# output.elasticsearch:
# # Array of hosts to connect to.
# hosts: ["localhost:9200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
# logstash output
output.logstash:
# The Logstash hosts
hosts: ["localhost:5044"]
# --------------------------------- filebeats processor -------------------------------
processors:
- drop_fields: # 제외할 필드. 로그 외에 자동으로 생성되는 필드들도 있으므로 확인 후 제외
fields: ["log", "host", "architecture", "agent", "input", "ecs"]
- dissect: # 데이터 부분만 추출
field: "message"
tokenizer: '%{log_timestamp->} %{+log_timestamp} - %{} - %{model} - %{method} - %{} : %{data}'
target_prefix: ""
- dissect: # Model1 데이터 추출
when:
and:
- equals:
model: "Model2"
- contains:
message: "{"
field: "data"
tokenizer: "%{}: '%{type}', %{}: %{order}, %{}: %{id}, %{}: %{}: '%{from.lat}', %{}: '%{from.lon}'}, %{}: %{}: '%{to.lat}', %{}: '%{to.lon}'}, %{}: '%{duration}'"
target_prefix: ""
ignore_failure: "true"
- dissect:
when:
and:
- equals:
model: "Model2"
- contains:
message: "Distance"
field: "data"
tokenizer: "%{}: %{distance}"
target_prefix: ""
- dissect:
when:
and:
- equals:
model: "Model3"
- contains:
message: "Time"
field: "data"
tokenizer: "%{}: %{time}"
target_prefix: ""
- dissect:
when:
and:
- equals:
model: "Model1"
- equals:
method: "inference"
field: "data"
tokenizer: "['%{id}', '%{date}', %{score1}, %{score2}, %{score3}, '%{grade1}', '%{grade2}', '%{grade3}']"
target_prefix: ""
- dissect:
when:
and:
- equals:
model: "freshnessModel"
- equals:
method: "_get_cluster_score"
field: "data"
tokenizer: "['%{id}', '%{date}', %{x}, %{y}, %{cluster}]"
target_prefix: ""
- drop_fields:
fields: ["data"]
## logstash.conf
입력(input), 처리(filter), 출력(output)과 관련된 설정 항목을 작성하면 된다. 입력의 경우, beats 스택으로부터 tcp 소켓을 통해 5044 포트의 logstash로 전송된다. 앞서 filebeats.yml의 logstash output에서 설정한 부분이다.
input {
# filebeats input
beats {
port => 5044
}
}
filter {
# Model1 데이터 가공
if [model] == "Model1" {
# score 데이터 자료형 변환
if [method] == "inference" {
mutate {
convert => {
"score1" => "float"
"score2" => "float"
"score3" => "float"
}
}
}
# x, y 좌표 데이터 자료형 변환
if [method] == "_get_cluster_score" {
mutate {
convert => {
"x" => "float"
"y" => "float"
}
}
}
}
# Model2 데이터 가공
if [model] == "Model2" {
mutate {
# 자료형 변환
convert => {
"[from][lon]" => "float"
"[from][lat]" => "float"
"[to][lon]" => "float"
"[to][lat]" => "float"
"order" => "integer"
"id" => "integer"
"time" => "float"
"distance" => "float"
}
}
}
# 필요 없는 필드 제거
mutate {
remove_field => ["tags", "@version"]
}
}
output {
# Model1 elasticsearch 인덱스
if [model] == "Model1" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "model1"
ecs_compatibility => disabled
}
}
# Model2 elasticsearch 인덱스
if [model] == "Model2" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "model2"
ecs_compatibility => disabled
}
}
}
## kibana.yml
Kibana 설정은 앞에 비해서는 간단한 편이다. Kibana 웹 인터페이스를 통해서도 설정 파일의 항목을 쉽게 변경할 수 있다. Kibana 서버 포트와 쿼리를 전송할 Elasticsearch 호스트 설정에 주의한다.
# Kibana 서버 포트
server.port: 5601
# Kibana 서버 호스트
server.host: "kibana.server.host.address"
# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""
# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false
# Elasticsearch host
elasticsearch.hosts: ["http://localhost:9200"]
# kibana pid 저장
pid.file: /path/to/kibana/kibana-7.16.0-linux-x86_64/kibana.pid
# kibana 로그
logging.dest: /path/to/kibana/kibana-7.16.0-linux-x86_64/logs/kibana.log
참고: server.basepath, server.rewritebasepath 설정
여기서는 설정하지 않았지만, K8S 환경에 ELK stack을 적용했을 때(https://projectlog-eraser.tistory.com/63), ingress를 통해 회사 앱 서버 url을 지정했는데, 이 경우 server.basepath를 설정해 주어야 한다. 또한, 설정된 basepath가 제대로 적용될 수 있도록 server.rewritebasepath를 설정해 주어야 한다. 참고
# 결론
기존에 로그 관리 시스템에 적용해 본 것 외에도 데이터 분석에 Elastic stack을 적용할 수 있음을 배웠다. 특히 그것을 AI 모델링 서버와 연결해 진행할 수 있어 더 의미가 있는 경험이었다. 그 동안 Python 기반 matplotlib, seaborn 라이브러리 등을 이용해 시각화하는 것만 경험해 왔는데, ELK stack을 활용하면 일련의 파이프라인을 통해 데이터 수집부터 시각화할 수 있음을 깨달았다. 심지어 아래와 같이 시각화된 결과를 웹 대시보드 형태로도 제공할 수 있기에, 필요한 경우 ELK stack을 활용하는 것도 좋아 보인다.
'Backend > AI App Server' 카테고리의 다른 글
[App Server] Flask로 AI 백엔드 서버 구축해 보기 (0) | 2021.12.23 |
---|---|
[Tibero] Tibero에서 UPSERT 쿼리 구현하기 (0) | 2021.10.18 |
[App Server] Flask, Socket으로 앱 서버 구축해 보기 (0) | 2021.08.13 |
[App Server] ODBC를 이용해 Tibero와 Python 연동하기 (1) | 2021.07.30 |