JAVA SpringBoot整合并操作ES(ElasticSearch)数据
工作需要,在更改数据时需要维护es(ElasticSearch简称,以下均称es)数据,记录个人总结。以下只包含java对es数据操作,不包含es基础内容。一、导入maven坐标<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-
·
工作需要,在更改数据时需要维护es(ElasticSearch简称,以下均称es)数据,记录个人总结。以下只包含java对es数据操作,不包含es基础内容。
一、导入maven坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
二、es连接配置类,@ConfigurationProperties(prefix = “sc.elasticsearch”)见下方配置文件,成员变量名与配置文件一致即可完成值注入
import lombok.Setter;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "sc.elasticsearch")
@Setter
public class ElasticsearchConfig {
//es地址
private String host;
//es端口号
private int port;
//es用户名
private String username;
//es密码
private String password;
@Bean(destroyMethod = "close")
public RestHighLevelClient client() {
//初始化ES操作客户端
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password)); //es账号密码(默认用户名为elastic)
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(
new HttpHost(host, port)
).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).setMaxRetryTimeoutMillis(2000)
);
return esClient;
}
}
三、以上配置类引入了yml的配置内容,下附yml配置
sc:
elasticsearch:
host: 127.0.0.1
port: 9200
username: elastic
password: ******
四、测试
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyTest {
@Autowired
private RestHighLevelClient client;
@Test
public void getLearn() throws IOException {
GetRequest getRequest = new GetRequest("fn_rmsv2_supervise_log","_doc","336198460682276870");
GetResponse response = client.get(getRequest);
System.out.println(response.getId());
}
}
五、根据业务逻辑编写service
1.接口
public interface IElasticsearchService {
/**
* 新增或修改es数据
* @param indexName
* @param type
* @param id
* @param jsonStr
*/
void addData(String indexName,String type ,String id,String jsonStr);
/**
* 根据督导id查询es的_id,新增或修改es数据需要使用
* @param id
* @param index
* @return
*/
String getEsId(String id, String index);
}
2.实现类
import cn.shencom.server.service.IElasticsearchService;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class ElasticsearchServiceImpl implements IElasticsearchService {
@Autowired
private RestHighLevelClient client;
@Override
public void addData(String indexName,String type ,String id,String jsonStr) {
try {
// 1、创建索引请求 //索引 // mapping type //文档id
IndexRequest request = new IndexRequest(indexName, type, id); //文档id
// 2、准备文档数据
// 直接给JSON串
request.source(jsonStr, XContentType.JSON);
//4、发送请求
IndexResponse indexResponse = null;
try {
// 同步方式
indexResponse = client.index(request);
} catch (ElasticsearchException e) {
// 捕获,并处理异常
//判断是否版本冲突、create但文档已存在冲突
if (e.status() == RestStatus.CONFLICT) {
System.out.println("冲突了,请在此写冲突处理逻辑!" + e.getDetailedMessage());
}
}
//5、处理响应
if (indexResponse != null) {
String index1 = indexResponse.getIndex();
String type1 = indexResponse.getType();
String id1 = indexResponse.getId();
long version1 = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("新增文档成功!" + " index:" + index1 + " type:" + type1 + " id:" + id1 + " version:" + version1);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("修改文档成功!");
}
// 分片处理信息
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("分片处理信息.....");
}
// 如果有分片副本失败,可以获得失败原因信息
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
System.out.println("副本失败原因:" + reason);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public String getEsId(String id, String index){
//设置查询条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.fetchSource(new String[]{"*"}, Strings.EMPTY_ARRAY);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//设置督导记录id条件
boolQueryBuilder.filter(QueryBuilders.matchQuery("id", id));
sourceBuilder.query(boolQueryBuilder);
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(sourceBuilder);
SearchResponse response;
try {
response = client.search(searchRequest);
//返回_id,没有记录时返回督导记录作为id
return response.getHits().getTotalHits() > 0 ? response.getHits().getAt(0).getId() : id;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 批量插入ES
* @param indexName 索引
* @param type 类型
* @param idName id名称
* @param list 数据集合
*/
public void bulkDate(String indexName,String type ,String idName ,List<Map<String,Object>> list ){
try {
if(null == list || list.size()<=0){
return;
}
if(StringUtils.isBlank(indexName) || StringUtils.isBlank(idName) || StringUtils.isBlank(type)){
return;
}
BulkRequest request = new BulkRequest();
for(Map<String,Object> map : list){
if(map.get(idName)!=null){
request.add(new IndexRequest(indexName, type, String.valueOf(map.get(idName)))
.source(map,XContentType.JSON));
}
}
// 2、可选的设置
/*
request.timeout("2m");
request.setRefreshPolicy("wait_for");
request.waitForActiveShards(2);
*/
//3、发送请求
// 同步请求
BulkResponse bulkResponse = client.bulk(request);
//4、处理响应
if(bulkResponse != null) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
//TODO 新增成功的处理
System.out.println("新增成功,{}"+ indexResponse.toString());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
//TODO 修改成功的处理
System.out.println("修改成功,{}"+ updateResponse.toString());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
//TODO 删除成功的处理
System.out.println("删除成功,{}"+ deleteResponse.toString());
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// public static void main(String ags[]){
// Map<String,Object> map1 = new HashMap<String, Object>();
// map1.put("id","2");
// map1.put("user1","bbherbert1");
// map1.put("postDate","2018-08-30");
// map1.put("username","aa");
// map1.put("message","message");
// Map<String,Object> map2 = new HashMap<String, Object>();
// map2.put("id","3");
// map2.put("user2","bbherbert1");
// map2.put("postDate","2018-08-30");
// map2.put("username","aa");
// map2.put("message","message");
// Map<String,Object> map = new HashMap<String, Object>();
// map.put("id","1");
// map.put("user","bbherbert1");
// map.put("postDate","2018-08-30");
// map.put("username","aa");
// map.put("message","message");
//
// List<Map<String,Object>> list = new ArrayList<Map<String, Object>>();
// list.add(map);
// list.add(map1);
// list.add(map2);
// ESUtil esUtil= new ESUtil();
// esUtil.bulkDate("book15","boo","id",list);
// Map<String,Object> map = new HashMap<String, Object>();
// map.put("user","herbert1");
// map.put("postDate","2018-08-30");
// map.put("username","aa");
// map.put("message","message");
// String jsonString = JSON.toJSONString(map);
// esUtil.addData("hh","d","4",jsonString);
// esUtil.addData("hh","d","4","{" +
// "\"user\":\"kimchy\"," +
// "\"postDate\":\"2013-01-30\"," +
// "\"username\":\"zhangsan\"," +
// "\"message\":\"trying out Elasticsearch\"" +
// "}");
// }
}
六、使用,entity即需要新增或修改的对象,fn_rmsv2_supervise_log为es索引
String jsonStr = JSON.toJSONString(entity);
//获得es的数据_id
String _id = iElasticsearchService.getEsId(entity.getId(), "fn_rmsv2_supervise_log" );
//添加或修改数据
iElasticsearchService.addData("fn_rmsv2_supervise_log", "_doc", _id, jsonStr);
七、至此,工作任务就完成了,但是学无止境,故收集了一些es的其他java操作知识点,下附:
1.es API文档地址http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/query-dsl-queries.html
package com.elasticsearch;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.IndicesQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* elasticsearch以提供了一个完整的Java查询dsl其余查询dsl。
* QueryBuilders工厂构建
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
public class QueryBuildersFactory {
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* match query 单个匹配
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder matchQuery() {
return QueryBuilders.matchQuery("name", "张三");
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* multimatch query
* 创建一个匹配查询的布尔型提供字段名称和文本
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder multiMatchQuery() {
//曾用名和现名称为张三的条件
return QueryBuilders.multiMatchQuery(
"张三", // Text you are looking for
"name", "old_name" // Fields you query on
);
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* boolean query and 多条件组合查询
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder booleanQuery() {
return QueryBuilders
.boolQuery()
.must(QueryBuilders.termQuery("name", "张三"))
.mustNot(QueryBuilders.termQuery("using", false))
.must(QueryBuilders.termQuery("sex", "男"))
.should(QueryBuilders.termQuery("age", "20"));
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* ids query
* 构造一个只会匹配的特定数据 id 的查询,类似sql的in关键字
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder idsQuery() {
return QueryBuilders.idsQuery().ids("101", "102", "103");
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* fuzzy query
* 使用模糊查询匹配文档查询
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder fuzzyQuery() {
return QueryBuilders.fuzzyQuery("name", "张三");
}
/**
* TODO NotSolved
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* has child / has parent
* 父或者子的文档查询
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder hasChildQuery() {
return // Has Child
QueryBuilders.hasChildQuery("blog_tag",
QueryBuilders.termQuery("tag", "something"));
// Has Parent
/*return QueryBuilders.hasParentQuery("blog",
QueryBuilders.termQuery("tag","something"));*/
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* matchall query
* 查询匹配所有文件。
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder matchAllQuery() {
return QueryBuilders.matchAllQuery();
}
/**
* TODO NotSolved
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* more like this (field) query (mlt and mlt_field)
* 多字段模糊条件查询
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder moreLikeThisQuery() {
// mlt Query
QueryBuilders.moreLikeThisQuery("name", "old_name") // Fields
.likeText("张") //模糊等于的内容
.minTermFreq(1) //最少出现的次数
.maxQueryTerms(12); // 最多匹配项
// in generated queries
// mlt_field Query
//单字段模糊条件查询
return QueryBuilders.moreLikeThisFieldQuery("name") // Only on single field
.likeText("张")
.minTermFreq(1)
.maxQueryTerms(12);
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* prefix query
* 包含与查询相匹配的文档指定的前缀,匹配name=“张%”
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder prefixQuery() {
return QueryBuilders.prefixQuery("name", "张");
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* range query
* 查询相匹配的文档在一个范围。
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder rangeQuery() {
return QueryBuilders
.rangeQuery("id")
.from("101")
.to("199")
.includeLower(true) //包括下界
.includeUpper(false); //包括上界
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* term query
* 一个查询相匹配的文件包含一个术语。。
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder termQuery() {
return QueryBuilders.termQuery("name", "张三");
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* terms query
* 一个查询相匹配的多个value
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder termsQuery() {
return QueryBuilders.termsQuery("name", // field
"张三", "李四", "王五") // values
.minimumMatch(1); // 设置最小数量的匹配提供了条件。默认为1。
}
/**
* TODO NotSolved
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* top children query
* 构建了一种新的评分的子查询,与子类型和运行在子文档查询。这个查询的结果是,那些子父文档文件匹配。
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder topChildrenQuery() {
return QueryBuilders.topChildrenQuery(
"blog_tag", // field
QueryBuilders.termQuery("name", "葫芦3812娃") // Query
)
.score("max") // max, sum or avg
.factor(5)
.incrementalFactor(2);
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* wildcard query
* 实现了通配符搜索查询。支持通配符* < /tt>,<tt>
* 匹配任何字符序列(包括空),<tt> ? < /tt>,
* 匹配任何单个的字符。注意该查询可以缓慢,因为它
* 许多方面需要遍历。为了防止WildcardQueries极其缓慢。
* 一个通配符词不应该从一个通配符* < /tt>或<tt>
* < /tt> <tt> ?。
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder wildcardQuery() {
return QueryBuilders.wildcardQuery("name", "张*2三");
}
/**
* TODO NotSolved
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* nested query
* 嵌套查询
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static QueryBuilder nestedQuery() {
return QueryBuilders.nestedQuery("location", // Path
QueryBuilders.boolQuery() // Your query
.must(QueryBuilders.matchQuery("location.lat", 0.962590433140581))
.must(QueryBuilders.rangeQuery("location.lon").lt(0.00000000000000000003))
)
.scoreMode("total"); // max, total, avg or none
}
/**
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* indices query
* 索引查询
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
protected static IndicesQueryBuilder indicesQuery() {
// Using another query when no match for the main one
QueryBuilders.indicesQuery(
QueryBuilders.termQuery("name", "张三"),
Es_Utils.INDEX_DEMO_01, "index2"
) //设置查询索引上执行时使用不匹配指数
.noMatchQuery(QueryBuilders.termQuery("age", "18"));
// Using all (match all) or none (match no documents)
return QueryBuilders.indicesQuery(
QueryBuilders.termQuery("name", "张三"),
Es_Utils.INDEX_DEMO_01, "index2"
) // 设置不匹配查询,可以是 all 或者 none
.noMatchQuery("none");
}
public static void main(String[] args) {
//设置查询条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.fetchSource(new String[]{"*"}, Strings.EMPTY_ARRAY);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(fuzzyQuery);
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(sourceBuilder);
SearchResponse response;
try {
response = client.search(searchRequest);
System.out.println(response);
} catch (IOException e) {
e.printStackTrace();
}
}
}
更多推荐



所有评论(0)