Elasticsearch:Ingest pipeline 介绍

这篇具有很好参考价值的文章主要介绍了Elasticsearch:Ingest pipeline 介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Ingest pipeline 可让你在索引之前对数据执行常见转换。 例如,你可以使用 pipeline 删除字段、从文本中提取值并丰富你的数据。

Pipeline 由一系列称为处理器(processors)的可配置任务组成。 每个处理器按顺序运行,对传入文档进行特定更改。 处理器运行后,Elasticsearch 会将转换后的文档添加到您的数据流或索引中。

Elasticsearch:Ingest pipeline 介绍

你可以使用 Kibana 的 Ingest Pipelines 功能或 ingest APIs 创建和管理摄取管道。 Elasticsearch 以集群状态存储管道。 

前提条件

  • 具有 ingest 角色的节点处理管道处理。 要使用 pipeline,你的集群必须至少有一个具有 ingest 角色的节点。 对于大量摄取负载,我们建议创建专用摄取节点。有关节点角色的描述,你可以阅读我之前的文章 “Elasticsearch:Node roles 介绍 - 7.9 之后版本”。
  • 如果启用了 Elasticsearch 安全功能,你必须具有 manage_pipeline 集群权限才能管理摄取管道。 要使用 Kibana 的 Ingest Pipelines 功能,你还需要 cluster:monitor/nodes/info 集群权限。
  • 包括 enrich 处理器的管道需要额外的设置。 请参阅丰富你的数据。你也可以阅读文章 “Elasticsearch:enrich processor (7.5发行版新功能)”。

在如下的展示中,我将使用 Elastic Stack 8.3.3 来进行展示,尽管不同的版本的界面可能稍有不同。

创建及管理 pipeline

在 Kibana 中,打开主菜单并单击 Stack Management > Ingest Pipelines。 从列表视图中,你可以:

  • 查看管道列表并深入了解详细信息
  • 编辑或克隆现有管道
  • 删除管道

Elasticsearch:Ingest pipeline 介绍

Elasticsearch:Ingest pipeline 介绍 

要创建管道,请单击上面的 Create pipeline > New pipeline。 有关示例教程,请参阅示例:ingest pipeline 使用示例 - 解析常用日志格式。 

你还可以使用摄取 API 来创建和管理 pipeline。 以下创建 pipeline API 请求创建一个 pipeline,其中包含两个 set 处理器,后跟一个 lowercase 处理器。 处理器按指定的顺序依次运行。

PUT _ingest/pipeline/my-pipeline
{
  "description": "My optional pipeline description",
  "processors": [
    {
      "set": {
        "description": "My optional processor description",
        "field": "my-long-field",
        "value": 10
      }
    },
    {
      "set": {
        "description": "Set 'my-boolean-field' to true",
        "field": "my-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "my-keyword-field"
      }
    }
  ]
}

在上面,我们定义了一个 pipeline。它在执行的时候是从上往下依次执行的。它为一个文档添加一个叫做 my-long-field 的字段,添加一个叫做 my-boolean-field 的字段,以及把 my-keyword-field 字段的字母都变为小写字母。我们可以使用如下的例子来进行检验:

PUT my_index/_doc/1?pipeline=my-pipeline
{
  "my-keyword-field": "Hi, this is Xiaoguo Liu"
}

在上面,我们创建一个叫做 my_index 的索引,并写入一个文件。我们可以使用如下的命令来检查写入的结果:

GET my_index/_search?filter_path=hits.hits._source

上面的命令返回的结果是:

{
  "hits": {
    "hits": [
      {
        "_source": {
          "my-long-field": 10,
          "my-keyword-field": "hi, this is xiaoguo liu",
          "my-boolean-field": true
        }
      }
    ]
  }
}

显然,它和我们之前的所述的是一样的结果。我们可以在链接找到更多的 pipeline processors。我们甚至可以使用如下的 API 来获得所有的 pipeline processors:

GET _nodes/ingest?filter_path=nodes.*.ingest.processors

上面的命令显示为:

{
  "nodes": {
    "EGibleagSe6UJMBgbuPbIA": {
      "ingest": {
        "processors": [
          {
            "type": "append"
          },
          {
            "type": "bytes"
          },
          {
            "type": "circle"
          },
          {
            "type": "community_id"
          },
          {
            "type": "convert"
          },
          {
            "type": "csv"
          },
          {
            "type": "date"
          },
          {
            "type": "date_index_name"
          },
          {
            "type": "dissect"
          },
          ...
        ]
      }
    }
  }
}

管理 pipeline 版本

创建或更新 pipeline 时,可以指定可选的版本整数。 你可以将此版本号与 if_version 参数一起使用,以有条件地更新 pipeline。 当指定 if_version 参数时,成功的更新会增加 pipeline 的版本。

PUT _ingest/pipeline/my-pipeline-id
{
  "version": 1,
  "processors": [ ... ]
}

要使用 API 取消设置版本号,请在不指定版本参数的情况下替换或更新 pipeline。

测试 pipeline

在生产中使用 pipeline 之前,我们建议你i使用示例文档对其进行测试。 在 Kibana 中创建或编辑 pipeline 时,单击添加文档。 在 Documents 选项卡中,提供示例文档并单击 Run the pipeline。

Elasticsearch:Ingest pipeline 介绍

这个在我之前的文章 “Elasticsearch:ingest pipeline 使用示例 - 解析常用日志格式”  有详细的描述。

你还可以使用模拟 pipeline API 测试 pipeline。 你可以在请求路径中指定配置的 pipeline。 例如,以下请求测试。

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "description": "Extract fields from 'message'",
          "field": "message",
          "patterns": [
            """%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \[%{HTTPDATE:@timestamp}\] "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}"""
          ]
        }
      },
      {
        "date": {
          "description": "Format '@timestamp' as 'dd/MMM/yyyy:HH:mm:ss Z'",
          "field": "@timestamp",
          "formats": [
            "dd/MMM/yyyy:HH:mm:ss Z"
          ]
        }
      },
      {
        "geoip": {
          "description": "Add 'source.geo' GeoIP data for 'source.ip'",
          "field": "source.ip",
          "target_field": "source.geo"
        }
      },
      {
        "user_agent": {
          "description": "Extract fields from 'user_agent'",
          "field": "user_agent"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
      }
    }
  ]
}

或者,你可以先创建一个 pipeline:

PUT _ingest/pipeline/common_log_format
{
  "description": "A pipeline to structure common logs ",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \\[%{HTTPDATE:@timestamp}\\] \"%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}\" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}"
        ]
      }
    },
    {
      "date": {
        "field": "@timestamp",
        "formats": [
          "dd/MMM/yyyy:HH:mm:ss Z"
        ],
        "output_format": "yyyy-MMM-dd'T'HH:mm:ss Z"
      }
    },
    {
      "geoip": {
        "field": "source.ip",
        "target_field": "source.geo"
      }
    },
    {
      "user_agent": {
        "field": "user_agent"
      }
    }
  ]
}

然后,我们再用如下的方法来进行测试:

POST _ingest/pipeline/common_log_format/_simulate
{
  "description": "A pipeline to structure common logs ",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          """%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \[%{HTTPDATE:@timestamp}\] "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}"""
        ]
      }
    },
    {
      "date": {
        "field": "@timestamp",
        "formats": [
          "dd/MMM/yyyy:HH:mm:ss Z"
        ],
        "output_format": "yyyy-MMM-dd'T'HH:mm:ss Z"
      }
    },
    {
      "geoip": {
        "field": "source.ip",
        "target_field": "source.geo"
      }
    },
    {
      "user_agent": {
        "field": "user_agent"
      }
    }
  ],
  "docs": [
    {
      "_source": {
        "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
      }
    }
  ]
}

上面的命令将返回如下的结果:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "@timestamp": "2099-May-05T16:21:15 +0000",
          "http": {
            "request": {
              "method": "GET",
              "referrer": "\"-\""
            },
            "version": "1.1",
            "response": {
              "body": {
                "bytes": 3638
              },
              "status_code": 200
            }
          },
          "source": {
            "geo": {
              "continent_name": "Europe",
              "region_iso_code": "DE-BE",
              "city_name": "Berlin",
              "country_iso_code": "DE",
              "country_name": "Germany",
              "region_name": "Land Berlin",
              "location": {
                "lon": 13.3878,
                "lat": 52.5312
              }
            },
            "ip": "212.87.37.154"
          },
          "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"",
          "user": {
            "name": "-",
            "id": "-"
          },
          "url": {
            "original": "/favicon.ico"
          },
          "user_agent": {
            "name": "Chrome",
            "original": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"",
            "os": {
              "name": "Mac OS X",
              "version": "10.11.6",
              "full": "Mac OS X 10.11.6"
            },
            "device": {
              "name": "Mac"
            },
            "version": "52.0.2743.116"
          }
        },
        "_ingest": {
          "timestamp": "2022-08-10T07:39:53.508450216Z"
        }
      }
    }
  ]
}

将 pipeline 添加到索引请求

使用 pipeline 查询参数将管道应用于单个或批量索引请求中的文档。我们首先来创建一个 index template。这个 index template 含有 data stream。

PUT _index_template/my-data-stream-template
{
  "index_patterns": [ "my-data-stream*" ],
  "data_stream": { },
  "priority": 500
}

我们使用如下的方法来把一些数据写入到一个叫做 my-data-stream 的 data stream 中去:

POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

我们可以通过如下的命令来查看写入的文档:

GET my-data-stream/_search?filter_path=**.hits

上面的命令返回的结果为:

{
  "hits": {
    "hits": [
      {
        "_index": ".ds-my-data-stream-2022.08.10-000001",
        "_id": "Pai4hoIBT1laGVGsvcR4",
        "_score": 1,
        "_source": {
          "my-long-field": 10,
          "@timestamp": "2099-03-07T11:04:06.000Z",
          "my-keyword-field": "foo",
          "my-boolean-field": true
        }
      },
      {
        "_index": ".ds-my-data-stream-2022.08.10-000001",
        "_id": "Pqi4hoIBT1laGVGsvcR4",
        "_score": 1,
        "_source": {
          "my-long-field": 10,
          "@timestamp": "2099-03-07T11:04:07.000Z",
          "my-keyword-field": "bar",
          "my-boolean-field": true
        }
      },
      {
        "_index": ".ds-my-data-stream-2022.08.10-000001",
        "_id": "PKi4hoIBT1laGVGsssSh",
        "_score": 1,
        "_source": {
          "my-long-field": 10,
          "@timestamp": "2099-03-07T11:04:05.000Z",
          "my-keyword-field": "foo",
          "my-boolean-field": true
        }
      }
    ]
  }
}

你还可以将 pipeline 参数与通过 update_by_query 或 reindex  API 一起使用。

POST my-data-stream/_update_by_query?pipeline=my-pipeline

POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

设置默认 pipeline

使用 index.default_pipeline 索引设置来设置默认 pipeline。 如果未指定 pipeline 参数,Elasticsearch 会将此 pipeline 应用于索引请求。比如,当我创建一个如下的索引:

PUT my_index1
{
  "settings": {
    "index.default_pipeline": "my-pipeline"
  }
}

PUT my_index1/_doc/1
{
  "my-keyword-field": "FOO"
}

虽然在上面,在我们创建文档时,我们没有指定任何的 pipeline,但是它在默认的情况下,就自动将 pipeline 应用于该索引。我们使用如下的命令来查看:

GET my_index1/_search?filter_path=**.hits
{
  "hits": {
    "hits": [
      {
        "_index": "my_index1",
        "_id": "1",
        "_score": 1,
        "_source": {
          "my-long-field": 10,
          "my-keyword-field": "foo",
          "my-boolean-field": true
        }
      }
    ]
  }
}

设置最终 pipeline

使用 index.final_pipeline 索引设置来设置最终 pipeline。 Elasticsearch 在请求或默认 pipeline 之后应用此 pipeline,即使两者都未指定。

PUT _ingest/pipeline/my-final-pipeline
{
  "description": "Increase my-long-field by 10", 
  "processors": [
    {
      "script": {
        "source": """
          ctx['my-long-field'] += 10 
        """
      }
    }
  ]
}

在上面,我们创建一个 my-final-pipeline 的 pipeline。它把字段 my-long-field 的值增加 10。能够完成这个操作的前提是 my-long-field 字段已经存在,否则我们的这个 pipeline 就好出问题。现在我们来创建一个叫做 my_index2 的索引:

PUT my_index2
{
  "settings": {
    "index.default_pipeline": "my-pipeline",
    "index.final_pipeline": "my-final-pipeline"
  }
}

在上面,我们定义了两个 pipeline:默认的及最终的。按照执行顺序来说,默认的先执行,然后才是 final。我们写入如下的文档:

PUT my_index2/_doc/1
{
  "my-keyword-field": "FOO"
}

我们通过如下的命令来进行查看:

GET my_index2/_search?filter_path=**.hits

上面的命令显示的结果为:

{
  "hits": {
    "hits": [
      {
        "_index": "my_index2",
        "_id": "1",
        "_score": 1,
        "_source": {
          "my-long-field": 20,
          "my-keyword-field": "foo",
          "my-boolean-field": true
        }
      }
    ]
  }
}

显然,my-long-field 字段的值现在为 20,而不是之前的 10。

在 Beats 中进行配置

要将摄取 pipeline 添加到 Elastic Beats,请在 <BEAT_NAME>.yml 的 output.elasticsearch 下指定 pipeline 参数。 例如,对于 Filebeat,你可以在 filebeat.yml 中指定管道。

output.elasticsearch:
  hosts: ["localhost:9200"]
  pipeline: my-pipeline

用于 Fleet 和 Elastic Agent 的 pipeline

Fleet 自动为其集成添加摄取 pipeline。 Fleet 使用包含为索引设置默认 pipeline 以及索引模板应用这些 pipeline。 Elasticsearch 根据 datastream 的命名方案将这些模板与你的 Fleet 数据流进行匹配。你可以阅读我之前的文章 “Observability:如何使用 Elastic Agents 把定制的日志摄入到 Elasticsearch 中”。

警告:不要更改 Fleet 的摄取 pipeline 或使用自定义 pipeline 进行 Fleet 集成。 这样做可能会破坏你的 Fleet 数据流。

Fleet 不为 Custom logs 集成提供摄取 pipeline。 你可以通过以下两种方式之一安全地为此集成指定 pipeline:index template 或 custom configuraiton。

选项一:index template

1)创建并测试你的摄取 pipeline。 将你的 pipeline 命名为 logs-<dataset-name>-default。 这使得跟踪集成 pipeline 变得更加容易。

例如,以下请求为 my-app 数据集创建管道。 管道的名称是 logs-my_app-default。

PUT _ingest/pipeline/logs-my_app-default
{
  "description": "Pipeline for `my_app` dataset",
  "processors": [ ... ]
}

2)创建一个索引模板,在 index.default_pipeline 或 index.final_pipeline 索引设置中包含你的 pipeline。 确保模板已启用数据流。 模板的索引模式应该匹配 logs-<dataset-name>-*。

你可以使用 Kibana 的索引管理功能或创建索引模板 API 创建此模板。

例如,以下请求会创建一个匹配 logs-my_app-* 的模板。 该模板使用包含 index.default_pipeline 索引设置的组件模板。

# Creates a component template for index settings
PUT _component_template/logs-my_app-settings
{
  "template": {
    "settings": {
      "index.default_pipeline": "logs-my_app-default",
      "index.lifecycle.name": "logs"
    }
  }
}

# Creates a mapping for index
PUT _component_template/logs-my_app-mappings
{
  "template": {
    "mappings": {
      "_source": {
        "enabled": false
      },
      "properties": {
        "host_name": {
          "type": "keyword"
        },
        "created_at": {
          "type": "date",
          "format": "EEE MMM dd HH:mm:ss Z yyyy"
        }
      }
    }
  }
}

# Creates an index template matching `logs-my_app-*`
PUT _index_template/logs-my_app-template
{
  "index_patterns": ["logs-my_app-*"],
  "data_stream": { },
  "priority": 500,
  "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]
}

3)在 Fleet 中添加或编辑 Custom logs 集成时,单击 Configure integration > Custom log file > Advanced options.。

4)在数据集名称中,指定数据集的名称。 Fleet 会将用于集成的新数据添加到生成的 logs-<dataset-name>-default 数据流中。

例如,如果你的数据集名称是 my_app,Fleet 会将新数据添加到 logs-my_app-default 数据流。

Elasticsearch:Ingest pipeline 介绍

5)使用 rollover API 翻转您的数据流。 这可确保 Elasticsearch 将索引模板及其 pipeline 设置应用于任何新数据以进行集成。

选择二:custom configuration

关于这个配置,我已经在我之前的文章  “Observability:如何使用 Elastic Agents 把定制的日志摄入到 Elasticsearch 中” 做了详尽的描述。这里就不再重复了。

Elasticsearch:Ingest pipeline 介绍

Elastic Agent standalone

如果你独立运行 Elastic Agent,则可以使用包含 index.default_pipeline 或 index.final_pipeline 索引设置的索引模板应用管道。 或者,你可以在 elastic-agent.yml 配置中指定 pipeline 策略设置。 请参阅安装独立的 Elastic Agent。

访问处理器中的源字段

处理器对传入文档的源字段具有读写访问权限。 要访问处理器中的字段键,请使用其字段名称。 以下 set 处理器访问 my-long-field。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "my-long-field",
        "value": 10
      }
    }
  ]
}

你还可以添加 _source 前缀。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "_source.my-long-field",
        "value": 10
      }
    }
  ]
}

使用点表示法访问对象字段。

重要:如果你的文档包含展平对象,请先使用 dot_expander 处理器展开它们。 其他摄取处理器无法访问展平对象。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "dot_expander": {
        "description": "Expand 'my-object-field.my-property'",
        "field": "my-object-field.my-property"
      }
    },
    {
      "set": {
        "description": "Set 'my-object-field.my-property' to 10",
        "field": "my-object-field.my-property",
        "value": 10
      }
    }
  ]
}

比如,我们有如下的文档:

PUT flattened_obj/_doc/1?pipeline=my-pipeline
{
  "my-object-field.my-property": 100
}

最终写入的文档为:

GET flattened_obj/_search?filter_path=**.hits
{
  "hits": {
    "hits": [
      {
        "_index": "flattened_obj",
        "_id": "1",
        "_score": 1,
        "_source": {
          "my-object-field": {
            "my-property": 10
          }
        }
      }
    ]
  }
}

我们可以看出来 my-property 是 my-object-field 里的一个子字段。

有几个处理器参数支持 Mustache 模板片段。 要访问模板片段中的字段值,请将字段名称括在三个大括号中:{{{field-name}}}。 你可以使用模板片段来动态设置字段名称。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Set dynamic '<service>' field to 'code' value",
        "field": "{{{service}}}",
        "value": "{{{code}}}"
      }
    }
  ]
}

在处理器中访问摄取元数据

摄取处理器可以使用 _ingest 键添加和访问摄取元数据。

与源和元数据字段不同,Elasticsearch 默认不索引摄取元数据字段。 Elasticsearch 还允许以 _ingest 键开头的源字段。 如果你的数据包含此类源字段,请使用 _source._ingest 访问它们。

默认情况下,pipelie 仅创建 _ingest.timestamp 摄取元数据字段。 该字段包含 Elasticsearch 收到文档索引请求的时间戳。 要索引 _ingest.timestamp 或其他摄取元数据字段,请使用 set 处理器。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Index the ingest timestamp as 'event.ingested'",
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

处理 pipeline 故障

Pipeline 的处理器按顺序运行。 默认情况下,当这些处理器之一发生故障或遇到错误时,pipeline 处理将停止。

要忽略处理器故障并运行管道的剩余处理器,请将 ignore_failure 设置为 true。详细阅读请参阅之前的文章 “Elasticsearch:如何处理 ingest pipeline 中的异常”。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      }
    }
  ]
}

使用 on_failure 参数指定在处理器发生故障后立即运行的处理器列表。 如果指定了 on_failure,即使 on_failure 配置为空,Elasticsearch 也会随后运行 pipeline 的剩余处理器。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}

嵌套用于嵌套错误处理的 on_failure 处理器列表。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false,
              "on_failure": [
                {
                  "set": {
                    "description": "Set 'error.message.multi'",
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  }
                }
              ]
            }
          }
        ]
      }
    }
  ]
}

你还可以为 pipeline 指定 on_failure。 如果没有 on_failure 值的处理器出现故障,Elasticsearch 会使用此 pipeline 级参数作为备用参数。 Elasticsearch 不会尝试运行 pipeline 的剩余处理器。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

有关 pipeline 故障的其他信息可能在文档元数据字段 on_failure_message、on_failure_processor_type、on_failure_processor_tag 和 on_failure_pipeline 中可用。 这些字段只能从 on_failure 块中访问。

以下示例使用元数据字段在文档中包含有关管道故障的信息。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

有条件地运行处理器

每个处理器都支持可选的 if 条件,编写为 Painless 脚本。 如果提供,则处理器仅在 if 条件为真时运行。

重要:if 条件脚本在 Painless 的摄取处理器上下文中运行。 在 if 条件下,ctx 值是只读的。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents with 'network.name' of 'Guest'",
        "if": "ctx?.network?.name == 'Guest'"
      }
    }
  ]
}

如果启用了 script.painless.regex.enabled 集群设置,你可以在 if 条件脚本中使用正则表达式。 有关支持的语法,请参阅 Painless 正则表达式。

提示:如果可能,请避免使用正则表达式。 昂贵的正则表达式会降低索引速度。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
        "if": "ctx.url?.scheme =~ /^http[^s]/",
        "field": "url.insecure",
        "value": true
      }
    }
  ]
}

你必须在一行中将 if 条件指定为有效 JSON。 但是,您可以使用 Kibana 控制台的三引号语法来编写和调试更大的脚本。

提示:如果可能,请避免使用复杂或昂贵的 if 条件脚本。 昂贵的条件脚本会降低索引速度。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                if (tag.toLowerCase().contains('prod')) {
                  return false;
                }
              }
            }
            return true;
        """
      }
    }
  ]
}

你还可以将存储的脚本指定为 if 条件。

PUT _scripts/my-prod-tag-script
{
  "script": {
    "lang": "painless",
    "source": """
      Collection tags = ctx.tags;
      if(tags != null){
        for (String tag : tags) {
          if (tag.toLowerCase().contains('prod')) {
            return false;
          }
        }
      }
      return true;
    """
  }
}

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": { "id": "my-prod-tag-script" }
      }
    }
  ]
}

传入文档通常包含对象字段。 如果处理器脚本尝试访问其父对象不存在的字段,Elasticsearch 将返回 NullPointerException。 要避免这些异常,请使用 null 安全运算符,例如 ?.,并将脚本编写为 null 安全的。

例如, ctx.network?.name.equalsIgnoreCase('Guest') 不是 null 安全的。 ctx.network?.name 可以返回 null。 将脚本重写为 'Guest'.equalsIgnoreCase(ctx.network?.name),这是 null 安全的,因为 Guest 始终为非 null。

如果你无法将脚本重写为空安全,请包含显式 null 检查。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that contain 'network.name' of 'Guest'",
        "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
      }
    }
  ]
}

有条件地应用管道

将 if 条件与管道处理器相结合,以根据你的标准将其他管道应用于文档。 你可以将此管道用作用于配置多个数据流或索引的索引模板中的默认管道。

PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
  "processors": [
    {
      "pipeline": {
        "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

获取管道使用统计信息

使用 node stats API 获取全局和每个 pipeline 的摄取统计信息。 使用这些统计信息来确定哪些 pipeline 运行最频繁或花费最多时间处理。文章来源地址https://www.toymoban.com/news/detail-445932.html

GET _nodes/stats/ingest?filter_path=nodes.*.ingest
{
  "nodes": {
    "EGibleagSe6UJMBgbuPbIA": {
      "ingest": {
        "total": {
          "count": 17,
          "time_in_millis": 7,
          "current": 0,
          "failed": 2
        },
        "pipelines": {
          "sample-pipeline": {
            "count": 0,
            "time_in_millis": 0,
            "current": 0,
            "failed": 0,
            "processors": []
          },
          "common_log_format": {
            "count": 2,
            "time_in_millis": 32,
            "current": 0,
            "failed": 0,
            "processors": [
              {
                "grok": {
                  "type": "grok",
                  "stats": {
                    "count": 2,
                    "time_in_millis": 6,
                    "current": 0,
                    "failed": 0
                  }
                }
              },
              {
                "date": {
                  "type": "date",
                  "stats": {
                    "count": 2,
                    "time_in_millis": 3,
                    "current": 0,
                    "failed": 0
                  }
                }
              },
        ...

到了这里,关于Elasticsearch:Ingest pipeline 介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ElasticSearch 实现 全文检索 支持(PDF、TXT、Word、HTML等文件)通过 ingest-attachment 插件实现 文档的检索

    Attachment 插件是 Elasticsearch 中的一种插件,允许将各种二进制文件(如PDF、Word文档等)以及它们的内容索引到 Elasticsearch 中。插件使用 Apache Tika 库来解析和提取二进制文件的内容。通过使用 Attachment 插件,可以轻松地在 Elasticsearch 中建立全文搜索功能,而无需事先转换二进制

    2024年02月05日
    浏览(55)
  • Elasticsearch Pipeline 详解

    说明 本文是建立在有一些 Elasticsearch 基础和了解相关 Pipeline 概念的人 简介 Ingest Node Ingest Node(预处理节点)是 ES 用于功能上命名的一种节点类型,可以通过在 elasticsearch.xml 进行如下配置来标识出集群中的某个节点是否是 Ingest Node. 上述将 node.ingest 设置成 false,则表明当前节点不

    2024年02月03日
    浏览(35)
  • Elasticsearch:使用 pipelines 路由文档到想要的 Elasticsearch 索引中去

    当应用程序需要向 Elasticsearch 添加文档时,它们首先要知道目标索引是什么。在很多的应用案例中,特别是针对时序数据,我们想把每个月的数据写入到一个特定的索引中。一方面便于管理索引,另外一方面在将来搜索的时候可以按照每个月的索引来进行搜索,这样速度更快

    2023年04月09日
    浏览(54)
  • 扩散模型Diffusers Pipeline API使用介绍

    大部分扩散模型包含多个独立训练的子模型和组件模块组合而成,例如StableDiffusion 有: 3个独立训练的子模型:Autoencoder、 Conditional Unet、CLIP text encoder 调度器组件scheduler, CLIPImageProcessor, safety checker. 为了让开发者以最简单的方式使用最新最先进的扩散模型, diffusers 开发了

    2024年02月08日
    浏览(71)
  • 【Camera2教程一】Camera2的框架Pipeline和framework中核心类和接口的详细介绍

    一,框架pipeline 在Android中,Camera2 API提供了一个全新的框架来访问和控制设备上的相机硬件。这个框架的设计更加灵活和强大,允许开发者进行更精细的控制,同时支持更复杂的相机功能。Camera2 API的pipeline可以大致划分为以下几个关键部分: 相机访问: 首先,应用需要请求

    2024年04月15日
    浏览(38)
  • 【软件测试】- 将 Selenium 和 JMeter 测试脚本集成到 Jenkins 中实现自动化测试和持续集成(CI)及Jenkinsfile 实现 Jenkins Pipeline 原理介绍

    将 Selenium 和 JMeter 测试脚本集成到 Jenkins 中是实现自动化测试和持续集成(CI)的关键步骤。以下是详细的集成过程: 1、准备工作 安装 Jenkins : 确保您已经在服务器上安装了 Jenkins。 可以从 Jenkins 官网 下载并安装。 安装必要的插件 : 在 Jenkins 中安装所需的插件,如 Git 插

    2024年02月04日
    浏览(73)
  • ElasticSearch内容分享(一):ElasticSearch介绍

    目录 ES分布式搜索引擎 初识elasticsearch 1. elasticsearch背景介绍 2. 倒排索引 2.1 正向索引 2.2 倒排索引 2.3 正向和倒排对比 3. ES数据库基本概念 3.1.文档和字段 3.2.索引和映射 3.3.mysql与elasticsearch 4. 安装es、kibana、分词器 4.1 部署单点es 4.1.1.创建网络 4.1.2.加载镜像 4.1.3.运行 4.2.部署

    2024年02月03日
    浏览(36)
  • 【Elasticsearch】Elasticsearch 从入门到精通(一):基本介绍

    《 Elasticsearch 从入门到精通 》共包含以下 2 2 2 篇文章: Elasticsearch 从入门到精通(一):基本介绍 Elasticsearch 从入门到精通(二):基础使用 😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖

    2024年04月23日
    浏览(34)
  • ElasticSearch架构介绍及原理解析_elasticsearch+sqlserver架构

    - 索引是一个包含多个文档的数据库,文档是可搜索的数据单元。    - 索引可以被看作是关系数据库中的数据库。    - 每个索引由多个分片(Shards)组成,分片是数据的物理副本。         4. **分片(Shards)**:    - 分片是索引的物理分割,用于提高数据的可扩展性和

    2024年04月17日
    浏览(42)
  • Elasticsearch基础篇(四):Elasticsearch的基础介绍与索引设置

    Elasticsearch是一个基于 lucene 、 分布式 、 通过Restful方式进行交互的 近实时搜索 平台框架。 ELK技术栈是 Elasticsearch 、 Logstash 、 Kibana 三大开源框架首字母大写简称。 而Elasticsearch 是一个 开源的高扩展的分布式全文搜索引擎 , 是整个 ELK技术栈的核心。 Elasticsearch是一个基于

    2024年02月04日
    浏览(80)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包