LogStash

[LogStash] Filter 적용하여 로그 파싱하기

behonestar 2016. 7. 21. 10:47


로그 포맷

제가 파싱하려는 로그는 아래의 포맷으로 기록되고 있습니다.

[timestmp] [log_level] [task_idmessage



로그 예제

[2016-07-21 05:27:19,818] [INFO] [....................................] Received task: qservice.devicemgmt.worker.GetServices[f770306e-1582-4b1e-9a6b-b5e09ca5f924]

[2016-07-21 05:27:19,819] [INFO] [f770306e-1582-4b1e-9a6b-b5e09ca5f924] message...

[2016-07-21 05:27:19,761] [INFO] [....................................] Task qservice.devicemgmt.worker.GetCapabilities[a4616504-2677-4e19-8b8e-a57180100d14] succeeded in 0.961237121024s: message...



정규식 패턴 작성 (./patterns/celerylog 파일)

로그에서 task_id를 파싱해내기 위해서 아래와 같이 정규식 패턴을 작성하였습니다.

정규식 패턴의 이름은 TASKID로 지정하였습니다.

이 정규식 패턴은 아래에서 filter를 정의할 때 사용합니다.

TASKID [0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}|\.{36}



logstash.conf 작성

1. input

input {

  file {

    path => "/var/log/celery.log"

    codec => multiline {

      pattern => "^\s|^Traceback"

      what => "previous"

    }

  }

}


celery.log 파일로부터 로그를 읽어오도록 설정하였습니다. 그런데 이 파일에는 다양한 포맷의 로그가 존재합니다. 예를 들어 오류가 발생하면, 아래와 같이 여러 행의 로그가 기록됩니다. 


[2016-07-21 05:27:20,547] [ERROR] [....................................] Task qservice.devicemgmt.worker.GetServices[f770306e-1582-4b1e-9a6b-b5e09ca5f924] raised unexpected: TypeError("'NoneType' object has no attribute '__getitem__'",)

Traceback (most recent call last):

  File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task

    R = retval = fun(*args, **kwargs)

  File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__

    return self.run(*args, **kwargs)

  File "/data/qworker/qservice/devicemgmt/worker.py", line 13, in GetServices

    return self.call("GetServices", param)

  File "/data/qworker/qservice/service.py", line 25, in call

    return func(param)

  File "/data/qworker/qservice/device/driver/plugin/onvif/devicemgmt.py", line 44, in GetServices 

    print (err['test'])


이와 같은 여러 행의 로그를 하나의 로그 단위로 저장하기 위해 multiline 코덱을 적용하였습니다. Traceback으로 시작(^Traceback)하거나 공백으로 시작(^\s)하는 행은 이전(previous) 행에 붙도록 해야합니다.


2. filter

filter {

  grok {

    patterns_dir => "/var/log/patterns"

    match => { "message" => [

     "%{TIMESTAMP_ISO8601:timestamp}%{DATA}%{LOGLEVEL:log_level}%{DATA}Task %{DATA:method}\[%{TASKID:task_id}\] succeeded in (?<proc_time>[0-9]+\.[0-9]+)s: (?<body>.+)",

     "%{TIMESTAMP_ISO8601:timestamp}%{DATA}%{LOGLEVEL:log_level}%{DATA}(T|t)ask(:|) %{DATA:method}\[%{TASKID:task_id}\](?<body>.+|)",

     "%{TIMESTAMP_ISO8601:timestamp}%{DATA}%{LOGLEVEL:log_level}%{DATA}%{TASKID:task_id}\] (?<body>.+)"]

    }

  }

  date {

    match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]

    timezone => "UTC"

  }

}


filter 플러그인을 적용하여 읽어들인 로그를 내가 원하는 포맷으로 재가공해야 합니다. 

먼저 grok을 사용해서 timestamp, log_level, method, task_id, proc_time, body 필드의 문자열을 추출하였습니다. 

TIMESTAMP_ISO8601, LOGLEVEL, DATA는 LogStash에 선언되어 있는 정규식 패턴입니다.


파일에서 로그를 읽어들이면 해당 라인을 읽어들인 시점이 ElasticSearch의 @timestamp 필드에 기록됩니다. 

저는 grok을 사용해서 추출한 timestamp를 @timestamp에 적용하기 위해 date 플러그인을 사용하였습니다.



3. output

output {

  elasticsearch {

    hosts => ["127.0.0.1:9200"]

  }

  stdout {}

}


재가공된 데이터를 ElasticSearch에 저장하고 터미널에도 출력하도록 설정하였습니다. 참고로 ElasticSearch가 여러개의 클러스터로 구성되어 있다면 Dedicated Master Node가 아닌 Client나 Data Node에 전달되도록 설정해야 클러스터의 안정성을 유지할 수 있다고 하네요.



ElasticSearch에 저장된 결과 확인





고려사항

Filter 플러그인을 적용하면 성능에 많은 영향을 미칠 수 있습니다. 특히 grok 필터는 정규식 계산을 위해 많은 리소스를 사용합니다. 이를 조금이나마 개선하기 위해서는 멀티코어 기반에서 병렬 처리를 수행하도록 설정해야 합니다. -w 옵션을 사용하여 스레드 개수를 설정할 수 있습니다. 예를 들어 logstash -w 8 명령어로 실행한 경우 필터 처리를 위해 8개의 다른 스레드를 사용합니다.



참고