使用Java API操作对ES执行增删改查操作

在数据量比较大的时候,传统的关系型数据库按照复杂条件搜索数据效率会比较底下,这时候需要使用一些搜索引擎的技术栈,而ElasticSearch就是使用纯Java开发的一款文档搜索引擎,他提供了多种客户端操作方式,通过客户端,可以大大简化开发者的开发效率和复杂度,下面将会主要结合Spring-Data的ElasticSearchTemplate客户端工具介绍一下使用ElasTicSearch的Java API基本操作。

一、常用的ES客户端

1.使用原生的ES客户端TransportClient

// 配置Maven包依赖

<dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>transport</artifactId>
   <!-- 根据ES的安装版本配置 -->
   <version>version</version>
</dependency>

// 构建客户端对象:使用Settings构建客户端配置对象

Settings settings = Settings.builder()
  .put("cluster.name", "myApplication")
  .put("client.transport.sniff", "true").build();

// 构建客户端对象:使用TransportClient构建ES客户端操作对象

TransportClient client = new PreBuiltTransportClient(settings)
   .addTransportAddress(new TransportAddress(
      InetAddress.getByName("192.168.199.1"), 9300));
2.使用Spring提供的ElasticsearchTemplate

// 配置Spring的Maven配置文件

<dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-elasticsearch</artifactId>
  <!-- 根据ES的安装版本配置 -->
  <version>version</version>
</dependency>

// Spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:elasticsearch="http://www.springframework.org/schema/data/elasticsearch"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
  http://www.springframework.org/schema/data/elasticsearch
  http://www.springframework.org/schema/data/elasticsearch/spring-elasticsearch-1.0.xsd">
  <!-- 扫描包路径 -->
  <elasticsearch:repositories base-package="com.wb.test.elasticsearch.*" />
  <!-- 声明elasticsearch客户端 -->
  <elasticsearch:transport-client id="client" cluster-nodes="192.168.199.1:9300" 
     cluster-name="myApplication" />

  <!-- 声明elasticsearchTemplate -->
  <bean name="esTemplate"
class="org.springframework.data.elasticsearch.core.ElasticsearchTemplate">
    <constructor-arg name="client" ref="client" />
  </bean>
</beans>

二、API基本操作

本文主要讲解使用Spring-data完成ES的API操作,如下:

public class EsBaseDao {
  // 索引名称,可以使用配置的方式写到配置文件中
  private static final String indexName = "user_index";
  // 索引类型,可以配置在配置文件中
  private static final String indexType = "userType";
  // 配置客户端连接ES的超时时间
  private long defaultTimeout;
  @Resource
  private ElasticSearchTemplate esTemplate;
  @Resource
  private Client client;

  // 在Spring容器启动的时候,执行一些初始化操作
  @PostConstruct
  public void init() {
    client = esTemplate.getClient();
  }
  // 增删改查方法
}

假设索引名称为user_index,索引类型为userType,对应的索引字段实体类如下:

public class User {
  // 主键ID
  private Long id;
  private String userName;
  private Integer age;
  private String address;

  // 省略setter和getter方法
}
1.增加数据
public void save(User user) {
  IndexRequestBuilder res = client.prepareIndex()
    .setIndex(indexName) // 索引名称
    .setType(indexType) // 索引类型
    .setId(user.getId()) // 主键ID,如果不设置将会自动生成一个字符串类型的ID
    .setSource(JSON.toJSONString(user)); // 需要保存的数据,为一个JSON格式的字符串
  res.execute().actionGet(defaultTimeout); // 按照指定的超时时间发送请求
}
2.删除数据
public void realDelete(Long id, User user) {
  UpdateRequestBuilder res = client.prepareDelete()
    .setIndex(indexName)
    .setType(indexType)
    .setId(id + "")
    .setDoc(JSON.toJSONString(user));
  res.execute().actionGet(defaultTimeout);
  refresh(); // 执行删除之后,如果需要立刻生效,需要执行refresh操作,实现如下
}

// 执行刷新,将多余的无用数据删除
// 如果对性能要求比较高,频繁使用该种方式会影响ES性能

// 执行ES索引刷新操作
public boolean refresh(Class<?> clazz) {
  try {
    esTemplate.refresh(clazz);
    return true;
  } catch (Exception e ){
    return false;
  }
}
3.修改数据

// 根据主键ID和入参实体类更新文档数据

public boolean update(Long id,User user) 
  throws IllegalAccessException {
  if (null == user) {
    return false;
  }
  UpdateQuery updateQuery = new UpdateQuery();
  updateQuery.setIndexName(indexName);
  updateQuery.setType(indexType);
  updateQuery.setId(id + "");
  updateQuery.setUpdateRequest(new UpdateRequest()
     .doc(JSON.toJSONString(user)));
  return !esTemplate.update(updateQuery).isCreated();
}

// 根据map参数更新文档数据,入参类型为key-value类型

public boolean updateByMapParams(Long id, Map<String,Object> params) {
  if(null == id || MapUtils.isEmpty(params)) {
    return false;
  }
  UpdateQuery updateQuery = new UpdateQuery();
  updateQuery.setIndexName(indexName);
  updateQuery.setType(indexType);
  updateQuery.setId(id + "");
  updateQuery.setUpdateRequest(new UpdateRequest().doc(params));
  return !esTemplate.update(updateQuery).isCreated();
}
4.查询数据
4.1 基础查询

// 根据主键ID查询文档数据

public Object getById(Long id, Class<?> clazz) {
  Object obj = null;
  SearchResponse searchResponse = client.prepareSearch(indexName)
    .setTypes(indexType)
    .setQuery(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("id", id)))
    .setSearchType(SearchType.QUERY_THEN_FETCH)
    .execute().actionGet(defaultTimeout);
  SearchHits searchHits = searchResponse.getHits();
  if (null != searchHits && searchHits.getHits().length > 0) {
    obj = JSON.parseObject(searchHits.getHits()[0].getSourceAsString(), clazz);
  }
  return obj;
}

// 根据条件查询文档列表,根据UserQuery条件查询

public List<User> search(UserQuery userQuery) {
  BoolQueryBuilder qb = new BoolQueryBuilder();
  if (StringUtils.isNotBlank(userQuery.getName())) {
    qb.must(QueryBuilders.termQuery("name", userQuery.getName()));
  }
  if (null != userQuery.getAge()) {
    qb.must(QueryBuilders.termQuery("age", userQuery.getAge()));
  }
  SearchRequestBuilder srq = client.prepareSearch(indexName)
    .setTypes(indexType)
    .setQuery(qb);
  SearchHits hits = srq.execute().actionGet(defaultTimeout).getHits();
  List<User> list = new ArrayList<>();
  if (null != hits && hits.getHits().length > 0) {
    hits.forEach(hit -> list.add(JSON.parseObject(hit.getSourceAsString(), User.class)));
  }
  return list;
}

UserQuery类如下:

public class UserQuery extends User {
  // 排序字段
  private String sortKey;
  // 每页查询数据条数
  private Integer pageSize;
  // 页码
  private Integer pageNo;
  // setter和getter方法略
}

// 根据条件查询数据条数

public Long getTotalNum(UserQuery userQuery){
  SearchRequestBuilder srq = client.prepareSearch(indexName)
    .setTypes(typeName);
  BoolQueryBuilder qb = new BoolQueryBuilder();
  if (StringUtils.isNotBlank(userQuery.getName())) {
    qb.must(QueryBuilders.termQuery("name", userQuery.getName()));
  }
  if (null != userQuery.getAge()) {
    qb.must(QueryBuilders.termQuery("age", userQuery.getAge()));
  }
  srq.setPostFilter(qb)
    .setSearchType(SearchType.COUNT)
    .setSize(0); // 表示查询之后,不返回数据内容,只返回条数,可以提高效率
  return srq.execute().actionGet().getHits().totalHits();
}
4.2 按照查询提交分页查询
public List<User> searchByPage(UserQuery userQuery) {
  BoolQueryBuilder qb = new BoolQueryBuilder();
  if (StringUtils.isNotBlank(userQuery.getName())) {
    qb.must(QueryBuilders.termQuery("name", userQuery.getName()));
  }
  if (null != userQuery.getAge()) {
    qb.must(QueryBuilders.termQuery("age", userQuery.getAge()));
  }
  SearchRequestBuilder srq = client.prepareSearch(indexName)
    .setTypes(indexType)
    .setQuery(qb)
    .addSort(SortBuilders.fieldSort(userQuery.getSortKey()).order(SortOrder.ASC)) // 按照某一个字段升序排列
    .setSize(userQuery.getPageSize()) // 设置每页查询数量
    .setFrom((userQuery.getPageNo() - 1) * userQuery.getPageSize()); // 设置查询偏移量
  SearchHits hits = srq.execute().actionGet(defaultTimeout).getHits();
  List<User> list = new ArrayList<>();
  if (null != hits && hits.getHits().length > 0) {
    hits.forEach(hit -> list.add(JSON.parseObject(hit.getSourceAsString(), User.class)));
  }
  return list;
}
4.3 聚合查询

// 根据年龄分组,统计每个年龄的用户数量
// 返回的Map中,key是年龄,value是年龄对应的用户数量

public Map<Long, Long> getCountByAge(Integer age) {
  Map<Long, Long> userMap = new HashMap<>();
  SearchRequestBuilder sbuilder = client.prepareSearch(indexName)
    .setTypes(indexType);
  TermsBuilder teamAgg = AggregationBuilders
    .terms("user_count").field("age");
  sbuilder.addAggregation(teamAgg);
  SearchResponse response = sbuilder.execute().actionGet(defaultTimeout);
  Map<String, Aggregation> aggMap = response.getAggregations().asMap();
  LongTerms teamAggRes = (LongTerms) aggMap.get("user_count");
  Iterator<Terms.Bucket> bucket = teamAggRes.getBuckets().iterator();
  while (bucket.hasNext()) {
    Terms.Bucket buck = bucket.next();
    userMap.put((Long)buck.getKey(),buck.getDocCount());
  }
  return userMap;
}
5.ES路由

ES路由,是针对于数据量特别大的时候,提高查询效率使用的,但是加了路由不一定是好事。对于数据量比较小,而且查询量比较大的时候,可能会造成ES分片出现热点,导致某一个节点的CPU或者内存瞬时升高。所以在增加路由之前需要仔细考虑。

5.1 路由的使用方法

在创建的时候,需要指定路由字段,即:使用哪一个文档中的字段去做路由,如果没有显示指定路由,将会默认使用主键ID做路由,在修改或者删除的时候,也需要指定路由字段,否则可能提示文档不存在类似的异常。

5.2 路由的使用示例

假设有20万活跃用户的网站浏览记录,当需要使用用户ID去做大批量的查询时,如果数据量特别大,比如上亿级别,可以在用户ID列建立路由,使用用户的ID去做路由,如下:

// 保存操作中指定路由字段

public void save(User user) throws IllegalAccessException {
  IndexRequestBuilder res = client.prepareIndex()
    .setIndex(indexName)
    .setType(indexType)
    .setId(user.getId())
    .setSource(JSON.toJSONString(user));
  // 如果用户ID不为空,可以在保存的时候指定使用userId作为路由字段
  if (StringUtils.isNotBlank(user.getUserId())) {
    res.setRouting(user.getUserId()); // 假设用户ID唯一,而且为字符串类型
  }
  res.execute().actionGet(defaultTimeout);
}

// 删除条件中设置路由字段

DeleteRequestBuilder res = client.prepareDelete()
  .setIndex(index)
  .setType(type)
  .setRouting("userId") // 构建删除对象时,需要指定路由字段
  .setId(id); // 根据主键删除

// 查询条件中设置路由字段

BoolQueryBuilder qb = new BoolQueryBuilder();
qb.must(QueryBuilders.termQuery("name", name)); // 根据name查询
qb.must(QueryBuilders.rangeQuery("createTime").gt(queryTime); // 根据创建时间查询,创建时间大于给定的时间
srq.setQuery(qb) // 设置查询条件
.setRouting(gwRedEsQuery.getVenderId() + ""); // 设置路由字段

// 更新

public boolean update(Long id,User user,String routingField) 
  throws IllegalAccessException {
  UpdateQuery updateQuery = new UpdateQuery();
  updateQuery.setIndexName(indexName);
  updateQuery.setType(indexType);
  updateQuery.setId(id + "");
  if(StringUtils.isNotBlank(routingField)) {
    updateQuery.setUpdateRequest(new UpdateRequest().routing(routingField).doc(JSON.toJSONString(user)));
  }
  return !esTemplate.update(updateQuery).isCreated();
}

至此,使用Java API实现ElasticSearch的基本增删改查操作介绍完毕,下片文章将会介绍ElasticSearch中的一些常见问题,包括深分页及一些Java查询API中的坑!欢迎评论转发!

本文章属于作者原创,如果转发请标注文章来源:个人小站【www.jinnianshizhunian.vip

另外提供一些优秀的Java架构师及IT开发视频,书籍资料,免费下载地址:https://www.592xuexi.com

发表评论

电子邮件地址不会被公开。