Update By Query API

_update_by_query的最简单的用法只是对索引中的每个文档执行更新,而不会更改源。这对于收集新的属性或其他在线映射更改非常有用。这是API:

POST twitter/_update_by_query?conflicts=proceed

他将返回类如下的信息:

{
  "took" : 147,
  "timed_out": false,
  "updated": 120,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}

_update_by_query在启动时获取索引的快照,并使用内部版本控制对其进行索引。这意味着如果文档在拍摄快照和处理索引请求之间发生变化,则会发生版本冲突。当版本匹配文档被更新并且版本号增加。

注意

由于内部版本控制不支持值0作为有效的版本号,版本等于零的文档无法使用_update_by_query进行更新,并且会使请求失败。

所有更新和查询失败导致_update_by_query中止并在响应失败中返回。已执行的更新仍然坚持。换句话说,进程没有回滚,只会中止。当第一个故障导致中止时,失败批量请求返回的所有故障都会返回到故障元素中;因此,有可能会有不少失败的实体。

如果你想简单地计算版本冲突,不会导致_update_by_query中止,你可以在url设置conflicts=proceed或在请求体设置"conflicts": "proceed"。第一个例子是这样做,因为它只是尝试接管在线映射更改,并且版本冲突只是意味着冲突的文档在_update_by_query的开始与尝试更新文档的时间之间更新。这很好,因为该更新将获取在线映射更新。

返回到API格式,您可以将_update_by_query限制为单一类型。下面将只从Twitter的索引更新tweet类型的文件:

POST twitter/tweet/_update_by_query?conflicts=proceed

您还可以使用Query DSL限制_update_by_query。下面将更新用户kimchytwitter索引中的所有文档:

POST twitter/_update_by_query?conflicts=proceed
{
  "query": { //①
    "term": {
      "user": "kimchy"
    }
  }
}

① 该查询必须以与Search API相同的方式作为query的值传递。您也可以以与搜索api相同的方式使用q参数。

到目前为止,我们只是更新文档而不改变它们的来源。这对于收集新的属性来说真的很有用,但只有一半的乐趣。 _update_by_query支持脚本对象来更新文档。这将增加所有kimchytweets上的likes字段:

POST twitter/_update_by_query
{
  "script": {
    "inline": "ctx._source.likes++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}

就像更新API一样,您可以设置ctx.op来更改执行的操作:

noop

如果您的脚本决定不必进行任何更改,请设置 ctx.op ="noop" 。这将导致_update_by_query 从其更新中忽略该文档。这个没有操作将被报告在响应体noop 计数器上。

delete

如果您的脚本决定必须删除该文档,请设置ctx.op="delete"。删除将在响应体deleted 计数器中报告。

ctx.op设置为其他任何内容都是错误。在ctx中设置任何其他字段是一个错误。

请注意,我们停止指定conflict=proceed。在这种情况下,我们希望版本冲突中止该过程,以便我们可以处理该故障。

该API不允许您移动其触摸的文档,只需修改其来源。这是故意的!我们没有规定将文档从原始位置删除。

也可以一次性对多个索引和多个类型进行整体处理,就像搜索API一样:

POST twitter,blog/tweet,post/_update_by_query

如果您提供routing则将路由复制到滚动查询,将进程限制为与该路由值匹配的分片:

POST twitter/_update_by_query?routing=1

默认情况下_update_by_query使用滚动批次数量为1000.您可以使用URL参数scroll_size更改批量大小:

POST twitter/_update_by_query?scroll_size=100

_update_by_query也可以使用Ingest Node功能来指定管道,如下所示:

PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST twitter/_update_by_query?pipeline=set-foo

URL参数

除了标准参数像pretty之外,“Update By Query API”还支持refreshwait_for_completionwait_for_active_shardstimeout以及requests_per_second

发送refresh将在更新请求完成时更新索引中的所有分片。这不同于 Index API 的refresh参数,只会导致接收到新数据的分片被索引。

如果请求包含wait_for_completion=false,那么Elasticsearch将执行一些预检检查、启动请求、然后返回一个任务,可以与Tasks API一起使用来取消或获取任务的状态。Elasticsearch还将以.tasks/task/${taskId}作为文档创建此任务的记录。这是你可以根据是否合适来保留或删除它。当你完成它时,删除它可以让Elasticsearch回收它使用的空间。

wait_for_active_shards控制在继续请求之前必须有多少个分片必须处于活动状态,详见这里timeout控制每个写入请求等待不可用分片变成可用的时间。两者都能正确地在Bulk API中工作。

requests_per_second可以设置为任何正数(1.4,6,1000等),来作为“delete-by-query”每秒请求数的节流阀数字,或者将其设置为-1以禁用限制。节流是在批量批次之间等待,以便它可以操纵滚动超时。等待时间是批次完成的时间与request_per_second * requests_in_the_batch的时间之间的差异。由于分批处理没有被分解成多个批量请求,所以会导致Elasticsearch创建许多请求,然后等待一段时间再开始下一组。这是“突发”而不是“平滑”。默认值为-1。

响应体

JSON响应类似如下:

{
  "took" : 639,
  "updated": 0,
  "batches": 1,
  "version_conflicts": 2,
  "retries": {
    "bulk": 0,
    "search": 0
  }
  "throttled_millis": 0,
  "failures" : [ ]
}

took

从整个操作的开始到结束的毫秒数。

updated

成功更新的文档数。

batches

通过查询更新的滚动响应数量。

version_conflicts

根据查询更新时,版本冲突的数量。

retries

根据查询更新的重试次数。bluk 是重试的批量操作的数量,search 是重试的搜索操作的数量。

throttled_millis

请求休眠的毫秒数,与`requests_per_second`一致。

failures

失败的索引数组。如果这是非空的,那么请求因为这些失败而中止。请参阅 conflicts 来如何防止版本冲突中止操作。

配合Task API使用

您可以使用Task API获取任何正在运行的根据查询修改请求的状态:

GET _tasks?detailed=true&actions=*/update/byquery

响应会类似如下:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    //①
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            }
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

① 此对象包含实际状态。它就像是响应json,重要的添加total字段。 total是重建索引希望执行的操作总数。您可以通过添加的updatedcreateddeleted的字段来估计进度。当它们的总和等于total字段时,请求将完成。

使用任务id可以直接查找任务:

GET /_tasks/taskId:1

这个API的优点是它与wait_for_completion=false集成,以透明地返回已完成任务的状态。如果任务完成并且wait_for_completion=false被设置,那么它将返回resultserror字段。此功能的成本是wait_for_completion=false.tasks/task/${taskId}创建的文档,由你自己删除该文件。

配合取消任务API使用

所有根据查询修改都能使用Task Cancel API取消:

POST _tasks/task_id:1/_cancel

可以使用上面的任务API找到task_id

取消应尽快发生,但可能需要几秒钟。上面的任务状态API将继续列出任务,直到它被唤醒取消自身。

重置节流阀

request_per_second的值可以在通过查询删除时使用_rethrottle API更改:

POST _update_by_query/task_id:1/_rethrottle?requests_per_second=-1

可以使用上面的任务API找到task_id。

就像在_update_by_query API中设置它一样,request_per_second可以是-1来禁用限制,或者任何十进制数字,如1.7或12,以节制到该级别。加速查询的会立即生效,但是在完成当前批处理之后,减慢查询的才会生效。这样可以防止滚动超时。

手动切片

根据查询修改支持滚动切片,您可以相对轻松地手动并行化处理:

POST twitter/_update_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "inline": "ctx._source['extra'] = 'test'"
  }
}
POST twitter/_update_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "inline": "ctx._source['extra'] = 'test'"
  }
}

您可以通过以下方式验证:

GET _refresh
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

其结果一个合理的total像这样:

{
  "hits": {
    "total": 120
  }
}

自动切片

你还可以让根据查询修改使用切片的_uid来自动并行的滚动切片

POST twitter/_update_by_query?refresh&slices=5
{
  "script": {
    "inline": "ctx._source['extra'] = 'test'"
  }
}

您可以通过以下方式验证:

POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

其结果一个合理的total像这样:

{
  "hits": {
    "total": 120
  }
}

slices添加到_update_by_query中可以自动执行上述部分中使用的手动过程,创建子请求,这意味着它有一些怪癖:

  • 您可以在Task API中看到这些请求。这些子请求是具有slices请求任务的“子”任务。

  • 获取slices请求任务的状态只包含已完成切片的状态。

  • 这些子请求可以单独寻址,例如取消和重置节流阀。

  • slices的重置节流阀请求将按相应的重新计算未完成的子请求。

  • slices的取消请求将取消每个子请求。

  • 由于slices的性质,每个子请求将不会获得完全均匀的文档部分。所有文件都将被处理,但有些片可能比其他片大。预期更大的切片可以有更均匀的分布。

  • 带有slices请求的request_per_secondsize的参数相应的分配给每个子请求。结合上述关于分布的不均匀性,您应该得出结论,使用切片大小可能不会导致正确的大小文档为_update_by_query

  • 每个子请求都会获得源索引的略有不同的快照,尽管这些都是大致相同的时间。

挑选切片数量

在这一点上,我们围绕要使用的slices数量提供了一些建议(比如手动并行化时,切片API中的max参数):

  • 不要使用大的数字,500就能造成相当大的CPU抖动。

  • 从查询性能的角度来看,在源索引中使用分片数量的一些倍数更为有效。

  • 在源索引中使用完全相同的分片是从查询性能的角度来看效率最高的。

  • 索引性能应在可用资源之间以slices数量线性扩展。

  • 索引或查询性能是否支配该流程取决于许多因素,如正在重建索引的文档和进行reindexing的集群。

收集新的属性

假设您创建了一个没有动态映射的索引,用数据填充它,然后添加一个映射值来从数据中获取更多的字段:

PUT test
{
  "mappings": {
    "test": {
      "dynamic": false,   //①
      "properties": {
        "text": {"type": "text"}
      }
    }
  }
}

POST test/test?refresh
{
  "text": "words words",
  "flag": "bar"
}
POST test/test?refresh
{
  "text": "words words",
  "flag": "foo"
}
PUT test/_mapping/test   //②
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

① 这意味着新的字段将不会被索引,只存储在_source中。

② 将更新映射以添加新的flag字段。要接收新的字段,你必须重新索引所有的文档。

搜索数据将找不到任何内容:

POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 0
  }
}

但是您可以发出_update_by_query请求来接收新映射:

POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 1
  }
}

将字段添加到多字段时,您可以执行完全相同的操作。

Last updated