Elasticsearch深度分页

叶斌

首先来看一下Elasticsearch搜索的内部执行原理

一个基本的分页查询语句:

POST /rider/rider_info/_search { "query": { "match_all": {}}, "from": 5000, "size": 20 }

上面的查询表示从搜索结果中取第5000条开始的20条数据。 那么,这个查询语句在 Elasticsearch 集群内部是怎么执行的呢? 在 Elasticsearch 中,搜索一般包括两个阶段,query 和 fetch 阶段,可以简单的理解,query 阶段确定要取哪些doc,fetch 阶段取出具体的 doc。

Query 阶段

如上图所示,描述了一次搜索请求的 query 阶段。 Client 发送一次搜索请求,node1 接收到请求,然后,node1 创建一个大小为 from + size 的优先级队列用来存结果,我们管 node1 叫 coordinating node。
coordinating node将请求广播到涉及到的 shards,每个 shard 在内部执行搜索请求,然后,将结果存到内部的大小同样为 from + size 的优先级队列里,可以把优先级队列理解为一个包含 top N 结果的列表。
每个 shard 把暂存在自身优先级队列里的数据返回给 coordinating node,coordinating node 拿到各个 shards 返回的结果后对结果进行一次合并,产生一个全局的优先级队列,存到自身的优先级队列里。 个人理解:在上面的例子中,coordinating node 排序查询得到5020条数据,存到优先级队列,以便 fetch 阶段使用。另外,各个分片同样返回给 coordinating node 的数据5020条数据,当然,只需要返回唯一标记 doc 的 _id 以及用于排序的 _score 即可,这样也可以保证返回的数据量足够小。 coordinating node 合并与计算好自己的优先级队列后,query 阶段结束,进入 fetch 阶段。

Fetch 阶段

query 阶段知道了要取哪些数据,但是并没有取具体的数据,这就是 fetch 阶段要做的。

上图展示了 fetch 过程: coordinating node 发送 GET 请求到相关shards。
shard 根据 doc 的 id 取到数据详情,然后返回给 coordinating node。
coordinating node 返回数据给 Client。
coordinating node 的优先级队列里有 from + size 个 _doc _id,但是,在 fetch 阶段,并不需要取回所有数据,在上面的例子中,前100条数据是不需要取的,只需要取优先级队列里的第101到110条数据即可。
需要取的数据可能在不同分片,也可能在同一分片,coordinating node 使用 multi-get 来避免多次去同一分片取数据,从而提高性能。 深度分页的问题 Elasticsearch 的这种方式提供了分页的功能,同时,也有相应的限制。举个例子,一个索引,有10亿数据,分10个 shards,然后,一个搜索请求,from=1,000,000,size=100,这时候,会带来严重的性能问题:
在 query 阶段,每个shards需要返回 1,000,100 条数据给 coordinating node,而 coordinating node 需要接收 10 1,000,100 条数据,即使每条数据只有 _doc _id 和 _score,这数据量也很大了,而且,这才一个查询请求,那如果再乘以100呢? Elasticsearch似乎并不希望用户深度分页的查询,在设置参数上有index.max
result_window,默认只有10000,也就是说在不修改默认参数的基础上,深度分页只能查询到10000,如果用普通查询进行深度分页肯定会碰到这个问题。
修改参数语法:

POST /rider/_settings" -d '{"index": {"max_result_window": 10000000}} 采用 Elasticsearch 提供的 scroll 方式来实现深度分页遍历。 可以把 scroll 理解为关系型数据库里的 cursor,因此,scroll 并不适合用来做实时搜索,而更适用于后台批处理任务,比如群发。 可以把 scroll 分为初始化和遍历两步,初始化时将所有符合搜索条件的搜索结果缓存起来,可以想象成快照,在遍历时,从这个快照里取数据,也就是说,在初始化后对索引插入、删除、更新数据都不会影响遍历结果。

使用介绍 下面介绍下scroll的使用,可以通过 Elasticsearch 的 HTTP 接口做试验下,包括初始化和遍历两个部分。 初始化-第一次查询:scroll查询city_id = 1的骑手

POST /rider/rider_info/_search?scroll=1m&size=200 { "query": { "bool": { "filter": [ { "bool": { "must": [ { "match_phrase": { "city_id": { "query": 1, "slop": 0, "boost": 1 } } } ], "disable_coord": false, "adjust_pure_negative": true, "boost": 1 } } ], "disable_coord": false, "adjust_pure_negative": true, "boost": 1 } } } 初始化时需要像普通 search 一样,指明 index 和 type (当然,search 是可以不指明 index 和 type 的),然后,加上参数 scroll,表示暂存搜索结果的时间,size,scroll每页的数量,其它就像一个普通的search请求一样。 初始化返回一个 scrollid,scrollid 用来下次取数据用。 遍历 POST search/scroll?scroll=1m&scrollid=?
每次的翻页查询都会返回下一页的结果集,直到所有的查询结果都已经返回了,这个时候返回的response里面的hits就会是空的,所以我们可以用hits来判断是不是查询结束了。 在网上查询资料的时候,有点地方说明:需要注意的是,遍历的scrollid是会改变的,每次新的查询必需用最近返回的scrollid。 不过我在使用的时候,一次scrollid查询每次遍历返回的值都是固定的,不知是不是Elasticsearch 版本不同的问题(公司使用Elasticsearch 5.3.0).保险起见,开发过程中,每次下一页scroll遍历最好使用上一次的scroll_id。

Elasticsearch scroll_id探秘

细心的会发现,这个ID其实是通过base64编码的,以刚才查询的scroll_id为例: DnF1ZXJ5VGhlbkZldGNoBgAAAAAANljpFkZSMEhCdUVQVFEyYkdEeGFoT1JRdWcAAAAAACzJkxZ1cEdfZVhhVVFhU1FBMEVrRUtEOWN3AAAAAAAsyZQWdXBHX2VYYVVRYVNRQTBFa0VLRDljdwAAAAAALMmVFnVwR19lWGFVUWFTUUEwRWtFS0Q5Y3cAAAAAADZY6hZGUjBIQnVFUFRRMmJHRHhhaE9SUXVnAAAAAAA2WOsWRlIwSEJ1RVBUUTJiR0R4YWhPUlF1Zw==

使用解码工具可以看到: queryThenFetch,�'upG_eXaUQaSQA0EkEKD9cw6\�FR0HBuEPTQ2bGDxahORQug0:�LFJEqZW7Q3OWLYTpz1UtrQ0:�LFJEqZW7Q3OWLYTpz1UtrQ0:�LFJEqZW7Q3OWLYTpz1UtrQ0:�LFJEqZW7Q3OWLYTpz1UtrQ 记录的应该是查询模式、查询参数及shard上的查询位置。

Elasticsearch Java client scroll查询代码:
public ScrollPagination<RiderEsDTO> pageScrollSearch(RiderEsParamDTO paramDTO) { ScrollPagination<RiderEsDTO> pagination = new ScrollPagination<RiderEsDTO>(paramDTO.getCurrentPage(), paramDTO.getPageSize()); // 非第一次且未传入scrollId,无法查询 if (pagination.getCurrentPage() > 1 && StringUtils.isBlank(paramDTO.getScrollId())) { throw new BusinessException("RiderScrollQueryManager pageScrollSearch scrollId invalid."); } long start = System.currentTimeMillis(); // 第一页查询 新建scroll if (paramDTO.getCurrentPage() == 1) { SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch(RiderEsConstants.RIDER_INDEX_NAME) .setTypes(RiderEsConstants.RIDER_INFO_TYPE_NAME); BoolQueryBuilder boolQuery = RiderElasticHelper.buildBasicOrderQuery(paramDTO); searchRequestBuilder.setQuery(boolQuery); searchRequestBuilder.setSize(pagination.getPageSize()); // scroll有效时间3分钟 searchRequestBuilder.setScroll(TimeValue.timeValueMinutes(3)); SearchResponse scrollResponse = searchRequestBuilder.execute().actionGet(); SearchHits searchHits = scrollResponse.getHits(); pagination.setTotalCount(searchHits.getTotalHits()); if (searchHits.getTotalHits() <= 0) { return pagination; } List<RiderEsDTO> list = new ArrayList<RiderEsDTO>(); for (SearchHit hit : searchHits.getHits()) { Map<String, Object> source = hit.getSource(); list.add(RiderElasticHelper.convertSourceMapToRiderEsDTO(source)); } pagination.setList(list); pagination.setScrollId(scrollResponse.getScrollId()); } else { // scroll翻页查询,直接传入scrollId即可 SearchResponse scrollResponse = transportClient.prepareSearchScroll(paramDTO.getScrollId()) .setScroll(TimeValue.timeValueMinutes(3)).execute().actionGet(); SearchHits searchHits = scrollResponse.getHits(); pagination.setTotalCount(searchHits.getTotalHits()); if (searchHits.getTotalHits() <= 0) { return pagination; } List<RiderEsDTO> list = new ArrayList<RiderEsDTO>(); for (SearchHit hit : searchHits.getHits()) { Map<String, Object> source = hit.getSource(); // source map convert to result dto list.add(RiderElasticHelper.convertSourceMapToRiderEsDTO(source)); } pagination.setList(list); pagination.setScrollId(scrollResponse.getScrollId()); } logger.info( "RiderScrollQueryManager pageScrollSearch scrollId:{}, totalCount:{}, currentPage:{}, listSize:{}, cost:{}", pagination.getScrollId(), pagination.getTotalCount(), pagination.getCurrentPage(), pagination.getList().size(), System.currentTimeMillis() - start); return pagination; }

测试from&size VS scroll的性能

size:200
size:500
size:1000

from&size VS scroll的性能比较结果,很遗憾,随每页size的增涨scroll反而是更慢,没有符合功能介绍上的预期。只能解释是环境问题:数据量太小,机器性能太好。(环境:生产的骑手中心数据约95W)

scroll使用过程中碰到的其他注意点
以rpc服务提供scroll查询必须接受数据可缺失。因为各种原因客户端可能的超时,而服务端scroll翻页,遍历过程中导致查询结果部分丢失。
scroll查询不支持排序。

以上所有测试,版本说明:Elasticsearch 5.3.0;java client org.elasticsearch 5.3.0