Backend/AI App Server

[ELK] AI 모델링 시각화

eraser 2022. 3. 15. 02:20
반응형

 AI 모델링에 사용한 데이터를 데이터 분석을 위한 Elastic stack(https://sirzzang.github.io/dev/Dev-elk-stack-01/)을 이용해 관리하고 시각화할 수 있다. AI 모델링 서버(https://projectlog-eraser.tistory.com/64)에서 로깅을 통해 input, modeling(모델링 중간 과정에 발생하는 데이터) data를 남기고, ELK stack을 통해 이를 수집 및 가공해 적재하고, 시각화하면 된다.

 


 

# 구조

 

전체 구조도는 다음과 같다.

 

AI modeling visualization을 위해 AI app server에서 발생한 로그 데이터를 활용한다.

 

 AI app server의 Model, Datastore에서 input, output data 및 모델링 과정을 나타내는 데이터(예컨대, 클러스터링 점수 등)를 로그로 남긴다. Elastic stack은 로그 데이터를 이용해 AI modeling 데이터를 관리 및 시각화한다.

 

 Elastic stack을 이루는 각각의 스택이 담당하는 역할은 다음과 같다.

  • Filebeats: 실시간으로 발생하는 로그 데이터를 읽고, 로그 데이터에서 필요한 데이터를 추출해 Logstash로 전송
  • Logstash: Filebeats에서 전송한 데이터를 가공하여 Elasticsearch로 전송
  • Elasticsearch: 데이터 적재 및 검색
  • Kibana: Elasticsearch에 적재된 데이터를 시각화

 

 이 때, 클라이언트는 Kibana를 통해 시각화된 모델링 결과를 대시보드로 확인할 수 있다. 

 

 


 

# 구현

 

 Model1에서 발생하는 로그 데이터의 형태는 다음과 같다.

2021-12-24 16:54:39,804 - root - Model1 - inference - INFO:['ST20000515', '20210909115502', 90.33, 70.22, 30.33, 'Good', 'Moderate', 'Bad']
2021-12-24 16:54:39,804 - root - Model2 - _get_cluster_score - INFO:['ST20000509', '20210928092810', 23.73, 59.13, 0]

 

 Model2에서 발생하는 로그 데이터의 형태는 다음과 같다.

2021-12-24 16:54:39,804 - root - Model2 - _solution - INFO : {'type': 'optimizer', 'order': 1, 'id': 1, 'from': {'lat': '36.88563292330548', 'lon': '128.73457595862516'}, 'to': {'lat': '36.817064467622814', 'lon': '128.96344333457913'}, 'duration': '0:32:00'}
2021-12-24 16:54:39,806 - root - Model2 - _solution - INFO : Optimizer Total Time(minute): 521
2021-12-24 16:54:39,806 - root - Model3 - _solution - INFO : Ortools Total Distance(meter): 267104

 

 Filebeats에서 로그 데이터를 읽어 필요한 데이터를 추출하고, Lostash에서 자료형을 바꾸는 방식으로 가공한다.

 


 

## Filebeats.yml

 

 기본적으로 입력(input), 처리(processor), 출력(output)과 관련된 설정 항목을 작성하면 된다. filebeats를 설치하면 자동으로 깔리는 예시 configuration 파일(https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-reference-yml.html)을 참고하거나, Filebeats 공식 문서를 참고해 설정하면 된다.

 아래 설정은 예시일 뿐, 개발 요구사항에 맞추어 작성해야 한다. 예컨대, exclude_lines, include_lines와 같은 것은 로그 패턴을 보고 수집할 것과 수집하지 않을 것을 결정해야 한다. 또한, processor dissect 모듈 등을 활용하여 로그 데이터에서 필요한 정보를 추출하는데, 추출되는 데이터가 문자열 형태임에 유의한다.

# --------------------------------- filebeats 입력 ---------------------------------
filebeat.inputs:
- type: filestream # 실시간 로그 filestream
  enabled: true
  prospector.scanner.check_interval: 30s # scan 간격
  paths:
    - /path/to/log # glob-based path 설정도 가능
  
  # 수집하지 않을 라인
  exclude_lines: ['DEBUG']
  
  # 수집할 로그 라인
  include_lines: ['inference', '_solution', '_get_cluster_score']

# --------------------------------- filebeats 설정 파일 -------------------------------
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml

  # 설정 변경시 reload 여부
  reload.enabled: true

# --------------------------------- filebeats output -------------------------------
# Kibana 설정
setup.kibana:
  host: "localhost:5601" # Kibana host

# 아래 주석 풀고 작성 시 Filebeats에서 바로 Elasticsearch로 전송
# output.elasticsearch:
#   # Array of hosts to connect to.
#   hosts: ["localhost:9200"]
  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# logstash output
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

# --------------------------------- filebeats processor -------------------------------
processors:
  - drop_fields: # 제외할 필드. 로그 외에 자동으로 생성되는 필드들도 있으므로 확인 후 제외
      fields: ["log", "host", "architecture", "agent", "input", "ecs"]
  - dissect: # 데이터 부분만 추출
      field: "message"
      tokenizer: '%{log_timestamp->} %{+log_timestamp} - %{} - %{model} - %{method} - %{} :     %{data}'
      target_prefix: ""
  - dissect: # Model1 데이터 추출
      when:
        and:
          - equals:
              model: "Model2"
          - contains:
              message: "{"
      field: "data"
      tokenizer: "%{}: '%{type}', %{}: %{order}, %{}: %{id}, %{}: %{}: '%{from.lat}', %{}: '%{from.lon}'}, %{}: %{}: '%{to.lat}', %{}: '%{to.lon}'}, %{}: '%{duration}'"
      target_prefix: ""
      ignore_failure: "true"
  - dissect:
      when:
        and:
          - equals:
              model: "Model2"
          - contains:
              message: "Distance"
      field: "data"
      tokenizer: "%{}: %{distance}"
      target_prefix: ""
  - dissect:
      when:
        and:
          - equals:
              model: "Model3"
          - contains:
              message: "Time"
      field: "data"
      tokenizer: "%{}: %{time}"
      target_prefix: ""
  - dissect:
      when:
        and:
          - equals:
              model: "Model1"
          - equals:
              method: "inference"
      field: "data"
      tokenizer: "['%{id}', '%{date}', %{score1}, %{score2}, %{score3}, '%{grade1}', '%{grade2}', '%{grade3}']"
      target_prefix: ""
  - dissect:
      when:
        and:
          - equals:
              model: "freshnessModel"
          - equals:
              method: "_get_cluster_score"
      field: "data"
      tokenizer: "['%{id}', '%{date}', %{x}, %{y}, %{cluster}]"
      target_prefix: ""
  - drop_fields:
      fields: ["data"]

 


## logstash.conf

 

 입력(input), 처리(filter), 출력(output)과 관련된 설정 항목을 작성하면 된다. 입력의 경우, beats 스택으로부터 tcp 소켓을 통해 5044 포트의 logstash로 전송된다. 앞서 filebeats.yml의 logstash output에서 설정한 부분이다.

 

input {
	# filebeats input
    beats {
        port => 5044
    }
}

filter {

    # Model1 데이터 가공
    if [model] == "Model1" {        
        # score 데이터 자료형 변환
        if [method] == "inference" {
            mutate {
                convert => {
                    "score1" => "float"
                    "score2" => "float"
                    "score3" => "float"
                }
            }
        }

        # x, y 좌표 데이터 자료형 변환
        if [method] == "_get_cluster_score" {
            mutate {
                convert => {
                    "x" => "float"
                    "y" => "float"
                }
            }
        }
    }

    # Model2 데이터 가공
    if [model] == "Model2" {
        mutate {
        
        	# 자료형 변환
            convert => {
                "[from][lon]" => "float"
                "[from][lat]" => "float"
                "[to][lon]" => "float"
                "[to][lat]" => "float"
                "order" => "integer"
                "id" => "integer"
                "time" => "float"
                "distance" => "float"
            }
        }
    }

    # 필요 없는 필드 제거
    mutate {
        remove_field => ["tags", "@version"]
    }
}

output {

    # Model1 elasticsearch 인덱스
    if [model] == "Model1" {
    	elasticsearch {
            hosts => ["http://localhost:9200"]
            index => "model1"
            ecs_compatibility => disabled
        }
    }

    # Model2 elasticsearch 인덱스
    if [model] == "Model2" {
        elasticsearch {
            hosts => ["http://localhost:9200"]
            index => "model2"
            ecs_compatibility => disabled
        }
    }
}

 


 

## kibana.yml

 

 Kibana 설정은 앞에 비해서는 간단한 편이다. Kibana 웹 인터페이스를 통해서도 설정 파일의 항목을 쉽게 변경할 수 있다. Kibana 서버 포트와 쿼리를 전송할 Elasticsearch 호스트 설정에 주의한다.

# Kibana 서버 포트
server.port: 5601

# Kibana 서버 호스트
server.host: "kibana.server.host.address"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""

# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false

# Elasticsearch host
elasticsearch.hosts: ["http://localhost:9200"]

# kibana pid 저장
pid.file: /path/to/kibana/kibana-7.16.0-linux-x86_64/kibana.pid

# kibana 로그
logging.dest: /path/to/kibana/kibana-7.16.0-linux-x86_64/logs/kibana.log

 

참고: server.basepath, server.rewritebasepath 설정
 여기서는 설정하지 않았지만, K8S 환경에 ELK stack을 적용했을 때(https://projectlog-eraser.tistory.com/63), ingress를 통해 회사 앱 서버 url을 지정했는데, 이 경우 server.basepath를 설정해 주어야 한다. 또한, 설정된 basepath가 제대로 적용될 수 있도록 server.rewritebasepath를 설정해 주어야 한다. 참고

 


 

# 결론

 

 기존에 로그 관리 시스템에 적용해 본 것 외에도 데이터 분석에 Elastic stack을 적용할 수 있음을 배웠다. 특히 그것을 AI 모델링 서버와 연결해 진행할 수 있어 더 의미가 있는 경험이었다. 그 동안 Python 기반 matplotlib, seaborn 라이브러리 등을 이용해 시각화하는 것만 경험해 왔는데, ELK stack을 활용하면 일련의 파이프라인을 통해 데이터 수집부터 시각화할 수 있음을 깨달았다. 심지어 아래와 같이 시각화된 결과를 웹 대시보드 형태로도 제공할 수 있기에, 필요한 경우 ELK stack을 활용하는 것도 좋아 보인다.

 

시각화에 활용된 데이터는 전부 다 Elasticsearch에 대한 쿼리 결과이다

 

 

 

반응형