使用Java操作Elasticsearch(Elasticsearch的java api使用)

mac2025-01-06  4

1、Elasticsearch是基于Lucene开发的一个分布式全文检索框架,向Elasticsearch中存储和从Elasticsearch中查询,格式是json。

索引index,相当于数据库中的database。

类型type相当于数据库中的table。

主键id相当于数据库中记录的主键,是唯一的。

向Elasticsearch中存储数据,其实就是向es中的index下面的type中存储json类型的数据。

2、Elasticsearch是RestFul风格的api,通过http的请求形式(注意,参数是url拼接还是请求的json形式哦),发送请求,对Elasticsearch进行操作。查询,请求方式应该是get。删除,请求方式应该是delete。添加,请求方式应该是put/post。修改,请求方式应该是put/post。RESTFul接口url的格式:http://ip:port/<index>/<type>/<[id]>。其中index、type是必须提供的。id是可以选择的,不提供es会自动生成,index、type将信息进行分层,利于管理。

3、如何使用java连接Elasticsearch。由于使用的是maven项目,pom.xml的依赖如下所示:

1 <project xmlns="http://maven.apache.org/POM/4.0.0" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 4 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.bie</groupId> 7 <artifactId>elasticsearch-hello</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 10 <properties> 11 <maven.compiler.source>1.8</maven.compiler.source> 12 <maven.compiler.target>1.8</maven.compiler.target> 13 <encoding>UTF-8</encoding> 14 </properties> 15 16 <dependencies> 17 <!-- elasticsearch的客户端 --> 18 <dependency> 19 <groupId>org.elasticsearch.client</groupId> 20 <artifactId>transport</artifactId> 21 <version>5.4.3</version> 22 </dependency> 23 <!-- elasticsearch依赖2.x的log4j --> 24 <dependency> 25 <groupId>org.apache.logging.log4j</groupId> 26 <artifactId>log4j-api</artifactId> 27 <version>2.8.2</version> 28 </dependency> 29 <dependency> 30 <groupId>org.apache.logging.log4j</groupId> 31 <artifactId>log4j-core</artifactId> 32 <version>2.8.2</version> 33 </dependency> 34 <!-- junit单元测试 --> 35 <dependency> 36 <groupId>junit</groupId> 37 <artifactId>junit</artifactId> 38 <version>4.12</version> 39 </dependency> 40 </dependencies> 41 42 43 </project>

使用查询的方式,先简单测试一下是否连通es集群,和对比查询的数据是否一致。

1 package com.bie.elasticsearch; 2 3 import java.net.InetAddress; 4 5 import org.elasticsearch.action.get.GetResponse; 6 import org.elasticsearch.client.transport.TransportClient; 7 import org.elasticsearch.common.settings.Settings; 8 import org.elasticsearch.common.transport.InetSocketTransportAddress; 9 import org.elasticsearch.transport.client.PreBuiltTransportClient; 10 11 /** 12 * 13 * @author biehl 14 * 15 */ 16 public class HelloElasticsearch { 17 18 public static void main(String[] args) { 19 try { 20 // 设置集群名称biehl01,Settings设置es的集群名称,使用的设计模式,链式设计模式、build设计模式。 21 Settings settings = Settings.builder().put("cluster.name", "biehl01").build(); 22 // 读取es集群中的数据,创建client。 23 @SuppressWarnings("resource") 24 TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses( 25 // 用java访问ES用的端口是9300。es的9200是restful的请求端口号 26 // 由于我使用的是伪集群,所以就配置了一台机器,如果是集群方式,将竞选主节点的加进来即可。 27 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 28 // 9300), 29 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 30 // 9300), 31 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 32 // 搜索数据(.actionGet()方法是同步的,没有返回就等待) 33 // 方式是先去索引里面查询出索引数据,再去文档里面查询出数据。 34 GetResponse response = client.prepareGet("news", "fulltext", "2").execute().actionGet(); 35 // 输出结果 36 System.out.println(response); 37 // 关闭client 38 client.close(); 39 } catch (Exception e) { 40 e.printStackTrace(); 41 } 42 43 } 44 45 }

查询的结果如下所示:

4、如何使用java api创建索引Index、类型Type、以及指定字段,是否创建索引,是否存储,是否即分词,又建立索引(analyzed)、是否建索引不分词(not_analyzed)等等。

1 package com.bie.elasticsearch; 2 3 import java.io.IOException; 4 import java.net.InetAddress; 5 import java.util.HashMap; 6 7 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; 8 import org.elasticsearch.client.AdminClient; 9 import org.elasticsearch.client.IndicesAdminClient; 10 import org.elasticsearch.client.transport.TransportClient; 11 import org.elasticsearch.common.settings.Settings; 12 import org.elasticsearch.common.transport.InetSocketTransportAddress; 13 import org.elasticsearch.common.xcontent.XContentBuilder; 14 import org.elasticsearch.common.xcontent.XContentFactory; 15 import org.elasticsearch.transport.client.PreBuiltTransportClient; 16 import org.junit.Before; 17 import org.junit.Test; 18 19 /** 20 * 21 * @author biehl 22 * 23 */ 24 public class AdminAPI { 25 26 private TransportClient client = null; 27 28 // 在所有的测试方法之前执行 29 @SuppressWarnings("resource") 30 @Before 31 public void init() throws Exception { 32 // 设置集群名称biehl01 33 Settings settings = Settings.builder().put("cluster.name", "biehl01") 34 // 自动感知的功能(可以通过当前指定的节点获取所有es节点的信息) 35 .put("client.transport.sniff", true).build(); 36 // 创建client 37 client = new PreBuiltTransportClient(settings).addTransportAddresses( 38 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 39 // 9300), 40 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 41 // 9300), 42 // 建议指定2个及其以上的节点。 43 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 44 } 45 46 /** 47 * 48 * AdminClient创建索引,并配置一些参数,用来指定一些映射关系等等 49 * 50 * 这里创建一个索引Index,并且指定分区、副本的数量 51 * 52 */ 53 @Test 54 public void createIndexWithSettings() { 55 // 获取Admin的API 56 AdminClient admin = client.admin(); 57 // 使用Admin API对索引进行操作 58 IndicesAdminClient indices = admin.indices(); 59 // 准备创建索引 60 indices.prepareCreate("food") 61 // 配置索引参数 62 .setSettings( 63 // 参数配置器 64 Settings.builder()// 指定索引分区的数量。shards分区 65 .put("index.number_of_shards", 5) 66 // 指定索引副本的数量(注意:不包括本身,如果设置数据存储副本为1,实际上数据存储了2份) 67 // replicas副本 68 .put("index.number_of_replicas", 1)) 69 // 真正执行 70 .get(); 71 } 72 73 /** 74 * 你可以通过dynamic设置来控制这一行为,它能够接受以下的选项: true:默认值。 75 * 76 * 动态添加字段 false:忽略新字段 77 * 78 * strict:如果碰到陌生字段,抛出异常 79 * 80 * 给索引添加mapping信息(给表添加schema信息) 81 * 82 * @throws IOException 83 */ 84 @Test 85 public void elasticsearchSettingsMappings() throws IOException { 86 // 1:settings 87 HashMap<String, Object> settings_map = new HashMap<String, Object>(2); 88 // shards分区的数量4 89 settings_map.put("number_of_shards", 4); 90 // 副本的数量1 91 settings_map.put("number_of_replicas", 1); 92 93 // 2:mappings(映射、schema) 94 // field("dynamic", "true")含义是动态字段 95 XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("dynamic", "true") 96 // 设置type中的属性 97 .startObject("properties") 98 // id属性 99 .startObject("id") 100 // 类型是integer 101 .field("type", "integer") 102 // 不分词,但是建索引 103 .field("index", "not_analyzed") 104 // 在文档中存储 105 .field("store", "yes").endObject() 106 // name属性 107 .startObject("name") 108 // string类型 109 .field("type", "string") 110 // 在文档中存储 111 .field("store", "yes") 112 // 建立索引 113 .field("index", "analyzed") 114 // 使用ik_smart进行分词 115 .field("analyzer", "ik_smart").endObject().endObject().endObject(); 116 117 CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("computer"); 118 // 管理索引(user_info)然后关联type(user) 119 prepareCreate.setSettings(settings_map).addMapping("xiaomi", builder).get(); 120 } 121 122 /** 123 * index这个属性,no代表不建索引 124 * 125 * not_analyzed,建索引不分词 126 * 127 * analyzed 即分词,又建立索引 128 * 129 * expected [no],[not_analyzed] or [analyzed]。即可以选择三者任意一个值 130 * 131 * @throws IOException 132 */ 133 134 @Test 135 public void elasticsearchSettingsPlayerMappings() throws IOException { 136 // 1:settings 137 HashMap<String, Object> settings_map = new HashMap<String, Object>(2); 138 // 分区的数量4 139 settings_map.put("number_of_shards", 4); 140 // 副本的数量1 141 settings_map.put("number_of_replicas", 1); 142 143 // 2:mappings 144 XContentBuilder builder = XContentFactory.jsonBuilder().startObject()// 145 .field("dynamic", "true").startObject("properties") 146 // 在文档中存储、 147 .startObject("id").field("type", "integer").field("store", "yes").endObject() 148 // 不分词,但是建索引、 149 .startObject("name").field("type", "string").field("index", "not_analyzed").endObject() 150 // 151 .startObject("age").field("type", "integer").endObject() 152 // 153 .startObject("salary").field("type", "integer").endObject() 154 // 不分词,但是建索引、 155 .startObject("team").field("type", "string").field("index", "not_analyzed").endObject() 156 // 不分词,但是建索引、 157 .startObject("position").field("type", "string").field("index", "not_analyzed").endObject() 158 // 即分词,又建立索引、 159 .startObject("description").field("type", "string").field("store", "no").field("index", "analyzed") 160 .field("analyzer", "ik_smart").endObject() 161 // 即分词,又建立索引、在文档中存储、 162 .startObject("addr").field("type", "string").field("store", "yes").field("index", "analyzed") 163 .field("analyzer", "ik_smart").endObject() 164 165 .endObject() 166 167 .endObject(); 168 169 CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player"); 170 prepareCreate.setSettings(settings_map).addMapping("basketball", builder).get(); 171 } 172 }

5、使用java api操作Elasticsearch的增删改查以及复杂查询(聚合查询,可以进行分组统计数量,分组统计最大值,分组统计平均值,等等统计)。

1 package com.bie.elasticsearch; 2 3 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 4 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; 5 6 import java.io.IOException; 7 import java.net.InetAddress; 8 import java.util.Date; 9 import java.util.Iterator; 10 import java.util.Map; 11 import java.util.Set; 12 13 import org.elasticsearch.action.ActionListener; 14 import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; 15 import org.elasticsearch.action.delete.DeleteResponse; 16 import org.elasticsearch.action.get.GetResponse; 17 import org.elasticsearch.action.get.MultiGetItemResponse; 18 import org.elasticsearch.action.get.MultiGetResponse; 19 import org.elasticsearch.action.index.IndexResponse; 20 import org.elasticsearch.action.search.SearchRequestBuilder; 21 import org.elasticsearch.action.search.SearchResponse; 22 import org.elasticsearch.action.update.UpdateRequest; 23 import org.elasticsearch.action.update.UpdateResponse; 24 import org.elasticsearch.client.transport.TransportClient; 25 import org.elasticsearch.common.settings.Settings; 26 import org.elasticsearch.common.transport.InetSocketTransportAddress; 27 import org.elasticsearch.index.query.QueryBuilder; 28 import org.elasticsearch.index.query.QueryBuilders; 29 import org.elasticsearch.index.reindex.DeleteByQueryAction; 30 import org.elasticsearch.search.aggregations.Aggregation; 31 import org.elasticsearch.search.aggregations.AggregationBuilders; 32 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; 33 import org.elasticsearch.search.aggregations.bucket.terms.Terms; 34 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; 35 import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; 36 import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg; 37 import org.elasticsearch.search.aggregations.metrics.max.InternalMax; 38 import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; 39 import org.elasticsearch.search.aggregations.metrics.sum.InternalSum; 40 import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; 41 import org.elasticsearch.transport.client.PreBuiltTransportClient; 42 import org.junit.Before; 43 import org.junit.Test; 44 45 /** 46 * 47 * @author biehl 48 * 49 */ 50 public class ElasticsearchCRUD { 51 52 private TransportClient client = null; 53 54 @SuppressWarnings("resource") 55 @Before 56 public void init() throws Exception { 57 // 设置集群名称biehl01 58 Settings settings = Settings.builder().put("cluster.name", "biehl01") 59 // 自动感知的功能(可以通过当前指定的节点获取所有es节点的信息) 60 .put("client.transport.sniff", true).build(); 61 // 创建client 62 client = new PreBuiltTransportClient(settings).addTransportAddresses( 63 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 64 // 9300), 65 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 66 // 9300), 67 // 建议指定2个及其以上的节点。 68 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 69 } 70 71 /** 72 * 创建一个Index索引、Type类型、以及id。 73 * 74 * 然后插入类型里面的数据。 75 * 76 * @throws IOException 77 */ 78 @Test 79 public void elasticsearchCreate() throws IOException { 80 IndexResponse response = client.prepareIndex("people", "student", "3") 81 .setSource(jsonBuilder().startObject().field("username", "王五五").field("sex", "") 82 .field("birthday", new Date()).field("age", 21).field("message", "trying out Elasticsearch") 83 .endObject()) 84 .get(); 85 System.out.println(response.toString()); 86 } 87 88 /** 89 * 查找一条索引Index里面的类型Type里面的id的所有信息 90 * 91 * @throws IOException 92 */ 93 @Test 94 public void elasticsearchGet() throws IOException { 95 GetResponse response = client.prepareGet("people", "student", "1").get(); 96 System.out.println(response.getSourceAsString()); 97 } 98 99 /** 100 * 查找多条 101 * 102 * 索引Index里面的类型Type里面的多个id的所有信息 103 * 104 * @throws IOException 105 */ 106 @Test 107 public void elasticsearchMultiGet() throws IOException { 108 // 查询出多个索引Index多个类型Type的多个id的所有信息 109 MultiGetResponse multiGetItemResponses = client.prepareMultiGet().add("people", "student", "1") 110 .add("people", "student", "2", "3").add("people", "teacher", "1").add("news", "fulltext", "1").get(); 111 // 将查询出的结果遍历输出 112 for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 113 // 将每一个查询出的结果遍历输出 114 GetResponse response = itemResponse.getResponse(); 115 // 判断如果存在就进行遍历输出 116 if (response.isExists()) { 117 String json = response.getSourceAsString(); 118 System.out.println(json); 119 } 120 } 121 } 122 123 /** 124 * 修改指定的索引Index里面的类型Type的id的信息 125 * 126 * @throws Exception 127 */ 128 @Test 129 public void elasticsearchUpdate() throws Exception { 130 // 创建一个更新的请求对象 131 UpdateRequest updateRequest = new UpdateRequest(); 132 // 指定索引Index 133 updateRequest.index("people"); 134 // 指定类型Type 135 updateRequest.type("student"); 136 // 指定id的值 137 updateRequest.id("3"); 138 // 设置修改的字段信息 139 updateRequest.doc(jsonBuilder().startObject().field("username", "王五五").endObject()); 140 // 开始进行修改,并且返回响应信息 141 UpdateResponse updateResponse = client.update(updateRequest).get(); 142 // 打印输出响应的信息 143 System.out.println(updateResponse.toString()); 144 } 145 146 /** 147 * 删除指定的索引Index里面的类型Type的id的信息 148 */ 149 @Test 150 public void elasticsearchDelete() { 151 // 指定删除的id信息,并且给出响应结果 152 // prepareDelete(String index, String type, String id); 153 DeleteResponse response = client.prepareDelete("people", "student", "4").get(); 154 // 打印输出的响应信息 155 System.out.println(response); 156 } 157 158 /** 159 * 根据查询条件进行删除数据 160 * 161 * 162 */ 163 @Test 164 public void elasticsearchDeleteByQuery() { 165 BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) 166 // 指定查询条件,matchQuery是name的值text里面包括了这个内容就进行删除。默认使用标准分词器。 167 .filter(QueryBuilders.matchQuery("username", "王五五")) 168 // 指定索引名称 169 .source("people").get(); 170 // 获取到删除的个数 171 long deleted = response.getDeleted(); 172 // 打印输出删除的个数 173 System.out.println(deleted); 174 } 175 176 /** 177 * 异步删除 178 * 179 * 监听,如果真正删除以后进行回调,打印输出删除确认的消息。 180 */ 181 @Test 182 public void elasticsearchDeleteByQueryAsync() { 183 DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(QueryBuilders.matchQuery("sex", "")) 184 .source("people").execute(new ActionListener<BulkByScrollResponse>() { 185 186 // 删除以后的方法回调 187 @Override 188 public void onResponse(BulkByScrollResponse response) { 189 // 返回删除的个数 190 long deleted = response.getDeleted(); 191 System.out.println("数据删除完毕!"); 192 // 打印删除的个数 193 System.out.println("数据删除的个数: " + deleted); 194 } 195 196 @Override 197 public void onFailure(Exception e) { 198 // 失败打印异常信息 199 e.printStackTrace(); 200 } 201 }); 202 203 // 先打印输出,正常执行完毕。再执行异步监听删除数据。 204 try { 205 System.out.println("异步删除操作!"); 206 // 休眠10秒钟,避免主线程里面结束,子线程无法进行结果输出 207 Thread.sleep(10000); 208 } catch (Exception e) { 209 e.printStackTrace(); 210 } 211 } 212 213 /** 214 * 215 * 按照范围进行查找。 216 * 217 */ 218 @Test 219 public void elasticsearchRange() { 220 // includeLower(true).includeUpper(false)含义是包含前面,不包含后面的 221 // [21, 24) 222 QueryBuilder qb = rangeQuery("age").from(21).to(24).includeLower(true).includeUpper(false); 223 // 将查询条件传递进去,并将查询结果进行返回。 224 SearchResponse response = client.prepareSearch("people").setQuery(qb).get(); 225 System.out.println(response); 226 } 227 228 /** 229 * 230 * 向指定索引index里面的类型Type的id的信息 231 * 232 * @throws IOException 233 */ 234 @Test 235 public void elasticsearchAddPlayer() throws IOException { 236 // 237 IndexResponse response = client.prepareIndex("player", "basketball", "4") 238 239 .setSource(jsonBuilder().startObject() 240 241 .field("name", "安其拉") 242 243 .field("age", 28) 244 245 .field("salary", 99000) 246 247 .field("team", "啦啦队 team") 248 249 .field("position", "打中锋") 250 251 .field("description", "跪族蓝孩") 252 253 .endObject()) 254 .get(); 255 256 System.out.println(response); 257 } 258 259 /** 260 * 261 * 262 * select team, count(*) as team_count from player group by team; 263 * 264 * team_counts是别名称。 265 */ 266 @Test 267 public void elasticsearchAgg1() { 268 // 指定索引和type 269 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 270 // 按team分组然后聚合,但是并没有指定聚合函数。 271 // team_count是别名称 272 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_count").field("team"); 273 // 添加聚合器 274 builder.addAggregation(teamAgg); 275 // 触发 276 SearchResponse response = builder.execute().actionGet(); 277 // System.out.println(response); 278 // 将返回的结果放入到一个map中 279 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 280 // 遍历打印输出 281 Set<String> keys = aggMap.keySet(); 282 for (String key : keys) { 283 System.out.println("key: " + key); 284 } 285 286 System.out.println(""); 287 288 // //取出聚合属性 289 StringTerms terms = (StringTerms) aggMap.get("team_count"); 290 291 // //依次迭代出分组聚合数据 292 for (Terms.Bucket bucket : terms.getBuckets()) { 293 // 分组的名字 294 String team = (String) bucket.getKey(); 295 // count,分组后一个组有多少数据 296 long count = bucket.getDocCount(); 297 System.out.println(team + ": " + count); 298 } 299 300 System.out.println(""); 301 302 // 使用Iterator进行遍历迭代 303 Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator(); 304 while (teamBucketIt.hasNext()) { 305 Terms.Bucket bucket = teamBucketIt.next(); 306 // 获取到分组后每组的组名称 307 String team = (String) bucket.getKey(); 308 // 获取到分组后的每组数量 309 long count = bucket.getDocCount(); 310 // 打印输出 311 System.out.println(team + ": " + count); 312 } 313 } 314 315 /** 316 * 317 * select 318 * 319 * team, position, count(*) as pos_count 320 * 321 * from 322 * 323 * player 324 * 325 * group by 326 * 327 * team,position; 328 * 329 * 330 */ 331 @Test 332 public void elasticsearchAgg2() { 333 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 334 // 指定别名和分组的字段 335 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); 336 TermsAggregationBuilder posAgg = AggregationBuilders.terms("pos_count").field("position"); 337 // 添加两个聚合构建器。先按照team分组,再按照position分组。 338 builder.addAggregation(teamAgg.subAggregation(posAgg)); 339 // 执行查询 340 SearchResponse response = builder.execute().actionGet(); 341 // 将查询结果放入map中 342 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 343 // 根据属性名到map中查找 344 StringTerms teams = (StringTerms) aggMap.get("team_name"); 345 // 循环查找结果 346 for (Terms.Bucket teamBucket : teams.getBuckets()) { 347 // 先按球队进行分组 348 String team = (String) teamBucket.getKey(); 349 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 350 StringTerms positions = (StringTerms) subAggMap.get("pos_count"); 351 // 因为一个球队有很多位置,那么还要依次拿出位置信息 352 for (Terms.Bucket posBucket : positions.getBuckets()) { 353 // 拿到位置的名字 354 String pos = (String) posBucket.getKey(); 355 // 拿出该位置的数量 356 long docCount = posBucket.getDocCount(); 357 // 打印球队,位置,人数 358 System.out.println(team + ": " + pos + ": " + docCount); 359 } 360 } 361 362 } 363 364 /** 365 * select team, max(age) as max_age from player group by team; 366 */ 367 @Test 368 public void elasticsearchAgg3() { 369 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 370 // 指定安球队进行分组 371 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); 372 // 指定分组求最大值 373 MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age"); 374 // 分组后求最大值 375 builder.addAggregation(teamAgg.subAggregation(maxAgg)); 376 // 查询 377 SearchResponse response = builder.execute().actionGet(); 378 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 379 // 根据team属性,获取map中的内容 380 StringTerms teams = (StringTerms) aggMap.get("team_name"); 381 for (Terms.Bucket teamBucket : teams.getBuckets()) { 382 // 分组的属性名 383 String team = (String) teamBucket.getKey(); 384 // 在将聚合后取最大值的内容取出来放到map中 385 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 386 // 取分组后的最大值 387 InternalMax ages = (InternalMax) subAggMap.get("max_age"); 388 // 获取到年龄的值 389 double max = ages.getValue(); 390 // 打印输出值 391 System.out.println(team + ": " + max); 392 } 393 } 394 395 /** 396 * select team, avg(age) as avg_age, sum(salary) as total_salary from player 397 * group by team; 398 */ 399 @Test 400 public void elasticsearchAgg4() { 401 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 402 // 指定分组字段 403 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team"); 404 // 指定聚合函数是求平均数据 405 AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age"); 406 // 指定另外一个聚合函数是求和 407 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); 408 // 分组的聚合器关联了两个聚合函数 409 builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg)); 410 // 查询 411 SearchResponse response = builder.execute().actionGet(); 412 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 413 // 按分组的名字取出数据 414 StringTerms teams = (StringTerms) aggMap.get("team_name"); 415 for (Terms.Bucket teamBucket : teams.getBuckets()) { 416 // 获取球队名字 417 String team = (String) teamBucket.getKey(); 418 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 419 // 根据别名取出平均年龄 420 InternalAvg avgAge = (InternalAvg) subAggMap.get("avg_age"); 421 // 根据别名取出薪水总和 422 InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary"); 423 double avgAgeValue = avgAge.getValue(); 424 double totalSalaryValue = totalSalary.getValue(); 425 System.out.println(team + ": " + avgAgeValue + ": " + totalSalaryValue); 426 } 427 } 428 429 /** 430 * select team, sum(salary) as total_salary from player group by team order by 431 * total_salary desc; 432 */ 433 @Test 434 public void elasticsearchAgg5() { 435 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 436 // 按team进行分组,然后指定排序规则 437 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team") 438 .order(Terms.Order.aggregation("total_salary ", true)); 439 // 指定一个聚合函数是求和 440 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); 441 // 添加两个聚合构建器。先按照team分组,再按照salary求和。 442 builder.addAggregation(termsAgg.subAggregation(sumAgg)); 443 // 查询 444 SearchResponse response = builder.execute().actionGet(); 445 // 将查询结果放入map中 446 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 447 // 从查询结果中获取到team_name的信息 448 StringTerms teams = (StringTerms) aggMap.get("team_name"); 449 // 开始遍历获取到的信息 450 for (Terms.Bucket teamBucket : teams.getBuckets()) { 451 // 获取到key的值 452 String team = (String) teamBucket.getKey(); 453 // 获取到求和的值 454 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 455 // 获取到求和的值的信息 456 InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary"); 457 // 获取到求和的值 458 double totalSalaryValue = totalSalary.getValue(); 459 // 打印输出信息 460 System.out.println(team + " " + totalSalaryValue); 461 } 462 } 463 464 }

执行效果,自己可以分别进行测试。由于测试都写了说明,这里就不一一进行测试打印效果了。请自行练习使用即可。

作者:别先生

博客园:https://www.cnblogs.com/biehongli/

如果您想及时得到个人撰写文章以及著作的消息推送,可以扫描上方二维码,关注个人公众号哦。

 

最新回复(0)