[ELK] AI 모델링 시각화

 AI 모델링에 사용한 데이터를 데이터 분석을 위한 Elastic stack(을 이용해 관리하고 시각화할 수 있다. AI 모델링 서버(에서 로깅을 통해 input, modeling(모델링 중간 과정에 발생하는 데이터) data를 남기고, ELK stack을 통해 이를 수집 및 가공해 적재하고, 시각화하면 된다.



# 구조


전체 구조도는 다음과 같다.


AI modeling visualization을 위해 AI app server에서 발생한 로그 데이터를 활용한다.


 AI app server의 Model, Datastore에서 input, output data 및 모델링 과정을 나타내는 데이터(예컨대, 클러스터링 점수 등)를 로그로 남긴다. Elastic stack은 로그 데이터를 이용해 AI modeling 데이터를 관리 및 시각화한다.


 Elastic stack을 이루는 각각의 스택이 담당하는 역할은 다음과 같다.

  • Filebeats: 실시간으로 발생하는 로그 데이터를 읽고, 로그 데이터에서 필요한 데이터를 추출해 Logstash로 전송
  • Logstash: Filebeats에서 전송한 데이터를 가공하여 Elasticsearch로 전송
  • Elasticsearch: 데이터 적재 및 검색
  • Kibana: Elasticsearch에 적재된 데이터를 시각화


 이 때, 클라이언트는 Kibana를 통해 시각화된 모델링 결과를 대시보드로 확인할 수 있다. 




# 구현


 Model1에서 발생하는 로그 데이터의 형태는 다음과 같다.

2021-12-24 16:54:39,804 - root - Model1 - inference - INFO:['ST20000515', '20210909115502', 90.33, 70.22, 30.33, 'Good', 'Moderate', 'Bad']
2021-12-24 16:54:39,804 - root - Model2 - _get_cluster_score - INFO:['ST20000509', '20210928092810', 23.73, 59.13, 0]


 Model2에서 발생하는 로그 데이터의 형태는 다음과 같다.

2021-12-24 16:54:39,804 - root - Model2 - _solution - INFO : {'type': 'optimizer', 'order': 1, 'id': 1, 'from': {'lat': '36.88563292330548', 'lon': '128.73457595862516'}, 'to': {'lat': '36.817064467622814', 'lon': '128.96344333457913'}, 'duration': '0:32:00'}
2021-12-24 16:54:39,806 - root - Model2 - _solution - INFO : Optimizer Total Time(minute): 521
2021-12-24 16:54:39,806 - root - Model3 - _solution - INFO : Ortools Total Distance(meter): 267104


 Filebeats에서 로그 데이터를 읽어 필요한 데이터를 추출하고, Lostash에서 자료형을 바꾸는 방식으로 가공한다.



## Filebeats.yml


 기본적으로 입력(input), 처리(processor), 출력(output)과 관련된 설정 항목을 작성하면 된다. filebeats를 설치하면 자동으로 깔리는 예시 configuration 파일(을 참고하거나, Filebeats 공식 문서를 참고해 설정하면 된다.

 아래 설정은 예시일 뿐, 개발 요구사항에 맞추어 작성해야 한다. 예컨대, exclude_lines, include_lines와 같은 것은 로그 패턴을 보고 수집할 것과 수집하지 않을 것을 결정해야 한다. 또한, processor dissect 모듈 등을 활용하여 로그 데이터에서 필요한 정보를 추출하는데, 추출되는 데이터가 문자열 형태임에 유의한다.

# --------------------------------- filebeats 입력 ---------------------------------
- type: filestream # 실시간 로그 filestream
  enabled: true
  prospector.scanner.check_interval: 30s # scan 간격
    - /path/to/log # glob-based path 설정도 가능
  # 수집하지 않을 라인
  exclude_lines: ['DEBUG']
  # 수집할 로그 라인
  include_lines: ['inference', '_solution', '_get_cluster_score']

# --------------------------------- filebeats 설정 파일 -------------------------------
  path: ${path.config}/modules.d/*.yml

  # 설정 변경시 reload 여부
  reload.enabled: true

# --------------------------------- filebeats output -------------------------------
# Kibana 설정
  host: "localhost:5601" # Kibana host

# 아래 주석 풀고 작성 시 Filebeats에서 바로 Elasticsearch로 전송
# output.elasticsearch:
#   # Array of hosts to connect to.
#   hosts: ["localhost:9200"]
  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# logstash output
  # The Logstash hosts
  hosts: ["localhost:5044"]

# --------------------------------- filebeats processor -------------------------------
  - drop_fields: # 제외할 필드. 로그 외에 자동으로 생성되는 필드들도 있으므로 확인 후 제외
      fields: ["log", "host", "architecture", "agent", "input", "ecs"]
  - dissect: # 데이터 부분만 추출
      field: "message"
      tokenizer: '%{log_timestamp->} %{+log_timestamp} - %{} - %{model} - %{method} - %{} :     %{data}'
      target_prefix: ""
  - dissect: # Model1 데이터 추출
          - equals:
              model: "Model2"
          - contains:
              message: "{"
      field: "data"
      tokenizer: "%{}: '%{type}', %{}: %{order}, %{}: %{id}, %{}: %{}: '%{}', %{}: '%{from.lon}'}, %{}: %{}: '%{}', %{}: '%{to.lon}'}, %{}: '%{duration}'"
      target_prefix: ""
      ignore_failure: "true"
  - dissect:
          - equals:
              model: "Model2"
          - contains:
              message: "Distance"
      field: "data"
      tokenizer: "%{}: %{distance}"
      target_prefix: ""
  - dissect:
          - equals:
              model: "Model3"
          - contains:
              message: "Time"
      field: "data"
      tokenizer: "%{}: %{time}"
      target_prefix: ""
  - dissect:
          - equals:
              model: "Model1"
          - equals:
              method: "inference"
      field: "data"
      tokenizer: "['%{id}', '%{date}', %{score1}, %{score2}, %{score3}, '%{grade1}', '%{grade2}', '%{grade3}']"
      target_prefix: ""
  - dissect:
          - equals:
              model: "freshnessModel"
          - equals:
              method: "_get_cluster_score"
      field: "data"
      tokenizer: "['%{id}', '%{date}', %{x}, %{y}, %{cluster}]"
      target_prefix: ""
  - drop_fields:
      fields: ["data"]


## logstash.conf


 입력(input), 처리(filter), 출력(output)과 관련된 설정 항목을 작성하면 된다. 입력의 경우, beats 스택으로부터 tcp 소켓을 통해 5044 포트의 logstash로 전송된다. 앞서 filebeats.yml의 logstash output에서 설정한 부분이다.


input {
	# filebeats input
    beats {
        port => 5044

filter {

    # Model1 데이터 가공
    if [model] == "Model1" {        
        # score 데이터 자료형 변환
        if [method] == "inference" {
            mutate {
                convert => {
                    "score1" => "float"
                    "score2" => "float"
                    "score3" => "float"

        # x, y 좌표 데이터 자료형 변환
        if [method] == "_get_cluster_score" {
            mutate {
                convert => {
                    "x" => "float"
                    "y" => "float"

    # Model2 데이터 가공
    if [model] == "Model2" {
        mutate {
        	# 자료형 변환
            convert => {
                "[from][lon]" => "float"
                "[from][lat]" => "float"
                "[to][lon]" => "float"
                "[to][lat]" => "float"
                "order" => "integer"
                "id" => "integer"
                "time" => "float"
                "distance" => "float"

    # 필요 없는 필드 제거
    mutate {
        remove_field => ["tags", "@version"]

output {

    # Model1 elasticsearch 인덱스
    if [model] == "Model1" {
    	elasticsearch {
            hosts => ["http://localhost:9200"]
            index => "model1"
            ecs_compatibility => disabled

    # Model2 elasticsearch 인덱스
    if [model] == "Model2" {
        elasticsearch {
            hosts => ["http://localhost:9200"]
            index => "model2"
            ecs_compatibility => disabled



## kibana.yml


 Kibana 설정은 앞에 비해서는 간단한 편이다. Kibana 웹 인터페이스를 통해서도 설정 파일의 항목을 쉽게 변경할 수 있다. Kibana 서버 포트와 쿼리를 전송할 Elasticsearch 호스트 설정에 주의한다.

# Kibana 서버 포트
server.port: 5601

# Kibana 서버 호스트 ""

# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""

# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false

# Elasticsearch host
elasticsearch.hosts: ["http://localhost:9200"]

# kibana pid 저장
pid.file: /path/to/kibana/kibana-7.16.0-linux-x86_64/

# kibana 로그
logging.dest: /path/to/kibana/kibana-7.16.0-linux-x86_64/logs/kibana.log


참고: server.basepath, server.rewritebasepath 설정
 여기서는 설정하지 않았지만, K8S 환경에 ELK stack을 적용했을 때(, ingress를 통해 회사 앱 서버 url을 지정했는데, 이 경우 server.basepath를 설정해 주어야 한다. 또한, 설정된 basepath가 제대로 적용될 수 있도록 server.rewritebasepath를 설정해 주어야 한다. 참고



# 결론


 기존에 로그 관리 시스템에 적용해 본 것 외에도 데이터 분석에 Elastic stack을 적용할 수 있음을 배웠다. 특히 그것을 AI 모델링 서버와 연결해 진행할 수 있어 더 의미가 있는 경험이었다. 그 동안 Python 기반 matplotlib, seaborn 라이브러리 등을 이용해 시각화하는 것만 경험해 왔는데, ELK stack을 활용하면 일련의 파이프라인을 통해 데이터 수집부터 시각화할 수 있음을 깨달았다. 심지어 아래와 같이 시각화된 결과를 웹 대시보드 형태로도 제공할 수 있기에, 필요한 경우 ELK stack을 활용하는 것도 좋아 보인다.


시각화에 활용된 데이터는 전부 다 Elasticsearch에 대한 쿼리 결과이다



