본문 바로가기
제품/ELK

인제스트 파이프라인

by 헬로웬디 2024. 10. 1.

개요

인제스트 파이프라인을 사용하면 데이터를 변환할 수 있습니다. 이 파이프라인은 프로세서로 구성되어 있으며, 각 프로세서가 데이터를 변경합니다. 예를 들어, 필드를 제거하거나 텍스트에서 특정 값을 추출하고, 데이터를 보강하는 등의 작업을 수행합니다. 이렇게 변환된 문서는 이후 Elasticsearch에 의해 데이터 스트림이나 인덱스에 추가됩니다.

 

인제스트 파이프라인을 사용하려면 클러스터에 인제스트 역할을 가진 노드가 최소 하나 있어야 합니다. 많은 데이터 처리 작업이 필요한 경우에는 전용 인제스트 노드를 만드는 것이 권장됩니다.

 

인제스트 API를 사용하여 파이프라인을 생성하고 관리하는 방법에 대해 알아보겠습니다.

 

이벤트 구조

제가 구문 분석해야 하는 이벤트는 다음처럼 생겼어요. agent.uuid와 같이 '.' (dot)으로 구분된 필드를 객체 필드로 확장하고, 시간을 읽을 수 있는 포맷으로 변환을 해야합니다.

{
    "agent.uuid":"9dcd5ed99b06495bb000000000000080",
    "endpoint.name":"DESKTOP-WENDY50",
    "endpoint.type":"desktop",
    "event.time":1727068416069,
    "src.process.uid":"3C5109A476335D11",
    "process.unique.key":"3C5109A476335D11",
    "src.process.pid":4,
    "src.process.startTime":1727068416069
}

 

파이프라인 생성

다음 API로 파이프라인을 생성 또는 업데이트할 수 있습니다. processors 키워드에 원하는 프로세스를 정의합니다.여기에서는 dot_expander 프로세서로 데이터를 객체 필드로 확장하고, 시간을 원하는 포맷으로 변경하기 위해 date 프로세서를 사용하였습니다.

PUT /_ingest/pipeline/logs-myblog.event-1.0.0
{
  "description": "Created by Wendy",
  "processors": [
    {
      "append": {
        "field": "creator",
        "value": [
          "Wendy"
        ]
      }
    },
    {
      "dot_expander": {
        "field":  "*"
      }
    },
    {
      "date": {
        "field": "event.time",
        "tag": "date_json_event_time",
        "formats": [
          "ISO8601",
          "epoch_millis"
        ],
         "if": "ctx.event?.time != null && ctx.event.time != ''",
         "on_failure": [
            {
              "append": {
                "field": "error.message",
                "value": "Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.pipeline}}} failed with message: {{{_ingest.on_failure_message}}}"
              }
            }
          ]
      }
    }
  ]
}

파이프라인 시뮬레이션

자! 파이프라인을 잘 생성했는지 테스트를 해봐야죠. 파이프라인을 시뮬레이션 하려면:

POST /_ingest/pipeline/logs-myblog.event-1.0.0/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
          "agent.uuid":"9dcd5ed99b06495bb000000000000080",
          "endpoint.name":"DESKTOP-WENDY50",
          "endpoint.type":"desktop",
          "event.time":1727068416069,
          "src.process.uid":"3C5109A476335D11",
          "src.process.pid":4,
          "src.process.startTime":1727068416069
      }
    }
  ]
}

 

[결과] 이벤트가 다음과 같이 인덱싱되었습니다. 데이터가 객체 형태로 나열되었고, 시간이 기존 epoch 형태에서 읽을 수 있는 형태로 변환되었습니다.

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_version": "-3",
        "_id": "id",
        "_source": {
          "creator": [
            "Wendy"
          ],
          "endpoint": {
            "name": "DESKTOP-WENDY50",
            "type": "desktop"
          },
          "agent": {
            "uuid": "9dcd5ed99b06495bb000000000000080"
          },
          "@timestamp": "2024-09-23T05:13:36.069Z",
          "src": {
            "process": {
              "uid": "3C5109A476335D11",
              "startTime": 1727068416069,
              "pid": 4
            }
          },
          "event": {
            "time": 1727068416069
          }
        },
        "_ingest": {
          "timestamp": "2024-10-03T06:45:00.147431824Z"
        }
      }
    }
  ]
}

 

파이프라인 삭제

특정 인제스트 파이프라인을 삭제하려면:.

DELETE /_ingest/pipeline/logs-myblog.event-1.0.0

 

파이프라인 보기

 

'logs-myblog.event-1.0.0' 라는 특정 인제스트 파이프라인에 대한 정보를 반환하려면:

GET /_ingest/pipeline/logs-myblog.event-1.0.0

 

 

모든 인제스트 파이프라인에 대한 정보를 반환하려면:

GET /_ingest/pipeline/