Ingest pipeline 允許文檔在被索引之前對數據進行預處理,將數據加工處理成我們需要的格式。例如,可以使用 ingest pipeline添加或者刪除字段,轉換類型,解析內容等等。Pipeline 由一組處理器 Processor 構成,每個處理器依次運行,對傳入的文檔進行特定的更改。Ingest pipeline 和 Logstash 中的 filter 作用相似,并且更加輕量和易于調試。
(資料圖片)
要使用 ingest pipeline,集群中必須至少有一個具有 ingest 角色的節點。對于大量攝取負載,建議設置專用的 ingest 節點,要創建專用的 ingest 節點,請設置:
node.roles: [ ingest ]
1 Ingest Pipeline 的基本用法
1.1 創建和使用 Ingest Pipeline
接下來介紹一下 ingest pipeline 的創建與使用,如下所示,使用 ingest API 創建一個名為 my-pipeline
的 ingest pipeline,在 processors 參數中指定了兩個處理器,set 處理器為文檔添加一個新的字段 location,設置值為 China;lowercase 處理器將 name 字段的所有字母轉換為小寫。
PUT _ingest/pipeline/my-pipeline{ "description": "My first Ingest Pipeline", "processors": [ { "set": { "description": "Add a new field", "field": "location", "value": "China" } }, { "lowercase": { "description": "Lowercase name", "field": "name" } } ]}
然后往索引 my-index 中寫入一條數據,通過 pipeline 參數指定使用剛剛創建的 my-pipeline
。
PUT my-index/_doc/1?pipeline=my-pipeline{ "name": "Tom", "age": 18}
查看 id 為 1 的文檔,可以看到 name 字段由 Tom 轉換為 tom,并且新增了 location 字段,說明 my-pipeline
管道成功處理了攝入的數據。
GET my-index/_doc/1# 返回結果{ "_index" : "my-index", "_type" : "_doc", "_id" : "1", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "name" : "tom", "location" : "China", "age" : 18 }}
1.2 使用 Simulate API 測試 Pipeline
為了讓開發者更好地了解和使用 pipeline 中的處理器,Elasticsearch 提供了 simulate API 接口,方便我們對 pipeline 進行測試。如下所示,我們對 1.1 創建和使用 Ingest Pipeline 章節中創建的 my-pipeline
進行測試,在 docs 列表中我們可以填寫多個原始文檔。
POST _ingest/pipeline/my-pipeline/_simulate{ "docs": [ { "_source": { "name": "Tom", "age": 18 } } ]}
返回結果如下,可以看到模擬的結果和實際創建的文檔一致,只不過 simulate API 并不會真正地創建這個文檔。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "tom", "location" : "China", "age" : 18 }, "_ingest" : { "timestamp" : "2022-03-03T14:04:15.941884826Z" } } } ]}
除了在請求路徑中指定 pipeline,我們還可以在請求體中定義 pipeline 進行模擬,這樣就不用預先創建好 pipeline,而是等到測試成功后再去創建 pipeline。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "set": { "description": "Add a new field", "field": "location", "value": "China" } }, { "lowercase": { "description": "Lowercase name", "field": "name" } } ] }, "docs": [ { "_source": { "name": "Tom", "age": 18 } } ]}
1.3 異常處理
當我們使用 pipeline 處理一個文檔的時候,有時并不是所有的文檔都很規范,這個時候可能就會出現文檔不能被正確解析或者發生異常的情況,此時 Elasticsearch 會返回給客戶端一個錯誤的信息,表明文檔不能被正確地處理。pipeline 中的處理器(processor)按照順序依次執行,默認情況下,當處理器發生錯誤或者異常時,將會停止后續的處理。
在 ingest pipeline 中,異常處理可以分為 3 種情況:
在處理器中設置ignore_failure: true
,當該處理器發生異常時,允許忽略異常,繼續執行后續的處理器。通過 on_failure
參數定義發生異常時執行的處理器列表,該參數可以在 processor 級別中定義,也可以在 pipeline 級別中定義。使用 fail 處理器主動拋出異常。下面將會分別對上述 3 種情況進行演示,首先模擬 2 個異常:
convert 處理器將 id 字段轉換為 long 類型,由于傳入文檔的 id 字段值設置為 S123456,無法轉換成 long 類型的數字,會產生 number_format_exception 的異常。date 處理器解析 timestamp 字段的日期格式,formats 參數要求輸入的格式是 yyyy-MM-dd HH:mm:ss,例如 2022-03-03 15:22:11,解析出日期對應的年月日信息,以 yyyy/MM/dd 的格式輸出到 date 字段中,例如 2022/03/03。由于傳入文檔的 timestamp 字段的格式是 20220303 15:22:11,并不滿足 formats 參數要求的日期格式,因此會產生 date_time_parse_exception 的異常。執行以下 pipeline 測試語句,在請求路徑中加上 verbose 可以看到每個處理器的執行情況。
POST _ingest/pipeline/_simulate?verbose{ "pipeline": { "processors": [ { "convert": { "field": "id", "type": "long" } }, { "date": { "field": "timestamp", // 解析的字段 "formats": [ "yyyy-MM-dd HH:mm:ss" // 解析的格式 ], "output_format": "yyyy/MM/dd", // 輸出的格式 "target_field": "date" // 輸出的字段 } } ] }, "docs": [ { "_source": { "id": "S123456", "timestamp": "20220303 15:22:11", "message": "User login successfully" } } ]}
返回結果如下,盡管我們人為制造了 2 個異常,但是只看到了 convert 處理器的異常報錯,這是因為當處理器發生錯誤或者異常時,將會停止后續的處理,直接向客戶端返回錯誤信息。
{ "docs" : [ { "processor_results" : [ { "processor_type" : "convert", "status" : "error", "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long" } ], "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long", "caused_by" : { "type" : "number_format_exception", "reason" : "For input string: \"S123456\"" } } } ] } ]}
1.3.1 ignore_failure 忽略異常
在處理器中設置 ignore_failure 參數為 true,當該處理器發生異常時,允許忽略異常,繼續執行后續的處理器。
POST _ingest/pipeline/_simulate?verbose{ "pipeline": { "processors": [ { "convert": { "field": "id", "type": "long", "ignore_failure": true // 忽略異常 } }, { "date": { "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ] }, "docs": [ { "_source": { "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully" } } ]}
這次在返回結果中可以看到有 2 個異常信息,其中 convert 處理器的 status 的值為 error_ignored,表示該異常被忽略了,在 doc 中可以看到該處理器處理完畢后的結果,可以看到 id 字段的內容保留不變。接著 pipeline 繼續往后執行,當執行到 date 處理器時,再次發生異常,由于 date 處理器中未對異常進行處理,此時向客戶端返回異常信息。
{ "docs" : [ { "processor_results" : [ { "processor_type" : "convert", "status" : "error_ignored", // 第 1 個異常,忽略異常 "ignored_error" : { "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long" } ], "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long", "caused_by" : { "type" : "number_format_exception", "reason" : "For input string: \"S123456\"" } } }, "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "id" : "S123456", // 跳過 convert 處理器對 id 字段的處理 "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : "2022-03-04T02:48:13.562353005Z" } } }, { "processor_type" : "date", "status" : "error", // 第 2 個異常 "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to parse date [2022/03/03 15:22:11]" } ], "type" : "illegal_argument_exception", "reason" : "unable to parse date [2022/03/03 15:22:11]", "caused_by" : { "type" : "illegal_argument_exception", "reason" : "failed to parse date field [2022/03/03 15:22:11] with format [yyyy-MM-dd HH:mm:ss]", "caused_by" : { "type" : "date_time_parse_exception", "reason" : "Text "2022/03/03 15:22:11" could not be parsed at index 4" } } } } ] } ]}
1.3.2 on_failure 處理異常
使用 on_failure 參數可以定義發生異常時執行的處理器列表,該參數允許在 processor 和 pipeline 級別中定義。在 pipeline 級別定義時,on_failure 捕獲整個 pipeline 發生的任何異常,當產生異常時直接執行 on_failure 中定義的處理器列表,不會再執行后續的處理器。
在 processor 級別定義時,on_failure 參數可以針對單個處理器進行異常處理,會繼續執行后續的處理器。
on_failure 參數可以同時在 pipeline 和 processor 中定義,這兩者并不沖突,比較推薦的做法是,針對某些處理器設置 processor 級別的 on_failure 處理規則,另外設置 pipeline 級別的 on_failure 處理規則作為一條兜底的規則,當 processor 級別的 on_failure 處理規則也發生異常時或者沒有設置異常處理的處理器發生異常時,就可以應用這條兜底的規則,這樣做的好處就是可以盡可能地保證我們的 ingest pipeline 的健壯性。
如下所示,在 pipeline 級別設置了 on_failure 的處理規則,注意這里的 on_failure 參數和 processors 參數是處于同一層級的。當發生異常時,on_failure 會執行里面的 set 處理器,將索引名改為 failure-index,該索引專門用于記錄 pipeline 處理異常的文檔。之后我們就可以在 failure-index 索引中去查看哪些文檔在預處理時發生了異常,方便后續實施相應的補救措施。
PUT _ingest/pipeline/failure-test-pipeline{ "processors": [ { "convert": { "field": "id", "type": "long" } }, { "date": { "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ], "on_failure": [ // 發生異常時執行的處理器列表 { "set": { "field": "_index", // 通過 _index 元數據字段,可以改變寫入的索引 "value": "failure-index" } } ]}
然后往 my-index 索引中插入一條有錯誤的文檔,將文檔 _id
設置為 1。從返回結果來看,并沒有異常報錯,文檔成功寫入了。但是仔細觀察可以發現,文檔并沒有寫入 my-index 索引,而是寫入了我們記錄異常的索引 failure-index。
PUT my-index/_doc/1?pipeline=failure-test-pipeline{ "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully"}# 返回結果{ "_index" : "failure-index", // 寫入了記錄異常的索引 "_type" : "_doc", "_id" : "1", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1}
查詢 my-index 文檔,確實沒有找到 _id 為 1 的這條文檔。
GET my-index/_doc/1# 返回結果{ "error" : { "root_cause" : [ { "type" : "index_not_found_exception", "reason" : "no such index [my-index]", "resource.type" : "index_expression", "resource.id" : "my-index", "index_uuid" : "_na_", "index" : "my-index" } ], "type" : "index_not_found_exception", "reason" : "no such index [my-index]", "resource.type" : "index_expression", "resource.id" : "my-index", "index_uuid" : "_na_", "index" : "my-index" }, "status" : 404}
查詢 failure-index 索引可以找到這條處理異常的文檔。
GET failure-index/_doc/1# 返回結果{ "_index" : "failure-index", "_type" : "_doc", "_id" : "1", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "id" : "S123456", "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }}
對于我們來說,目前 failure-index 索引記錄的信息十分有限,根據以上內容我們無法知道是哪個處理器在執行時產生了異常。在 on_failure 中提供了以下 4 個元數據字段方便我們進行故障定位:
on_failure_pipeline
:產生異常的 pipeline 類型的處理器中引用的 pipeline。ingest pipeline 中有一個 pipeline 類型的處理器,該處理器也可以指定使用其他的 pipeline,這里注意區分 pipeline 類型的處理器和 pipeline 管道。on_failure_message
:報錯的內容。on_failure_processor_type
:產生異常的處理器的標簽,標簽可以在處理器中通過 tag 參數指定。當 pipeline 中使用了多個相同類型的處理器時,根據指定的標簽可以方便我們進行區分。on_failure_processor_tag
:產生異常的處理器的類型。如下所示,我們在 on_failure 參數新增了一個 set 處理器,將錯誤信息寫入 failure-index 的 failure 字段中。
PUT _ingest/pipeline/failure-test-pipeline{ "processors": [ { "convert": { "tag": "my-index-convert", // 設置處理器的標簽,方便定位問題 "field": "id", "type": "long" } }, { "date": { "tag": "my-index-date", // 設置處理器的標簽,方便定位問題 "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failure-index" } }, { "set": { "field": "failure", "value": { "on_failure_pipeline": "{{ _ingest.on_failure_pipeline }}", "on_failure_message": "{{_ingest.on_failure_message}}", "on_failure_processor_type": "{{_ingest.on_failure_processor_type}}", "on_failure_processor_tag": "{{ _ingest.on_failure_processor_tag }}" } } } ]}
然后往 my-index 索引中插入一條有錯誤的文檔,將文檔 _id
設置為 2。
PUT my-index/_doc/2?pipeline=failure-test-pipeline{ "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully"}
查看 failure-index 索引記錄的錯誤信息,可以得知打了 my-index-convert 標簽的 convert 類型的處理器在處理 S123456 字符串時引發了異常。細心的同學可能會注意到, 在返回結果中 on_failure_pipeline 的內容為空,這是由于異常并不是由 pipeline 類型的處理器產生的,所以這里的結果是空值。如果只是想獲取客戶端直接調用的 ingest pipeline,那么可以通過 _ingest.pipeline
來獲取。
{ "_index" : "failure-index", "_type" : "_doc", "_id" : "2", "_version" : 1, "_seq_no" : 1, "_primary_term" : 1, "found" : true, "_source" : { "failure" : { "on_failure_pipeline" : "", // 產生異常的 pipeline 類型的處理器中引用的 pipeline "on_failure_message" : "For input string: \\\"S123456\\\"", // 報錯的內容 "on_failure_processor_tag" : "my-index-convert", // 產生異常的處理器的標簽 "on_failure_processor_type" : "convert" // 產生異常的處理器的類型 }, "id" : "S123456", "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }}
上面的示例介紹了 on_failure 參數在 pipeline 級別的處理,現在介紹下 on_failure 參數如何在 processor 級別進行處理。如下所示,在 convert 和 date 處理器中分別通過 on_failure 參數設置了發生異常時執行的處理器列表:當convert 進行類型轉換發生異常時,將當前時間的毫秒數設置 id 字段的值;當 date 處理器解析時間發生異常時,使用 ingest 攝取時間的日期戳作為 date 字段的值。
PUT _ingest/pipeline/failure-test-pipeline{ "processors": [ { "convert": { "field": "id", "type": "long", "on_failure": [ // 發生異常時將當前時間的毫秒數設置 id 字段的值 { "script": { "source": """ long timeNow = Calendar.getInstance().getTimeInMillis(); ctx.id = timeNow; """ } } ] } }, { "date": { "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date", "on_failure": [ // 發生異常時使用 ingest 攝取時間的日期戳作為 date 字段的值 { "set": { "field": "date", "value": "{{_ingest.timestamp}}" } }, { "date": { "field": "date", "formats": [ "yyyy-MM-dd"T"HH:mm:ss.SSSZ" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ] } } ]}
然后往 my-index 索引中插入一條有錯誤的文檔,將文檔 _id
設置為 3。文檔正常寫入 my-index 中,沒有返回報錯信息。
PUT my-index/_doc/3?pipeline=failure-test-pipeline{ "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully"}# 返回結果{ "_index" : "my-index", "_type" : "_doc", "_id" : "3", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1}
獲取 my-index 索引中 _id
為 3 的文檔,可以看到 id 字段的值并不是傳入的原始文檔中的 S123456,而是當前時間對應的毫秒值;date 字段的值被設置為了 ingest 攝取時間的日期。
GET my-index/_doc/3# 返回結果{ "_index" : "my-index", "_type" : "_doc", "_id" : "3", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "date" : "2022/03/03", "id" : 1646349731000, "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }}
1.3.3 fail 主動拋出異常
和 ignore_failure, on_failure 兩種處理異常的方式不同,使用 fail 處理器可以基于某些條件主動拋出異常,當你想要主動讓 pipeline 失敗并且返回特定的報錯信息給請求者時,可以使用這種方式。如下所示,當 tags 字段中不包含 production 時,fail 處理器會主動拋出異常,在 message 參數中可以自定義相應的報錯信息。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "fail": { "if": "ctx.tags.contains("production") != true", "message": "The production tag is not present, found tags: {{{tags}}}" } } ] }, "docs": [ { "_source": { "tags": ["development"] } } ]}# 返回結果{ "docs" : [ { "error" : { "root_cause" : [ { "type" : "fail_processor_exception", // 自定義的報錯信息 "reason" : "The production tag is not present, found tags: {0=development}" } ], "type" : "fail_processor_exception", "reason" : "The production tag is not present, found tags: {0=development}" } } ]}
1.4 執行條件判斷
每種類型的處理器中都支持 if 參數判斷執行處理器的條件,在 if 參數中使用 painless腳本進行邏輯判斷,當 if 的判斷結果為 true 時,相應的處理器才會執行。如下所示,創建了 if-test-pipeline,我們只想日志級別是 error 的消息,當 level 字段的值是 notice 時,丟棄該文檔。
PUT _ingest/pipeline/if-test-pipeline{ "processors": [ { "drop": { "description": "Drop documents with level of notice", "if": "ctx.level == "notice"" } } ]}
然后往 log-index 索引中寫入兩條文檔,指定使用 if-test-pipeline,其中一條文檔的 level 值等于 notice,另一條的 level 值等于 error。
POST log-index/_doc?pipeline=if-test-pipeline{ "level": "notice", "message": "this is a notice log"}POST log-index/_doc?pipeline=if-test-pipeline{ "level": "error", "message": "this is a error log"}
查詢 log-index 索引,只返回了 1 條文檔,level 等于 notice 的文檔被丟棄了。
GET log-index/_search# 返回結果{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "log-index", "_type" : "_doc", "_id" : "fV9ET38BKRZVqZj9X8yC", "_score" : 1.0, "_source" : { "level" : "error", "message" : "this is a error log" } } ] }}
接下來介紹一種高級的用法,將一個 pipeline 作為多個不同的索引或者數據流默認的 pipeline,在這個 pipeline 中創建多個 pipeline 類型的處理器,每個處理器根據傳入的文檔選擇后臺真正要執行的 pipeline。這樣做的好處就是,如果要更改后臺使用的 pipeline,只需要修改默認的 pipeline 中引用的 pipeline 即可,客戶端的代碼或者索引中的設置無需修改,可以做到業務無感知的切換。如下所示,先創建兩個 pipeline,其中 httpd_pipeline 用于處理 http 相關的日志,syslog_pipeline 用于處理 syslog 相關的日志。
PUT _ingest/pipeline/httpd_pipeline{ "processors": [ { "set": { "field": "message", "value": "this is a apache_httpd log" } } ]}PUT _ingest/pipeline/syslog_pipeline{ "processors": [ { "set": { "field": "message", "value": "this is a syslog log" } } ]}
接著創建一個 default_pipeline,使用 if 參數進行判斷,當 service 字段的值等于 apache_httpd 時,執行 httpd_pipeline,當 service 字段的值等于 syslog 時,執行 syslog_pipeline。
PUT _ingest/pipeline/default_pipeline{ "processors": [ { "pipeline": { "description": "If "service" is "apache_httpd", use "httpd_pipeline"", "if": "ctx.service == "apache_httpd"", "name": "httpd_pipeline" } }, { "pipeline": { "description": "If "service" is "syslog", use "syslog_pipeline"", "if": "ctx.service == "syslog"", "name": "syslog_pipeline" } } ]}
使用 simulate API 進行驗證,可以看到由于傳入的文檔的 service 字段的值是 syslog,因此這條文檔被交給 syslog_pipeline 進行處理。
POST _ingest/pipeline/default_pipeline/_simulate{ "docs": [ { "_source": { "service": "syslog" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "this is a syslog log", // syslog_pipeline 添加的內容 "service" : "syslog" }, "_ingest" : { "timestamp" : "2022-03-04T07:18:53.531846541Z" } } } ]}
2 Processor 處理器
下表列出了 Elasticsearch 所有 processor 處理器的類型,并且根據各個處理器的用途作了相應的分類。下面的小節中僅會演示說明一些常用的處理器,未介紹到的部分讀者可以自行查閱官方文檔。
類別 | 處理器 | 作用 |
---|---|---|
數組處理 | append | 添加元素 |
數組處理 | sort | 對數組中的元素進行排序 |
數組處理 | join | 將數組中的每個元素拼接成單個字符串 |
數組處理 | foreach | 遍歷處理數組中的元素 |
結構化數據處理 | json | 將 json 字符串轉換為結構化的 json 對象 |
結構化數據處理 | kv | 以鍵值對的方式提取字段 |
結構化數據處理 | csv | 從單個文本字段中提取 CSV 行中的字段 |
匹配處理 | gsub | 替換字符串中指定的內容,支持正則表達式匹配 |
匹配處理 | grok | 使用正則表達式提取字段,grok 處理器內置預定義的表達式 |
匹配處理 | dissect | 和 grok 處理器類似,語法比 grok 簡單,不使用正則表達式。可以使用修飾符控制解析方式 |
字符串處理 | lowercase | 將字符串轉換為小寫 |
字符串處理 | uppercase | 將字符串轉換為大寫 |
字符串處理 | split | 指定分隔符將字符串拆分為數組 |
字符串處理 | html_strip | 刪除字符串中的 HTLM 標簽 |
字符串處理 | trim | 去掉字符串中的前后空格 |
字段處理 | rename | 重命名字段 |
字段處理 | remove | 刪除字段 |
字段處理 | set | 為字段賦值 |
字段處理 | script | 處理復雜的邏輯,可以執行內聯或者存儲腳本 |
字段處理 | dot_expander | 將帶有點的字段擴展為對象字段 |
文檔處理 | drop | 刪除文檔 |
文檔處理 | fingerprint | 計算文檔內容的哈希值 |
網絡處理 | network_direction | 根據給定的源 IP 地址、目標 IP 地址和內部網絡列表下計算網絡請求的方向 |
網絡處理 | community_id | 計算網絡流數據中的 community id, 可以使用 community id 來關聯與單個流相關的網絡事件 |
網絡處理 | registered_domain | 從完全限定域名 (FQDN) 中提取注冊域(也稱為有效頂級域或 eTLD)、子域和頂級域。 |
HTTP 處理 | urldecode | URL 解碼 |
HTTP 處理 | user_agent | 從 user_agent 中提取詳細信息, 例如操作系統, 瀏覽器版本等等 |
HTTP 處理 | uri_parts | 從 URI 中提取詳細信息, 例如域名, 端口, 路徑等等 |
外部結合 | pipeline | 執行另一個 ingest pipeline |
外部結合 | enrich | 添加來自另一個索引的數據,類似關系型數據庫中的 join 關聯查詢 |
外部結合 | geoip | 根據來自 Maxmind 數據庫的數據添加有關 IP 地址地理位置的信息 |
外部結合 | set_security_user | 獲取索引文檔用戶的詳細信息,例如 username, roles, email, full_name, metadata |
外部結合 | inference | 使用預訓練的數據分析模型來處理數據,用于機器學習領域 |
時間處理 | date_index_name | 根據文檔中的時間戳字段將文檔寫入基于時間的索引 |
時間處理 | date | 從字段中解析日期作為文檔的時間戳 |
類型處理 | convert | 字段類型轉換,例如 "1234" -> 1234 |
類型處理 | byte | 將人類可讀的字節值轉換為字節的數值,例如 1kb -> 1024 |
異常處理 | fail | 主動拋出異常 |
圖形處理 | circle | 將圓形轉換為近似多邊形 |
2.1 Lowercase & Uppercase
lowercase 處理器可以將字符串轉換為其等效的小寫字母。如果該字段是一個字符串數組,則該數組的所有成員都將被轉換。uppercase 處理器和 lowercase 相反,將字符串轉換為大寫字母。如下所示,使用 lowercase 處理器將 name 字段轉換為小寫字母。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "lowercase": { "field": "name" } } ] }, "docs": [ { "_source": { "name": "Tom" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "tom" }, "_ingest" : { "timestamp" : "2022-02-27T10:43:11.718792423Z" } } } ]}
2.2 Split
split 處理器可以根據指定的分隔符,將字符串拆分為數組。如下所示,以 _
符號作為分隔符,將 num 字段拆分為數組。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "split": { "field": "num", "separator": "_" } } ] }, "docs": [ { "_source": { "num": "111_222_333_444" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "num" : [ "111", "222", "333", "444" ] }, "_ingest" : { "timestamp" : "2022-02-27T11:10:25.249883405Z" } } } ]}
2.3 Trim
trim 處理器可以去掉字符串頭尾的空格。如下所示,使用 trim 處理器去掉 message 字段頭尾的空格。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "trim": { "field": "message" } } ] }, "docs": [ { "_source": { "message": " Elasticsearch is the distributed search and analytics engine " } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "Elasticsearch is the distributed search and analytics engine" }, "_ingest" : { "timestamp" : "2022-02-27T11:12:26.952402786Z" } } } ]}
2.4 Join
join 處理器可以將數組中的每個元素拼接成單個字符串。如下所示,使用 _
符號作為分隔符,將 animal 字段中的元素拼接成單個字符串。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "join": { "field": "animal", "separator": "-" } } ] }, "docs": [ { "_source": { "animal": ["dog", "cat", "monkey"] } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "animal" : "dog-cat-monkey" }, "_ingest" : { "timestamp" : "2022-02-27T10:33:39.63520118Z" } } } ]}
2.5 Foreach
使用 foreach 處理器可以遍歷數組,對其中的每個元素進行處理,使用 processor 參數指定一個處理器來處理數組中元素。在 foreach 處理器內引用的處理通過 _ingest._value
鍵來獲取數組中每個元素的值。如下所示,將 values 字段中的每個元素轉換為大寫字母。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "foreach": { "field": "values", "processor": { "uppercase": { "field": "_ingest._value" } } } } ] }, "docs": [ { "_source": { "values" : ["foo", "bar", "baz"] } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "values" : [ "FOO", "BAR", "BAZ" ] }, "_ingest" : { "_value" : null, "timestamp" : "2022-02-27T10:06:44.235660464Z" } } } ]}
2.6 KV
kv 處理器可以以鍵值對的方式提取字段。如下所示,以空格作為不同鍵值對的分隔符,以 =
拆分每組鍵值對的鍵和值。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "kv": { "field": "message", "field_split": " ", // 拆分鍵值對 "value_split": "=" // 拆分鍵值對的鍵和值 } } ] }, "docs": [ { "_source": { "message": "ip=1.2.3.4 error=REFUSED" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "ip=1.2.3.4 error=REFUSED", "error" : "REFUSED", "ip" : "1.2.3.4" }, "_ingest" : { "timestamp" : "2022-02-27T10:40:31.072140367Z" } } } ]}
2.7 CSV
csv 處理器會將字段中的內容看作 csv 文本的一行,根據 separator 參數指定的分隔符,將拆分后的值賦值給 target_fields 列表中定義的字段。如下所示,將 person 字段按照 |
符號進行拆分,依次賦值給 name, age, country 字段。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "csv": { "field": "person", "target_fields": [ // 指定每列的字段值 "name", "age", "country" ], "separator": "|" // 字段間的分隔符 } } ] }, "docs": [ { "_source": { "person": "zhangsan|18|china" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "country" : "china", "person" : "zhangsan|18|china", "name" : "zhangsan", "age" : "18" }, "_ingest" : { "timestamp" : "2022-02-24T09:39:48.708832221Z" } } } ]}
2.8 Grok
grok 處理器可以使用正則表達式來提取字段,并且內置了許多常用的表達式,可以直接通過表達式別名進行使用。可以使用以下命令獲取所有 grok 內置的表達式。
GET _ingest/processor/grok?s
返回結果如下,例如我們想匹配 IP 地址就可以直接使用 %{IP}
進行匹配,想匹配 MAC 地址可以使用 %{MAC}
進行匹配。表達式別名還可以引用其他的表達式別名,比如表達式別名 IP 就引用了IPV4 和 IPV6 兩個別名。
接下來我們嘗試使用 grok 處理器解析一條日志。%{
表示將表達式匹配的值賦值到指定的 field 字段中,表達式可以是我們自定義的表達式,也可以是表達式別名;%{
表示只匹配不賦值。
55.3.244.1 GET /index.html 15824 0.043"
在上面的日志中:
55.3.244.2 是客戶端的 IP 地址,使用%{IP:client}
匹配 IP 地址,賦值到 client 字段中;GET 是 HTTP 的請求方法,使用 %{WORD:method}
匹配數字和字母,賦值到 method 字段中;/index.html 是請求的 URI 路徑,使用 %{URIPATHPARAM:request}
匹配 URI 路徑和參數,賦值到 request 字段中;15824 是請求的字節大小,使用 %{NUMBER:bytes:int}
匹配數字,賦值到 bytes 字段中,并且將字段設置為 int 類型;0.043 是請求的處理時間,使用 %{NUMBER:duration:double}
匹配數字,賦值到 duration字段中,并且將字段設置為 double 類型。上面用到的 IP, WORD, URIPATHPARAM, NUMBER 表達式都是 grok 內置的表達式別名,可以直接拿來使用。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"] } } ] }, "docs":[ { "_source": { "message": "55.3.244.1 GET /index.html 15824 0.043" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "duration" : 0.043, "request" : "/index.html", "method" : "GET", "bytes" : 15824, "client" : "55.3.244.1", "message" : "55.3.244.1 GET /index.html 15824 0.043" }, "_ingest" : { "timestamp" : "2022-03-01T03:33:57.627169176Z" } } } ]}
除了使用 grok 內置的表達式以外,grok 處理器也允許我們自定義表達式。可以在 pattern_definitions
參數中進行設置,其中鍵是我們自定義表達式的別名,值是具體的正則表達式。如下所示,我們定義了兩個表達式別名:FAVORITE_DOG 使用正則表達式 \w+
, 匹配數字和字母,注意這里需要額外使用一個 \
來進行轉義;RGB 可以匹配 RED,GREEN,BLUE 3 種顏色。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "my %{FAVORITE_DOG:dog} is colored %{RGB:color}" ], "pattern_definitions": { // 自定義表達式 "FAVORITE_DOG": "\\w+", // 匹配數字和字母 "RGB": "RED|GREEN|BLUE" // 匹配 3 個顏色 } } } ] }, "docs": [ { "_source": { "message": "my beagle is colored BLUE" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "my beagle is colored BLUE", "color" : "BLUE", "dog" : "beagle" }, "_ingest" : { "timestamp" : "2022-03-01T03:34:33.933398405Z" } } } ]}
有時候一種匹配規則可能難以匹配所有的內容,我們可以在正則表達式中通過或的邏輯進行判斷,但是這樣會使得寫出來的表達式難以閱讀。這里還有一種更好的方法,在 grok 處理器中,patterns 參數允許填寫多個表達式,這樣我們的匹配規則看上去就一目了然,處理器會使用最先匹配到的表達式。如下所示,我們設置了 FAVORITE_DOG 和 FAVORITE_CAT 兩個表達式都用于解析 pet 字段,如果想要知道是哪個表達式匹配了內容,可以設置參數 "trace_match": true
,這樣在返回結果的 _grok_match_index 字段中可以看到匹配了哪個表達式,其中 1 表示匹配了第二個表達式。
POST _ingest/pipeline/_simulate{ "pipeline": { "description": "parse multiple patterns", "processors": [ { "grok": { "field": "message", "patterns": [ // patterns 是數組, 可以填寫多個表達式 "%{FAVORITE_DOG:pet}", "%{FAVORITE_CAT:pet}" ], "pattern_definitions": { "FAVORITE_DOG": "beagle", "FAVORITE_CAT": "burmese" }, "trace_match": true // 顯示匹配了哪一個表達式, 第一個從 0 開始 } } ] }, "docs": [ { "_source": { "message": "I love burmese cats!" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "I love burmese cats!", "pet" : "burmese" }, "_ingest" : { "_grok_match_index" : "1", // 匹配了第 2 個表達式 "timestamp" : "2022-03-01T03:35:05.490483581Z" } } } ]}
在 Kibana 的界面上還提供了 Grok Debugger 方便我們調試 grok 表達式。點擊 Management -> Dev Tools -> Grok Gebugger進入調試界面。
從上圖可以看到,調試界面分為以下 4 個部分:
Sample Data: 填寫測試的文本。Grok Pattern:填寫 grok 表達式,相當于 grok 處理器中 patterns 定義的內容。Custom Patterns:自定義表達式,相當于 grok 處理器中 pattern_definitions 定義的內容。在 Custom Patterns 中每行表示一個自定義表達式,最左邊的字符串表示我們自定義的表達式別名,右邊內容是表達式的內容,不需要進行符號轉義。Structured Data:處理完的結果。我們將示例中的內容按照上面的說明填寫到相應的位置,點擊 Simulate,就可以看到解析完成后的結構化數據了。
2.9 Dissect
dissect 和 grok 處理器類似,都是用于從單個文本字段中提取結構化字段。與 grok 相比,dissect 最大的優勢就是簡單和快速,dissect 在解析時不使用正則表達式,這使得 dissect 的語法更加簡單,并且執行速度比 grok 更快。當然 grok 也有自己的獨到之處,grok 可以同時使用多個 patterns 來對內容來進行匹配,這是 dissect 所不具備的能力。接下來首先介紹一下 dissect 處理器簡單的使用方法,如下所示,我們要對一行日志內容進行解析,%{
表示將匹配到的字符串作為 field 字段的值。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}" } } ] }, "docs": [ { "_source": { "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/english/venues/cities/images/montpellier/18.gif", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : """1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] "GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0" 200 3171""", "@timestamp" : "30/Apr/1998:22:00:52 +0000", "size" : "3171", "clientip" : "1.2.3.4", "httpversion" : "1.0", "status" : "200" }, "_ingest" : { "timestamp" : "2022-03-01T06:37:23.791866312Z" } } } ]}
在 dissect 中可以使用修飾符改變默認的匹配規則,例如可以指定 dissect 忽略某些字段、拼接多個字符等等。dissect 的修飾符說明如下表所示。
修飾符 | 用途 | 位置 | 示例 |
---|---|---|---|
-> | 跳過 -> 右邊重復的字符 | 最右邊 | %{keyname1->} |
將多個結果附加到一起作為輸出 | 左邊 | %{+keyname} %{+keyname} | |
和 /n | 指定附加結果的順序 | 號在左邊,/n 放在右邊,n 是順序的數字 | %{+keyname/2} %{+keyname/1} |
? | 跳忽略匹配項 | 左邊 | %{?keyname} |
和 & | 輸出鍵設置為 * 的值,輸出值設置為 & 的值 | 左邊 | %{*key} %{&value} |
dissect 默認的匹配算法非常嚴格,要求 pattern 中的所有字符都與源字符串完全匹配。例如 %{a} %{b}
只能匹配“字符串1 字符串2”(中間 1 個空格),將無法匹配“字符串1 字符串2”(中間 5 個空格)。要處理這種情況就可以使用 ->
修飾符跳過箭頭右邊重復的字符,例如 %{a->} %{b}
就可以跳過字符串1 和字符串 2 中間的多個空格,只對空格匹配一次。要跳過的字符我們可以自由設置,如下所示,使用 ->
修飾符跳過重復的 ~
字符。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{ts->}~%{level}" // 跳過重復的 ~ 字符 } } ] }, "docs": [ { "_source": { "message": "1998-08-10T17:15:42,466~~~~~~~WARN" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "1998-08-10T17:15:42,466~~~~~~~WARN", "level" : "WARN", "ts" : "1998-08-10T17:15:42,466" }, "_ingest" : { "timestamp" : "2022-03-01T06:38:20.328535452Z" } } } ]}
假如我們想將多個匹配的字符拼接為一個字段,可以使用 +
修飾符,append_separator 參數可以指定分隔符, 默認以空格作為分隔符。如下所示,我們將匹配的多個字符串拼接為 name 字段,使用 ,
作為分隔符。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{+name} %{+name} %{+name} %{+name}", "append_separator": "," } } ] }, "docs": [ { "_source": { "message": "john jacob jingleheimer schmidt" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "john,jacob,jingleheimer,schmidt", "message" : "john jacob jingleheimer schmidt" }, "_ingest" : { "timestamp" : "2022-03-02T13:41:40.058126802Z" } } } ]}
如果我們想改變字符串拼接的順序,可以同時使用 +
和 /n
修飾符指定順序,其中 n 是順序的數字。如下所示,可以看到返回結果中的 name 字段按照我們指定的順序拼接。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{+name/2} %{+name/4} %{+name/3} %{+name/1}", "append_separator": "," } } ] }, "docs": [ { "_source": { "message": "john jacob jingleheimer schmidt" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "schmidt,john,jingleheimer,jacob", "message" : "john jacob jingleheimer schmidt" }, "_ingest" : { "timestamp" : "2022-03-02T13:47:44.332086601Z" } } } ]}
前面提到過,dissect 要求 pattern 中的所有字符都與源字符串完全匹配,否則解析將不會成功。如果我們僅僅想讓某些字符串在匹配時充當“占位”的角色,并不想讓它出現在最終的文檔中,那么就可以使用 ?
修飾符來忽略最終結果中的匹配項。除了使用 ?
修飾符以外,還可以用一個空鍵 %{}
實現相同的效果,但是為了便于閱讀,建議還是使用 %{?
的方式。如下所示,ident 和 auth 字段都不會出現在最終的結果中,僅用于字符串匹配。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{clientip} %{?ident} %{?auth} [%{@timestamp}]" } } ] }, "docs": [ { "_source": { "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "@timestamp" : "30/Apr/1998:22:00:52 +0000", "message" : "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]", "clientip" : "1.2.3.4" }, "_ingest" : { "timestamp" : "2022-03-02T13:50:56.099402273Z" } } } ]}
*
和 &
修飾符可以用于解析包含鍵值對的內容,其中輸出鍵設置為 *
的值,輸出值設置為 &
的值。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "[%{ts}] [%{level}] %{*p1}:%{&p1} %{*p2}:%{&p2}" } } ] }, "docs": [ { "_source": { "message": "[2018-08-10T17:15:42,466] [ERR] ip:1.2.3.4 error:REFUSED" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "level" : "ERR", "ip" : "1.2.3.4", "message" : "[2018-08-10T17:15:42,466] [ERR] ip:1.2.3.4 error:REFUSED", "error" : "REFUSED", "ts" : "2018-08-10T17:15:42,466" }, "_ingest" : { "timestamp" : "2022-03-02T14:00:54.96982616Z" } } } ]}
2.10 Rename
rename 處理器用于重命名現有字段。如果該字段不存在或者重命名的字段已存在,則會引發異常。如下所示,將 provider 字段重命名為 cloud.provider。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "rename": { "field": "provider", "target_field": "cloud.provider" } } ] }, "docs": [ { "_source": { "provider": "Aliyun" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "cloud" : { "provider" : "Aliyun" } }, "_ingest" : { "timestamp" : "2022-02-27T10:57:47.821558199Z" } } } ]}
2.11 Remove
remove 處理器用于刪除現有字段。如果刪除的字段不存在,則會引發異常。如下所示,使用 remove 處理器刪除文檔中的 name 和 location 字段。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "remove": { "field": ["age", "location"] } } ] }, "docs": [ { "_source": { "name": "tom", "age": 18, "location": "United States" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "tom" }, "_ingest" : { "timestamp" : "2022-02-27T10:56:05.119755281Z" } } } ]}
2.12 Set
set 處理器用于為字段賦值,并且在賦值的時候還可以使用 {{{ }}}
符號從其他字段復制值,然后和指定字符串進行拼接。如下所示,將 version 字段的值設置為 2,host.os.name 字段的值為 copy from 字符串拼接 os 字段的結果。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "set": { "field": "host.os.name", "value": "copy from {{{os}}}" // 從 os 字段復制值進行拼接 } }, { "set": { "field": "version", "value": "2" // 設置靜態值 } } ] }, "docs": [ { "_source": { "os": "Ubuntu" } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "host" : { "os" : { "name" : "copy from Ubuntu" } }, "os" : "Ubuntu", "version" : "2" }, "_ingest" : { "timestamp" : "2022-02-28T13:39:31.035666829Z" } } } ]}
2.13 Script
對于復雜的處理邏輯,如果使用 Elasticseach 其他自帶的處理器無法實現,那么可以嘗試在 script 處理器中編寫腳本進行處理。在 script 處理器中通過 lang 參數可以指定腳本語言,通常我們使用 painless 作為腳本語言,這也是 Elasticsearch 中默認的腳本語言。在 script 處理器中,腳本在 ingest 上下文中運行,我們可以通過 ctx["field"]
或者ctx.field
語法來訪問文檔中的字段。如下所示,傳入的文檔中有一個數字類型的參數 num,我們在腳本中通過 if else 條件語句進行判斷,當 num 等于 7 時,將 result 的值設置為 happy;當 num 等于 4 時,將 result 的結果設置為 sad;當 num 是其他值時,將 result 的結果設置為 normal。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "script": { "lang": "painless", "source": """ if(ctx.num == 7){ ctx.result = "happy" }else if(ctx.num == 4){ ctx.result = "sad" }else { ctx.result = "normal" } """ } } ] }, "docs": [ { "_source": { "num": 7 } } ]}# 返回結果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "result" : "happy", "num" : 7 }, "_ingest" : { "timestamp" : "2022-03-02T14:20:27.776240111Z" } } } ]}
2.14 Drop
drop 處理器可以根據條件刪除指定的文檔。如下所示,刪除 name 字段值為 tom 的文檔。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "drop": { "if": "ctx.name == "tom"" } } ] }, "docs": [ { "_source": { "name": "tom", "age": 18 } } ]}# 返回結果{ "docs" : [ null ]}
3 Ingest Pipeline 應用場景
Ingest Pipeline 主要有以下 4 類應用場景:
寫入時指定 pipeline,單條寫入或者使用 _bulk API 批量寫入時都可以使用。更新時指定 pipeline。定義索引或者模板時指定 pipeline,有兩個相關的參數:-index.default_pipeline
參數可以定義default pipeline(默認執行的 pipeline),當請求中沒有指定 pipeline 時執行;- index.final_pipeline
參數可以定義final pipeline(最終執行的 pipeline),在所有 pipeline 執行完后再執行。reindex 時指定 pipeline,在重建索引或者數據遷移時使用。3.1 寫入時指定 Pipeline
首先創建一個名為 lowercase-pipeline
的 pipeline,它的作用是將 name 字段轉換為小寫字母。
PUT _ingest/pipeline/lowercase-pipeline{ "processors": [ { "lowercase": { "field": "name" } } ]}
單條寫入或者通過 _bulk
API 批量寫入時都可以通過 pipeline
參數指定使用的 pipeline。
# 寫入單條數據時指定 pipelienPOST index-1/_doc?pipeline=lowercase-pipeline{ "name": "Tom", "age": 20}# _bulk 寫入多條文檔時指定 pipelinePUT index-1/_bulk?pipeline=lowercase-pipeline{"index":{ }}{"name":"Peter","age":17}{"index":{}}{"name":"Mary","age":19}
查看寫入的文檔,可以看到所有文檔的 name 字段都轉換為了小寫字母。
GET index-1/_search# 返回結果{ "_index" : "index-1", "_type" : "_doc", "_id" : "g196X38BKRZVqZj9rsyn", "_score" : 1.0, "_source" : { "name" : "tom", "age" : 20 }},{ "_index" : "index-1", "_type" : "_doc", "_id" : "hF96X38BKRZVqZj9scwO", "_score" : 1.0, "_source" : { "name" : "peter", "age" : 17 }},{ "_index" : "index-1", "_type" : "_doc", "_id" : "hV96X38BKRZVqZj9scwO", "_score" : 1.0, "_source" : { "name" : "mary", "age" : 19 }}
3.2 更新時指定 Pipeline
使用 _update_by_query API
可以批量更新索引中的文檔,通常會結合pipeline 來對文檔進行更新。以下示例中我們對索引中的所有文檔進行更新,也可以在 _update_by_query API
中使用 DSL 語句過濾出需要更新的文檔。
# 往源索引中插入數據PUT index-2/_doc/1{ "name": "Smith", "age": 18}PUT index-2/_doc/1{ "name": "Mike", "age": 16}# 使用 update_by_query 進行更新,可以寫 DSL 語句過濾出需要更新的文檔POST index-2/_update_by_query?pipeline=lowercase-pipeline
3.3 定義索引或者模板時指定 Pipeline
在定義索引或者模板時可以使用 index.default_pipeline 參數指定 default pipeline(默認執行的 pipeline),index.final_pipeline 參數指定 final pipeline(最終執行的 pipeline)。default pipeline 與 final pipeline 實際上都是普通的 ingest pipeline,只是和一般的 pipeline 執行時機不同;default pipeline 執行的時機是當前寫入請求沒有指定 pipeline 時,final pipeline 執行的時機是在所有 pipeline 執行完畢后。
如上圖所示,如果當前的寫入或者更新請求中指定了 pipeline,則會先執行自定義的 pipeline,當所有的 pipeline 執行完畢后再執行 final pipeline(如果索引顯式設置了index.final_pipeline);如果當前的寫入或者更新請求中沒有指定 pipeline,并且索引顯式設置了 index.default_pipeline 參數時,則會先執行 default pipeline,最后再執行 final pipeline。
為了完成下面的演示,在前面 lowercase-pipeline 的基礎上,現在再創建兩個 pipeline,其中 uppercase-pipeline 的作用是 name 字段轉換為小寫字母,set-pipeline 的作用是為文檔添加一個 message 字段。
PUT _ingest/pipeline/uppercase-pipeline{ "processors": [ { "uppercase": { "field": "name" } } ]}PUT _ingest/pipeline/set-pipeline{ "processors": [ { "set": { "field": "message", "value": "set by final pipeline" } } ]}
接下來創建一個索引 index-3,在 settings 中指定索引的 default_pipeline 為 lowercase-pipeline,final_pipeline 為 set-pipeline。
PUT index-3{ "settings": { "index": { "default_pipeline": "lowercase-pipeline", // 默認執行的 pipeline "final_pipeline": "set-pipeline" // 最終執行的 pipeline } }}
然后往索引中插入兩條文檔,其中 _id
為 1 的文檔在寫入時不指定 pipeline,_id
為 2 的文檔在寫入時指定使用 uppercase-pipeline。
PUT index-3/_doc/1{ "name": "Lisa", "age": 18}# 在寫入時指定 pipeline 覆蓋 default_pipelinePUT index-3/_doc/2?pipeline=uppercase-pipeline{ "name": "Jerry", "age": 21}
查詢最終保存的文檔,可以看到 final pipeline 始終會執行,2 個文檔都添加了 message 字段;由于寫入 _id
為 2 的文檔時指定使用了 uppercase-pipeline,所以該文檔沒有執行 default pipeline,而是執行了 uppercase-pipeline 將字母轉換為大寫。
GET index-3/_search# 返回結果{ "_index" : "index-3", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "name" : "lisa", "message" : "set by final pipeline", "age" : 18 }},{ "_index" : "index-3", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "name" : "JERRY", "message" : "set by final pipeline", "age" : 21 }}
3.4 Reindex 時指定 Pipeline
Elasticsearch 提供了 reindex API 用于將文檔從源索引復制到目標索引,在 reindex 時可以指定 pipeline 對復制的文檔進行加工處理。如下所示,先創建源索引 source-index,并插入 1 條文檔。
PUT source-index/_doc/1{ "name": "Jack", "age": 18}
然后在 reindex 時指定使用 lowercase-pipeline,目標索引名設置為 dest-index。
POST _reindex{ "source": { "index": "source-index" }, "dest": { "index": "dest-index", "pipeline": "lowercase-pipeline" }}
查看目標索引,name 字段已經成功轉換為了小寫字母。
GET dest-index/_search# 返回結果{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "dest-index", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "name" : "jack", "age" : "18" } } ] }}
4 總結
Ingest pipeline 是 Elasticsearch 的一個非常實用的功能,它能夠幫助用戶在數據進入 Elasticsearch 索引之前對其進行預處理,從而提高搜索和分析的效率和準確性。
本文向讀者介紹了如何有效地創建,管理和測試 ElasticSearch Ingest Pipeline。在第一小節中首先說明了 ingest pipeline 的基本用法,包括創建和使用 ingest pipeline,使用 simulate API 對 pipeline 進行測試,以及如何處理 pipeline 中的異常;在第二小節中,將 ingest pipeline 中的 processor 處理器根據用途作了分類說明,并通過示例展示了常見的幾個 processor 的用法;在最后一個小節中歸納了 ingest pipeline 的 4 個應用場景。