1. 简述

  • Elasticsearch 是基于 Lucene 开发的一个分布式全文检索框架,向 Elasticsearch 中存储和从 Elasticsearch 中查询,格式是json。

  • Elasticsearch 中存储数据,其实就是向 es 中的 index 下面的 type 中存储 json 类型的数据。

  • elasticsearch 提供了很多语言的客户端用于操作 elasticsearch 服务,例如: javapython.netJavaScriptPHP 等。本文主要介绍如何使用 java 语言来操作 elasticsearch 服务。在 elasticsearch 的官网上提供了两种 java 语言的 API ,一种是 **Java Transport Client**,一种是 **Java REST Client**。

Java Transport Client** 是基于 TCP 协议交互的,**在 elasticsearch 7.0+ 版本后官方不再赞成使用,在Elasticsearch 8.0的版本中完全移除 TransportClient

** Java REST Client 是基于 HTTP 协议交互,**而 Java REST Client 又分为 Java Low Level REST ClientJava High Level REST Client

  • Java High Level REST Client 是在 Java Low Level REST Client 的基础上做了封装,使其以更加面向对象和操作更加便利的方式调用 elasticsearch 服务。

官方推荐使用 Java High Level REST Client ,因为在实际使用中, Java Transport Client 在大并发的情况下会出现连接不稳定的情况。
那接下来我们就来看看 elasticsearch 提供的 Java High Level REST Client (以下简称高级REST客户端)的一些基础的操作,跟多的操作大家自行阅读elasticsearch的官方文档: [https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html)



2. 准备

  • 环境:

    • Windows 10

    • elasticsearch 7.91

    • IDEA

    • Maven

    • Java 8

高级客户端需要 Java 1.8 并依赖于 Elasticsearch core 项目

  • 依赖:
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.9.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.9.1</version>
</dependency>

3. 初始化

1
2
3
4
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
  • 高级客户端将在内部创建用于基于提供的生成器执行请求的低级客户端。该低级客户端维护一个连接池并启动一些线程,因此,当您很好地完成高级客户端时,您应该关闭该高级客户端,然后关闭内部低级客户端以释放这些资源。这可以通过 以下时间完成: close
1
client.close();

在有关 Java 高级客户端的本文档的其余部分中,实例将引用为 。 RestHighLevelClient client


案例: 

  • 查询 index 代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args)  {
RestClientBuilder builder = RestClient.builder(
new HttpHost(
"127.0.0.1", //es主机 IP
9200 // es 端口http
)
);
RestHighLevelClient client = new RestHighLevelClient(builder);
GetRequest request = new GetRequest(
"blog1", //索引
"1" //文档ID
);

//当针对不存在的索引执行获取请求时,响应404状态码,将引发IOException,需要按如下方式处理:
GetResponse documentFields = null;
try {
documentFields = client.get(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
////处理因为索引不存在而抛出的异常情况
}
System.out.println(documentFields);
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}

}
  • 查询结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"_index": "blog1",
"_type": "_doc",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"age": 1,
"country": "fuzhou",
"date": "2020-09-10",
"name": "ngitvusercancel"
}
}

上述是一个案例的展示,让我们初步了解通过 Java 的高级 restful 客户端来访问, 下面我们将进行相关 Api 的介绍

4. 索引 API (Index Api)

4.1 创建索引(Create Index API)

**

4.1.1 案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
   /*
* 创建索引.
* url:https://i-code.online/
*/
public static void main(String[] args) {
//创建链接信息
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("127.0.0.1",9200)));

//创建索引请求 索引名称 student
CreateIndexRequest createIndexRequest = new CreateIndexRequest("student-1");

//创建索引时可以设置与之相关的 特定配置
createIndexRequest.settings(Settings.builder()
.put("index.number_of_shards",3) //分片数
.put("index.number_of_replicas",2) //备份数
);
//创建文档类型映射
createIndexRequest.mapping("{\n" +
" \"properties\": {\n" +
" \"id\": {\n" +
" \"type\": \"long\",\n" +
" \"store\": true\n" +
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\",\n" +
" \"index\": true,\n" +
" \"analyzer\": \"ik_max_word\"\n" +
" },\n" +
" \"content\": {\n" +
" \"type\": \"text\",\n" +
" \"index\": true,\n" +
" \"analyzer\": \"ik_max_word\"\n" +
" }\n" +
" }\n" +
"}",
XContentType.JSON //类型映射,需要的是一个JSON字符串
);
//可选参数
//超时,等待所有节点被确认(使用TimeValue方式)
createIndexRequest.setTimeout(TimeValue.timeValueMinutes(1));

try {
//同步执行
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
//返回的CreateIndexResponse允许检索有关执行的操作的信息,如下所示:
boolean acknowledged = createIndexResponse.isAcknowledged();//指示是否所有节点都已确认请求
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();//指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
System.out.println("acknowledged:"+acknowledged);
System.out.println("shardsAcknowledged:"+shardsAcknowledged);
System.out.println(createIndexResponse.index());
} catch (IOException e) {
e.printStackTrace();
}
try {
//关闭客户端链接
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}

上述是一个 index 创建的过程,具体的细节操作 api 下面详解

4.1.2 创建索引请求

  • 需要参数: CreateIndexRequestindex
1
CreateIndexRequest request = new CreateIndexRequest("twitter");//<1>

<1>要创建索引

4.1.3 索引设置

  • 创建的每个索引都可以具有与其关联的特定设置。
1
2
3
4
5
//此索引的设置
request.settings(Settings.builder()
.put("index.number_of_shards", 3) //分片数
.put("index.number_of_replicas", 2)//备份数
);

4.1.4 索引映射

  • 可以创建索引,并创建其文档类型的映射
1
2
3
4
5
6
7
8
9
request.mapping(
"{\n" +
" "properties": {\n" +
" "message": {\n" +
" "type": "text"\n" +
" }\n" +
" }\n" +
"}", //<1> 要定义的类型
XContentType.JSON); //<2> 此类型的映射,作为 JSON 字符串提供

<1>要定义的类型
<2>此类型的映射,作为 JSON 字符串提供

  • 除了上面显示的示例之外,还可以以不同的方式提供映射源: String
1
2
3
4
5
6
7
8
9
10
Map<String, Object> message = new HashMap<>();
message.put("type", "text");

Map<String, Object> properties = new HashMap<>();
properties.put("message", message);

Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);

request.mapping(mapping); //接受map的映射集合,自动转为 json

提供自动转换为 JSON 格式的映射源 Map

这种方式多层嵌套,在使用过程中注意嵌套,上面标签嵌套: properties -> message -> type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
XContentBuilder builder = XContentFactory.jsonBuilder(); // 使用XContentBuilder内容生成器
builder.startObject();
{
builder.startObject("properties");
{
builder.startObject("message");
{
builder.field("type", "text");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();

映射作为对象提供的源,弹性搜索内置帮助器,用于生成 JSON 内容 XContentBuilder

4.1.5 索引别名

  • 可以在索引创建时设置别名
1
request.alias(new Alias("twitter_alias").filter(QueryBuilders.termQuery("user", "kimchy"))); //要定义的别名

4.1.6 提供整个源

  • 前面我们都是一步一步的设置的,其实也可以提供整个源,包括其所有部分(映射、设置和别名):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
request.source("{\n" +
" \"settings\" : {\n" +
" \"number_of_shards\" : 1,\n" +
" \"number_of_replicas\" : 0\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"message\" : { \"type\" : \"text\" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : {\n" +
" \"twitter_alias\" : {}\n" +
" }\n" +
"}", XContentType.JSON);

作为 JSON 字符串提供的源。它也可以作为 或 提供。MapXContentBuilder

4.1.7 可选参数

  • 可以选择提供以下参数:
1
request.setTimeout(TimeValue.timeValueMinutes(2));

超时以等待所有节点将索引创建确认为 TimeValue

1
request.setMasterTimeout(TimeValue.timeValueMinutes(1));

以作为 TimeValue

1
2
request.waitForActiveShards(ActiveShardCount.from(2));
request.waitForActiveShards(ActiveShardCount.DEFAULT);

在创建索引 API 返回响应之前等待的活动分片副本数,作为 in t
在创建索引 API 返回响应之前等待的活动分片副本数,作为 ActiveShardCount

4.1.8 同步执行

  • 以下列方式执行 时,客户端将等待 返回 ,然后再继续执行代码: CreateIndexRequest CreateIndexResponse
1
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);

同步调用可能会引发 在高级 REST 客户端中无法解析 REST 响应、请求会发出时间或类似情况下没有从服务器返回的响应的情况下。 IOException

在服务器返回 或 错误代码的情况下,高级客户端尝试分析响应正文错误详细信息,然后引发泛型,并将原始代码添加为抑制异常。 4xx 5xx ElasticsearchExceptionResponseException

4.1.9 异步执行

  • 也可以以异步方式执行 ,以便客户端可以直接返回。用户需要指定如何通过将请求和侦听器传递到异步创建索引方法来处理响应或潜在故障: CreateIndexRequest
1
client.indices().createAsync(request, RequestOptions.DEFAULT, listener);

执行完成时要执行和要使用的 CreateIndexRequest ActionListener

  • 异步方法不会阻止并立即返回。完成后,如果执行成功完成,则使用 onResponse 方法调用 ,如果执行失败,则使用 onFailure 该方法。失败方案和预期异常与同步执行案例相同。 ActionListener


典型的侦听器如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
ActionListener<CreateIndexResponse> listener =
new ActionListener<CreateIndexResponse>() {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
//成功执行时调用。
}

@Override
public void onFailure(Exception e) {
//当整个失败时调用
}
};

4.1.10 创建索引响应

  • 返回的允许检索有关执行操作的信息,如下所示: CreateIndexResponse
1
2
boolean acknowledged = createIndexResponse.isAcknowledged(); // <1>
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); // <2>

<1> 指示所有节点是否都已确认请求

<2> 指示在计时之前是否为索引中的每个分片启动所需的分片副本数

4.2 删除索引(Delete Index Api)

4.2.1 案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 删除索引.
* url:https://i-code.online/
* @param args
*/
public static void main(String[] args) {
//1. 创建客户端
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("127.0.0.1",9200)));
//2. 创建DeleteIndexRequest 接受 index(索引名) 参数
DeleteIndexRequest request = new DeleteIndexRequest("student");
//超时以等待所有节点确认索引删除 参数为 TimeValue 类型
request.timeout(TimeValue.timeValueMinutes(1));
//连接master节点的超时时间(使用TimeValue方式)
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
try {
// 调用delete
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
System.out.printf("isAcknowledged:%s", response.isAcknowledged());
} catch (IOException e) {
e.printStackTrace();
}
}

4.2.2 删除索引请求

  • 需要参数: DeleteIndexRequestindex
1
DeleteIndexRequest request = new DeleteIndexRequest("posts");//<1> <1> 索引(index)名

<1> 索引(index)名

4.2.3 可选参数

  • 可以选择提供以下参数:
1
2
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");

超时以等待所有节点确认索引删除为 TimeValue 类型

超时以等待所有节点确认索引删除为 String 类型

1
2
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));//连接master节点的超时时间(使用TimeValue方式)
request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)

连接master节点的超时时间(使用TimeValue方式)
连接master节点的超时时间(使用字符串方式)

1
request.indicesOptions(IndicesOptions.lenientExpandOpen());

设置控制如何解析不可用的索引以及如何展开通配符表达式IndicesOptions

4.2.4 同步执行

  • 以下列方式执行 DeleteIndexRequest 时,客户端将等待 DeleteIndexResponse 返回 ,然后再继续执行代码: DeleteIndexRequest DeleteIndexResponse
1
AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);

同步调用可能会引发 在高级 REST 客户端中无法解析 REST 响应、请求会发出时间或类似情况下没有从服务器返回的响应的情况下。 IOException

在服务器返回 或 错误代码的情况下,高级客户端尝试分析响应正文错误详细信息,然后引发泛型,并将原始代码添加为抑制异常。4xx 5xx ElasticsearchExceptionResponseException

4.2.5 异步执行

  • 也可以以异步方式执行 ,以便客户端可以直接返回。用户需要指定如何通过将请求和侦听器传递到异步删除索引方法来处理响应或潜在故障: DeleteIndexRequest
1
client.indices().deleteAsync(request, RequestOptions.DEFAULT, listener); //<1>

<1> 执行完成时要执行和要使用的DeleteIndexRequest ActionListener


异步方法不会阻止并立即返回。完成后,如果执行成功完成,则使用 ActionListener#onResponse 方法调用 ,如果执行失败,则使用 ActionListener# onFailure 该方法。失败方案和预期异常与同步执行案例相同。


典型的侦听器如下所示:delete-index

1
2
3
4
5
6
7
8
9
10
11
12
ActionListener<AcknowledgedResponse> listener =
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse deleteIndexResponse) {
//成功执行时调用。
}

@Override
public void onFailure(Exception e) {
//当整个失败时调用。DeleteIndexRequest
}
};

4.2.6 删除索引响应

  • 返回的允许检索有关执行操作的信息,如下所示: DeleteIndexResponse
1
boolean acknowledged = deleteIndexResponse.isAcknowledged(); //<1> 指示所有节点是否都已确认请求

<1> 指示所有节点是否都已确认请求

  • 如果未找到索引,将引发 : ElasticsearchException
1
2
3
4
5
6
7
8
try {
DeleteIndexRequest request = new DeleteIndexRequest("does_not_exist");
client.indices().delete(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) {
//如果未找到要删除的索引,则进行""
}
}

如果未找到要删除的索引,则进行””

4.3 索引存在(Index Exists Api)

4.3.1 案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 索引是否存在Api
* url:www.i-code.online
* @param args
*/
public static void main(String[] args) {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("127.0.0.1",9200)));
//创建请求
GetIndexRequest request = new GetIndexRequest("student");

//<1> 是返回本地信息还是从主节点检索状态
request.local(false);
//<2> 返回结果为适合人类的格式
request.humanReadable(true);
try {
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
System.out.println(exists);
} catch (IOException e) {
e.printStackTrace();
}
}

4.3.2 索引存在请求

  • 高级 REST 客户端使用 “Index Exists API”。索引名称是必需的。 GetIndexRequest
1
GetIndexRequest request = new GetIndexRequest("twitter"); //<1> index 名称

<1> index 名称

4.3.3 可选参数

  • 索引存在 API 还接受以下可选参数,通过 : GetIndexRequest
1
2
3
4
request.local(false);//<1> 是返回本地信息还是从主节点检索状态
request.humanReadable(true); //<2> 返回结果为适合人类的格式
request.includeDefaults(false); //<3> 是否返回每个索引的所有默认设置
request.indicesOptions(indicesOptions); //<4> 控制如何解析不可用的索引以及如何展开通配符表达式

<1> 是返回本地信息还是从主节点检索状态
<2> 返回结果为适合人类的格式
<3> 是否返回每个索引的所有默认设置
<4> 控制如何解析不可用的索引以及如何展开通配符表达式

4.3.4 同步执行

  • 以下列方式执行 时,客户端将等待 返回 ,然后再继续执行代码: GetIndexRequest boolean
1
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);

与其他同步的相同

4.3.5 异步执行

-

  • 也可以以异步方式执行 ,以便客户端可以直接返回。用户需要指定如何通过将请求和侦听器传递到异步索引存在的方法来处理响应或潜在故障: GetIndexRequest
1
client.indices().existsAsync(request, RequestOptions.DEFAULT, listener);//<1>执行完成时要执行和要使用的 GetIndexRequest  ActionListener

<1>执行完成时要执行和要使用的 GetIndexRequest ActionListener

异步的处理逻辑与其他异步的相同,都是实现 ActionListener 的方法

4.3.6 响应

  • 响应是一个值,指示索引(或索引)是否存在。 boolean

5. 文档 Api (Document APIs)

5.1 索引 API (Index Api)

5.1.1 案例:

  • 添加记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void test02() {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("127.0.0.1",9200)));
//创建请求, 参数index名称
IndexRequest request = new IndexRequest("student");
//请求的模式,CREATE: 创建模式,如果已经存在则报错 Index:存在则不再创建,也不报错
request.opType(DocWriteRequest.OpType.INDEX);
String json = "{\n" +
" \"id\": 12,\n" +
" \"name\": \"admin\",\n" +
" \"content\": \"步的处理逻辑与其他异步的相同,都是实现ActionListener 的方法\"\n" +
"}";
request.id("1").source(
json,
XContentType.JSON
);
IndexResponse indexResponse = null;
try {
//调用 index 方法
indexResponse = client.index(request, RequestOptions.DEFAULT);
System.out.println(indexResponse.getVersion());
System.out.println(indexResponse.getIndex());
System.out.println(indexResponse.getId());
System.out.println(indexResponse.status());
} catch (ElasticsearchStatusException | IOException e) {
e.printStackTrace();
}
}

5.1.2 索引请求

  • 需要以下参数: IndexRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
//创建请求, 参数index名称
IndexRequest request = new IndexRequest("student"); //<1> index 名称
String json = "{\n" +
" \"id\": 12,\n" +
" \"name\": \"admin\",\n" +
" \"content\": \"步的处理逻辑与其他异步的相同,都是实现ActionListener 的方法\"\n" +
"}";
request
.id("1") // <2> 指定文档 ID
.source(
json,
XContentType.JSON // <3> 指定参数类型,json
);

<1> index 名称指数

<2> 请求的文档 ID

<3> 指定参数类型,json


提供文档源

  • 除了上面显示的示例之外,还可以以不同的方式提供文档源: String
1
2
3
4
5
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("id", 1);
jsonMap.put("name", "Admin);
jsonMap.put("content", "步的处理逻辑与其他异步的相同");
IndexRequest indexRequest = new IndexRequest("student").id("1").source(jsonMap);

文档源作为 自动转换为 JSON 格式的 Map

1
2
3
4
5
6
7
8
9
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("id", 1);
builder.field("name", "admin);
builder.field("content", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("student").id("1").source(builder);

文档源作为对象提供,弹性搜索内置帮助器生成 JSON 内容 XContentBuilder

1
2
3
4
5
IndexRequest indexRequest = new IndexRequest("student")
.id("1")
.source("id", 1,
"name", "admin",
"content", "trying out Elasticsearch");

作为密钥对提供的文档源,该源将转换为 JSON 格式Object

5.1.3 可选参数

  • 可以选择提供以下参数:
1
request.routing("routing"); //<1>

<1> 路由值

1
2
request.timeout(TimeValue.timeValueSeconds(1)); //<1>
request.timeout("1s"); // <2>

<1> 超时以等待主分片 作为 TimeValue
<2> 超时以等待主分片 作为 String

1
2
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //<1>
request.setRefreshPolicy("wait_for"); //<2>

<1> 将策略刷新 为实例 WriteRequest.RefreshPolicy

<2> 将策略刷新 为 String

1
request.version(2);

版本

1
request.versionType(VersionType.EXTERNAL); //版本类型

版本类型

1
2
request.opType(DocWriteRequest.OpType.CREATE);//<1>
request.opType("create");//<2>

<1> 作为值提供的操作类型 DocWriteRequest.OpType

<2> 提供的操作类型可以是 或(默认) String create index

1
request.setPipeline("pipeline");

在索引文档之前要执行的包含管道的名称

5.1.4 同步执行

  • 以下列方式执行 时,客户端将等待 返回 ,然后再继续执行代码: IndexRequest IndexResponse
1
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
  • 同步调用可能会引发 在高级 REST 客户端中无法解析 REST 响应、请求会发出时间或类似情况下没有从服务器返回的响应的情况下。 IOException
  • 在服务器返回 或 错误代码的情况下,高级客户端尝试分析响应正文错误详细信息,然后引发泛型,并将原始代码添加为抑制异常。 4xx 5xx ElasticsearchExceptionResponseException

5.1.5 异步执行

  • 也可以以异步方式执行 ,以便客户端可以直接返回。用户需要指定如何通过将请求和侦听器传递到异步索引方法来处理响应或潜在故障: IndexRequest
1
client.indexAsync(request, RequestOptions.DEFAULT, listener); //<1>

<1> 执行完成时要执行和要使用的 IndexRequest ActionListener

  • 异步方法不会阻止并立即返回。完成后,如果执行成功完成,则使用 方法调用 ,如果执行失败,则使用 该方法。失败方案和预期异常与同步执行案例相同。 ActionListener onResponse onFailure
  • 典型的侦听器如下所示:index
1
2
3
4
5
6
7
8
9
10
11
listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
//<1> 成功执行时调用。
}

@Override
public void onFailure(Exception e) {
//<2> 当整个失败时调用。IndexRequest
}
};

<1> 成功执行时调用。

<2> 当整个失败时调用。> IndexRequest

5.1.6 索引响应

  • 返回的允许检索有关执行操作的信息,如下所示: IndexResponse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String index = indexResponse.getIndex();
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
//<1>
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//<2>
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
// <3>
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason(); //<4>
}
}

<1> 处理(如果需要)首次创建文档的情况

<2> 处理(如果需要)文档被重写的情况,因为它已经存在

<3> 处理成功分片数少于总分片的情况

<4> 处理潜在的故障

  • 如果存在版本冲突,将引发 : ElasticsearchException
1
2
3
4
5
6
7
8
9
10
11
12
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
//<1>
}
}

<1> 引发异常指示返回版本冲突错误

  • 在设置为且已存在具有相同索引和 ID 的文档的情况下,将发生相同的情况: opTypecreate
1
2
3
4
5
6
7
8
9
10
11
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
//<1>
}
}

<1>引发异常指示返回版本冲突错误


5.2 获取Api (Get API)

5.2.1 案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static void test01(){
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("127.0.0.1",9200)));
GetRequest request = new GetRequest("student");
// 为特定字段 配置 源包含
String[] includs = {"name","id","content"};
String[] excluds = {"id"};
FetchSourceContext context = new FetchSourceContext(true,includs,excluds);

request.id("1").version(2).fetchSourceContext(context);
try {
GetResponse documentFields = client.get(request, RequestOptions.DEFAULT);
if (documentFields.isExists()) {
//检索名称
System.out.println(documentFields.getIndex());
// 获取文档源的 Map 结果
System.out.println(documentFields.getSource());
// 获取源作为 Map
System.out.println(documentFields.getSourceAsMap());
// 获取源作为 bytes
System.out.println(documentFields.getSourceAsBytes());
}else {
System.out.println("不错在该数据");
}

} catch (IOException e) {
e.printStackTrace();
}
}

5.2.2 获取请求

  • 需要以下参数:GetRequest
1
2
3
GetRequest getRequest = new GetRequest(
"posts", //<1>
"1"); //<1>

<1> 索引名称

<2> 文档 ID

5.2.3 可选参数

  • 可以选择提供以下参数:
1
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

禁用源检索,默认情况下启用

1
2
3
4
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);

为特定字段 配置 源包含

includes : 检索结果所包含的字段
excludes : 检索结果排除的字段

1
2
3
4
5
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);

为特定字段配置源排除

1
2
3
request.storedFields("message");
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
String message = getResponse.getField("message").getValue();

为特定存储字段配置检索(要求字段单独存储在映射中)

检索存储的字段(要求该字段单独存储在映射中)message

1
request.routing("routing");

路由值

1
request.preference("preference");

首选项值

1
request.realtime(false);

将实时标志设置为(默认情况下)falsetrue

1
request.refresh(true);

在检索文档之前执行刷新(默认情况下)false

1
request.version(2);

版本

1
request.versionType(VersionType.EXTERNAL);

版本类型

5.2.4 同步执行

  • 以下列方式执行 时,客户端将等待 返回 ,然后再继续执行代码: GetRequest GetResponse
1
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
  • 同步调用可能会引发 在高级 REST 客户端中无法解析 REST 响应、请求会发出时间或类似情况下没有从服务器返回的响应的情况下。IOException

  • 在服务器返回 或 错误代码的情况下,高级客户端尝试分析响应正文错误详细信息,然后引发泛型,并将原始代码添加为抑制异常。4xx5xxElasticsearchExceptionResponseException

5.2.5 异步执行

  • 也可以以异步方式执行 ,以便客户端可以直接返回。用户需要指定如何通过将请求和侦听器传递到异步获取方法来处理响应或潜在故障: GetRequest
1
client.getAsync(request, RequestOptions.DEFAULT, listener);

执行完成时要执行和要使用的 GetRequest ActionListener

  • 异步方法不会阻止并立即返回。完成后,如果执行成功完成,则使用 方法调用 ,如果执行失败,则使用 该方法。失败方案和预期异常与同步执行案例相同。ActionListeneronResponseonFailure

  • 典型的侦听器如下所示: get

1
2
3
4
5
6
7
8
9
10
11
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
//成功执行时调用
}

@Override
public void onFailure(Exception e) {
//当整个失败时调用。GetRequest
}
};

5.2.6 获取响应

  • 返回的允许检索请求的文档及其元数据和最终存储的字段。 GetResponse
1
2
3
4
5
6
7
8
9
10
String index = getResponse.getIndex();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString(); // <1>
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // <2>
byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // <3>
} else {
// <4>
}

<1> 将文档检索为 String

<2> 将文档检索为 Map<String, Object>

<3> 将文档检索为 byte[]

<4> 处理找不到文档的方案。请注意,尽管返回的响应具有状态代码,但返回的是有效的,而不是引发异常。此类响应不保存任何源文档,其方法将返回。 404 GetResponseisExistsfalse

  • 当对不存在的索引执行 get 请求时,响应具有状态代码,即需要按如下方式处理的已引发请求: 404 ElasticsearchException
1
2
3
4
5
6
7
8
GetRequest request = new GetRequest("does_not_exist", "1");
try {
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
//<1> 处理引发异常,因为索引不存在
}
}

<1> 处理引发异常,因为索引不存在

  • 如果请求了特定的文档版本,并且现有文档具有不同的版本号,则引发版本冲突:
1
2
3
4
5
6
7
8
try {
GetRequest request = new GetRequest("posts", "1").version(2);
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
// <1>
}
}

<1> 引发异常指示返回版本冲突错误


6. 结语

其实很多 Api 的使用都是类似相同的,这里我们不再对其他 Api 进行解析,需要了解的完全可以去光网文档查看,文档地址在问上涨上面有。


微信公众号


欢迎关注公众号“云栖简码”