ElasticSearch学习之(九)--代码实现ES同步数据

mac2024-04-13  43

该博文主要是描述如何使用代码同步数据至ES,话不多说直接上干货!

1.首先创建自己的索引,指定mapping(当然直接往ES写入数据会自动生成mapping,但是数据类型不可控),所以需要自己创建mapping指定相应的数据类型。

PUT judge_customer { "mappings" : { "properties" : { "company_id" : { "type" : "keyword" }, "customer_id" : { "type" : "keyword" }, "customer_level" : { "type" : "keyword" }, "customer_mobile" : { "type" : "keyword" }, "employee_id" : { "type" : "keyword" }, "first_arrive_time" : { "type" : "long" }, "internal_id" : { "type" : "keyword" }, "last_arrive_time" : { "type" : "long" }, "recommend_id" : { "type" : "keyword" }, "release_num" : { "type" : "integer" } } }, "settings" : { "index" : { "number_of_replicas" : "1", "number_of_shards" : "1" } } }

我是用kibana执行的

使用ES的header插件查看索引已经创建成功

2.上面是ES的准备工作下面直接上代码

@Autowired private RestHighLevelClient esClient; /** * 更新索引接口(批量) * @param list * @return */ @Override public int batchIndex(List<BasicCustomerDoc> list) { int addCount = 0; int delCount = 0; log.info("create or update lghszz customer index begin"); BulkRequest bulkRequest = new BulkRequest(); // 操作标识 boolean flag = false; for (BasicCustomerDoc v : list) { if (StringUtils.isBlank(v.getCustomerId()) && StringUtils.isBlank(v.getInternalId())) { log.error("lghszz customer id is null!"); continue; } // 封装ES数据 通过对v实体的字段进行非空判断封装到对应索引字段中(map的key) Map<String, Object> data = getSourceMap(v); // 创建索引 StringBuilder idBuilder = new StringBuilder().append(v.getCustomerId()).append("_").append(v.getInternalId()); IndexRequest indexRequest = new IndexRequest(IndexName.JUDGE_CUSTOMER.value()).id(idBuilder.toString()); indexRequest.source(data, XContentType.JSON); bulkRequest.add(indexRequest); flag = true; addCount++; // // 更新索引 // UpdateRequest updateRequest = new UpdateRequest(IndexName.JUDGE_CUSTOMER.value(), idBuilder.toString()); // updateRequest.doc(data, XContentType.JSON); // bulkRequest.add(updateRequest); // // 删除无效数据 // DeleteRequest deleteRequest = new DeleteRequest(IndexName.lghszz_CUSTOMER.value(), idBuilder.toString()); // bulkRequest.add(deleteRequest); // flag = true; // delCount++; } // 存在更新或删除操作 if (flag) { BulkResponse response = null; try { // 使用ES 的client进行操作获取结果 response = esClient.bulk(bulkRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.error("error in lghszz customer create index", e); } if (response != null && !response.hasFailures()) { BulkItemResponse[] items = response.getItems(); int opSize = items != null ? items.length : 0; if (opSize == addCount + delCount) { log.info("create or update lghszz customer index end! operational number = {}, add = {}, del = {}", opSize, addCount, delCount); } else { log.error("lghszz customer index operation number inconsistent,operational number = {}, add = {}, del = {}", opSize, addCount, delCount); } return opSize; } else { log.error("lghszz customer index operates fail"); } } else { log.error("lghszz customer index no operation!"); } return addCount; }

这也就是简单实现ES获取数据并写入更新到ES中的简单实现,可以正常执行写入数据。

如有披露或问题欢迎留言或者入群探讨

 

最新回复(0)