ES 除了全文搜索以外还有一个主要功能,就是数据的聚合分析。我会在微信小程序里用到聚合功能。今天先介绍一下。

目前 DSL 库支持如下三种常用聚合模式

Metrics Aggregations

顾名思义,主要是用于计算特定的度量字段,Metric 很像 SQL 中的 avg、max、min 等方法。我们找一下最多的 Live 有多少人感兴趣:

In : from elasticsearch_dsl import A
In : s = Live.search()
In : s.aggs.metric('max_liked_num', A('max', field='liked_num'))
Out: <elasticsearch_dsl.search.Search at 0x10a47b550>

In : r = s.execute()
In : r.aggregations.max_liked_num
Out: {'value': 6918.0}

嚯。看看是哪一个吧:

In [27]: s.query('match', liked_num=6918).execute()[0].subject
Out[27]: '致所有近视想摘掉眼镜的你们'

有点出人意料哦~

其实度量的也不一定是文档的某个特定字段值,可以是文档通过脚本生成的值。比如我们看看全部 Live 平均收入,收入 = 票价* 参与人数,是 2 个字段。要这样用:

In : from elasticsearch_dsl import A

In : s = Live.search()
In : s.aggs.metric('avg_income', A('scripted_metric', init_script="params._agg['incomes'] = []", map_script="params._agg.incomes.add(doc.amou
...: nt.value * doc.seats_taken.value)", combine_script='double total=0; int num_of_income=0; for (i in params._agg.incomes) { total += i; nu
...: m_of_income += 1 } return [total, num_of_income]', reduce_script='double total=0; int num_of_income=0; for (item in params._aggs) { tota
...: l += item[0]; num_of_income += item[1]} return total / num_of_income'))
Out: <elasticsearch_dsl.search.Search at 0x10a512898>

In : rs = s.execute()

In : rs.aggregations.avg_income
Out: {'value': 32934.61029513591}

有点长,我们把 scripted_metric 的参数分开说:

  1. init_script。 初始化时运行,一般是设置初始的全局变量
  2. map_script。会对每个文档做循环,把每个计算好的收入用 add 方法加到每个分片的 params._agg.incomes 里面。
  3. combine_script。我们知道 ES 是分布式的,数据有多个分片,当 map_script 完成后,它用来对每个分片的那部分结果做求和和计数的预处理
  4. reduce_script。如果你了解 MapReduce,我想对 2 和 4 步就能更好的理解了,这一步能通过 params._aggs 把每个分片的预处理结果拿来再做处理,最后通过总收入和 live 数求得平均值。

很庆幸没有给平均值拖后腿。BTW,有兴趣的同学可以继续挖掘为啥平均收入这么高。而且注意额,我考虑的只是普通票价,没有算那些「聊表心意」、「鼎力支持」的票,这会让平均值更高一些。

上面的例子也的好长啊。我不太满意,那么是不是可以简化一下呢?也就是 combine_script 不预计算,统一在 reduce_script 计算:

In : s = Live.search()
In : s.aggs.metric('avg_income', A('scripted_metric', init_script="params._agg['incomes'] = []", map_script="params._agg.incomes.add(doc.amou
...: nt.value * doc.seats_taken.value)", combine_script='return params._agg.incomes', reduce_script='double total=0; int num_of_income=0; for
...:  (shard in params._aggs) { for (income in shard) {total += income; num_of_income += 1}} return total / num_of_income'))
Out: <elasticsearch_dsl.search.Search at 0x10a53a470>

In : r = s.execute()
In : r.aggregations.avg_income
Out: {'value': 32934.61029513591}

只是在 combine_script 用了个嵌套循环。

Bucket Aggregations

Bucket 在英语里面有「桶」的意思,Bucket Aggregations 会把符合某种条件的文档丢进一个 Bucket,而且还可以实现子聚合(sub-aggregations)。

Elasticsearch 是基于 Lucene 构建的。如果你了解过 Lucene,相信知道 docValue,它节省内存、做排序,分组等聚合操作时能够大大提升性能。我们之前的 model 里面大多使用了文本字段(Text),这是用作进行全文搜索的,而希望做聚合计算,需要使用 Keyword 类型的字段。所以我添加了一个 topics 字段:

from elasticsearch_dsl import Keyword, DocType

class Live(DocType):
    ...
    topic_names = Text(analyzer='ik_max_word')
    topics = Keyword()  # 新增

其实 DSL 还支持一种用子字段的写法:

topic_names = Text(analyzer='ik_max_word', fields={'raw': Keyword()})

由于担心未来 Live 的 Topic 会有多个,所以 topic_names 是一个用 join 把 topic 列表串起来的字符串,而需求上 topics 是一个或者多个 topic 的列表,还是额外新加一个字段吧。

这样重新跑爬虫,补充下 topics 字段之后,按 toics 符合数量排序,看看 live 中那些类型的 Live 更多:

In : s = Live.search()
In : s.aggs.bucket('categories', A('terms', field='topics'))
Out: <elasticsearch_dsl.search.Search at 0x10a532d30>

In : r = s.execute()

In : r.aggregations.categories.buckets
Out:
[{'key': '生活方式', 'doc_count': 145},
 {'key': '金融', 'doc_count': 94},
 {'key': '音乐', 'doc_count': 87},
 {'key': '艺术', 'doc_count': 73},
 {'key': '教育', 'doc_count': 59},
 {'key': '科技', 'doc_count': 59},
 {'key': '心理学', 'doc_count': 56},
 {'key': '职业', 'doc_count': 56},
 {'key': '互联网', 'doc_count': 48},
 {'key': '医学', 'doc_count': 42}]

看到了吧,彰显逼格的「生活方式」话题的 Live 最多,象征更多钱的金融话题次之...

现在是不是有种熟悉的感觉:聚合短语 terms 这不是 SQL 里面的 group by 嘛?

PS: 如果要使结果返回所有聚合结果的话,需要加上 size 参数:

s.aggs.bucket('categories', A('terms', field='topics', size=20))

PS: 从 ES5.0 开始,size 不再能指定 0 而返回全部结果了,需要明确指定一个大于 0 的整数。

Bucket 聚合支持多种类型,我们再演示下范围聚合。 现在把票价分成三个范围:

  1. 小于 20 的
  2. 20-100 之间的
  3. 大于 100 的

这样写:

In : s = Live.search()
In : s.aggs.bucket('amount_eq_100', A('range', field='amount', ranges=[{'from': 100}, {'from': 20, 'to': 100}, {'to': 20}]))
Out: Range(field='amount', ranges=[{'from': 100}, {'from': 20, 'to': 100}, {'to': 20}])

In : r = s.execute()
In : buckets = r.aggregations.amount_eq_100.buckets
In : for bucket in buckets:
...:     print('{}: {}'.format(bucket['key'], bucket['doc_count']))
...:
*-20.0: 1159
20.0-100.0: 683
100.0-*: 20

最后演示下 date_histogram 型的聚合,histogram 顾名思义是直方图的意思,我们看看从 Live 诞生到现在,每个月(即将)举行 Live 的数量分别是多少:

In : s = Live.search()
In : s.aggs.buckets('start_at', A('date_histogram', field='starts_at', interval='month'))
Out: <elasticsearch_dsl.search.Search at 0x10a6ea3c8>

In : r = s.execute()
In : r.aggregations.start_at.buckets
Out:
[{'key_as_string': '2016-05-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-06-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-07-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-08-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-09-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-10-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-11-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-12-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-01-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-02-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-03-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-04-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-05-01T00:00:00.000Z', 'key': datetim...}]

In : q = r.aggregations.start_at.buckets[0]
In : q.doc_count
Out: 20
In : q.key
Out: datetime.datetime(2016, 5, 1, 0, 0)

interval 支持多种类型:如 year, quarter, month, week, day, hour, minute, second 等。

Pipeline Aggregations

管道聚合是在 Elasticsearch 2.x 新增的一种聚合类型,可以在现有的聚合数据之上,再对其做一次运算。这类似 SQL 的 Subquery。

Pipeline 分为 2 类:

  1. parent。聚合的输入是非 Pipeline 聚合的输出,并对其进行进一步处理。一般不生成新的桶,而是对父聚合桶信息的 replace。
  2. sibling。聚合的输入是其他 Pipeline 聚合的输出。并能在同级上计算新的聚合。

管道聚合通过 buckets_path 参数指定他们要进行聚合计算的权值对象,格式如下:

AGG_SEPARATOR       =  '>' ;  指定父子聚合关系
METRIC_SEPARATOR    =  '.' ;  指定聚合的特定权值
AGG_NAME            =  <the name of the aggregation> ;  直接指定聚合的名称
METRIC              =  <the name of the metric (in case of multi-value metrics aggregation)> ;  直接指定权值
PATH                =  <AGG_NAME> [ <AGG_SEPARATOR>, <AGG_NAME> ]* [ <METRIC_SEPARATOR>, <METRIC> ] ; 综合上面的方式指定完整路径

看 2 个例子就好懂了。首先演示 sibling 类型的,基于上节 date_histogram 聚合例子了,我们算一下每个月的 Live 总收入

In : agg = A('date_histogram', field='starts_at', interval='month')
In : agg.bucket('incomes', A('sum', script={'inline': "doc['seats_taken'].value* doc['amount'].value"}))
Out: Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"})
In : s.aggs.bucket('incomes_per_month', agg)
Out: DateHistogram(aggs={'incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"})}, field='starts_at', interval='month')

为了构造更好理解的聚合语句,先生成一个 agg 变量,可以看到 Buckets 和 Metrics 可以用函数式的方式用多个,也要注意当需求复杂的时候都是可以通过 script 来实现的。接着加入 2 个管道,再分别获得最大月收入和全部月收入:

In : s.aggs.pipeline('max_monthly_incomes', agg_type='max_bucket', buckets_path='incomes_per_month>incomes')
Out: <elasticsearch_dsl.search.Search at 0x10a75b518>
In : s.aggs.pipeline('sum_monthly_incomes', agg_type='sum_bucket', buckets_path='incomes_per_month>incomes')  # 注意agg_type不一样
Out: <elasticsearch_dsl.search.Search at 0x10a75b518>

In : r = s.execute()
In : r.aggregations.incomes_per_month.buckets
Out:
[{'key_as_string': '2016-05-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-06-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-07-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-08-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-09-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-10-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-11-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2016-12-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-01-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-02-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-03-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-04-01T00:00:00.000Z', 'key': datetim...},
 {'key_as_string': '2017-05-01T00:00:00.000Z', 'key': datetim...}]

In : a = r.aggregations.incomes_per_month.buckets[0]
In : a.doc_count
Out: 20
In : a.incomes
Out: {'value': 210088.48763275146}

In : r.aggregations.max_monthly_incomes
Out: {'value': 18330342.949926138, 'keys': ['2016-10-01T00:00:00....}  # 十月份收入最多

In : r.aggregations.sum_monthly_incomes
Out: {'value': 61324244.369543076}  # 满眼的钱,现金🐂啊

数完了钱,思考下。这个例子就是 sibling 聚合,因为 sum_monthly_incomes、max_monthly_incomes 和 incomes_per_month 在一个区间内的(都是 aggs 的键)。

我之前我们算过么每个月 Live 的总收入,全部 Live 的平均收入。我们现在算一下每个月 Live 的平均收入:

In : agg = A('date_histogram', field='starts_at', interval='month')
In : agg.bucket('total_incomes', A('sum', script={'inline': "doc['seats_taken'].value* doc['amount'].value"}))
Out: Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"})

In : agg.pipeline('avg_income', agg_type='bucket_script', buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count')  # _count是一个特殊的路径,表示当前bucket里面的文档数量
Out: DateHistogram(aggs={'total_incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}), 'avg_income': BucketScript(buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count')}, field='starts_at', interval='month')

In : s = Live.search()
In : s.aggs.bucket('avg_income_per_month', agg)
Out: DateHistogram(aggs={'total_incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}), 'avg_income': BucketScript(buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count')}, field='starts_at', interval='month')

In : r = s.execute()
In : buckets = r.aggregations.avg_income_per_month.buckets
In : b = buckets[0]

In : b.total_incomes
Out: {'value': 210088.48763275146}
In : b.doc_count
Out: 20
In : b.avg_income
Out: {'value': 10504.424381637573}

这就是 parent 类型的管道聚合了,它对每个桶自己去做运算。

今天先到这里了,下一篇将基于这几天对 ES 的学习实现一个对知乎 Live 进行全文搜索的微信小程序了