로그 포맷
제가 파싱하려는 로그는 아래의 포맷으로 기록되고 있습니다.
[timestmp] [log_level] [task_id] message
로그 예제
[2016-07-21 05:27:19,818] [INFO] [....................................] Received task: qservice.devicemgmt.worker.GetServices[f770306e-1582-4b1e-9a6b-b5e09ca5f924]
[2016-07-21 05:27:19,819] [INFO] [f770306e-1582-4b1e-9a6b-b5e09ca5f924] message...
[2016-07-21 05:27:19,761] [INFO] [....................................] Task qservice.devicemgmt.worker.GetCapabilities[a4616504-2677-4e19-8b8e-a57180100d14] succeeded in 0.961237121024s: message...
정규식 패턴 작성 (./patterns/celerylog 파일)
로그에서 task_id를 파싱해내기 위해서 아래와 같이 정규식 패턴을 작성하였습니다.
정규식 패턴의 이름은 TASKID로 지정하였습니다.
이 정규식 패턴은 아래에서 filter를 정의할 때 사용합니다.
TASKID [0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}|\.{36}
logstash.conf 작성
1. input
input {
file {
path => "/var/log/celery.log"
codec => multiline {
pattern => "^\s|^Traceback"
what => "previous"
}
}
}
celery.log 파일로부터 로그를 읽어오도록 설정하였습니다. 그런데 이 파일에는 다양한 포맷의 로그가 존재합니다. 예를 들어 오류가 발생하면, 아래와 같이 여러 행의 로그가 기록됩니다.
[2016-07-21 05:27:20,547] [ERROR] [....................................] Task qservice.devicemgmt.worker.GetServices[f770306e-1582-4b1e-9a6b-b5e09ca5f924] raised unexpected: TypeError("'NoneType' object has no attribute '__getitem__'",)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/data/qworker/qservice/devicemgmt/worker.py", line 13, in GetServices
return self.call("GetServices", param)
File "/data/qworker/qservice/service.py", line 25, in call
return func(param)
File "/data/qworker/qservice/device/driver/plugin/onvif/devicemgmt.py", line 44, in GetServices
print (err['test'])
이와 같은 여러 행의 로그를 하나의 로그 단위로 저장하기 위해 multiline 코덱을 적용하였습니다. Traceback으로 시작(^Traceback)하거나 공백으로 시작(^\s)하는 행은 이전(previous) 행에 붙도록 해야합니다.
2. filter
filter {
grok {
patterns_dir => "/var/log/patterns"
match => { "message" => [
"%{TIMESTAMP_ISO8601:timestamp}%{DATA}%{LOGLEVEL:log_level}%{DATA}Task %{DATA:method}\[%{TASKID:task_id}\] succeeded in (?<proc_time>[0-9]+\.[0-9]+)s: (?<body>.+)",
"%{TIMESTAMP_ISO8601:timestamp}%{DATA}%{LOGLEVEL:log_level}%{DATA}(T|t)ask(:|) %{DATA:method}\[%{TASKID:task_id}\](?<body>.+|)",
"%{TIMESTAMP_ISO8601:timestamp}%{DATA}%{LOGLEVEL:log_level}%{DATA}%{TASKID:task_id}\] (?<body>.+)"]
}
}
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
timezone => "UTC"
}
}
filter 플러그인을 적용하여 읽어들인 로그를 내가 원하는 포맷으로 재가공해야 합니다.
먼저 grok을 사용해서 timestamp, log_level, method, task_id, proc_time, body 필드의 문자열을 추출하였습니다.
TIMESTAMP_ISO8601, LOGLEVEL, DATA는 LogStash에 선언되어 있는 정규식 패턴입니다.
파일에서 로그를 읽어들이면 해당 라인을 읽어들인 시점이 ElasticSearch의 @timestamp 필드에 기록됩니다.
저는 grok을 사용해서 추출한 timestamp를 @timestamp에 적용하기 위해 date 플러그인을 사용하였습니다.
3. output
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
stdout {}
}
재가공된 데이터를 ElasticSearch에 저장하고 터미널에도 출력하도록 설정하였습니다. 참고로 ElasticSearch가 여러개의 클러스터로 구성되어 있다면 Dedicated Master Node가 아닌 Client나 Data Node에 전달되도록 설정해야 클러스터의 안정성을 유지할 수 있다고 하네요.
ElasticSearch에 저장된 결과 확인
고려사항
Filter 플러그인을 적용하면 성능에 많은 영향을 미칠 수 있습니다. 특히 grok 필터는 정규식 계산을 위해 많은 리소스를 사용합니다. 이를 조금이나마 개선하기 위해서는 멀티코어 기반에서 병렬 처리를 수행하도록 설정해야 합니다. -w 옵션을 사용하여 스레드 개수를 설정할 수 있습니다. 예를 들어 logstash -w 8 명령어로 실행한 경우 필터 처리를 위해 8개의 다른 스레드를 사용합니다.
참고
- LogStash Grok 패턴
- grok 정규식 테스트 사이트
- https://www.elastic.co/guide/en/logstash/current/deploying-and-scaling.html
'LogStash' 카테고리의 다른 글
Filebeat 데이터를 로드밸런싱하는 방법들 (0) | 2016.07.25 |
---|---|
[LogStash] sockAppender로 log4j 로그 입력 (0) | 2015.08.24 |
LogStash, ElasticSearch, Kibana 설치하기 (0) | 2015.08.21 |