搭建 Restful Web 服务

1. 理解 REST

  REST 全称是 Representational State Transfer,中文意思是表征性状态转移。它首次出现在2000年Roy Fielding的博士论文中,Roy Fielding是HTTP规范的主要编写者之一。值得注意的是REST并没有一个明确的标准,而更像是一种设计的风格。如果一个架构符合REST的约束条件和原则,我们就称它为RESTful架构。

  理论上REST架构风格并不是绑定在HTTP上,只不过目前HTTP是唯一与REST相关的实例。

1.1. REST 原则

  • 资源 可通过目录结构样式的 URIs 暴露

  • 表述 可以通过 JSON 或 XML 表达的数据对象或属性来传递

  • 消息 使用统一的 HTTP 方法(例如:GET、POST、PUT 和 DELETE)

  • 无状态 客户端与服务端之间的交互在请求之间是无状态的,从客户端到服务端的每个请求都必须包含理解请求所必需的信息

    1.2. HTTP 方法

      使用 HTTP 将 CRUD(create, retrieve, update, delete <创建、获取、更新、删除—增删改查>)操作映射为 HTTP 请求。如果按照HTTP方法的语义来暴露资源,那么接口将会拥有安全性和幂等性的特性,例如GET和HEAD请求都是安全的, 无论请求多少次,都不会改变服务器状态。而GET、HEAD、PUT和DELETE请求都是幂等的,无论对资源操作多少次, 结果总是一样的,后面的请求并不会产生比第一次更多的影响。

    1.2.1. GET

  • 安全且幂等

  • 获取信息

    1.2.2. POST

  • 不安全且不幂等

  • 使用请求中提供的实体执行操作,可用于创建资源或更新资源

    1.2.3. PUT

  • 不安全但幂等

  • 使用请求中提供的实体执行操作,可用于创建资源或更新资源

    1.2.4. DELETE

  • 不安全但幂等

  • 删除资源
      POST和PUT在创建资源的区别在于,所创建的资源的名称(URI)是否由客户端决定。 例如为我的博文增加一个java的分类,生成的路径就是分类名/categories/java,那么就可以采用PUT方法。不过很多人直接把POST、GET、PUT、DELETE直接对应上CRUD,例如在一个典型的rails实现的RESTful应用中就是这么做的。

    1.3. HTTP status codes

      状态码指示 HTTP 请求的结果:

  • 1XX:信息

  • 2XX:成功

  • 3XX:转发

  • 4XX:客户端错误

  • 5XX:服务端错误

    1.4. 媒体类型

      HTTP头中的 Accept 和 Content-Type 可用于描述HTTP请求中发送或请求的内容。如果客户端请求JSON响应,那么可以将 Accept 设为 application/json。相应地,如果发送的内容是XML,那么可以设置 Content-Type 为 application/xml 。

    2. REST API 设计最佳实践

      这里介绍一些设计 REST API 的最佳实践,大家先记住下面这句话:

    URL 是个句子,其中资源是名词、HTTP 方法是动词。

    2.1. 使用名词来表示资源

      下面是一些例子:

  • GET – /users:返回用户列表

  • GET – /users/100:返回一个特定用户

  • POST – /users:创建一个新用户

  • PUT – /users/200:更新一个特定用户

  • DELETE – /users/711:删除一个特定用户
      不要使用动词:

  • /getAllsers

  • /getUserById

  • /createNewUser

  • /updateUser

  • /deleteUser

    2.2 在 HTTP 头中使用适当的序列化格式

      客户端和服务端都需要知道通信所用的格式,这个格式要在 HTTP 头中指定:

  • Content-Type 定义请求格式

  • Accept 定义一个可接受的响应格式列表

    2.3 Get 方法和查询参数不应当改变状态

      使用 PUT, POST 和 DELETE 方法来改变状态,不要使用 GET 方法来改变状态:

  • GET /users/711?activate

  • GET /users/711/activate

    2.4. 使用子资源表示关联

      如果一个资源与另一个资源关联,使用子资源:

  • GET /cars/711/drivers/ 返回711号汽车的驾驶员列表

  • GET /cars/711/drivers/4 返回711号汽车的第4号驾驶员

    2.5. 使用适当的 HTTP 方法 (动词)

      再回顾一下这句话:

    URL 是个句子,其中资源是名词、HTTP 方法是动词。

  • GET:获取在URI资源中指定的表述,响应消息体包含所请求资源的细节。

  • POST:创建一个URI指定的新资源,请求消息体提供新资源的细节。注意,POST也可以触发一些操作,而不一定是要创建新资源。

  • PUT:创建或替代指定URI的资源。请求消息体指定要创建或更新的资源。

  • DELETE:移除指定URI的资源。

    2.6. HTTP 响应状态码

      当客户端通过API向服务端发起一个请求时,客户端应当知道反馈:是否失败、通过或者请求错误。HTTP 状态码是一批标准化代码,在不同的场景下有不同的解释。服务器应当总是返回正确的状态码。
      下面是重要的HTTP代码分类:

  • 2xx (成功分类):这些状态码代码请求动作被接收且被服务器成功处理。

    • 200:Ok 表示 GET、PUT 或 POST 请求的标准状态码。
    • 201:Created(已创建)表示实例已被创建,多用于 POST 方法。
    • 204:No Content(无内容)表示请求已被成功处理但没有返回任何内容。常用于 DELETE 方法返回。
  • 3xx (转发分类)

    • 304:Not Modified(无修改)表示客户端已经缓存此响应,无须再次传输相同内容。
  • 4xx (客户端错误分类):这些状态码代表客户端提交了一个错误请求。

    • 400:Bad Request(错误请求)表示客户端请求没被处理,因为服务端无法理解客户端请求。
    • 401:Unauthorized(无授权)表示客户端无权访问资源,应当加上所需的认证信息后再次请求。
    • 403:Forbidden(禁止访问)表示请求有效且客户端已获授权,但客户端无权访问该资源。
    • 404:Not Found(没发现)表示所请求的资源现在不可用。
    • 410:Gone(移除)表示所请求的资源已被移除。
  • 5xx (服务端错误分类)

    • 500:Internal Server Error(内部服务器错误)表示请求有效,但是服务端发生了异常。
    • 503:Service Unavailable(服务不可用)表示服务器关闭或不可用,通常是指服务器处于维护状态。

      2.7. 名称规约

        你可以遵循任何名称规约,只要保持跨应用一致性即可。如果请求体和响应体是 JSON 类型,那么请遵循驼峰名称规约。

      2.8. 搜索、排序、过滤与分页

        上面一些示例都是在一个数据集上的简单查询,对于复杂的数据,我们需要在 GET 方法 API 上加一些参数来处理。下面是一些示例:

  • 排序:这个例子中,客户想获取排序的公司列表,GET /companies 应当在查询时接受多种排序参数。譬如 GET /companies?sort=rank_asc 将以等级升序的方式对公司进行排序。

  • 过滤:要过滤数据集,我们可以通过查询参数传递不同的选项。比如 GET /companies?category=banking&location=india 将过滤分类为银行且位于印度的公司。

  • 搜索:在公司列表中搜索公司名的 API 端点应当是 GET /companies?search=Digital。

  • 分页:当数据集太大时,我们应当将数据集分割成小的数据块,这样有利于提升服务端性能,也方便客户端处理响应。如 GET /companies?page=23 意味着获取公司列表的第 23 页数据。

    2.9. Restful API 版本

      一般使用不带点的简单数字表示版本,数字前加字母v代表版本号,如下所示:

  • /blog/api/v1

  • http://api.yourservice.com/v1/companies/34/employees

    2.10. 处理 JSON 错误体

      API 错误处理机制是很重要的,而且要好好规划。极力推荐总是在返回字段中包含错误消息。一个 JSON 错误体应当为开发者提供一些有用的信息:错误消息、错误代码以及详细描述。下面是一个较好的示例:

    {
    "code": 1234,
    "message": "Something bad happened :(",
    "description": "More details about the error here"
    }

    2.11. 如何创建 Rest API URL

      推荐使用下面格式的 URL:

  • http(s)://{域名(:端口号)}/{表示REST API的值}/{API版本}/{识别资源的路径}

  • http(s)://{表示REST API的域名(:端口号)}/{API 版本}/{识别资源的路径}
      如下所示:

  • http://example.com/api/v1/members/M000000001

  • http://api.example.com/v1/members/M000000001

    3. 开发基于 Spring Boot 的 Restful Web 服务

      Spring Boot 提供了构建企业应用中 RESTful Web 服务的极佳支持。

    3.1. 引入依赖

      要构建 RESTful Web 服务,我们需要在构建配置文件中加上 Spring Boot Starter Web 依赖。
      对于 Maven 用户,使用以下的代码在 pom.xml 文件中加入依赖:

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>    
    </dependency>

      对于 Gradle 用户,使用以下的代码在 build.gradle 文件中加入依赖:

    compile('org.springframework.boot:spring-boot-starter-web')

    3.2. Rest 相关注解

      在继续构建 RESTful web 服务前,建议你先要熟悉下面的注解:

    Rest Controller

      @RestController 注解用于定义 RESTful web 服务。它提供 JSON、XML 和自定义响应。语法如下所示:

    @RestController
    public class ProductServiceController {
    }

    Request Mapping

      @RequestMapping 注解用于定义请求 URI 以访问 REST 端点。我们可以定义 Request 方法来消费 produce 对象。默认的请求方法是 GET:

    @RequestMapping(value = "/products")
    public ResponseEntity<Object> getProducts() { }
    Request Body
    @RequestBody 注解用于定义请求体内容类型。
    public ResponseEntity<Object> createProduct(@RequestBody Product product) {
    }

    Path Variable

      @PathVariable 注解被用于定义自定义或动态的请求 URI,Path variable 被放在请求 URI 中的大括号内,如下所示:

    public ResponseEntity<Object> updateProduct(@PathVariable("id") String id) {
    }

    Request Parameter

      @RequestParam 注解被用于从请求 URL 中读取请求参数。缺省情况下是必须的,也可以为请求参数设置默认值。如下所示:
    public ResponseEntity getProduct(
    @RequestParam(value = "name", required = false, defaultValue = "honey") String name) {
    }

    3.3. 编写 REST API

    GET API

      下面的示例代码定义了 HTTP GET 请求方法。在这个例子里,我们使用 HashMap 来在存储 Product。注意我们使用了 POJO 类来存储产品。
      在这里,请求 URI 是 /products,它会从 HashMap 仓储中返回产品列表。下面的控制器类文件包含了 GET 方法的 REST 端点:

    package com.tutorialspoint.demo.controller;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.tutorialspoint.demo.model.Product;
    
    @RestController
    public class ProductServiceController {
       private static Map<String, Product> productRepo = new HashMap<>();
       static {
          Product honey = new Product();
          honey.setId("1");
          honey.setName("Honey");
          productRepo.put(honey.getId(), honey);
    
          Product almond = new Product();
          almond.setId("2");
          almond.setName("Almond");
          productRepo.put(almond.getId(), almond);
       }
       @RequestMapping(value = "/products")
       public ResponseEntity<Object> getProduct() {
          return new ResponseEntity<>(productRepo.values(), HttpStatus.OK);
       }
    }

    POST API

      HTTP POST 请求用于创建资源。这个方法包含请求体。我们可以通过发送请求参数和路径变量来定义自定义或动态 URL。
      下面的示例代码定义了 HTTP POST 请求方法。在这个例子中,我们使用 HashMap 来存储 Product,这里产品是一个 POJO 类。
      这里,请求 URI 是 /products,在产品被存入 HashMap 仓储后,它会返回字符串。

    package com.tutorialspoint.demo.controller;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.tutorialspoint.demo.model.Product;
    
    @RestController
    public class ProductServiceController {
       private static Map<String, Product> productRepo = new HashMap<>();
    
       @RequestMapping(value = "/products", method = RequestMethod.POST)
       public ResponseEntity<Object> createProduct(@RequestBody Product product) {
          productRepo.put(product.getId(), product);
          return new ResponseEntity<>("Product is created successfully", HttpStatus.CREATED);
       }
    }

    PUT API

      HTTP PUT 请求用于更新已有的资源。这个方法包含请求体。我们可以通过发送请求参数和路径变量来定义自定义或动态 URL。
      下面的例子展示了如何定义 HTTP PUT 请求方法。在这个例子中,我们使用 HashMap 更新现存的产品。此处,产品是一个 POJO 类。
      这里,请求 URI 是 /products/{id},在产品被存入 HashMap 仓储后,它会返回字符串。注意我们使用路径变量 {id} 定义需要更新的产品 ID:

    package com.tutorialspoint.demo.controller;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    import com.tutorialspoint.demo.model.Product;
    
    @RestController
    public class ProductServiceController {
       private static Map<String, Product> productRepo = new HashMap<>();
    
       @RequestMapping(value = "/products/{id}", method = RequestMethod.PUT)
       public ResponseEntity<Object> updateProduct(@PathVariable("id") String id, @RequestBody Product product) {
          productRepo.remove(id);
          product.setId(id);
          productRepo.put(id, product);
          return new ResponseEntity<>("Product is updated successsfully", HttpStatus.OK);
       }   
    }

    DELETE API

      HTTP Delete 请求用于删除存在的资源。这个方法不包含任何请求体。我们可以通过发送请求参数和路径变量来定义自定义或动态 URL。
      下面的例子展示如何定义 HTTP DELETE 请求方法。这个例子中,我们使用 HashMap 来移除现存的产品,用 POJO 来表示。
      请求 URI 是 /products/{id} 在产品被从 HashMap 仓储中删除后,它会返回字符串。 我们使用路径变量 {id} 来定义要被删除的产品 ID。

    package com.tutorialspoint.demo.controller;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.tutorialspoint.demo.model.Product;
    
    @RestController
    public class ProductServiceController {
       private static Map<String, Product> productRepo = new HashMap<>();
    
       @RequestMapping(value = "/products/{id}", method = RequestMethod.DELETE)
       public ResponseEntity<Object> delete(@PathVariable("id") String id) {
          productRepo.remove(id);
          return new ResponseEntity<>("Product is deleted successsfully", HttpStatus.OK);
       }
    }

    Java 项目中使用 Resilience4j 框架实现隔断机制/断路器


    到目前为止,在本系列中,我们已经了解了 Resilience4j 及其 Retry, RateLimiter, TimeLimiter, 和 Bulkhead 模块。在本文中,我们将探索 CircuitBreaker 模块。我们将了解何时以及如何使用它,并查看一些示例。

    代码示例

    本文附有 GitHub 上的工作代码示例。

    什么是 Resilience4j?

    请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

    什么是断路器?

    断路器的思想是,如果我们知道调用可能会失败或超时,则阻止对远程服务的调用。我们这样做是为了不会在我们的服务和远程服务中不必要地浪费关键资源。这样的退出也给了远程服务一些时间来恢复。

    我们怎么知道一个调用可能会失败? 通过跟踪对远程服务发出的先前请求的结果。例如,如果前 10 次调用中有 8 次导致失败或超时,则下一次调用也可能会失败。

    断路器通过包装对远程服务的调用来跟踪响应。在正常运行期间,当远程服务成功响应时,我们说断路器处于“闭合”状态。当处于关闭状态时,断路器正常将请求传递给远程服务。

    当远程服务返回错误或超时时,断路器会增加一个内部计数器。如果错误计数超过配置的阈值,断路器将切换到“断开”状态。当处于断开状态时,断路器立即向调用者返回错误,甚至无需尝试远程调用。

    经过一段配置的时间后,断路器从断开状态切换到“半开”状态。在这种状态下,它允许一些请求传递到远程服务以检查它是否仍然不可用或缓慢。 如果错误率或慢呼叫率高于配置的阈值,则切换回断开状态。但是,如果错误率或慢呼叫率低于配置的阈值,则切换到关闭状态以恢复正常操作。

    断路器的类型

    断路器可以基于计数或基于时间。如果最后 N 次调用失败或缓慢,则基于计数的断路器将状态从关闭切换为断开。如果最后 N 秒的响应失败或缓慢,则基于时间的断路器将切换到断开状态。在这两个断路器中,我们还可以指定失败或慢速调用的阈值。

    例如,如果最近 25 次调用中有 70% 失败或需要 2 秒以上才能完成,我们可以配置一个基于计数的断路器来“断开电路”。同样,如果过去 30 秒内 80% 的调用失败或耗时超过 5 秒,我们可以告诉基于时间的断路器断开电路。

    Resilience4j 的 CircuitBreaker 概念

    resilience4j-circuitbreaker 的工作原理与其他 Resilience4j 模块类似。我们提供想要作为函数构造执行的代码——一个进行远程调用的 lambda 表达式或一个从远程服务中检索到的某个值的 Supplier,等等——并且断路器用代码修饰它 如果需要,跟踪响应并切换状态。

    Resilience4j 同时支持基于计数和基于时间的断路器。

    我们使用 slidingWindowType() 配置指定断路器的类型。此配置可以采用两个值之一 –
    SlidingWindowType.COUNT_BASED
    SlidingWindowType.TIME_BASED

    failureRateThreshold()slowCallRateThreshold() 以百分比形式配置失败率阈值和慢速调用率。

    slowCallDurationThreshold() 以秒为单位配置调用被认为慢的时间。

    我们可以指定一个 minimumNumberOfCalls(),在断路器可以计算错误率或慢速调用率之前需要它。

    如前所述,断路器在一定时间后从断开状态切换到半断开状态,以检查远程服务的情况。waitDurationInOpenState() 指定断路器在切换到半开状态之前应等待的时间。

    permittedNumberOfCallsInHalfOpenState() 配置在半开状态下允许的调用次数,
    maxWaitDurationInHalfOpenState() 确定断路器在切换回开状态之前可以保持在半开状态的时间。

    此配置的默认值 0 意味着断路器将无限等待,直到所有
    permittedNumberOfCallsInHalfOpenState() 完成。

    默认情况下,断路器将任何异常视为失败。但是我们可以对此进行调整,以使用 recordExceptions() 配置指定应视为失败的异常列表和使用 ignoreExceptions() 配置忽略的异常列表。

    如果我们在确定异常应该被视为失败还是忽略时想要更精细的控制,我们可以提供 Predicate<Throwable> 作为 recordException()ignoreException() 配置。

    当断路器拒绝处于断开状态的呼叫时,它会抛出 CallNotPermittedException。我们可以使用 writablestacktraceEnabled() 配置控制 CallNotPermittedException 的堆栈跟踪中的信息量。

    使用 Resilience4j CircuitBreaker模块

    让我们看看如何使用
    resilience4j-circuitbreaker 模块中可用的各种功能。

    我们将使用与本系列前几篇文章相同的示例。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

    使用 Resilience4j 断路器时,CircuitBreakerRegistryCircuitBreakerConfigCircuitBreaker 是我们使用的主要抽象。

    CircuitBreakerRegistry 是用于创建和管理 CircuitBreaker 对象的工厂。

    CircuitBreakerConfig 封装了上一节中的所有配置。每个 CircuitBreaker 对象都与一个 CircuitBreakerConfig 相关联。

    第一步是创建一个 CircuitBreakerConfig

    CircuitBreakerConfig config = CircuitBreakerConfig.ofDefaults();

    这将创建一个具有以下默认值的 CircuitBreakerConfig:

    配置 默认值
    slidingWindowType COUNT_BASED
    failureRateThreshold 50%
    slowCallRateThreshold 100%
    slowCallDurationThreshold 60s
    minimumNumberOfCalls 100
    permittedNumberOfCallsInHalfOpenState 10
    maxWaitDurationInHalfOpenState 0s

    基于计数的断路器

    假设我们希望断路器在最近 10 次调用中有 70% 失败时断开:

    CircuitBreakerConfig config = CircuitBreakerConfig
      .custom()
      .slidingWindowType(SlidingWindowType.COUNT_BASED)
      .slidingWindowSize(10)
      .failureRateThreshold(70.0f)
      .build();

    然后我们用这个配置创建一个 CircuitBreaker

    CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
    CircuitBreaker circuitBreaker = registry.circuitBreaker("flightSearchService");

    现在让我们表达我们的代码以作为 Supplier 运行航班搜索并使用 circuitbreaker 装饰它:

    Supplier<List<Flight>> flightsSupplier =
      () -> service.searchFlights(request);
    Supplier<List<Flight>> decoratedFlightsSupplier =
      circuitBreaker.decorateSupplier(flightsSupplier);

    最后,让我们调用几次修饰操作来了解断路器的工作原理。我们可以使用 CompletableFuture 来模拟来自用户的并发航班搜索请求:

    for (int i=0; i<20; i++) {
      try {
        System.out.println(decoratedFlightsSupplier.get());
      }
      catch (...) {
        // Exception handling
      }
    }

    输出显示前几次飞行搜索成功,然后是 7 次飞行搜索失败。此时,断路器断开并为后续调用抛出 CallNotPermittedException

    Searching for flights; current time = 12:01:12 884
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    Searching for flights; current time = 12:01:12 954
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    Searching for flights; current time = 12:01:12 957
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    Searching for flights; current time = 12:01:12 958
    io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search
    ... stack trace omitted ...
    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    ... other lines omitted ...
    io.reflectoring.resilience4j.circuitbreaker.Examples.countBasedSlidingWindow_FailedCalls(Examples.java:56)
      at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:229)

    现在,假设我们希望断路器在最后 10 个调用中有 70% 需要 2 秒或更长时间才能完成:

    CircuitBreakerConfig config = CircuitBreakerConfig
      .custom()
      .slidingWindowType(SlidingWindowType.COUNT_BASED)
      .slidingWindowSize(10)
      .slowCallRateThreshold(70.0f)
      .slowCallDurationThreshold(Duration.ofSeconds(2))
      .build();

    示例输出中的时间戳显示请求始终需要 2 秒才能完成。在 7 次缓慢响应后,断路器断开并且不允许进一步调用:

    Searching for flights; current time = 12:26:27 901
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    Searching for flights; current time = 12:26:29 953
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    Searching for flights; current time = 12:26:31 957
    Flight search successful
    ... other lines omitted ...
    Searching for flights; current time = 12:26:43 966
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    ... stack trace omitted ...
            at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:231)
    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    ... stack trace omitted ...
            at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:231)

    通常我们会配置一个具有故障率和慢速调用率阈值的断路器:

    CircuitBreakerConfig config = CircuitBreakerConfig
      .custom()
      .slidingWindowType(SlidingWindowType.COUNT_BASED)
      .slidingWindowSize(10)
      .failureRateThreshold(70.0f)
      .slowCallRateThreshold(70.0f)
      .slowCallDurationThreshold(Duration.ofSeconds(2))
      .build();

    基于时间的断路器

    假设我们希望断路器在过去 10 秒内 70% 的请求失败时断开:

    CircuitBreakerConfig config = CircuitBreakerConfig
      .custom()
      .slidingWindowType(SlidingWindowType.COUNT_BASED)
      .slidingWindowSize(10)
      .failureRateThreshold(70.0f)
      .slowCallRateThreshold(70.0f)
      .slowCallDurationThreshold(Duration.ofSeconds(2))
      .build();

    我们创建了 CircuitBreaker,将航班搜索调用表示为 Supplier<List<Flight>> 并使用 CircuitBreaker 对其进行装饰,就像我们在上一节中所做的那样。

    以下是多次调用修饰操作后的示例输出:

    Start time: 18:51:01 552
    Searching for flights; current time = 18:51:01 582
    Flight search successful
    [Flight{flightNumber='XY 765', ... }]
    ... other lines omitted ...
    Searching for flights; current time = 18:51:01 631
    io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search
    ... stack trace omitted ...
    Searching for flights; current time = 18:51:01 632
    io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search
    ... stack trace omitted ...
    Searching for flights; current time = 18:51:01 633
    ... other lines omitted ...
    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    ... other lines omitted ...

    前 3 个请求成功,接下来的 7 个请求失败。此时断路器断开,后续请求因抛出 CallNotPermittedException 而失败。

    现在,假设我们希望断路器在过去 10 秒内 70% 的调用需要 1 秒或更长时间才能完成:

    CircuitBreakerConfig config = CircuitBreakerConfig
      .custom()
      .slidingWindowType(SlidingWindowType.TIME_BASED)
      .minimumNumberOfCalls(10)
      .slidingWindowSize(10)
      .slowCallRateThreshold(70.0f)
      .slowCallDurationThreshold(Duration.ofSeconds(1))
      .build();

    示例输出中的时间戳显示请求始终需要 1 秒才能完成。在 10 个请求(minimumNumberOfCalls)之后,当断路器确定 70% 的先前请求花费了 1 秒或更长时间时,它断开电路:

    Start time: 19:06:37 957
    Searching for flights; current time = 19:06:37 979
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 19:06:39 066
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 19:06:40 070
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 19:06:41 070
    ... other lines omitted ...
    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    ... stack trace omitted ...

    通常我们会配置一个具有故障率和慢速调用率阈值的基于时间的断路器:

    指定断开状态下的等待时间

    假设我们希望断路器处于断开状态时等待 10 秒,然后转换到半断开状态并让一些请求传递到远程服务:

    CircuitBreakerConfig config = CircuitBreakerConfig
      .custom()
      .slidingWindowType(SlidingWindowType.TIME_BASED)
      .slidingWindowSize(10)
      .minimumNumberOfCalls(10)
      .failureRateThreshold(70.0f)
      .slowCallRateThreshold(70.0f)
      .slowCallDurationThreshold(Duration.ofSeconds(2))
      .build();

    示例输出中的时间戳显示断路器最初转换为断开状态,在接下来的 10 秒内阻止一些调用,然后更改为半断开状态。后来,在半开状态时一致的成功响应导致它再次切换到关闭状态:

    Searching for flights; current time = 20:55:58 735
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 20:55:59 812
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 20:56:00 816
    ... other lines omitted ...
    io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Flight search failed
        at
    ... stack trace omitted ...
    2020-12-13T20:56:03.850115+05:30: CircuitBreaker 'flightSearchService' changed state from CLOSED to OPEN
    2020-12-13T20:56:04.851700+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
    2020-12-13T20:56:05.852220+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
    2020-12-13T20:56:06.855338+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
    ... other similar lines omitted ...
    2020-12-13T20:56:12.862362+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
    2020-12-13T20:56:13.865436+05:30: CircuitBreaker 'flightSearchService' changed state from OPEN to HALF_OPEN
    Searching for flights; current time = 20:56:13 865
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    ... other similar lines omitted ...
    2020-12-13T20:56:16.877230+05:30: CircuitBreaker 'flightSearchService' changed state from HALF_OPEN to CLOSED
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 20:56:17 879
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    ... other similar lines omitted ...

    指定回退方法

    使用断路器时的常见模式是指定在电路断开时要调用的回退方法。回退方法可以为不允许的远程调用提供一些默认值或行为

    我们可以使用 Decorators 实用程序类进行设置。Decorators 是来自 resilience4j-all 模块的构建器,具有 withCircuitBreaker()withRetry()withRateLimiter() 等方法,可帮助将多个 Resilience4j 装饰器应用于 SupplierFunction 等。

    当断路器断开并抛出 CallNotPermittedException 时,我们将使用它的 withFallback() 方法从本地缓存返回航班搜索结果:

    Supplier<List<Flight>> flightsSupplier = () -> service.searchFlights(request);
    Supplier<List<Flight>> decorated = Decorators
      .ofSupplier(flightsSupplier)
      .withCircuitBreaker(circuitBreaker)
      .withFallback(Arrays.asList(CallNotPermittedException.class),
                    e -> this.getFlightSearchResultsFromCache(request))
      .decorate();

    以下示例输出显示了断路器断开后从缓存中返回的搜索结果:

    Searching for flights; current time = 22:08:29 735
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 22:08:29 854
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 22:08:29 855
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Searching for flights; current time = 22:08:29 855
    2020-12-13T22:08:29.856277+05:30: CircuitBreaker 'flightSearchService' recorded an error: 'io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search'. Elapsed time: 0 ms
    Searching for flights; current time = 22:08:29 912
    ... other lines omitted ...
    2020-12-13T22:08:29.926691+05:30: CircuitBreaker 'flightSearchService' changed state from CLOSED to OPEN
    Returning flight search results from cache
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    Returning flight search results from cache
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
    ... other lines omitted ...

    减少 Stacktrace 中的信息

    每当断路器断开时,它就会抛出 CallNotPermittedException

    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
        at io.github.resilience4j.circuitbreaker.CallNotPermittedException.createCallNotPermittedException(CallNotPermittedException.java:48)
    ... other lines in stack trace omitted ...
    at io.reflectoring.resilience4j.circuitbreaker.Examples.timeBasedSlidingWindow_SlowCalls(Examples.java:169)
        at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:263)

    除了第一行,堆栈跟踪中的其他行没有增加太多价值。如果 CallNotPermittedException 发生多次,这些堆栈跟踪行将在我们的日志文件中重复。

    我们可以通过将 writablestacktraceEnabled() 配置设置为 false 来减少堆栈跟踪中生成的信息量:

    CircuitBreakerConfig config = CircuitBreakerConfig
      .custom()
      .slidingWindowType(SlidingWindowType.COUNT_BASED)
      .slidingWindowSize(10)
      .failureRateThreshold(70.0f)
      .writablestacktraceEnabled(false)
      .build();

    现在,当 CallNotPermittedException 发生时,堆栈跟踪中只存在一行:

    Searching for flights; current time = 20:29:24 476
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    Searching for flights; current time = 20:29:24 540
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
    ... other lines omitted ...
    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    ...

    其他有用的方法

    Retry 模块类似,CircuitBreaker 也有像 ignoreExceptions()recordExceptions() 等方法,让我们可以指定 CircuitBreaker 在跟踪调用结果时应该忽略和考虑哪些异常。

    例如,我们可能不想忽略来自远程飞行服务的 SeatsUnavailableException – 在这种情况下,我们真的不想断开电路。

    与我们见过的其他 Resilience4j 模块类似,CircuitBreaker 还提供了额外的方法,如 decorateCheckedSupplier()decorateCompletionStage()decorateRunnable()decorateConsumer() 等,因此我们可以在 Supplier 之外的其他结构中提供我们的代码。

    断路器事件

    CircuitBreaker 有一个 EventPublisher 可以生成以下类型的事件:

    • CircuitBreakerOnSuccessEvent,
    • CircuitBreakerOnErrorEvent,
    • CircuitBreakerOnStateTransitionEvent,
    • CircuitBreakerOnResetEvent,
    • CircuitBreakerOnIgnoredErrorEvent,
    • CircuitBreakerOnCallNotPermittedEvent,
    • CircuitBreakerOnFailureRateExceededEvent 以及
    • CircuitBreakerOnSlowCallRateExceededEvent.

    我们可以监听这些事件并记录它们,例如:

    circuitBreaker.getEventPublisher()
      .onCallNotPermitted(e -> System.out.println(e.toString()));
    circuitBreaker.getEventPublisher()
      .onError(e -> System.out.println(e.toString()));
    circuitBreaker.getEventPublisher()
      .onFailureRateExceeded(e -> System.out.println(e.toString()));
    circuitBreaker.getEventPublisher().onStateTransition(e -> System.out.println(e.toString()));

    以下是示例的日志输出:

    2020-12-13T22:25:52.972943+05:30: CircuitBreaker 'flightSearchService' recorded an error: 'io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search'. Elapsed time: 0 ms
    Searching for flights; current time = 22:25:52 973
    ... other lines omitted ...
    2020-12-13T22:25:52.974448+05:30: CircuitBreaker 'flightSearchService' exceeded failure rate threshold. Current failure rate: 70.0
    2020-12-13T22:25:52.984300+05:30: CircuitBreaker 'flightSearchService' changed state from CLOSED to OPEN
    2020-12-13T22:25:52.985057+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
    ... other lines omitted ...

    CircuitBreaker指标

    CircuitBreake 暴露了许多指标,这些是一些重要的条目:

    • 成功、失败或忽略的调用总数 (resilience4j.circuitbreaker.calls)
    • 断路器状态 (resilience4j.circuitbreaker.state)
    • 断路器故障率 (resilience4j.circuitbreaker.failure.rate)
    • 未被允许的调用总数 (resilience4.circuitbreaker.not.permitted.calls)
    • 断路器的缓慢调用 (resilience4j.circuitbreaker.slow.call.rate)

    首先,我们像往常一样创建 CircuitBreakerConfigCircuitBreakerRegistryCircuitBreaker。然后,我们创建一个 MeterRegistry 并将 CircuitBreakerRegistry 绑定到它:

    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    TaggedCircuitBreakerMetrics.ofCircuitBreakerRegistry(registry)
      .bindTo(meterRegistry);

    运行几次断路器修饰操作后,我们显示捕获的指标。这是一些示例输出:

    The number of slow failed calls which were slower than a certain threshold - resilience4j.circuitbreaker.slow.calls: 0.0
    The states of the circuit breaker - resilience4j.circuitbreaker.state: 0.0, state: metrics_only
    Total number of not permitted calls - resilience4j.circuitbreakernot.permitted.calls: 0.0
    The slow call of the circuit breaker - resilience4j.circuitbreaker.slow.call.rate: -1.0
    The states of the circuit breaker - resilience4j.circuitbreaker.state: 0.0, state: half_open
    Total number of successful calls - resilience4j.circuitbreaker.calls: 0.0, kind: successful
    The failure rate of the circuit breaker - resilience4j.circuitbreaker.failure.rate: -1.0

    在实际应用中,我们会定期将数据导出到监控系统并在仪表板上进行分析。

    结论

    在本文中,我们学习了如何使用 Resilience4j 的 Circuitbreaker 模块在远程服务返回错误时暂停向其发出请求。我们了解了为什么这很重要,还看到了一些有关如何配置它的实际示例。

    您可以使用 GitHub 上的代码来演示一个完整的应用程序。


    本文译自:Implementing a Circuit Breaker with Resilience4j – Reflectoringhttps://reflectoring.io/circuitbreaker-with-resilience4j/)

    Java 项目中使用 Resilience4j 框架实现故障隔离

    Java 项目中使用 Resilience4j 框架实现故障隔离

    到目前为止,在本系列中,我们已经了解了 Resilience4j 及其 Retry, RateLimiterTimeLimiter 模块。在本文中,我们将探讨 Bulkhead 模块。我们将了解它解决了什么问题,何时以及如何使用它,并查看一些示例。

    代码示例

    本文附有 GitHub 上的工作代码示例。

    什么是 Resilience4j?

    请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

    什么是故障隔离?

    几年前,我们遇到了一个生产问题,其中一台服务器停止响应健康检查,负载均衡器将服务器从池中取出。

    就在我们开始调查这个问题的时候,还有第二个警报——另一台服务器已经停止响应健康检查,也被从池中取出。

    几分钟后,每台服务器都停止响应健康探测,我们的服务完全关闭。

    我们使用 Redis 为应用程序支持的几个功能缓存一些数据。正如我们后来发现的那样,Redis 集群同时出现了一些问题,它已停止接受新连接。我们使用 Jedis 库连接到 Redis,该库的默认行为是无限期地阻塞调用线程,直到建立连接。

    我们的服务托管在 Tomcat 上,它的默认请求处理线程池大小为 200 个线程。因此,通过连接到 Redis 的代码路径的每个请求最终都会无限期地阻塞线程。

    几分钟之内,集群中的所有 2000 个线程都无限期地阻塞了——甚至没有空闲线程来响应负载均衡器的健康检查。

    该服务本身支持多项功能,并非所有功能都需要访问 Redis 缓存。但是当这一方面出现问题时,它最终影响了整个服务。

    这正是故障隔离要解决的问题——它可以防止某个服务区域的问题影响整个服务。

    虽然我们的服务发生的事情是一个极端的例子,但我们可以看到缓慢的上游依赖如何影响调用服务的不相关区域。

    如果我们在每个服务器实例上对 Redis 设置了 20 个并发请求的限制,那么当 Redis 连接问题发生时,只有这些线程会受到影响。剩余的请求处理线程可以继续为其他请求提供服务。

    故障隔离背后的想法是对我们对远程服务进行的并发调用数量设置限制。我们将对不同远程服务的调用视为不同的、隔离的池,并对可以同时进行的调用数量设置限制。

    术语舱壁本身来自它在船舶中的使用,其中船舶的底部被分成彼此分开的部分。如果有裂缝,并且水开始流入,则只有该部分会充满水。这可以防止整艘船沉没。

    Resilience4j 隔板概念

    resilience4j-bulkhead 的工作原理类似于其他 Resilience4j 模块。我们为它提供了我们想要作为函数构造执行的代码——一个进行远程调用的 lambda 表达式或一个从远程服务中检索到的某个值的 Supplier,等等——并且隔板用代码装饰它以控制并发调用数。

    Resilience4j 提供两种类型的隔板 – SemaphoreBulkhead ThreadPoolBulkhead

    SemaphoreBulkhead 内部使用
    java.util.concurrent.Semaphore 来控制并发调用的数量并在当前线程上执行我们的代码。

    ThreadPoolBulkhead 使用线程池中的一个线程来执行我们的代码。它内部使用
    java.util.concurrent.ArrayBlockingQueue
    java.util.concurrent.ThreadPoolExecutor 来控制并发调用的数量。

    SemaphoreBulkhead

    让我们看看与信号量隔板相关的配置及其含义。

    maxConcurrentCalls 确定我们可以对远程服务进行的最大并发调用数。我们可以将此值视为初始化信号量的许可数。

    任何尝试超过此限制调用远程服务的线程都可以立即获得 BulkheadFullException 或等待一段时间以等待另一个线程释放许可。这由 maxWaitDuration 值决定。

    当有多个线程在等待许可时,fairCallHandlingEnabled 配置确定等待的线程是否以先进先出的顺序获取许可。

    最后, writableStackTraceEnabled 配置让我们可以在 BulkheadFullException 发生时减少堆栈跟踪中的信息量。这很有用,因为如果没有它,当异常多次发生时,我们的日志可能会充满许多类似的信息。通常在读取日志时,只知道发生了 BulkheadFullException 就足够了。

    ThreadPoolBulkhead

    coreThreadPoolSizemaxThreadPoolSizekeepAliveDurationqueueCapacity 是与 ThreadPoolBulkhead 相关的主要配置。ThreadPoolBulkhead 内部使用这些配置来构造一个 ThreadPoolExecutor

    internalThreadPoolExecutor 使用可用的空闲线程之一执行传入的任务。 如果没有线程可以自由执行传入的任务,则该任务将排队等待线程可用时稍后执行。如果已达到 queueCapacity,则远程调用将被拒绝并返回 BulkheadFullException

    ThreadPoolBulkhead 也有 writableStackTraceEnabled 配置来控制 BulkheadFullException 的堆栈跟踪中的信息量。

    使用 Resilience4j 隔板模块

    让我们看看如何使用 resilience4j-bulkhead 模块中可用的各种功能。

    我们将使用与本系列前几篇文章相同的示例。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

    SemaphoreBulkhead

    使用基于信号量的隔板时,BulkheadRegistryBulkheadConfigBulkhead 是我们使用的主要抽象。

    BulkheadRegistry 是一个用于创建和管理 Bulkhead 对象的工厂。

    BulkheadConfig 封装了 maxConcurrentCallsmaxWaitDurationwritableStackTraceEnabledfairCallHandlingEnabled 配置。每个 Bulkhead 对象都与一个 BulkheadConfig 相关联。

    第一步是创建一个 BulkheadConfig

    BulkheadConfig config = BulkheadConfig.ofDefaults();

    这将创建一个 BulkheadConfig,其默认值为 maxConcurrentCalls(25)、maxWaitDuration(0s)、writableStackTraceEnabled(true) 和 fairCallHandlingEnabled(true)。

    假设我们希望将并发调用的数量限制为 2,并且我们愿意等待 2 秒让线程获得许可:

    BulkheadConfig config = BulkheadConfig.custom()
      .maxConcurrentCalls(2)
      .maxWaitDuration(Duration.ofSeconds(2))
      .build();

    然后我们创建一个 Bulkhead

    BulkheadRegistry registry = BulkheadRegistry.of(config);
    
    Bulkhead bulkhead = registry.bulkhead("flightSearchService");

    现在让我们表达我们的代码以作为 Supplier 运行航班搜索并使用 bulkhead 装饰它:

    BulkheadRegistry registry = BulkheadRegistry.of(config);
    Bulkhead bulkhead = registry.bulkhead("flightSearchService");

    最后,让我们调用几次装饰操作来了解隔板的工作原理。我们可以使用 CompletableFuture 来模拟来自用户的并发航班搜索请求:

    for (int i=0; i<4; i++) {
      CompletableFuture
        .supplyAsync(decoratedFlightsSupplier)
        .thenAccept(flights -> System.out.println("Received results"));
    }

    输出中的时间戳和线程名称显示,在 4 个并发请求中,前两个请求立即通过:

    Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-3
    Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-5
    Flight search successful at 11:42:13 226
    Flight search successful at 11:42:13 226
    Received results
    Received results
    Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-9
    Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-7
    Flight search successful at 11:42:14 239
    Flight search successful at 11:42:14 239
    Received results
    Received results

    第三个和第四个请求仅在 1 秒后就能够获得许可,在之前的请求完成之后。

    如果线程无法在我们指定的 2s maxWaitDuration 内获得许可,则会抛出 BulkheadFullException

    Caused by: io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
        at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:49)
        at io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead.acquirePermission(SemaphoreBulkhead.java:164)
        at io.github.resilience4j.bulkhead.Bulkhead.lambda$decorateSupplier$5(Bulkhead.java:194)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 6 more

    除了第一行,堆栈跟踪中的其他行没有增加太多价值。如果 BulkheadFullException 发生多次,这些堆栈跟踪行将在我们的日志文件中重复。

    我们可以通过将 writableStackTraceEnabled 配置设置为 false 来减少堆栈跟踪中生成的信息量:

    BulkheadConfig config = BulkheadConfig.custom()
        .maxConcurrentCalls(2)
        .maxWaitDuration(Duration.ofSeconds(1))
        .writableStackTraceEnabled(false)
    .build();

    现在,当 BulkheadFullException 发生时,堆栈跟踪中只存在一行:

    Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3
    Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5
    io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
    Flight search successful at 12:27:58 699
    Flight search successful at 12:27:58 699
    Received results
    Received results

    与我们见过的其他 Resilience4j 模块类似,Bulkhead 还提供了额外的方法,如 decorateCheckedSupplier()decorateCompletionStage()decorateRunnable()decorateConsumer() 等,因此我们可以在 Supplier 供应商之外的其他结构中提供我们的代码。

    ThreadPoolBulkhead

    当使用基于线程池的隔板时,
    ThreadPoolBulkheadRegistryThreadPoolBulkheadConfigThreadPoolBulkhead 是我们使用的主要抽象。

    ThreadPoolBulkheadRegistry 是用于创建和管理 ThreadPoolBulkhead 对象的工厂。

    ThreadPoolBulkheadConfig 封装了 coreThreadPoolSizemaxThreadPoolSizekeepAliveDurationqueueCapacity 配置。每个 ThreadPoolBulkhead 对象都与一个 ThreadPoolBulkheadConfig 相关联。

    第一步是创建一个 ThreadPoolBulkheadConfig

    ThreadPoolBulkheadConfig config =
      ThreadPoolBulkheadConfig.ofDefaults();

    这将创建一个 ThreadPoolBulkheadConfig,其默认值为 coreThreadPoolSize(可用处理器数量 -1)、maxThreadPoolSize(可用处理器最大数量)、keepAliveDuration(20ms)和 queueCapacity(100)。

    假设我们要将并发调用的数量限制为 2:

    ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
      .maxThreadPoolSize(2)
      .coreThreadPoolSize(1)
      .queueCapacity(1)
      .build();

    然后我们创建一个 ThreadPoolBulkhead

    ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
    ThreadPoolBulkhead bulkhead = registry.bulkhead("flightSearchService");

    现在让我们表达我们的代码以作为 Supplier 运行航班搜索并使用 bulkhead 装饰它:

    Supplier<List<Flight>> flightsSupplier =
      () -> service.searchFlightsTakingOneSecond(request);
    Supplier<CompletionStage<List<Flight>>> decoratedFlightsSupplier =
      ThreadPoolBulkhead.decorateSupplier(bulkhead, flightsSupplier);

    与返回一个 Supplier<List<Flight>>
    SemaphoreBulkhead.decorateSupplier() 不同,
    ThreadPoolBulkhead.decorateSupplier() 返回一个 Supplier<CompletionStage<List<Flight>>。这是因为 ThreadPoolBulkHead 不会在当前线程上同步执行代码。

    最后,让我们调用几次装饰操作来了解隔板的工作原理:

    for (int i=0; i<3; i++) {
      decoratedFlightsSupplier
        .get()
        .whenComplete((r,t) -> {
          if (r != null) {
            System.out.println("Received results");
          }
          if (t != null) {
            t.printStackTrace();
          }
        });
    }

    输出中的时间戳和线程名称显示,虽然前两个请求立即执行,但第三个请求已排队,稍后由释放的线程之一执行:

    Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-1
    Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-2
    Flight search successful at 16:15:00 136
    Flight search successful at 16:15:00 135
    Received results
    Received results
    Searching for flights; current time = 16:15:01 151; current thread = bulkhead-flightSearchService-2
    Flight search successful at 16:15:01 151
    Received results

    如果队列中没有空闲线程和容量,则抛出 BulkheadFullException

    Exception in thread "main" io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
     at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:64)
     at io.github.resilience4j.bulkhead.internal.FixedThreadPoolBulkhead.submit(FixedThreadPoolBulkhead.java:157)
    ... other lines omitted ...

    我们可以使用 writableStackTraceEnabled 配置来减少堆栈跟踪中生成的信息量:

    ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
      .maxThreadPoolSize(2)
      .coreThreadPoolSize(1)
      .queueCapacity(1)
      .writableStackTraceEnabled(false)
      .build();

    现在,当 BulkheadFullException 发生时,堆栈跟踪中只存在一行:

    Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3
    Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5
    io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
    Flight search successful at 12:27:58 699
    Flight search successful at 12:27:58 699
    Received results
    Received results

    上下文传播

    有时我们将数据存储在 ThreadLocal 变量中并在代码的不同区域中读取它。我们这样做是为了避免在方法链之间显式地将数据作为参数传递,尤其是当该值与我们正在实现的核心业务逻辑没有直接关系时。

    例如,我们可能希望将当前用户 ID 或事务 ID 或某个请求跟踪 ID 记录到每个日志语句中,以便更轻松地搜索日志。对于此类场景,使用 ThreadLocal 是一种有用的技术。

    使用 ThreadPoolBulkhead 时,由于我们的代码不在当前线程上执行,因此我们存储在 ThreadLocal 变量中的数据在其他线程中将不可用。

    让我们看一个例子来理解这个问题。首先我们定义一个 RequestTrackingIdHolder 类,一个围绕 ThreadLocal 的包装类:

    class RequestTrackingIdHolder {
      static ThreadLocal<String> threadLocal = new ThreadLocal<>();
    
      static String getRequestTrackingId() {
        return threadLocal.get();
      }
    
      static void setRequestTrackingId(String id) {
        if (threadLocal.get() != null) {
          threadLocal.set(null);
          threadLocal.remove();
        }
        threadLocal.set(id);
      }
    
      static void clear() {
        threadLocal.set(null);
        threadLocal.remove();
      }
    }

    静态方法可以轻松设置和获取存储在 ThreadLocal 上的值。我们接下来在调用隔板装饰的航班搜索操作之前设置一个请求跟踪 ID:

    for (int i=0; i<2; i++) {
      String trackingId = UUID.randomUUID().toString();
      System.out.println("Setting trackingId " + trackingId + " on parent, main thread before calling flight search");
      RequestTrackingIdHolder.setRequestTrackingId(trackingId);
      decoratedFlightsSupplier
        .get()
        .whenComplete((r,t) -> {
            // other lines omitted
        });
    }

    示例输出显示此值在隔板管理的线程中不可用:

    Setting trackingId 98ff99df-466a-47f7-88f7-5e31fc8fcb6b on parent, main thread before calling flight search
    Setting trackingId 6b98d73c-a590-4a20-b19d-c85fea783caf on parent, main thread before calling flight search
    Searching for flights; current time = 19:53:53 799; current thread = bulkhead-flightSearchService-1; Request Tracking Id = null
    Flight search successful at 19:53:53 824
    Received results
    Searching for flights; current time = 19:53:54 836; current thread = bulkhead-flightSearchService-1; Request Tracking Id = null
    Flight search successful at 19:53:54 836
    Received results

    为了解决这个问题,ThreadPoolBulkhead 提供了一个 ContextPropagatorContextPropagator 是一种用于跨线程边界检索、复制和清理值的抽象。它定义了一个接口,其中包含从当前线程 (retrieve()) 获取值、将其复制到新的执行线程 (copy()) 并最终在执行线程 (clear()) 上进行清理的方法。

    让我们实现一个
    RequestTrackingIdPropagator

    class RequestTrackingIdPropagator implements ContextPropagator {
      @Override
      public Supplier<Optional> retrieve() {
        System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName());
        return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
      }
    
      @Override
      Consumer<Optional> copy() {
        return optional -> {
          System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
          optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
        };
      }
    
      @Override
      Consumer<Optional> clear() {
        return optional -> {
          System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName());
          optional.ifPresent(s -> RequestTrackingIdHolder.clear());
        };
      }
    }

    我们通过在 ThreadPoolBulkheadConfig 上的设置来为 ThreadPoolBulkhead 提供 ContextPropagator

    class RequestTrackingIdPropagator implements ContextPropagator {
      @Override
      public Supplier<Optional> retrieve() {
        System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName());
        return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
      }
    
      @Override
      Consumer<Optional> copy() {
        return optional -> {
          System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
          optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
        };
      }
    
      @Override
      Consumer<Optional> clear() {
        return optional -> {
          System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName());
          optional.ifPresent(s -> RequestTrackingIdHolder.clear());
        };
      }
    }

    现在,示例输出显示请求跟踪 ID 在隔板管理的线程中可用:

    Setting trackingId 71d44cb8-dab6-4222-8945-e7fd023528ba on parent, main thread before calling flight search
    Getting request tracking id from thread: main
    Setting trackingId 5f9dd084-f2cb-4a20-804b-038828abc161 on parent, main thread before calling flight search
    Getting request tracking id from thread: main
    Setting request tracking id 71d44cb8-dab6-4222-8945-e7fd023528ba on thread: bulkhead-flightSearchService-1
    Searching for flights; current time = 20:07:56 508; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 71d44cb8-dab6-4222-8945-e7fd023528ba
    Flight search successful at 20:07:56 538
    Clearing request tracking id on thread: bulkhead-flightSearchService-1
    Received results
    Setting request tracking id 5f9dd084-f2cb-4a20-804b-038828abc161 on thread: bulkhead-flightSearchService-1
    Searching for flights; current time = 20:07:57 542; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 5f9dd084-f2cb-4a20-804b-038828abc161
    Flight search successful at 20:07:57 542
    Clearing request tracking id on thread: bulkhead-flightSearchService-1
    Received results

    Bulkhead事件

    Bulkhead 和 ThreadPoolBulkhead 都有一个 EventPublisher 来生成以下类型的事件:

    • BulkheadOnCallPermittedEvent
    • BulkheadOnCallRejectedEvent 和
    • BulkheadOnCallFinishedEvent

    我们可以监听这些事件并记录它们,例如:

    Bulkhead bulkhead = registry.bulkhead("flightSearchService");
    bulkhead.getEventPublisher().onCallPermitted(e -> System.out.println(e.toString()));
    bulkhead.getEventPublisher().onCallFinished(e -> System.out.println(e.toString()));
    bulkhead.getEventPublisher().onCallRejected(e -> System.out.println(e.toString()));

    示例输出显示了记录的内容:

    2020-08-26T12:27:39.790435: Bulkhead 'flightSearch' permitted a call.
    ... other lines omitted ...
    2020-08-26T12:27:40.290987: Bulkhead 'flightSearch' rejected a call.
    ... other lines omitted ...
    2020-08-26T12:27:41.094866: Bulkhead 'flightSearch' has finished a call.

    Bulkhead 指标

    SemaphoreBulkhead

    Bulkhead 暴露了两个指标:

    • 可用权限的最大数量(resilience4j.bulkhead.max.allowed.concurrent.calls),和
    • 允许的并发调用数(resilience4j.bulkhead.available.concurrent.calls)。

    bulkhead.available 指标与我们在 BulkheadConfig 上配置的 maxConcurrentCalls 相同。

    首先,我们像前面一样创建 BulkheadConfigBulkheadRegistryBulkhead。然后,我们创建一个 MeterRegistry 并将 BulkheadRegistry 绑定到它:

    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    TaggedBulkheadMetrics.ofBulkheadRegistry(registry)
      .bindTo(meterRegistry);

    运行几次隔板装饰操作后,我们显示捕获的指标:

    Consumer<Meter> meterConsumer = meter -> {
      String desc = meter.getId().getDescription();
      String metricName = meter.getId().getName();
      Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
        .filter(m -> m.getStatistic().name().equals("VALUE"))
        .findFirst()
        .map(m -> m.getValue())
        .orElse(0.0);
      System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

    这是一些示例输出:

    The maximum number of available permissions - resilience4j.bulkhead.max.allowed.concurrent.calls: 8.0
    The number of available permissions - resilience4j.bulkhead.available.concurrent.calls: 3.0

    ThreadPoolBulkhead

    ThreadPoolBulkhead 暴露五个指标:

    • 队列的当前长度(resilience4j.bulkhead.queue.depth),
    • 当前线程池的大小(resilience4j.bulkhead.thread.pool.size),
    • 线程池的核心和最大容量(resilience4j.bulkhead.core.thread.pool.sizeresilience4j.bulkhead.max.thread.pool.size),以及
    • 队列的容量(resilience4j.bulkhead.queue.capacity)。

    首先,我们像前面一样创建 ThreadPoolBulkheadConfig
    ThreadPoolBulkheadRegistryThreadPoolBulkhead。然后,我们创建一个 MeterRegistry 并将
    ThreadPoolBulkheadRegistry 绑定到它:

    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    TaggedThreadPoolBulkheadMetrics.ofThreadPoolBulkheadRegistry(registry).bindTo(meterRegistry);

    运行几次隔板装饰操作后,我们将显示捕获的指标:

    The queue capacity - resilience4j.bulkhead.queue.capacity: 5.0
    The queue depth - resilience4j.bulkhead.queue.depth: 1.0
    The thread pool size - resilience4j.bulkhead.thread.pool.size: 5.0
    The maximum thread pool size - resilience4j.bulkhead.max.thread.pool.size: 5.0
    The core thread pool size - resilience4j.bulkhead.core.thread.pool.size: 3.0

    在实际应用中,我们会定期将数据导出到监控系统并在仪表板上进行分析。

    实施隔板时的陷阱和良好实践

    使隔板成为单例

    对给定远程服务的所有调用都应通过同一个 Bulkhead 实例。对于给定的远程服务,Bulkhead 必须是单例。

    如果我们不强制执行此操作,我们代码库的某些区域可能会绕过 Bulkhead 直接调用远程服务。为了防止这种情况,远程服务的实际调用应该在一个核心、内部层和其他区域应该使用内部层暴露的隔板装饰器。

    我们如何确保未来的新开发人员理解这一意图? 查看 Tom 的文章,该文章展示了解决此类问题的一种方法,即通过组织包结构来明确此类意图。此外,它还展示了如何通过在 ArchUnit 测试中编码意图来强制执行此操作。

    与其他 Resilience4j 模块结合

    将隔板与一个或多个其他 Resilience4j 模块(如重试和速率限制器)结合使用会更有效。例如,如果有 BulkheadFullException,我们可能希望在一些延迟后重试。

    结论

    在本文中,我们学习了如何使用 Resilience4j 的 Bulkhead 模块对我们对远程服务进行的并发调用设置限制。我们了解了为什么这很重要,还看到了一些有关如何配置它的实际示例。

    您可以使用 GitHub 上的代码演示一个完整的应用程序。


    本文译自: Implementing Bulkhead with Resilience4j – Reflectoring

    Java 项目中使用 Resilience4j 框架实现异步超时处理

    到目前为止,在本系列中,我们已经了解了 Resilience4j 及其 RetryRateLimiter 模块。在本文中,我们将通过 TimeLimiter 继续探索 Resilience4j。我们将了解它解决了什么问题,何时以及如何使用它,并查看一些示例。

    代码示例

    本文附有 GitHub 上的工作代码示例。

    什么是 Resilience4j?

    请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

    什么是限时?

    对我们愿意等待操作完成的时间设置限制称为时间限制。如果操作没有在我们指定的时间内完成,我们希望通过超时错误收到通知。

    有时,这也称为“设定最后期限”。

    我们这样做的一个主要原因是确保我们不会让用户或客户无限期地等待。不提供任何反馈的缓慢服务可能会让用户感到沮丧。

    我们对操作设置时间限制的另一个原因是确保我们不会无限期地占用服务器资源。我们在使用 Spring 的 @Transactional 注解时指定的 timeout 值就是一个例子——在这种情况下,我们不想长时间占用数据库资源。

    什么时候使用 Resilience4j TimeLimiter?

    Resilience4j 的 TimeLimiter 可用于设置使用 CompleteableFutures 实现的异步操作的时间限制(超时)。

    Java 8 中引入的 CompletableFuture 类使异步、非阻塞编程变得更容易。可以在不同的线程上执行慢速方法,释放当前线程来处理其他任务。 我们可以提供一个当 slowMethod() 返回时执行的回调:

    int slowMethod() {
      // time-consuming computation or remote operation
    return 42;
    }
    
    CompletableFuture.supplyAsync(this::slowMethod)
    .thenAccept(System.out::println);

    这里的 slowMethod() 可以是一些计算或远程操作。通常,我们希望在进行这样的异步调用时设置时间限制。我们不想无限期地等待 slowMethod() 返回。例如,如果 slowMethod() 花费的时间超过一秒,我们可能想要返回先前计算的、缓存的值,甚至可能会出错。

    在 Java 8 的 CompletableFuture 中,没有简单的方法来设置异步操作的时间限制。CompletableFuture 实现了 Future 接口,Future 有一个重载的 get() 方法来指定我们可以等待多长时间:

    CompletableFuture<Integer> completableFuture = CompletableFuture
      .supplyAsync(this::slowMethod);
    Integer result = completableFuture.get(3000, TimeUnit.MILLISECONDS);
    System.out.println(result);

    但是这里有一个问题—— get() 方法是一个阻塞调用。所以它首先违背了使用 CompletableFuture 的目的,即释放当前线程。

    这是 Resilience4j 的 TimeLimiter 解决的问题——它让我们在异步操作上设置时间限制,同时保留在 Java 8 中使用 CompletableFuture 时非阻塞的好处。

    CompletableFuture 的这种限制已在 Java 9 中得到解决。我们可以在 Java 9 及更高版本中使用 CompletableFuture 上的 orTimeout()completeOnTimeout() 等方法直接设置时间限制。然而,凭借 Resilience4J指标事件,与普通的 Java 9 解决方案相比,它仍然提供了附加值。

    Resilience4j TimeLimiter 概念

    TimeLimiter支持 FutureCompletableFuture。但是将它与 Future 一起使用相当于 Future.get(long timeout, TimeUnit unit)。因此,我们将在本文的其余部分关注 CompletableFuture

    与其他 Resilience4j 模块一样,TimeLimiter 的工作方式是使用所需的功能装饰我们的代码 – 如果在这种情况下操作未在指定的 timeoutDuration 内完成,则返回 TimeoutException

    我们为 TimeLimiter 提供 timeoutDurationScheduledExecutorService 和异步操作本身,表示为 CompletionStageSupplier。它返回一个 CompletionStage 的装饰 Supplier

    在内部,它使用调度器来调度一个超时任务——通过抛出一个 TimeoutException 来完成 CompletableFuture 的任务。如果操作先完成,TimeLimiter 取消内部超时任务。

    除了 timeoutDuration 之外,还有另一个与 TimeLimiter 关联的配置 cancelRunningFuture。此配置仅适用于 Future 而不适用于 CompletableFuture。当超时发生时,它会在抛出 TimeoutException 之前取消正在运行的 Future

    使用 Resilience4j TimeLimiter 模块

    TimeLimiterRegistryTimeLimiterConfigTimeLimiterresilience4j-timelimiter 的主要抽象。

    TimeLimiterRegistry 是用于创建和管理 TimeLimiter 对象的工厂。

    TimeLimiterConfig 封装了 timeoutDurationcancelRunningFuture 配置。每个 TimeLimiter 对象都与一个 TimeLimiterConfig 相关联。

    TimeLimiter 提供辅助方法来为 FutureCompletableFuture Suppliers 创建或执行装饰器。

    让我们看看如何使用 TimeLimiter 模块中可用的各种功能。我们将使用与本系列前几篇文章相同的示例。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

    第一步是创建一个 TimeLimiterConfig

    TimeLimiterConfig config = TimeLimiterConfig.ofDefaults();

    这将创建一个 TimeLimiterConfig,其默认值为 timeoutDuration (1000ms) 和 cancelRunningFuture (true)。

    假设我们想将超时值设置为 2s 而不是默认值:

    TimeLimiterConfig config = TimeLimiterConfig.custom()
      .timeoutDuration(Duration.ofSeconds(2))
      .build();

    然后我们创建一个 TimeLimiter

    TimeLimiterRegistry registry = TimeLimiterRegistry.of(config);
    
    TimeLimiter limiter = registry.timeLimiter("flightSearch");

    我们想要异步调用
    FlightSearchService.searchFlights(),它返回一个 List<Flight>。让我们将其表示为 Supplier<CompletionStage<List<Flight>>>

    Supplier<List<Flight>> flightSupplier = () -> service.searchFlights(request);
    Supplier<CompletionStage<List<Flight>>> origCompletionStageSupplier =
    () -> CompletableFuture.supplyAsync(flightSupplier);

    然后我们可以使用 TimeLimiter 装饰 Supplier

    ScheduledExecutorService scheduler =
      Executors.newSingleThreadScheduledExecutor();
    Supplier<CompletionStage<List<Flight>>> decoratedCompletionStageSupplier =  
      limiter.decorateCompletionStage(scheduler, origCompletionStageSupplier);

    最后,让我们调用装饰的异步操作:

    decoratedCompletionStageSupplier.get().whenComplete((result, ex) -> {
      if (ex != null) {
        System.out.println(ex.getMessage());
      }
      if (result != null) {
        System.out.println(result);
      }
    });

    以下是成功飞行搜索的示例输出,其耗时少于我们指定的 2 秒 timeoutDuration

    Searching for flights; current time = 19:25:09 783; current thread = ForkJoinPool.commonPool-worker-3
    
    Flight search successful
    
    [Flight{flightNumber='XY 765', flightDate='08/30/2020', from='NYC', to='LAX'}, Flight{flightNumber='XY 746', flightDate='08/30/2020', from='NYC', to='LAX'}] on thread ForkJoinPool.commonPool-worker-3

    这是超时的航班搜索的示例输出:

    Exception java.util.concurrent.TimeoutException: TimeLimiter 'flightSearch' recorded a timeout exception on thread pool-1-thread-1 at 19:38:16 963
    
    Searching for flights; current time = 19:38:18 448; current thread = ForkJoinPool.commonPool-worker-3
    
    Flight search successful at 19:38:18 461

    上面的时间戳和线程名称表明,即使异步操作稍后在另一个线程上完成,调用线程也会收到 TimeoutException。

    如果我们想创建一个装饰器并在代码库的不同位置重用它,我们将使用decorateCompletionStage()。如果我们想创建它并立即执行 Supplier<CompletionStage>,我们可以使用 executeCompletionStage() 实例方法代替:

    CompletionStage<List<Flight>> decoratedCompletionStage =  
      limiter.executeCompletionStage(scheduler, origCompletionStageSupplier);

    TimeLimiter 事件

    TimeLimiter 有一个 EventPublisher,它生成 TimeLimiterOnSuccessEventTimeLimiterOnErrorEventTimeLimiterOnTimeoutEvent 类型的事件。我们可以监听这些事件并记录它们,例如:

    TimeLimiter limiter = registry.timeLimiter("flightSearch");
    
    limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));
    
    limiter.getEventPublisher().onError(e -> System.out.println(e.toString()));
    
    limiter.getEventPublisher().onTimeout(e -> System.out.println(e.toString()));

    示例输出显示了记录的内容:

    2020-08-07T11:31:48.181944: TimeLimiter 'flightSearch' recorded a successful call.
    
    ... other lines omitted ...
    
    2020-08-07T11:31:48.582263: TimeLimiter 'flightSearch' recorded a timeout exception.

    TimeLimiter 指标

    TimeLimiter 跟踪成功、失败和超时的调用次数。

    首先,我们像往常一样创建 TimeLimiterConfigTimeLimiterRegistryTimeLimiter。然后,我们创建一个 MeterRegistry 并将 TimeLimiterRegistry 绑定到它:

    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    TaggedTimeLimiterMetrics.ofTimeLimiterRegistry(registry)
      .bindTo(meterRegistry);

    运行几次限时操作后,我们显示捕获的指标:

    Consumer<Meter> meterConsumer = meter -> {
      String desc = meter.getId().getDescription();
      String metricName = meter.getId().getName();
      String metricKind = meter.getId().getTag("kind");
      Double metricValue =
        StreamSupport.stream(meter.measure().spliterator(), false)
        .filter(m -> m.getStatistic().name().equals("COUNT"))
        .findFirst()
        .map(Measurement::getValue)
        .orElse(0.0);
      System.out.println(desc + " - " +
                         metricName +
                         "(" + metricKind + ")" +
                         ": " + metricValue);
    };
    meterRegistry.forEachMeter(meterConsumer);

    这是一些示例输出:

    The number of timed out calls - resilience4j.timelimiter.calls(timeout): 6.0
    
    The number of successful calls - resilience4j.timelimiter.calls(successful): 4.0
    
    The number of failed calls - resilience4j.timelimiter.calls(failed): 0.0

    在实际应用中,我们会定期将数据导出到监控系统并在仪表板上进行分析。

    实施时间限制时的陷阱和良好实践

    通常,我们处理两种操作 – 查询(或读取)和命令(或写入)。对查询进行时间限制是安全的,因为我们知道它们不会改变系统的状态。我们看到的 searchFlights() 操作是查询操作的一个例子。

    命令通常会改变系统的状态。bookFlights() 操作将是命令的一个示例。在对命令进行时间限制时,我们必须记住,当我们超时时,该命令很可能仍在运行。例如,bookFlights() 调用上的 TimeoutException 并不一定意味着命令失败。

    在这种情况下,我们需要管理用户体验——也许在超时时,我们可以通知用户操作花费的时间比我们预期的要长。然后我们可以查询上游以检查操作的状态并稍后通知用户。

    结论

    在本文中,我们学习了如何使用 Resilience4j 的 TimeLimiter 模块为异步、非阻塞操作设置时间限制。我们通过一些实际示例了解了何时使用它以及如何配置它。

    您可以使用 GitHub 上的代码演示一个完整的应用程序来说明这些想法。


    本文译自:
    https://reflectoring.io/time-limiting-with-resilience4j/

    Java 项目中使用 Resilience4j 框架实现客户端 API 调用的限速/节流机制


    在本系列的上一篇文章中,我们了解了 Resilience4j 以及如何使用其 Retry 模块。现在让我们了解 RateLimiter – 它是什么,何时以及如何使用它,以及在实施速率限制(或者也称为“节流”)时要注意什么。

    代码示例

    本文附有GitHub 上的工作代码示例。

    什么是 Resilience4j?

    请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

    什么是限速?

    我们可以从两个角度来看待速率限制——作为服务提供者和作为服务消费者。

    服务端限速

    作为服务提供商,我们实施速率限制以保护我们的资源免受过载和拒绝服务 (DoS) 攻击

    为了满足我们与所有消费者的服务水平协议 (SLA),我们希望确保一个导致流量激增的消费者不会影响我们对他人的服务质量。

    我们通过设置在给定时间单位内允许消费者发出多少请求的限制来做到这一点。我们通过适当的响应拒绝任何超出限制的请求,例如 HTTP 状态 429(请求过多)。这称为服务器端速率限制。

    速率限制以每秒请求数 (rps)、每分钟请求数 (rpm) 或类似形式指定。某些服务在不同的持续时间(例如 50 rpm 且不超过 2500 rph)和一天中的不同时间(例如,白天 100 rps 和晚上 150 rps)有多个速率限制。该限制可能适用于单个用户(由用户 ID、IP 地址、API 访问密钥等标识)或多租户应用程序中的租户。

    客户端限速

    作为服务的消费者,我们希望确保我们不会使服务提供者过载。此外,我们不想招致意外的成本——无论是金钱上的还是服务质量方面的。

    如果我们消费的服务是有弹性的,就会发生这种情况。服务提供商可能不会限制我们的请求,而是会因额外负载而向我们收取额外费用。有些甚至在短时间内禁止行为不端的客户。消费者为防止此类问题而实施的速率限制称为客户端速率限制。

    何时使用 RateLimiter?

    resilience4j-ratelimiter 用于客户端速率限制。

    服务器端速率限制需要诸如缓存和多个服务器实例之间的协调之类的东西,这是 resilience4j 不支持的。对于服务器端的速率限制,有 API 网关和 API 过滤器,例如 Kong API GatewayRepose API Filter。Resilience4j 的 RateLimiter 模块并不打算取代它们。

    Resilience4j RateLimiter 概念

    想要调用远程服务的线程首先向 RateLimiter 请求许可。如果 RateLimiter 允许,则线程继续。 否则,RateLimiter 会停放线程或将其置于等待状态。

    RateLimiter 定期创建新权限。当权限可用时,线程会收到通知,然后可以继续。

    一段时间内允许的调用次数称为 limitForPeriod。RateLimiter 刷新权限的频率由 limitRefreshPeriod 指定。timeoutDuration 指定线程可以等待多长时间获取权限。如果在等待时间结束时没有可用的权限,RateLimiter 将抛出 RequestNotPermitted 运行时异常。

    使用Resilience4j RateLimiter 模块

    RateLimiterRegistryRateLimiterConfigRateLimiterresilience4j-ratelimiter 的主要抽象。

    RateLimiterRegistry 是一个用于创建和管理 RateLimiter 对象的工厂。

    RateLimiterConfig 封装了 limitForPeriodlimitRefreshPeriodtimeoutDuration 配置。每个 RateLimiter 对象都与一个 RateLimiterConfig 相关联。

    RateLimiter 提供辅助方法来为包含远程调用的函数式接口或 lambda 表达式创建装饰器。

    让我们看看如何使用 RateLimiter 模块中可用的各种功能。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

    基本示例

    第一步是创建一个 RateLimiterConfig

    RateLimiterConfig config = RateLimiterConfig.ofDefaults();

    这将创建一个 RateLimiterConfig,其默认值为 limitForPeriod (50)、limitRefreshPeriod(500ns) 和 timeoutDuration (5s)。

    假设我们与航空公司服务的合同规定我们可以以 1 rps 调用他们的搜索 API。然后我们将像这样创建 RateLimiterConfig

    RateLimiterConfig config = RateLimiterConfig.custom()
      .limitForPeriod(1)
      .limitRefreshPeriod(Duration.ofSeconds(1))
      .timeoutDuration(Duration.ofSeconds(1))
      .build();

    如果线程无法在指定的 1 秒 timeoutDuration 内获取权限,则会出错。

    然后我们创建一个 RateLimiter 并装饰 searchFlights() 调用:

    RateLimiterRegistry registry = RateLimiterRegistry.of(config);
    RateLimiter limiter = registry.rateLimiter("flightSearchService");
    // FlightSearchService and SearchRequest creation omitted
    Supplier<List<Flight>> flightsSupplier =
      RateLimiter.decorateSupplier(limiter,
        () -> service.searchFlights(request));

    最后,我们多次使用装饰过的 Supplier<List<Flight>>

    for (int i=0; i<3; i++) {
      System.out.println(flightsSupplier.get());
    }

    示例输出中的时间戳显示每秒发出一个请求:

    Searching for flights; current time = 15:29:40 786
    ...
    [Flight{flightNumber='XY 765', ... }, ... ]
    Searching for flights; current time = 15:29:41 791
    ...
    [Flight{flightNumber='XY 765', ... }, ... ]

    如果超出限制,我们会收到 RequestNotPermitted 异常:

    Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)
    
     at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)
    
    ... other lines omitted ...

    装饰方法抛出已检异常

    假设我们正在调用
    FlightSearchService.searchFlightsThrowingException() ,它可以抛出一个已检 Exception。那么我们就不能使用
    RateLimiter.decorateSupplier()。我们将使用
    RateLimiter.decorateCheckedSupplier() 代替:

    CheckedFunction0<List<Flight>> flights =
      RateLimiter.decorateCheckedSupplier(limiter,
        () -> service.searchFlightsThrowingException(request));
    
    try {
      System.out.println(flights.apply());
    } catch (...) {
      // exception handling
    }

    RateLimiter.decorateCheckedSupplier() 返回一个 CheckedFunction0,它表示一个没有参数的函数。请注意对 CheckedFunction0 对象的 apply() 调用以调用远程操作。

    如果我们不想使用 SuppliersRateLimiter 提供了更多的辅助装饰器方法,如 decorateFunction()decorateCheckedFunction()decorateRunnable()decorateCallable() 等,以与其他语言结构一起使用。decorateChecked* 方法用于装饰抛出已检查异常的方法。

    应用多个速率限制

    假设航空公司的航班搜索有多个速率限制:2 rps 和 40 rpm。 我们可以通过创建多个 RateLimiters 在客户端应用多个限制:

    RateLimiterConfig rpsConfig = RateLimiterConfig.custom().
      limitForPeriod(2).
      limitRefreshPeriod(Duration.ofSeconds(1)).
      timeoutDuration(Duration.ofMillis(2000)).build();
    
    RateLimiterConfig rpmConfig = RateLimiterConfig.custom().
      limitForPeriod(40).
      limitRefreshPeriod(Duration.ofMinutes(1)).
      timeoutDuration(Duration.ofMillis(2000)).build();
    
    RateLimiterRegistry registry = RateLimiterRegistry.of(rpsConfig);
    RateLimiter rpsLimiter =
      registry.rateLimiter("flightSearchService_rps", rpsConfig);
    RateLimiter rpmLimiter =
      registry.rateLimiter("flightSearchService_rpm", rpmConfig);  
    然后我们使用两个 RateLimiters 装饰 searchFlights() 方法:
    
    Supplier<List<Flight>> rpsLimitedSupplier =
      RateLimiter.decorateSupplier(rpsLimiter,
        () -> service.searchFlights(request));
    
    Supplier<List<Flight>> flightsSupplier
      = RateLimiter.decorateSupplier(rpmLimiter, rpsLimitedSupplier);

    示例输出显示每秒发出 2 个请求,并且限制为 40 个请求:

    Searching for flights; current time = 15:13:21 246
    ...
    Searching for flights; current time = 15:13:21 249
    ...
    Searching for flights; current time = 15:13:22 212
    ...
    Searching for flights; current time = 15:13:40 215
    ...
    Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted:
    RateLimiter 'flightSearchService_rpm' does not permit further calls
    at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)
    at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)

    在运行时更改限制

    如果需要,我们可以在运行时更改 limitForPeriodtimeoutDuration 的值:

    limiter.changeLimitForPeriod(2);
    limiter.changeTimeoutDuration(Duration.ofSeconds(2));

    例如,如果我们的速率限制根据一天中的时间而变化,则此功能很有用 – 我们可以有一个计划线程来更改这些值。新值不会影响当前正在等待权限的线程。

    RateLimiter和 Retry一起使用

    假设我们想在收到 RequestNotPermitted 异常时重试,因为它是一个暂时性错误。我们会像往常一样创建 RateLimiterRetry 对象。然后我们装饰一个 Supplier 的供应商并用 Retry 包装它:

    Supplier<List<Flight>> rateLimitedFlightsSupplier =
      RateLimiter.decorateSupplier(rateLimiter,
        () -> service.searchFlights(request));
    
    Supplier<List<Flight>> retryingFlightsSupplier =
      Retry.decorateSupplier(retry, rateLimitedFlightsSupplier);

    示例输出显示为 RequestNotPermitted 异常重试请求:

    Searching for flights; current time = 15:29:39 847
    Flight search successful
    [Flight{flightNumber='XY 765', ... }, ... ]
    Searching for flights; current time = 17:10:09 218
    ...
    [Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]
    2020-07-27T17:10:09.484: Retry 'rateLimitedFlightSearch', waiting PT1S until attempt '1'. Last attempt failed with exception 'io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls'.
    Searching for flights; current time = 17:10:10 492
    ...
    2020-07-27T17:10:10.494: Retry 'rateLimitedFlightSearch' recorded a successful retry attempt...
    [Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]

    我们创建装饰器的顺序很重要。如果我们将 RetryRateLimiter 包装在一起,它将不起作用。

    RateLimiter 事件

    RateLimiter 有一个 EventPublisher,它在调用远程操作时生成 RateLimiterOnSuccessEventRateLimiterOnFailureEvent 类型的事件,以指示获取权限是否成功。我们可以监听这些事件并记录它们,例如:

    RateLimiter limiter = registry.rateLimiter("flightSearchService");
    limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));
    limiter.getEventPublisher().onFailure(e -> System.out.println(e.toString()));

    日志输出示例如下:

    RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.127+05:30}
    ... other lines omitted ...
    RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.186+05:30}

    RateLimiter 指标

    假设在实施客户端节流后,我们发现 API 的响应时间增加了。这是可能的 – 正如我们所见,如果在线程调用远程操作时权限不可用,RateLimiter 会将线程置于等待状态。

    如果我们的请求处理线程经常等待获得许可,则可能意味着我们的 limitForPeriod 太低。也许我们需要与我们的服务提供商合作并首先获得额外的配额。

    监控 RateLimiter 指标可帮助我们识别此类容量问题,并确保我们在 RateLimiterConfig 上设置的值运行良好。

    RateLimiter 跟踪两个指标:可用权限的数量(
    resilience4j.ratelimiter.available.permissions)和等待权限的线程数量(
    resilience4j.ratelimiter.waiting.threads)。

    首先,我们像往常一样创建 RateLimiterConfigRateLimiterRegistryRateLimiter。然后,我们创建一个 MeterRegistry 并将 RateLimiterRegistry 绑定到它:

    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    TaggedRateLimiterMetrics.ofRateLimiterRegistry(registry)
      .bindTo(meterRegistry);

    运行几次限速操作后,我们显示捕获的指标:

    Consumer<Meter> meterConsumer = meter -> {
      String desc = meter.getId().getDescription();
      String metricName = meter.getId().getName();
      Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
        .filter(m -> m.getStatistic().name().equals("VALUE"))
        .findFirst()
        .map(m -> m.getValue())
        .orElse(0.0);
      System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

    这是一些示例输出:

    The number of available permissions - resilience4j.ratelimiter.available.permissions: -6.0
    The number of waiting threads - resilience4j.ratelimiter.waiting_threads: 7.0

    resilience4j.ratelimiter.available.permissions 的负值显示为请求线程保留的权限数。在实际应用中,我们会定期将数据导出到监控系统,并在仪表板上进行分析。

    实施客户端速率限制时的陷阱和良好实践

    使速率限制器成为单例

    对给定远程服务的所有调用都应通过相同的 RateLimiter 实例。对于给定的远程服务,RateLimiter 必须是单例。

    如果我们不强制执行此操作,我们代码库的某些区域可能会绕过 RateLimiter 直接调用远程服务。为了防止这种情况,对远程服务的实际调用应该在核心、内部层和其他区域应该使用内部层暴露的限速装饰器。

    我们如何确保未来的新开发人员理解这一意图?查看 Tom 的文章,其中揭示了一种解决此类问题的方法,即通过组织包结构来明确此类意图。此外,它还展示了如何通过在 ArchUnit 测试中编码意图来强制执行此操作。

    为多个服务器实例配置速率限制器

    为配置找出正确的值可能很棘手。如果我们在集群中运行多个服务实例,limitForPeriod 的值必须考虑到这一点

    例如,如果上游服务的速率限制为 100 rps,而我们的服务有 4 个实例,那么我们将配置 25 rps 作为每个实例的限制。

    然而,这假设我们每个实例上的负载大致相同。 如果情况并非如此,或者如果我们的服务本身具有弹性并且实例数量可能会有所不同,那么 Resilience4j 的 RateLimiter 可能不适合

    在这种情况下,我们需要一个速率限制器,将其数据保存在分布式缓存中,而不是像 Resilience4j RateLimiter 那样保存在内存中。但这会影响我们服务的响应时间。另一种选择是实现某种自适应速率限制。尽管 Resilience4j 可能会支持它,但尚不清楚何时可用。

    选择正确的超时时间

    对于 timeoutDuration 配置值,我们应该牢记 API 的预期响应时间

    如果我们将 timeoutDuration 设置得太高,响应时间和吞吐量就会受到影响。如果它太低,我们的错误率可能会增加。

    由于此处可能涉及一些反复试验,因此一个好的做法是将我们在 RateLimiterConfig 中使用的值(如 timeoutDurationlimitForPeriodlimitRefreshPeriod)作为我们服务之外的配置进行维护。然后我们可以在不更改代码的情况下更改它们。

    调优客户端和服务器端速率限制器

    实现客户端速率限制并不能保证我们永远不会受到上游服务的速率限制

    假设我们有来自上游服务的 2 rps 的限制,并且我们将 limitForPeriod 配置为 2,将 limitRefreshPeriod 配置为 1s。如果我们在第二秒的最后几毫秒发出两个请求,在此之前没有其他调用,RateLimiter 将允许它们。如果我们在下一秒的前几毫秒内再进行两次调用,RateLimiter 也会允许它们,因为有两个新权限可用。但是上游服务可能会拒绝这两个请求,因为服务器通常会实现基于滑动窗口的速率限制。

    为了保证我们永远不会从上游服务中获得超过速率,我们需要将客户端中的固定窗口配置为短于服务中的滑动窗口。因此,如果我们在前面的示例中将 limitForPeriod 配置为 1 并将 limitRefreshPeriod 配置为 500ms,我们就不会出现超出速率限制的错误。但是,第一个请求之后的所有三个请求都会等待,从而增加响应时间并降低吞吐量。

    结论

    在本文中,我们学习了如何使用 Resilience4j 的 RateLimiter 模块来实现客户端速率限制。 我们通过实际示例研究了配置它的不同方法。我们学习了一些在实施速率限制时要记住的良好做法和注意事项。

    您可以使用 GitHub 上的代码演示一个完整的应用程序来说明这些想法。


    本文译自: Implementing Rate Limiting with Resilience4j – Reflectoring

    使用 Resilience4j 框架实现重试机制


    在本文中,我们将从快速介绍 Resilience4j 开始,然后深入探讨其 Retry 模块。我们将了解何时、如何使用它,以及它提供的功能。在此过程中,我们还将学习实现重试时的一些良好实践。

    代码示例

    本文在 GitHu 上附有工作代码示例。

    什么是 Resilience4j?

    当应用程序通过网络进行通信时,会有很多出错的情况。由于连接断开、网络故障、上游服务不可用等,操作可能会超时或失败。应用程序可能会相互过载、无响应甚至崩溃。

    Resilience4j 是一个 Java 库,可以帮助我们构建弹性和容错的应用程序。它提供了一个框架,可编写代码以防止和处理此类问题

    Resilience4j 为 Java 8 及更高版本编写,适用于函数接口、lambda 表达式和方法引用等结构。

    Resilience4j 模块

    让我们快速浏览一下这些模块及其用途:

    模块 目的
    Retry 自动重试失败的远程操作
    RateLimiter 限制我们在一定时间内调用远程操作的次数
    TimeLimiter 调用远程操作时设置时间限制
    Circuit Breaker 当远程操作持续失败时,快速失败或执行默认操作
    Bulkhead 限制并发远程操作的数量
    Cache 存储昂贵的远程操作的结果

    使用范式

    虽然每个模块都有其抽象,但通常的使用范式如下:

    1. 创建一个 Resilience4j 配置对象
    2. 为此类配置创建一个 Registry 对象
    3. 从注册表创建或获取 Resilience4j 对象
    4. 将远程操作编码为 lambda 表达式或函数式接口或通常的 Java 方法
    5. 使用提供的辅助方法之一围绕第 4 步中的代码创建装饰器或包装器
    6. 调用装饰器方法来调用远程操作
      步骤 1-5 通常在应用程序启动时完成一次。让我们看看重试模块的这些步骤:
    RetryConfig config = RetryConfig.ofDefaults(); // ----> 1
    RetryRegistry registry = RetryRegistry.of(config); // ----> 2
    Retry retry = registry.retry("flightSearchService", config); // ----> 3
    
    FlightSearchService searchService = new FlightSearchService();
    SearchRequest request = new SearchRequest("NYC", "LAX", "07/21/2020");
    Supplier<List<Flight>> flightSearchSupplier =
      () -> searchService.searchFlights(request); // ----> 4
    
    Supplier<List<Flight>> retryingFlightSearch =
      Retry.decorateSupplier(retry, flightSearchSupplier); // ----> 5
    
    System.out.println(retryingFlightSearch.get()); // ----> 6

    什么时候使用重试?

    远程操作可以是通过网络发出的任何请求。通常,它是以下之一:

    1. 向 REST 端点发送 HTTP 请求
    2. 调用远程过程 (RPC) 或 Web 服务
    3. 从数据存储(SQL/NoSQL 数据库、对象存储等)读取和写入数据
    4. 向消息代理(RabbitMQ/ActiveMQ/Kafka 等)发送和接收消息

    当远程操作失败时,我们有两种选择——立即向我们的客户端返回错误,或者重试操作。如果重试成功,这对客户来说是件好事——他们甚至不必知道这是一个临时问题。

    选择哪个选项取决于错误类型(瞬时或永久)、操作(幂等或非幂等)、客户端(人或应用程序)和用例。

    暂时性错误是暂时的,通常,如果重试,操作很可能会成功。请求被上游服务限制、连接断开或由于某些服务暂时不可用而超时就是例子。

    来自 REST API 的硬件故障或 404(未找到)响应是永久性错误的示例,重试无济于事

    如果我们想应用重试,操作必须是幂等的。假设远程服务接收并处理了我们的请求,但在发送响应时出现问题。在这种情况下,当我们重试时,我们不希望服务将请求视为新请求或返回意外错误(想想银行转账)。

    重试会增加 API 的响应时间。如果客户端是另一个应用程序,如 cron 作业或守护进程,这可能不是问题。但是,如果是一个人,有时最好做出响应,快速失败并提供反馈,而不是在我们不断重试时让这个人等待。

    对于某些关键用例,可靠性可能比响应时间更重要,即使客户是个人,我们也可能需要实现重试。银行转账或旅行社预订航班和旅行酒店的转账就是很好的例子 – 用户期望可靠性,而不是对此类用例的即时响应。我们可以通过立即通知用户我们已接受他们的请求并在完成后通知他们来做出响应。

    使用 Resilience4j 重试模块

    RetryRegistryRetryConfigRetryresilience4j-retry 中的主要抽象。RetryRegistry 是用于创建和管理 Retry 对象的工厂。RetryConfig 封装了诸如应该尝试重试多少次、尝试之间等待多长时间等配置。每个 Retry 对象都与一个 RetryConfig 相关联。 Retry 提供了辅助方法来为包含远程调用的函数式接口或 lambda 表达式创建装饰器。

    让我们看看如何使用 retry 模块中可用的各种功能。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务通信。

    简单重试

    在简单重试中,如果在远程调用期间抛出 RuntimeException,则重试该操作。 我们可以配置尝试次数、尝试之间等待多长时间等:

    RetryConfig config = RetryConfig.custom()
      .maxAttempts(3)
      .waitDuration(Duration.of(2, SECONDS))
      .build();
    
    // Registry, Retry creation omitted
    
    FlightSearchService service = new FlightSearchService();
    SearchRequest request = new SearchRequest("NYC", "LAX", "07/31/2020");
    Supplier<List<Flight>> flightSearchSupplier =
      () -> service.searchFlights(request);
    
    Supplier<List<Flight>> retryingFlightSearch =
      Retry.decorateSupplier(retry, flightSearchSupplier);
    
    System.out.println(retryingFlightSearch.get());

    我们创建了一个 RetryConfig,指定我们最多要重试 3 次,并在两次尝试之间等待 2 秒。如果我们改用 RetryConfig.ofDefaults() 方法,则将使用 3 次尝试和 500 毫秒等待持续时间的默认值。

    我们将航班搜索调用表示为 lambda 表达式 – List<Flight>SupplierRetry.decorateSupplier() 方法使用重试功能装饰此 Supplier。最后,我们在装饰过的 Supplier 上调用 get() 方法来进行远程调用。

    如果我们想创建一个装饰器并在代码库的不同位置重用它,我们将使用 decorateSupplier()。如果我们想创建它并立即执行它,我们可以使用 executeSupplier() 实例方法代替:

    List<Flight> flights = retry.executeSupplier(
      () -> service.searchFlights(request));
    这是显示第一个请求失败然后第二次尝试成功的示例输出:
    
    Searching for flights; current time = 20:51:34 975
    Operation failed
    Searching for flights; current time = 20:51:36 985
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]

    在已检异常上重试

    现在,假设我们要重试已检查和未检查的异常。假设我们正在调用
    FlightSearchService.searchFlightsThrowingException(),它可以抛出一个已检查的 Exception。由于 Supplier 不能抛出已检查的异常,我们会在这一行得到编译器错误:

    Supplier<List<Flight>> flightSearchSupplier =
      () -> service.searchFlightsThrowingException(request);

    我们可能会尝试在 lambda 表达式中处理 Exception 并返回 Collections.emptyList(),但这看起来不太好。更重要的是,由于我们自己捕获 Exception,重试不再起作用:

    ExceptionSupplier<List<Flight>> flightSearchSupplier = () -> {
        try {      
          return service.searchFlightsThrowingException(request);
        } catch (Exception e) {
          // don't do this, this breaks the retry!
        }
        return Collections.emptyList();
      };

    那么当我们想要重试远程调用可能抛出的所有异常时,我们应该怎么做呢?我们可以使用
    Retry.decorateCheckedSupplier()(或 executeCheckedSupplier() 实例方法)代替 Retry.decorateSupplier()

    CheckedFunction0<List<Flight>> retryingFlightSearch =
      Retry.decorateCheckedSupplier(retry,
        () -> service.searchFlightsThrowingException(request));
    
    try {
      System.out.println(retryingFlightSearch.apply());
    } catch (...) {
      // handle exception that can occur after retries are exhausted
    }

    Retry.decorateCheckedSupplier() 返回一个 CheckedFunction0,它表示一个没有参数的函数。请注意对 CheckedFunction0 对象的 apply() 调用以调用远程操作。

    如果我们不想使用 SuppliersRetry 提供了更多的辅助装饰器方法,如 decorateFunction()decorateCheckedFunction()decorateRunnable()decorateCallable() 等,以与其他语言结构一起使用。decorate*decorateChecked* 版本之间的区别在于,decorate* 版本在 RuntimeExceptions 上重试,而 decorateChecked* 版本在 Exception 上重试。

    有条件重试

    上面的简单重试示例展示了如何在调用远程服务时遇到 RuntimeException 或已检查 Exception 时重试。在实际应用中,我们可能不想对所有异常都重试。 例如,如果我们得到一个
    AuthenticationFailedException 重试相同的请求将无济于事。当我们进行 HTTP 调用时,我们可能想要检查 HTTP 响应状态代码或在响应中查找特定的应用程序错误代码来决定是否应该重试。让我们看看如何实现这种有条件的重试。

    Predicate-based条件重试

    假设航空公司的航班服务定期初始化其数据库中的航班数据。对于给定日期的飞行数据,此内部操作需要几秒钟时间。 如果我们在初始化过程中调用当天的航班搜索,该服务将返回一个特定的错误代码 FS-167。航班搜索文档说这是一个临时错误,可以在几秒钟后重试该操作。

    让我们看看如何创建 RetryConfig

    RetryConfig config = RetryConfig.<SearchResponse>custom()
      .maxAttempts(3)
      .waitDuration(Duration.of(3, SECONDS))
      .retryOnResult(searchResponse -> searchResponse
        .getErrorCode()
        .equals("FS-167"))
      .build();

    我们使用 retryOnResult() 方法并传递执行此检查的 Predicate。这个 Predicate 中的逻辑可以像我们想要的那样复杂——它可以是对一组错误代码的检查,也可以是一些自定义逻辑来决定是否应该重试搜索。

    Exception-based条件重试

    假设我们有一个通用异常
    FlightServiceBaseException,当在与航空公司的航班服务交互期间发生任何意外时会抛出该异常。作为一般策略,我们希望在抛出此异常时重试。但是我们不想重试 SeatsUnavailableException 的一个子类 – 如果航班上没有可用座位,重试将无济于事。我们可以通过像这样创建 RetryConfig 来做到这一点:

    RetryConfig config = RetryConfig.custom()
      .maxAttempts(3)
      .waitDuration(Duration.of(3, SECONDS))
      .retryExceptions(FlightServiceBaseException.class)
      .ignoreExceptions(SeatsUnavailableException.class)
      .build();

    retryExceptions() 中,我们指定了一个异常列表。ignoreExceptions() 将重试与此列表中的异常匹配或继承的任何异常。我们把我们想忽略而不是重试的那些放入ignoreExceptions()。如果代码在运行时抛出一些其他异常,比如 IOException,它也不会被重试。

    假设即使对于给定的异常,我们也不希望在所有情况下都重试。也许我们只想在异常具有特定错误代码或异常消息中的特定文本时重试。在这种情况下,我们可以使用 retryOnException 方法:

    Predicate<Throwable> rateLimitPredicate = rle ->
      (rle instanceof  RateLimitExceededException) &&
      "RL-101".equals(((RateLimitExceededException) rle).getErrorCode());
    
    RetryConfig config = RetryConfig.custom()
      .maxAttempts(3)
      .waitDuration(Duration.of(1, SECONDS))
      .retryOnException(rateLimitPredicate)
      build();

    与 predicate-based (基于谓词)的条件重试一样,谓词内的检查可以根据需要复杂化。

    退避策略

    到目前为止,我们的示例有固定的重试等待时间。通常我们希望在每次尝试后增加等待时间——这是为了让远程服务有足够的时间在当前过载的情况下进行恢复。我们可以使用 IntervalFunction 来做到这一点。

    IntervalFunction 是一个函数式接口——它是一个以尝试次数为参数并以毫秒为单位返回等待时间的 Function

    随机间隔

    这里我们指定尝试之间的随机等待时间:

    RetryConfig config = RetryConfig.custom()
    .maxAttempts(4)
    .intervalFunction(IntervalFunction.ofRandomized(2000))
    .build();
    

    IntervalFunction.ofRandomized() 有一个关联的 randomizationFactor。我们可以将其设置为 ofRandomized() 的第二个参数。如果未设置,则采用默认值 0.5。这个 randomizationFactor 决定了随机值的分布范围。因此,对于上面的默认值 0.5,生成的等待时间将介于 1000 毫秒(2000 – 2000 0.5)和 3000 毫秒(2000 + 2000 0.5)之间。

    这种行为的示例输出如下:

    Searching for flights; current time = 20:27:08 729
    Operation failed
    Searching for flights; current time = 20:27:10 643
    Operation failed
    Searching for flights; current time = 20:27:13 204
    Operation failed
    Searching for flights; current time = 20:27:15 236
    Flight search successful
    [Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'},...]

    指数间隔

    对于指数退避,我们指定两个值 – 初始等待时间和乘数。在这种方法中,由于乘数,等待时间在尝试之间呈指数增长。例如,如果我们指定初始等待时间为 1 秒,乘数为 2,则重试将在 1 秒、2 秒、4 秒、8 秒、16 秒等之后进行。当客户端是后台作业或守护进程时,此方法是推荐的方法。

    以下是我们如何为指数退避创建 RetryConfig

    RetryConfig config = RetryConfig.custom()
    .maxAttempts(6)
    .intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
    .build();

    这种行为的示例输出如下:

    Searching for flights; current 
    time = 20:37:02 684
    
    Operation failed
    
    Searching for flights; current time = 20:37:03 727
    
    Operation failed
    
    Searching for flights; current time = 20:37:05 731
    
    Operation failed
    
    Searching for flights; current time = 20:37:09 731
    
    Operation failed
    
    Searching for flights; current time = 20:37:17 731

    IntervalFunction 还提供了一个 exponentialRandomBackoff() 方法,它结合了上述两种方法。我们还可以提供 IntervalFunction 的自定义实现。

    重试异步操作

    直到现在我们看到的例子都是同步调用。让我们看看如何重试异步操作。假设我们像这样异步搜索航班:

    CompletableFuture.supplyAsync(() -> service.searchFlights(request))
      .thenAccept(System.out::println);

    searchFlight() 调用发生在不同的线程上,当它返回时,返回的 List<Flight> 被传递给 thenAccept(),它只是打印它。

    我们可以使用 Retry 对象上的 executeCompletionStage() 方法对上述异步操作进行重试。 此方法采用两个参数 – 一个 ScheduledExecutorService 将在其上安排重试,以及一个 Supplier<CompletionStage> 将被装饰。它装饰并执行 CompletionStage,然后返回一个 CompletionStage,我们可以像以前一样调用 thenAccept

    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    
    Supplier<CompletionStage<List<Flight>>> completionStageSupplier =
      () -> CompletableFuture.supplyAsync(() -> service.searchFlights(request));
    
    retry.executeCompletionStage(scheduler, completionStageSupplier)
    .thenAccept(System.out::println);

    在实际应用程序中,我们将使用共享线程池 (
    Executors.newScheduledThreadPool()) 来调度重试,而不是此处显示的单线程调度执行器。

    重试事件

    在所有这些例子中,装饰器都是一个黑盒子——我们不知道什么时候尝试失败了,框架代码正在尝试重试。假设对于给定的请求,我们想要记录一些详细信息,例如尝试计数或下一次尝试之前的等待时间。 我们可以使用在不同执行点发布的重试事件来做到这一点。Retry 有一个 EventPublisher,它具有 onRetry()onSuccess() 等方法。

    我们可以通过实现这些监听器方法来收集和记录详细信息:

    Retry.EventPublisher publisher = retry.getEventPublisher();
    
    publisher.onRetry(event -> System.out.println(event.toString()));
    
    publisher.onSuccess(event -> System.out.println(event.toString()));

    类似地,RetryRegistry 也有一个 EventPublisher,它在 Retry 对象被添加或从注册表中删除时发布事件。

    重试指标

    Retry 维护计数器以跟踪操作的次数

    1. 第一次尝试成功
    2. 重试后成功
    3. 没有重试就失败了
    4. 重试后仍失败

    每次执行装饰器时,它都会更新这些计数器。

    为什么要捕获指标?

    捕获并定期分析指标可以让我们深入了解上游服务的行为。它还可以帮助识别瓶颈和其他潜在问题

    例如,如果我们发现某个操作通常在第一次尝试时失败,我们可以调查其原因。如果我们发现我们的请求在建立连接时受到限制或超时,则可能表明远程服务需要额外的资源或容量。

    如何捕获指标?

    Resilience4j 使用 Micrometer 发布指标。Micrometer 为监控系统(如 Prometheus、Azure Monitor、New Relic 等)提供了仪表客户端的外观。因此我们可以将指标发布到这些系统中的任何一个或在它们之间切换,而无需更改我们的代码。

    首先,我们像往常一样创建 RetryConfigRetryRegistryRetry。然后,我们创建一个 MeterRegistry 并将 etryRegistry 绑定到它:

    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    
    TaggedRetryMetrics.ofRetryRegistry(retryRegistry).bindTo(meterRegistry);

    运行几次可重试操作后,我们显示捕获的指标:

    Consumer<Meter> meterConsumer = meter -> {
      String desc = meter.getId().getDescription();
      String metricName = meter.getId().getTag("kind");
      Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
        .filter(m -> m.getStatistic().name().equals("COUNT"))
        .findFirst()
        .map(m -> m.getValue())
        .orElse(0.0);
      System.out.println(desc + " - " + metricName + ": " + metricValue);
    };
    meterRegistry.forEachMeter(meterConsumer);

    一些示例输出如下:

    The number of successful calls without a retry attempt - successful_without_retry: 4.0
    
    The number of failed calls without a retry attempt - failed_without_retry: 0.0
    
    The number of failed calls after a retry attempt - failed_with_retry: 0.0
    
    The number of successful calls after a retry attempt - successful_with_retry: 6.0

    当然,在实际应用中,我们会将数据导出到监控系统并在仪表板上查看。

    重试时的注意事项和良好实践

    服务通常提供具有内置重试机制的客户端库或 SDK。对于云服务尤其如此。 例如,Azure CosmosDB 和 Azure 服务总线为客户端库提供内置重试工具。 它们允许应用程序设置重试策略来控制重试行为。

    在这种情况下,最好使用内置的重试而不是我们自己的编码。如果我们确实需要自己编写,我们应该禁用内置的默认重试策略 – 否则,它可能导致嵌套重试,其中应用程序的每次尝试都会导致客户端库的多次尝试

    一些云服务记录瞬时错误代码。例如,Azure SQL 提供了它期望数据库客户端重试的错误代码列表。在决定为特定操作添加重试之前,最好检查一下服务提供商是否有这样的列表。

    另一个好的做法是将我们在 RetryConfig 中使用的值(例如最大尝试次数、等待时间和可重试错误代码和异常)作为我们服务之外的配置进行维护。如果我们发现新的暂时性错误或者我们需要调整尝试之间的间隔,我们可以在不构建和重新部署服务的情况下进行更改。

    通常在重试时,框架代码中的某处可能会发生 Thread.sleep()。对于在重试之间有等待时间的同步重试就是这种情况。如果我们的代码在 Web 应用程序的上下文中运行,则 Thread 很可能是 Web 服务器的请求处理线程。因此,如果我们进行过多的重试,则会降低应用程序的吞吐量

    结论

    在本文中,我们了解了 Resilience4j 是什么,以及如何使用它的重试模块使我们的应用程序可以在应对临时错误具备弹性。我们研究了配置重试的不同方法,以及在不同方法之间做出决定的一些示例。我们学习了一些在实施重试时要遵循的良好实践,以及收集和分析重试指标的重要性。

    您可以使用 GitHub 上的代码尝试一个完整的应用程序来演示这些想法。


    本文译自: Implementing Retry with Resilience4j – Reflectoring

    在 Spring Boot 中使用搜索引擎 Elasticsearch


    Elasticsearch 建立在 Apache Lucene 之上,于 2010 年由 Elasticsearch NV(现为 Elastic)首次发布。据 Elastic 网站称,它是一个分布式开源搜索和分析引擎,适用于所有类型的数据,包括文本、数值 、地理空间、结构化和非结构化。Elasticsearch 操作通过 REST API 实现。主要功能是:

    • 将文档存储在索引中,
    • 使用强大的查询搜索索引以获取这些文档,以及
    • 对数据运行分析函数。

    Spring Data Elasticsearch 提供了一个简单的接口来在 Elasticsearch 上执行这些操作,作为直接使用 REST API 的替代方法
    在这里,我们将使用 Spring Data Elasticsearch 来演示 Elasticsearch 的索引和搜索功能,并在最后构建一个简单的搜索应用程序,用于在产品库存中搜索产品。

    代码示例

    本文附有 GitHub 上的工作代码示例。

    Elasticsearch 概念

    Elasticsearch 概念
    了解 Elasticsearch 概念的最简单方法是用数据库进行类比,如下表所示:

    Elasticsearch -> 数据库
    索引 ->
    文档 ->
    文档 ->

    我们要搜索或分析的任何数据都作为文档存储在索引中。在 Spring Data 中,我们以 POJO 的形式表示一个文档,并用注解对其进行修饰以定义到 Elasticsearch 文档的映射。

    与数据库不同,存储在 Elasticsearch 中的文本首先由各种分析器处理。默认分析器通过常用单词分隔符(如空格和标点符号)拆分文本,并删除常用英语单词。

    如果我们存储文本“The sky is blue”,分析器会将其存储为包含“术语”“sky”和“blue”的文档。我们将能够使用“blue sky”、“sky”或“blue”形式的文本搜索此文档,并将匹配程度作为分数。

    除了文本之外,Elasticsearch 还可以存储其他类型的数据,称为 Field Type(字段类型),如文档中 mapping-types (映射类型)部分所述。

    启动 Elasticsearch 实例

    在进一步讨论之前,让我们启动一个 Elasticsearch 实例,我们将使用它来运行我们的示例。有多种运行 Elasticsearch 实例的方法:

    • 使用托管服务
    • 使用来自 AWS 或 Azure 等云提供商的托管服务
    • 通过在虚拟机集群中自己安装 Elasticsearch
    • 运行 Docker 镜像
      我们将使用来自 Dockerhub 的 Docker 镜像,这对于我们的演示应用程序来说已经足够了。让我们通过运行 Docker run 命令来启动 Elasticsearch 实例:
    docker run -p 9200:9200 \
      -e "discovery.type=single-node" \
      docker.elastic.co/elasticsearch/elasticsearch:7.10.0

    执行此命令将启动一个 Elasticsearch 实例,侦听端口 9200。我们可以通过点击 URL http://localhost:9200 来验证实例状态,并在浏览器中检查结果输出:

    {
      "name" : "8c06d897d156",
      "cluster_name" : "docker-cluster",
      "cluster_uuid" : "Jkx..VyQ",
      "version" : {
      "number" : "7.10.0",
      ...
      },
      "tagline" : "You Know, for Search"
    }

    如果我们的 Elasticsearch 实例启动成功,应该看到上面的输出。

    使用 REST API 进行索引和搜索

    Elasticsearch 操作通过 REST API 访问。 有两种方法可以将文档添加到索引中:

    • 一次添加一个文档,或者
    • 批量添加文档。

    添加单个文档的 API 接受一个文档作为参数。

    对 Elasticsearch 实例的简单 PUT 请求用于存储文档如下所示:

    PUT /messages/_doc/1
    {
      "message": "The Sky is blue today"
    }

    这会将消息 – “The Sky is blue today”存储为“messages”的索引中的文档。

    我们可以使用发送到搜索 REST API 的搜索查询来获取此文档:

    GET /messages/search
    {
      "query":
      {
      "match": {"message": "blue sky"}
      }
    }

    这里我们发送一个 match 类型的查询来获取匹配字符串“blue sky”的文档。我们可以通过多种方式指定用于搜索文档的查询。Elasticsearch 提供了一个基于 JSON 的 查询 DSL(Domain Specific Language – 领域特定语言)来定义查询。

    对于批量添加,我们需要提供一个包含类似以下代码段的条目的 JSON 文档:

    POST /_bulk
    {"index":{"_index":"productindex"}}{"_class":"..Product","name":"Corgi Toys .. Car",..."manufacturer":"Hornby"}{"index":{"_index":"productindex"}}{"_class":"..Product","name":"CLASSIC TOY .. BATTERY"...,"manufacturer":"ccf"}

    使用 Spring Data 进行 Elasticsearch 操作

    我们有两种使用 Spring Data 访问 Elasticsearch 的方法,如下所示:

    • Repositories:我们在接口中定义方法,Elasticsearch 查询是在运行时根据方法名称生成的。
    • ElasticsearchRestTemplate:我们使用方法链和原生查询创建查询,以便在相对复杂的场景中更好地控制创建 Elasticsearch 查询。

    我们将在以下各节中更详细地研究这两种方式。

    创建应用程序并添加依赖项

    让我们首先通过包含 web、thymeleaf 和 lombok 的依赖项,使用 Spring Initializr 创建我们的应用程序。添加 thymeleaf 依赖项以便增加用户界面。

    在 Maven pom.xml 中添加 spring-data-elasticsearch 依赖项:

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-elasticsearch</artifactId>
    </dependency>

    连接到 Elasticsearch 实例

    Spring Data Elasticsearch 使用 Java High Level REST Client (JHLC) 连接到 Elasticsearch 服务器。JHLC 是 Elasticsearch 的默认客户端。我们将创建一个 Spring Bean 配置来进行设置:

    @Configuration
    @EnableElasticsearch
    Repositories(basePackages
            = "io.pratik.elasticsearch.repositories")@ComponentScan(basePackages = { "io.pratik.elasticsearch" })
    public class ElasticsearchClientConfig extends
             AbstractElasticsearchConfiguration {
      @Override
      @Bean
      public RestHighLevelClient elasticsearchClient() {
    
      final ClientConfiguration clientConfiguration =
        ClientConfiguration
          .builder()
          .connectedTo("localhost:9200")
          .build();
    
      return RestClients.create(clientConfiguration).rest();
      }
    }

    在这里,我们连接到我们之前启动的 Elasticsearch 实例。我们可以通过添加更多属性(例如启用 ssl、设置超时等)来进一步自定义连接。

    为了调试和诊断,我们将在 logback-spring.xml 的日志配置中打开传输级别的请求/响应日志:

    public class Product {
      @Id
      private String id;
    
      @Field(type = FieldType.Text, name = "name")
      private String name;
    
      @Field(type = FieldType.Double, name = "price")
      private Double price;
    
      @Field(type = FieldType.Integer, name = "quantity")
      private Integer quantity;
    
      @Field(type = FieldType.Keyword, name = "category")
      private String category;
    
      @Field(type = FieldType.Text, name = "desc")
      private String description;
    
      @Field(type = FieldType.Keyword, name = "manufacturer")
      private String manufacturer;
    
      ...
    }

    表达文档

    在我们的示例中,我们将按名称、品牌、价格或描述搜索产品。因此,为了将产品作为文档存储在 Elasticsearch 中,我们将产品表示为 POJO,并加上 Field 注解以配置 Elasticsearch 的映射,如下所示:

    public class Product {
      @Id
      private String id;
    
      @Field(type = FieldType.Text, name = "name")
      private String name;
    
      @Field(type = FieldType.Double, name = "price")
      private Double price;
    
      @Field(type = FieldType.Integer, name = "quantity")
      private Integer quantity;
    
      @Field(type = FieldType.Keyword, name = "category")
      private String category;
    
      @Field(type = FieldType.Text, name = "desc")
      private String description;
    
      @Field(type = FieldType.Keyword, name = "manufacturer")
      private String manufacturer;
    
      ...
    }

    @Document 注解指定索引名称。

    @Id 注解使注解字段成为文档的 _id,作为此索引中的唯一标识符。id 字段有 512 个字符的限制。

    @Field 注解配置字段的类型。我们还可以将名称设置为不同的字段名称。

    在 Elasticsearch 中基于这些注解创建了名为 productindex 的索引。

    使用 Spring Data Repository 进行索引和搜索

    存储库提供了使用 finder 方法访问 Spring Data 中数据的最方便的方法。Elasticsearch 查询是根据方法名称创建的。但是,我们必须小心避免产生低效的查询并给集群带来高负载。

    让我们通过扩展 ElasticsearchRepository 接口来创建一个 Spring Data 存储库接口:

    public interface ProductRepository
        extends ElasticsearchRepository<Product, String> {
    
    }

    此处 ProductRepository 类继承了 ElasticsearchRepository 接口中包含的 save()saveAll()find()findAll() 等方法。

    索引

    我们现在将通过调用 save() 方法存储一个产品,调用 saveAll() 方法来批量索引,从而在索引中存储一些产品。在此之前,我们将存储库接口放在一个服务类中:

    @Service
    public class ProductSearchServiceWithRepo {
    
      private ProductRepository productRepository;
    
      public void createProductIndexBulk(final List<Product> products) {
        productRepository.saveAll(products);
      }
    
      public void createProductIndex(final Product product) {
        productRepository.save(product);
      }
    }

    当我们从 JUnit 调用这些方法时,我们可以在跟踪日志中看到 REST API 调用索引和批量索引。

    搜索

    为了满足我们的搜索要求,我们将向存储库接口添加 finder 方法:

    public interface ProductRepository
        extends ElasticsearchRepository<Product, String> {
      List<Product> findByName(String name);
    
      List<Product> findByNameContaining(String name);
      List<Product> findByManufacturerAndCategory
           (String manufacturer, String category);
    }

    在使用 JUnit 运行 findByName() 方法时,我们可以看到在发送到服务器之前在跟踪日志中生成的 Elasticsearch 查询:

    TRACE Sending request POST /productindex/_search? ..:
    Request body: {.."query":{"bool":{"must":[{"query_string":{"query":"apple","fields":["name^1.0"],..}

    类似地,通过运行
    findByManufacturerAndCategory() 方法,我们可以看到使用两个 query_string 参数对应两个字段——“manufacturer”和“category”生成的查询:

    TRACE .. Sending request POST /productindex/_search..:
    Request body: {.."query":{"bool":{"must":[{"query_string":{"query":"samsung","fields":["manufacturer^1.0"],..}},{"query_string":{"query":"laptop","fields":["category^1.0"],..}}],..}},"version":true}

    有多种方法命名模式可以生成各种 Elasticsearch 查询。

    使用 ElasticsearchRestTemplate进行索引和搜索

    当我们需要更多地控制我们设计查询的方式,或者团队已经掌握了 Elasticsearch 语法时,Spring Data 存储库可能就不再适合。

    在这种情况下,我们使用 ElasticsearchRestTemplate。它是 Elasticsearch 基于 HTTP 的新客户端,取代以前使用节点到节点二进制协议的 TransportClient。

    ElasticsearchRestTemplate 实现了接口 ElasticsearchOperations,该接口负责底层搜索和集群操的繁杂工作。

    索引

    该接口具有用于添加单个文档的方法 index() 和用于向索引添加多个文档的 bulkIndex() 方法。此处的代码片段显示了如何使用 bulkIndex() 将多个产品添加到索引“productindex”:

    @Service
    @Slf4j
    public class ProductSearchService {
    
      private static final String PRODUCT_INDEX = "productindex";
      private ElasticsearchOperations elasticsearchOperations;
    
      public List<String> createProductIndexBulk
                (final List<Product> products) {
    
          List<IndexQuery> queries = products.stream()
          .map(product->
            new IndexQueryBuilder()
            .withId(product.getId().toString())
            .withObject(product).build())
          .collect(Collectors.toList());;
    
          return elasticsearchOperations
          .bulkIndex(queries,IndexCoordinates.of(PRODUCT_INDEX));
      }
      ...
    }

    要存储的文档包含在 IndexQuery 对象中。bulkIndex() 方法将 IndexQuery 对象列表和包含在 IndexCoordinates 中的 Index 名称作为输入。当我们执行此方法时,我们会获得批量请求的 REST API 跟踪:

    Sending request POST /_bulk?timeout=1m with parameters:
    Request body: {"index":{"_index":"productindex","_id":"383..35"}}{"_class":"..Product","id":"383..35","name":"New Apple..phone",..manufacturer":"apple"}
    ..
    {"_class":"..Product","id":"d7a..34",.."manufacturer":"samsung"}

    接下来,我们使用 index() 方法添加单个文档:

    @Service
    @Slf4j
    public class ProductSearchService {
    
      private static final String PRODUCT_INDEX = "productindex";
    
      private ElasticsearchOperations elasticsearchOperations;
    
      public String createProductIndex(Product product) {
    
        IndexQuery indexQuery = new IndexQueryBuilder()
             .withId(product.getId().toString())
             .withObject(product).build();
    
        String documentId = elasticsearchOperations
         .index(indexQuery, IndexCoordinates.of(PRODUCT_INDEX));
    
        return documentId;
      }
    }

    跟踪相应地显示了用于添加单个文档的 REST API PUT 请求。

    Sending request PUT /productindex/_doc/59d..987..:
    Request body: {"_class":"..Product","id":"59d..87",..,"manufacturer":"dell"}

    搜索

    ElasticsearchRestTemplate 还具有 search() 方法,用于在索引中搜索文档。此搜索操作类似于 Elasticsearch 查询,是通过构造 Query 对象并将其传递给搜索方法来构建的。

    Query 对象具有三种变体 – NativeQueryyStringQueryCriteriaQuery,具体取决于我们如何构造查询。让我们构建一些用于搜索产品的查询。

    NativeQuery

    NativeQuery 为使用表示 Elasticsearch 构造(如聚合、过滤和排序)的对象构建查询提供了最大的灵活性。这是用于搜索与特定制造商匹配的产品的 NativeQuery

    @Service
    @Slf4j
    public class ProductSearchService {
    
      private static final String PRODUCT_INDEX = "productindex";
      private ElasticsearchOperations elasticsearchOperations;
    
      public void findProductsByBrand(final String brandName) {
    
        QueryBuilder queryBuilder =
          QueryBuilders
          .matchQuery("manufacturer", brandName);
    
        Query searchQuery = new NativeSearchQueryBuilder()
          .withQuery(queryBuilder)
          .build();
    
        SearchHits<Product> productHits =
          elasticsearchOperations
          .search(searchQuery,
              Product.class,
              IndexCoordinates.of(PRODUCT_INDEX));
      }
    }

    在这里,我们使用 NativeSearchQueryBuilder 构建查询,该查询使用 MatchQueryBuilder 指定包含字段“制造商”的匹配查询。

    StringQuery

    StringQuery 通过允许将原生 Elasticsearch 查询用作 JSON 字符串来提供完全控制,如下所示:

    @Service
    @Slf4j
    public class ProductSearchService {
    
      private static final String PRODUCT_INDEX = "productindex";
      private ElasticsearchOperations elasticsearchOperations;
    
      public void findByProductName(final String productName) {
        Query searchQuery = new StringQuery(
          "{\"match\":{\"name\":{\"query\":\""+ productName + "\"}}}\"");
    
        SearchHits<Product> products = elasticsearchOperations.search(
          searchQuery,
          Product.class,
          IndexCoordinates.of(PRODUCT_INDEX_NAME));
      ...     
       }
    }

    在此代码片段中,我们指定了一个简单的 match 查询,用于获取具有作为方法参数发送的特定名称的产品。

    CriteriaQuery

    使用 CriteriaQuery,我们可以在不了解 Elasticsearch 任何术语的情况下构建查询。查询是使用带有 Criteria 对象的方法链构建的。每个对象指定一些用于搜索文档的标准:

    @Service
    @Slf4j
    public class ProductSearchService {
    
      private static final String PRODUCT_INDEX = "productindex";
    
      private ElasticsearchOperations elasticsearchOperations;
    
      public void findByProductPrice(final String productPrice) {
        Criteria criteria = new Criteria("price")
                      .greaterThan(10.0)
                      .lessThan(100.0);
    
        Query searchQuery = new CriteriaQuery(criteria);
    
        SearchHits<Product> products = elasticsearchOperations
           .search(searchQuery,
               Product.class,
               IndexCoordinates.of(PRODUCT_INDEX_NAME));
      }
    }

    在此代码片段中,我们使用 CriteriaQuery 形成查询以获取价格大于 10.0 且小于 100.0 的产品。

    构建搜索应用程序

    我们现在将向我们的应用程序添加一个用户界面,以查看产品搜索的实际效果。用户界面将有一个搜索输入框,用于按名称或描述搜索产品。输入框将具有自动完成功能,以显示基于可用产品的建议列表,如下所示:

    我们将为用户的搜索输入创建自动完成建议。然后根据与用户输入的搜索文本密切匹配的名称或描述搜索产品。我们将构建两个搜索服务来实现这个用例:

    • 获取自动完成功能的搜索建议
    • 根据用户的搜索查询处理搜索产品的搜索
      服务类 ProductSearchService 将包含搜索和获取建议的方法。

    GitHub 存储库中提供了带有用户界面的成熟应用程序。

    建立产品搜索索引

    productindex 与我们之前用于运行 JUnit 测试的索引相同。我们将首先使用 Elasticsearch REST API 删除 productindex,以便在应用程序启动期间使用从我们的 50 个时尚系列产品的示例数据集中加载的产品创建新的 productindex

    curl -X DELETE http://localhost:9200/productindex

    如果删除操作成功,我们将收到消息 "acknowledged": true

    现在,让我们为库存中的产品创建一个索引。我们将使用包含 50 种产品的示例数据集来构建我们的索引。这些产品在 CSV 文件中被排列为单独的行。

    每行都有三个属性 – id、name 和 description。我们希望在应用程序启动期间创建索引。请注意,在实际生产环境中,索引创建应该是一个单独的过程。我们将读取 CSV 的每一行并将其添加到产品索引中:

    @SpringBootApplication
    @Slf4j
    public class ProductsearchappApplication {
      ...
      @PostConstruct
      public void buildIndex() {
        esOps.indexOps(Product.class).refresh();
        productRepo.saveAll(prepareDataset());
      }
    
      private Collection<Product> prepareDataset() {
        Resource resource = new ClassPathResource("fashion-products.csv");
        ...
        return productList;
      }
    }

    在这个片段中,我们通过从数据集中读取行并将这些行传递给存储库的 saveAll() 方法以将产品添加到索引中来进行一些预处理。在运行应用程序时,我们可以在应用程序启动中看到以下跟踪日志。

    ...Sending request POST /_bulk?timeout=1m with parameters:
    Request body: {"index":{"_index":"productindex"}}{"_class":"io.pratik.elasticsearch.productsearchapp.Product","name":"Hornby 2014 Catalogue","description":"Product Desc..talogue","manufacturer":"Hornby"}{"index":{"_index":"productindex"}}{"_class":"io.pratik.elasticsearch.productsearchapp.Product","name":"FunkyBuys..","description":"Size Name:Lar..& Smoke","manufacturer":"FunkyBuys"}{"index":{"_index":"productindex"}}.
    ...

    使用多字段和模糊搜索搜索产品

    下面是我们在方法 processSearch() 中提交搜索请求时如何处理搜索请求:

    @Service
    @Slf4j
    public class ProductSearchService {
    
      private static final String PRODUCT_INDEX = "productindex";
    
      private ElasticsearchOperations elasticsearchOperations;
    
      public List<Product> processSearch(final String query) {
      log.info("Search with query {}", query);
    
      // 1. Create query on multiple fields enabling fuzzy search
      QueryBuilder queryBuilder =
        QueryBuilders
        .multiMatchQuery(query, "name", "description")
        .fuzziness(Fuzziness.AUTO);
    
      Query searchQuery = new NativeSearchQueryBuilder()
                .withFilter(queryBuilder)
                .build();
    
      // 2. Execute search
      SearchHits<Product> productHits =
        elasticsearchOperations
        .search(searchQuery, Product.class,
        IndexCoordinates.of(PRODUCT_INDEX));
    
      // 3. Map searchHits to product list
      List<Product> productMatches = new ArrayList<Product>();
      productHits.forEach(searchHit->{
        productMatches.add(searchHit.getContent());
      });
      return productMatches;
      }...
    }

    在这里,我们对多个字段执行搜索 – 名称和描述。 我们还附加了 fuzziness() 来搜索紧密匹配的文本以解释拼写错误。

    使用通配符搜索获取建议

    接下来,我们为搜索文本框构建自动完成功能。 当我们在搜索文本字段中输入内容时,我们将通过使用搜索框中输入的字符执行通配符搜索来获取建议。

    我们在 fetchSuggestions() 方法中构建此函数,如下所示:

    @Service
    @Slf4j
    public class ProductSearchService {
    
      private static final String PRODUCT_INDEX = "productindex";
    
      public List<String> fetchSuggestions(String query) {
        QueryBuilder queryBuilder = QueryBuilders
          .wildcardQuery("name", query+"*");
    
        Query searchQuery = new NativeSearchQueryBuilder()
          .withFilter(queryBuilder)
          .withPageable(PageRequest.of(0, 5))
          .build();
    
        SearchHits<Product> searchSuggestions =
          elasticsearchOperations.search(searchQuery,
            Product.class,
          IndexCoordinates.of(PRODUCT_INDEX));
    
        List<String> suggestions = new ArrayList<String>();
    
        searchSuggestions.getSearchHits().forEach(searchHit->{
          suggestions.add(searchHit.getContent().getName());
        });
        return suggestions;
      }
    }

    我们以搜索输入文本的形式使用通配符查询,并附加 * 以便如果我们输入“red”,我们将获得以“red”开头的建议。我们使用 withPageable() 方法将建议的数量限制为 5。可以在此处看到正在运行的应用程序的搜索结果的一些屏幕截图:

    结论

    在本文中,我们介绍了 Elasticsearch 的主要操作——索引文档、批量索引和搜索——它们以 REST API 的形式提供。Query DSL 与不同分析器的结合使搜索变得非常强大。

    Spring Data Elasticsearch 通过使用 Spring Data Repositories 或 ElasticsearchRestTemplate 提供了方便的接口来访问应用程序中的这些操作。

    我们最终构建了一个应用程序,在其中我们看到了如何在接近现实生活的应用程序中使用 Elasticsearch 的批量索引和搜索功能。


    Java 设计模式 Monads 的美丽世界

    让我从免责声明开始。从函数式编程的角度来看,下面的解释绝不是精确的或绝对准确的。相反,我将重点解释的清晰和简单性上,以便让尽可能多的 Java 开发人员进入这个美丽的世界。

    几年前,当我开始深入研究函数式编程时,我很快发现有大量的信息,但对于几乎完全具有命令式背景的普通 Java 开发人员来说,几乎无法理解。如今,情况正在慢慢改变。例如,有很多文章解释了例如基本的 FP 概念(参考: 实用函数式 Java (PFJ)简介)以及它们如何适用于 Java。或解释如何正确使用 Java 流的文章。但是 Monads 仍然不在这些文章的重点之外。我不知道为什么会发生这种情况,但我会努力填补这个空白。

    那么,Monad 是什么?

    Monad 是……一种设计模式。就这么简单。这种设计模式由两部分组成:

    • Monad 是一个值的容器。对于每个 Monad,都有一些方法可以将值包装到 Monad 中。
    • Monad 为内部包含的值实现了“控制反转”。为了实现这一点,Monad 提供了接受函数的方法。这些函数接受与 Monad 中存储的类型相同的值,并返回转换后的值。转换后的值被包装到与源值相同的 Monad 中。
      为了理解模式的第二部分,我们可以看看 Monad 的接口:
    interface Monad<T> {
        <R> Monad<R> map(Function<T, R> mapper);
    
        <R> Monad<R> flatMap(Function<T, Monad<R>> mapper);
    }

    当然,特定的 Monad 通常有更丰富的接口,但这两个方法绝对应该存在。

    乍一看,接受函数而不是访问值并没有太大区别。事实上,这使 Monad 能够完全控制如何以及何时应用转换功能。当您调用 getter 时,您希望立即获得值。在 Monad 转换的情况下可以立即应用或根本不应用,或者它的应用可以延迟。缺乏对内部值的直接访问使 monad 能够表示甚至尚不可用的值!

    下面我将展示一些 Monad 的例子以及它们可以解决哪些问题。

    Monad 缺失值或 Optional/Maybe 的场景

    这个 Monad 有很多名字——Maybe、Option、Optional。最后一个听起来很熟悉,不是吗? 好吧,因为 Java 8 Optional 是 Java 平台的一部分。

    不幸的是,Java Optional 实现过于尊崇传统的命令式方法,这使得它的用处不大。特别是 Optional 允许应用程序使用 .get() 方法获取值。如果缺少值,甚至会抛出 NPE。因此,Optional 的用法通常仅限于表示返回潜在的缺失值,尽管这只是潜在用法的一小部分。

    也许 Monad 的目的是表示可能会丢失的值。传统上,Java 中的这个角色是为 null 保留的。不幸的是,这会导致许多不同的问题,包括著名的 NullPointerException

    例如,如果您期望某些参数或某些返回值可以为 null,则应该在使用前检查它:

    public UserProfileResponse getUserProfileHandler(final User.Id userId) {
        final User user = userService.findById(userId);
        if (user == null) {
        return UserProfileResponse.error(USER_NOT_FOUND);
        }
    
        final UserProfileDetails details = userProfileService.findById(userId);
    
        if (details == null) {
        return UserProfileResponse.of(user, UserProfileDetails.defaultDetails());
        }
    
        return UserProfileResponse.of(user, details);
    }

    看起来熟悉吗?当然了。

    让我们看看 Option Monad 如何改变这一点(为简洁起见,使用一个静态导入):

        public UserProfileResponse getUserProfileHandler(final User.Id userId) {
            return ofNullable(userService.findById(userId))
                    .map(user -> UserProfileResponse.of(user,
                            ofNullable(userProfileService.findById(userId)).orElseGet(UserProfileDetails::defaultDetails)))
                    .orElseGet(() -> UserProfileResponse.error(USER_NOT_FOUND));
        }

    请注意,代码更加简洁,对业务逻辑的“干扰”也更少。

    这个例子展示了 monadic 的“控制反转”是多么方便:转换不需要检查 null,只有当值实际可用时才会调用它们。

    “如果/当值可用时做某事”是开始方便地使用 Monads 的关键心态。

    请注意,上面的示例保留了原始 API 的完整内容。但是更广泛地使用该方法并更改 API 是有意义的,因此它们将返回 Optional 而不是 null

        public Optional<UserProfileResponse> getUserProfileHandler4(final User.Id userId) {
            return optionalUserService.findById(userId).flatMap(
                    user -> userProfileService.findById(userId).map(profile -> UserProfileResponse.of(user, profile)));
        }

    一些观察:

    • 代码更简洁,包含几乎零样板。
    • 所有类型都是自动派生的。虽然并非总是如此,但在绝大多数情况下,类型是由编译器派生的—尽管与 Scala 相比,Java 中的类型推断较弱。
    • 没有明确的错误处理,而是我们可以专注于“快乐日子场景”。
    • 所有转换都方便地组合和链接,不会中断或干扰主要业务逻辑。
      事实上,上面的属性对于所有的 Monad 都是通用的。

    抛还是不抛是个问题

    事情并不总是如我们所愿,我们的应用程序生活在现实世界中,充满痛苦、错误和失误。有时我们可以和他们一起做点什么,有时不能。如果我们不能做任何事情,我们至少希望通知调用者事情并不像我们预期的那样进行。

    在 Java 中,我们传统上有两种机制来通知调用者问题:

    • 返回特殊值(通常为空)
    • 抛出异常
      除了返回 null 我们还可以返回 Option Monad(见上文),但这通常是不够的,因为需要更多关于错误的详细信息。通常在这种情况下我们会抛出异常。

    但是这种方法有一个问题。事实上,甚至很少有问题。

    • 异常中断执行流程

    • 异常增加了很多心理开销
      异常引起的心理开销取决于异常的类型:

    • 检查异常迫使你要么在这里处理它们,要么在签名中声明它们并将麻烦转移到调用者身上

    • 未经检查的异常会导致相同级别的问题,但编译器不支持
      不知道哪个更差。

    Either Monad 来了

    让我们先分析一下这个问题。我们想要返回的是一些特殊值,它可以是两种可能的事情之一:结果值(成功时)或错误(失败时)。请注意,这些东西是相互排斥的——如果我们返回值,则不需要携带错误,反之亦然。

    以上是对Either Monad 的几乎准确描述:任何给定的实例都只包含一个值,并且该值具有两种可能类型之一。

    任何 Monad 的接口都可以这样描述:

    interface Either<L, R> {
        <T> Either<T, R> mapLeft(Function<L, T> mapper);
    
        <T> Either<T, R> flatMapLeft(Function<L, Either<T, R>> mapper);
    
        <T> Either<L, T> mapLeft(Function<T, R> mapper);
    
        <T> Either<L, T> flatMapLeft(Function<R, Either<L, T>> mapper);
    }

    该接口相当冗长,因为它在左右值方面是对称的。对于更窄的用例,当我们需要传递成功或错误时,这意味着我们需要就某种约定达成一致——哪种类型(第一种或第二种)将保存错误,哪种将保存值。

    在这种情况下,Either 的对称性质使其更容易出错,因为很容易无意中交换代码中的错误和成功值。

    虽然这个问题很可能会被编译器捕获,但最好为这个特定用例量身定制。如果我们修复其中一种类型,就可以做到这一点。显然,修复错误类型更方便,因为 Java 程序员已经习惯于从单个 Throwable 类型派生所有错误和异常。

    Result Monad — 专门用于错误处理和传播的 Either Monad

    所以,让我们假设所有错误都实现相同的接口,我们称之为失败。现在我们可以简化和减少接口:

    interface Result<T> {
        <R> Result<R> map(Function<T, R> mapper);
    
        <R> Result<R> flatMap(Function<T, Result<R>> mapper);
    }

    Result Monad API 看起来与 Maybe Monad 的 API 非常相似。

    使用这个 Monad,我们可以重写前面的例子:

        public Result<UserProfileResponse> getUserProfileHandler(final User.Id userId) {
            return resultUserService.findById(userId).flatMap(user -> resultUserProfileService.findById(userId)
                    .map(profile -> UserProfileResponse.of(user, profile)));
        }

    好吧,它与上面的示例基本相同,唯一的变化是 Monad — Result 而不是 Optional。与前面的例子不同,我们有关于错误的完整信息,所以我们可以在上层做一些事情。但是,尽管完整的错误处理代码仍然很简单并且专注于业务逻辑。

    “承诺是一个很重要的词。它要么成就了什么,要么破坏了什么。”

    我想展示的下一个 Monad 将是 Promise Monad。

    必须承认,对于 Promise 是否是 monad,我还没有找到权威的答案。不同的作者对此有不同的看法。我纯粹是从实用的角度来看它的:它的外观和行为与其他 monad 非常相似,所以我认为它们是一个 monad。

    Promise Monad 代表一个(可能还不可用的)值。从某种意义上说,它与 Maybe Monad 非常相似。

    Promise Monad 可用于表示譬如对外部服务或数据库的请求结果、文件读取或写入等。基本上它可以表示任何需要 I/O 和时间来执行它的东西。Promise 支持与我们在其他 Monad 中观察到的相同的思维方式——“如果/当价值可用时做某事”。

    请注意,由于无法预测操作是否成功,因此让 Promise 表示的不是 value 本身而是 Result 内部带有 value 是很方便的。

    要了解它是如何工作的,让我们看一下下面的示例:

    ...
    public interface ArticleService {
        // Returns list of articles for specified topics posted by specified users
        Promise<Collection<Article>> userFeed(final Collection<Topic.Id> topics, final Collection<User.Id> users);
    }
    ...
    public interface TopicService {
        // Returns list of topics created by user
        Promise<Collection<Topic>> topicsByUser(final User.Id userId, final Order order);
    }
    ...
    public class UserTopicHandler {
        private final ArticleService articleService;
        private final TopicService topicService;
    
        public UserTopicHandler(final ArticleService articleService, final TopicService topicService) {
            this.articleService = articleService;
            this.topicService = topicService;
        }
    
        public Promise<Collection<Article>> userTopicHandler(final User.Id userId) {
            return topicService.topicsByUser(userId, Order.ANY)
                    .flatMap(topicsList -> articleService.articlesByUserTopics(userId, topicsList.map(Topic::id)));
        }
    }

    为了提供整个上下文,我包含了两个必要的接口,但实际上有趣的部分是 userTopicHandler() 方法。尽管这种方法的简单性令人怀疑:

    • 调用 TopicService 并检索由提供的用户创建的主题列表
    • 成功获取主题列表后,该方法提取主题 ID,然后调用 ArticleService,获取用户为指定主题创建的文章列表
    • 执行端到端的错误处理

      后记

      Monads 是非常强大和方便的工具。使用“当价值可用时做”的思维方式编写代码需要一些时间来习惯,但是一旦你开始使用它,它将让你的生活变得更加简单。它允许将大量的心理开销卸载给编译器,并使许多错误在编译时而不是在运行时变得不可能或可检测到。


    本文译自:Beautiful World of Monads – DEV Community

    Java Spring Boot 项目中使用结构化日志节省时间

    【注】本文译自: Saving Time with Structured Logging – Reflectoring

    日志记录是调查事件和了解应用程序中发生的事情的终极资源。每个应用程序都有某种类型的日志。

    然而,这些日志通常很混乱,分析它们需要付出很多努力。在本文中,我们将研究如何利用结构化日志来大大增加日志的价值

    我们将通过一些非常实用的技巧来提高应用程序日志数据的价值,并使用 Logz.io 作为日志平台来查询日志。

    代码示例

    本文附有 GitHub 上的工作代码示例。

    什么是结构化日志?

    “正常”日志是非结构化的。它们通常包含一个消息字符串:

    2021-08-08 18:04:14.721 INFO 12402 --- [ main] i.r.s.StructuredLoggingApplication : Started StructuredLoggingApplication in 0.395 seconds (JVM running for 0.552)

    此消息包含我们在调查事件或分析问题时希望获得的所有信息:

    • 日志事件的日期
    • 创建日志事件的记录器的名称,以及
    • 日志消息本身。
      所有信息都在该日志消息中,但很难查询这些信息!由于所有信息都在一个字符串中,如果我们想从日志中获取特定信息,就必须解析和搜索这个字符串。

    例如,如果我们只想查看特定记录器的日志,则日志服务器必须解析所有日志消息,检查它们是否具有识别记录器的特定模式,然后根据所需的记录器过滤日志消息。

    结构化日志包含相同的信息,但采用结构化形式而不是非结构化字符串。通常,结构化日志以 JSON 格式呈现:

    {
        "timestamp": "2021-08-08 18:04:14.721",
        "level": "INFO",
        "logger": "io.reflectoring....StructuredLoggingApplication",
        "thread": "main",
        "message": "Started StructuredLoggingApplication ..."
    }

    这种 JSON 结构允许日志服务器有效地存储,更重要的是检索日志。

    例如,现在可以通过 timestamplogger 轻松过滤日志,而且搜索比解析特定模式的字符串更有效。

    但是结构化日志的价值并不止于此:我们可以根据需要向结构化日志事件中添加任何自定义字段! 我们可以添加上下文信息来帮助我们识别问题,或者我们可以向日志添加指标。

    凭借我们现在触手可及的所有数据,我们可以创建强大的日志查询和仪表板,即使我们刚在半夜醒来调查事件,我们也能找到所需的信息。

    现在让我们看几个用例,它们展示了结构化日志记录的强大功能。

    为所有日志事件添加代码路径

    我们首先要看的是代码路径。每个应用程序通常有几个不同的路径,传入请求可以通过应用程序。考虑这个图:

    Java Spring Boot 项目中使用结构化日志节省时间
    此示例具有(至少)三种不同的代码路径,传入请求可以采用这些路径:

    • 用户代码路径:用户正在从他们的浏览器使用应用程序。浏览器向 Web 控制器发送请求,控制器调用领域代码。
    • 第三方系统代码路径:应用程序的 HTTP API 也从第三方系统调用。在这个例子中,第三方系统调用与用户浏览器相同的 web 控制器。
    • 计时器代码路径:与许多应用程序一样,此应用程序有一些由计时器触发的计划任务。
      这些代码路径中的每一个都可以具有不同的特征。域服务涉及所有三个代码路径。在涉及域服务错误的事件期间,了解导致错误的代码路径将大有帮助!

    如果我们不知道代码路径,我们很容易在事件调查期间做出毫无结果的猜测。

    所以,我们应该将代码路径添加到日志中!以下是我们如何使用 Spring Boot 做到这一点。

    为传入的 Web 请求添加代码路径

    在 Java 中,SLF4J 日志库提供了 MDC 类(消息诊断上下文)。这个类允许我们向在同一线程中发出的所有日志事件添加自定义字段。

    要为每个传入的 Web 请求添加自定义字段,我们需要构建一个拦截器,在每个请求的开头添加 codePath 字段,甚至在我们的 Web 控制器代码执行之前。

    我们可以通过实现 HandlerInterceptor 接口来做到这一点:

    public class LoggingInterceptor implements HandlerInterceptor {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
                throws Exception {
    
            if (request.getHeader("X-CUSTOM-HEADER") != null) {
                MDC.put("codePath", "3rdParty");
            } else {
                MDC.put("codePath", "user");
            }
    
            return true;
        }
    
        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
                ModelAndView modelAndView) {
            MDC.remove("codePath");
        }
    }

    在 preHandle() 方法中,我们调用 MDC.put() 将 codePath 字段添加到所有日志事件中。如果请求包含标识请求来自第三方方系统的标头,我们将代码路径设置为 3rdParty,否则,我们假设请求来自用户的浏览器。

    根据应用的不同,这里的逻辑可能会有很大的不同,当然,这只是一个例子。

    postHandle() 方法中,我们不应该忘记调用 MDC.remove() 再次删除所有先前设置的字段,否则线程仍会保留这些字段,即使它返回到线程池,以及下一个请求 由该线程提供服务的那些字段可能仍然设置为错误的值。

    要激活拦截器,我们需要将其添加到 InterceptorRegistry 中:

    @Componentpublic
    class WebConfigurer implements WebMvcConfigurer {
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(new LoggingInterceptor());
        }
    }

    就是这样。在传入日志事件的线程中发出的所有日志事件现在都具有 codePath 字段。

    如果任何请求创建并启动子线程,请确保在新线程生命周期开始时调用 MDC.put()

    在计划作业中添加代码路径

    在 Spring Boot 中,我们可以通过使用 @Scheduled@EnableScheduling 注解轻松创建计划作业。

    要将代码路径添加到日志中,我们需要确保调用 MDC.put() 作为调度方法中的第一件事:

    @Componentpublic
    class Timer {
    
        private final DomainService domainService;
    
        private static final Logger logger = LoggerFactory.getLogger(Timer.class);
    
        public Timer(DomainService domainService) {
            this.domainService = domainService;
        }
    
        @Scheduled(fixedDelay = 5000)
        void scheduledHello() {
            MDC.put("codePath", "timer");
            logger.info("log event from timer");
            // do some actual work
            MDC.remove("codePath");
        }
    
    }

    这样,从执行调度方法的线程发出的所有日志事件都将包含字段 codePath。我们也可以创建我们自己的 @Job 注解或类似的注解来为我们完成这项工作,但这超出了本文的范围。

    为了使预定作业的日志更有价值,我们可以添加其他字段:

    • job_status:指示作业是否成功的状态。
    • job_id:已执行作业的 ID。
    • job_records_processed:如果作业进行一些批处理,它可以记录处理的记录数。
    • ……
      通过日志中的这些字段,我们可以在日志服务器获取到很多有用的信息!

    将用户 ID 添加到用户启动的日志事件

    典型 Web 应用程序中的大部分工作是在来自用户浏览器的 Web 请求中完成的,这些请求会触发应用程序中的线程,为浏览器创建响应。

    想象一下发生了一些错误,日志中的堆栈跟踪显示它与特定的用户配置有关。但是我们不知道请求来自哪个用户!

    为了缓解这种情况,在用户触发的所有日志事件中包含某种用户 ID 是非常有帮助的

    由于我们知道传入的 Web 请求大多直接来自用户的浏览器,因此我们可以在创建的同一个 LoggingInterceptor 中添加 username 字段以添加 codePath 字段:

    public class LoggingInterceptor implements HandlerInterceptor {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
                throws Exception {
    
            Object principal = SecurityContextHolder.getContext().getAuthentication().getPrincipal();
    
            if (principal instanceof UserDetails) {
                String username = ((UserDetails) principal).getUsername();
                MDC.put("username", username);
            } else {
                String username = principal.toString();
                MDC.put("username", username);
            }
    
            return true;
        }
    
        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
                ModelAndView modelAndView) {
            MDC.remove("username");
        }
    }

    这段代码假设我们使用 Spring Security 来管理对 Web 应用程序的访问。我们使用 SecurityContextHolder 来获取 Principal 并从中提取用户名以将其传递给 MDC.put()

    从服务请求的线程发出的每个日志事件现在都将包含用户名字段和用户名。

    有了这个字段,我们现在可以过滤特定用户请求的日志。如果用户报告了问题,我们可以根据他们的姓名过滤日志,并极大地减少我们必须查看的日志。

    根据规定,您可能希望记录更不透明的用户 ID 而不是用户名。

    为错误日志事件添加根本原因

    当我们的应用程序出现错误时,我们通常会记录堆栈跟踪。堆栈跟踪帮助我们确定错误的根本原因。如果没有堆栈跟踪,我们将不知道是哪个代码导致了错误!

    但是,如果我们想在应用程序中运行错误统计信息,堆栈跟踪是非常笨拙的。假设我们想知道我们的应用程序每天总共记录了多少错误,以及其中有多少是由哪个根本原因异常引起的。我们必须从日志中导出所有堆栈跟踪,并对它们进行一些手动过滤,才能得到该问题的答案!

    但是,如果我们将自定义字段 rootCause 添加到每个错误日志事件,我们可以通过该字段过滤日志事件,然后在日志服务器的 UI 中创建不同根本原因的直方图或饼图,甚至无需导出数据。

    在 Spring Boot 中执行此操作的一种方法是创建一个 @ExceptionHandle

    @ControllerAdvicepublic
    class WebExceptionHandler {
    
        private static final Logger logger = LoggerFactory.getLogger(WebExceptionHandler.class);
    
        @ExceptionHandler(Exception.class)
        @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
        public void internalServerError(Exception e) {
            MDC.put("rootCause", getRootCause(e).getClass().getName());
            logger.error("returning 500 (internal server error).", e);
            MDC.remove("rootCause");
        }
    
        private Throwable getRootCause(Exception e) {
            Throwable rootCause = e;
            while (e.getCause() != null && rootCause.getCause() != rootCause) {
                rootCause = e.getCause();
            }
            return rootCause;
        }
    
    }

    我们创建了一个用 @ControllerAdvice 注解的类,这意味着它在我们所有的 web 控制器中都是有效的。

    在类中,我们创建了一个用 @ExceptionHandler 注解的方法。对于任何 Web 控制器中出现的异常,都会调用此方法。它将 rootCause MDC 字段设置为导致错误的异常类的完全限定名称,然后记录异常的堆栈跟踪。

    就是这样。所有打印堆栈跟踪的日志事件现在都有一个字段 rootCause,我们可以通过这个字段进行过滤以了解我们应用程序中的错误分布。

    向所有日志事件添加跟踪 ID

    如果我们运行多个服务,例如在微服务环境中,分析错误时事情会很快变得复杂。一个服务调用另一个服务,另一个服务调用再一个服务,并且很难(如果可能的话)跟踪一个服务中的错误到另一个服务中的错误。

    跟踪 ID 有助于连接一个服务中的日志事件和另一个服务中的日志事件:

    在上面的示例图中,服务 1 被调用并生成跟踪 ID“1234”。然后它调用服务 2 和 3,将相同的跟踪 ID 传播给它们,以便它们可以将相同的跟踪 ID 添加到其日志事件中,从而可以通过搜索特定的跟踪 ID 来连接所有服务的日志事件。

    对于每个传出请求,服务 1 还会创建一个唯一的“跨度 ID”。虽然跟踪跨越服务 1 的整个请求/响应周期,但跨度仅跨越一个服务和另一个服务之间的请求/响应周期。

    我们可以自己实现这样的跟踪机制,但是有一些跟踪标准和工具可以使用这些标准集成到跟踪系统中,例如 Logz.io 的分布式跟踪功能

    我们还是使用标准工具吧。在 Spring Boot 世界中,这就是 Spring Cloud Sleuth,我们可以通过简单地将它添加到我们的 pom.xml,从而把该功能集成到我们的应用程序中:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-dependencies</artifactId>
          <version>2020.0.3</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement><dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-sleuth</artifactId>
      </dependency>
    </dependencies>

    这会自动将跟踪和跨度 ID 添加到我们的日志中,并在使用支持的 HTTP 客户端时通过请求标头将它们从一个服务传播到下一个服务。您可以在“使用 Spring Cloud Sleuth 在分布式系统中进行跟踪”一文中阅读有关 Spring Cloud Sleuth 的更多信息。

    添加某些代码路径的持续时间

    我们的应用程序响应请求所需的总持续时间是一个重要的指标。如果速度太慢,用户会感到沮丧。

    通常,将请求持续时间作为指标公开并创建显示请求持续时间的直方图和百分位数的仪表板是一个好主意,这样我们就可以一目了然地了解应用程序的健康状况,甚至可能在违反某个阈值时收到警报。

    然而,我们并不是一直在查看仪表板,我们可能不仅对总请求持续时间感兴趣,而且对某些代码路径的持续时间感兴趣。在分析日志以调查问题时,了解代码中特定路径执行所需的时间可能是一个重要线索。

    在 Java 中,我们可能会这样做:

    void callThirdPartyService() throws InterruptedException {
        logger.info("log event from the domain service");
        Instant start=Instant.now();
        Thread.sleep(2000); // simulating an expensive operation
        Duration duration=Duration.between(start,Instant.now());
        MDC.put("thirdPartyCallDuration",String.valueOf(duration.getNano()));
        logger.info("call to third-party service successful!");
        MDC.remove("thirdPartyCallDuration");
    }

    假设我们正在调用第三方服务并希望将持续时间添加到日志中。使用 Instant.now()Duration.between(),我们计算持续时间,将其添加到 MDC,然后创建日志事件。

    这个日志事件现在将包含字段 thirdPartyCallDuration,我们可以在日志中过滤和搜索该字段。 例如,我们可能会搜索这个调用耗时过长的实例。 然后,我们可以使用用户 ID 或跟踪 ID,当这需要特别长的时间时,我们也可以将它们作为日志事件的字段来找出模式。

    在Logz.io中查询结构化日志

    如果我们按照关于 per-environment logging 的文章中的描述设置了日志记录到 Logz.io,我们现在可以在 Logz.io 提供的 Kibana UI 中查询日志。

    错误分布

    例如,我们可以查询在 rootCause 字段中具有值的所有日志事件:

    __exists__: "rootCause"

    这将显示具有根本原因的错误事件列表。

    我们还可以在 Logz.io UI 中创建一个可视化来显示给定时间范围内的错误分布:

    此图表显示几乎一半的错误是由 ThingyException 引起的,因此检查是否可以以某种方式避免此异常可能是个好主意。如果无法避免,我们应该将其记录在 WARN 而不是 ERROR 上,以保持错误日志的清洁。

    跨代码路径的错误分布

    例如,假设用户抱怨预定的作业没有正常工作。如果我们在调度方法代码中添加了一个 job_status 字段,我们可以通过那些失败的作业来过滤日志:

    job_status: "ERROR"

    为了获得更高级的视图,我们可以创建另一个饼图可视化,显示 job_statusrootCause 的分布:

    我们现在可以看到大部分预定的作业都失败了!我们应该为此添加一些警报! 我们还可以查看哪些异常是大多数计划作业的根本原因并开始调查。

    检查用户的错误

    或者,假设用户名为 “user” 的用户提出了一个支持请求,指定了它发生的大致日期和时间。我们可以使用查询 username: user 过滤日志以仅显示该用户的日志,并且可以快速将用户问题的原因归零。

    我们还可以扩展查询以仅显示具有 rootCause 的该用户的日志事件,以直接了解何时出了什么问题。

    username: "user" AND _exists_: "rootCause"

    结构化您的日志

    本文仅展示了几个示例,说明我们如何向日志事件添加结构并在查询日志时使用该结构。以后可以在日志中搜索的任何内容都应该是日志事件中的自定义字段。添加到日志事件中的字段在很大程度上取决于我们正在构建的应用程序,所以在编写代码时,一定要考虑哪些信息可以帮助您分析日志。

    您可以在 GitHub 上找到本文中讨论的代码示例。

    使用 Spring Boot 构可重用的 Mock 模块

    【译】本文译自: Building Reusable Mock Modules with Spring Boot – Reflectoring

    将代码库分割成松散耦合的模块,每个模块都有一组专门的职责,这不是很好吗?

    这意味着我们可以轻松找到代码库中的每个职责来添加或修改代码。也意味着代码库很容易掌握,因为我们一次只需要将一个模块加载到大脑的工作记忆中。

    而且,由于每个模块都有自己的 API,这意味着我们可以为每个模块创建一个可重用的模拟。在编写集成测试时,我们只需导入一个模拟模块并调用其 API 即可开始模拟。我们不再需要知道我们模拟的类的每一个细节。

    在本文中,我们将着眼于创建这样的模块,讨论为什么模拟整个模块比模拟单个 bean 更好,然后介绍一种简单但有效的模拟完整模块的方法,以便使用 Spring Boot 进行简单的测试设置。

    代码示例

    本文附有 GitHub 上的工作代码示例。

    什么是模块?

    当我在本文中谈论“模块”时,我的意思是:

    模块是一组高度内聚的类,这些类具有专用的 API 和一组相关的职责。

    我们可以将多个模块组合成更大的模块,最后组合成一个完整的应用程序。

    一个模块可以通过调用它的 API 来使用另一个模块。

    你也可以称它们为“组件”,但在本文中,我将坚持使用“模块”。

    如何构建模块?

    在构建应用程序时,我建议预先考虑如何模块化代码库。我们的代码库中的自然边界是什么?

    我们的应用程序是否需要与外部系统进行通信?这是一个自然的模块边界。我们可以构建一个模块,其职责是与外部系统对话!

    我们是否确定了属于一起的用例的功能“边界上下文”?这是另一个很好的模块边界。我们将构建一个模块来实现应用程序的这个功能部分中的用例!

    当然,有更多方法可以将应用程序拆分为模块,而且通常不容易找到它们之间的边界。他们甚至可能会随着时间的推移而改变!更重要的是在我们的代码库中有一个清晰的结构,这样我们就可以轻松地在模块之间移动概念!

    为了使模块在我们的代码库中显而易见,我建议使用以下包结构:

    • 每个模块都有自己的包

    • 每个模块包都有一个 api 子包,包含所有暴露给其他模块的类

    • 每个模块包都有一个内部子包 internal ,其中包含:

      • 实现 API 公开的功能的所有类
      • 一个 Spring 配置类,它将 bean 提供给实现该 API 所需的 Spring 应用程序上下文
    • 就像俄罗斯套娃一样,每个模块的 internal 子包可能包含带有子模块的包,每个子模块都有自己的 api 和 internal

    • 给定 internal 包中的类只能由该包中的类访问。

    这使得代码库非常清晰,易于导航。在我关于清晰架构边界 中阅读有关此代码结构的更多信息,或 示例代码中的一些代码。

    这是一个很好的包结构,但这与测试和模拟有什么关系呢?

    模拟单个 Bean 有什么问题?

    正如我在开始时所说的,我们想着眼于模拟整个模块而不是单个 bean。但是首先模拟单个 bean 有什么问题呢?

    让我们来看看使用 Spring Boot 创建集成测试的一种非常常见的方式。

    假设我们想为 REST 控制器编写一个集成测试,该控制器应该在 GitHub 上创建一个存储库,然后向用户发送电子邮件。

    集成测试可能如下所示:

    @WebMvcTest
    class RepositoryControllerTestWithoutModuleMocks {
    
        @Autowired
        private MockMvc mockMvc;
    
        @MockBean
        private GitHubMutations gitHubMutations;
    
        @MockBean
        private GitHubQueries gitHubQueries;
    
        @MockBean
        private EmailNotificationService emailNotificationService;
    
      @Test
      void givenRepositoryDoesNotExist_thenRepositoryIsCreatedSuccessfully()
          throws Exception {
        String repositoryUrl = "https://github.com/reflectoring/reflectoring";
    
        given(gitHubQueries.repositoryExists(...)).willReturn(false);
        given(gitHubMutations.createRepository(...)).willReturn(repositoryUrl);
    
        mockMvc.perform(post("/github/repository")
          .param("token", "123")
          .param("repositoryName", "foo")
          .param("organizationName", "bar"))
          .andExpect(status().is(200));
    
        verify(emailNotificationService).sendEmail(...);
        verify(gitHubMutations).createRepository(...);
      }
    
    }

    这个测试实际上看起来很整洁,我见过(并编写)了很多类似的测试。但正如人们所说,细节决定成败。

    我们使用 @WebMvcTest 注解来设置 Spring Boot 应用程序上下文以测试 Spring MVC 控制器。应用程序上下文将包含让控制器工作所需的所有 bean,仅此而已。

    但是我们的控制器在应用程序上下文中需要一些额外的 bean 才能工作,即 GitHubMutationsGitHubQueries、和 EmailNotificationService。因此,我们通过 @MockBean 注解将这些 bean 的模拟添加到应用程序上下文中。

    在测试方法中,我们在一对 given() 语句中定义这些模拟的状态,然后调用我们要测试的控制器端点,之后 verify() 在模拟上调用了某些方法。

    那么,这个测试有什么问题呢? 我想到了两件主要的事情:

    首先,要设置 given()verify() 部分,测试需要知道控制器正在调用模拟 bean 上的哪些方法。这种对实现细节的低级知识使测试容易被修改。每次实现细节发生变化时,我们也必须更新测试。这稀释了测试的价值,并使维护测试成为一件苦差事,而不是“有时是例行公事”。

    其次, @MockBean 注解将导致 Spring 为每个测试创建一个新的应用程序上下文(除非它们具有完全相同的字段)。在具有多个控制器的代码库中,这将显着增加测试运行时间。

    如果我们投入一点精力来构建上一节中概述的模块化代码库,我们可以通过构建可重用的模拟模块来解决这两个缺点。

    让我们通过看一个具体的例子来了解如何实现。

    模块化 Spring Boot 应用程序

    好,让我们看看如何使用 Spring Boots 实现可重用的模拟模块。

    这是示例应用程序的文件夹结构。如果你想跟随,你可以在 GitHub 上找到代码:

    ├── github
    |   ├── api
    |   |  ├── <I> GitHubMutations
    |   |  ├── <I> GitHubQueries
    |   |  └── <C> GitHubRepository
    |   └── internal
    |      ├── <C> GitHubModuleConfiguration
    |      └── <C> GitHubService
    ├── mail
    |   ├── api
    |   |  └── <I> EmailNotificationService
    |   └── internal
    |      ├── <C> EmailModuleConfiguration
    |      ├── <C> EmailNotificationServiceImpl
    |      └── <C> MailServer
    ├── rest
    |   └── internal
    |       └── <C> RepositoryController
    └── <C> DemoApplication

    该应用程序有 3 个模块:

    • github 模块提供了与 GitHub API 交互的接口,

    • mail 模块提供电子邮件功能,

    • rest 模块提供了一个 REST API 来与应用程序交互。

    让我们更详细地研究每个模块。

    GitHub 模块

    github 模块提供了两个接口(用 <I> 标记)作为其 API 的一部分:

    • GitHubMutations,提供了一些对 GitHub API 的写操作,

    • GitHubQueries,它提供了对 GitHub API 的一些读取操作。

    这是接口的样子:

    public interface GitHubMutations {
    
        String createRepository(String token, GitHubRepository repository);
    
    }
    
    public interface GitHubQueries {
    
        List<String> getOrganisations(String token);
    
        List<String> getRepositories(String token, String organisation);
    
        boolean repositoryExists(String token, String repositoryName, String organisation);
    
    }

    它还提供类 GitHubRepository,用于这些接口的签名。

    在内部, github 模块有类 GitHubService,它实现了两个接口,还有类 GitHubModuleConfiguration,它是一个 Spring 配置,为应用程序上下文贡献一个 GitHubService 实例:

    @Configuration
    class GitHubModuleConfiguration {
    
        @Bean
        GitHubService gitHubService() {
            return new GitHubService();
        }
    
    }

    由于 GitHubService 实现了 github 模块的整个 API,因此这个 bean 足以使该模块的 API 可用于同一 Spring Boot 应用程序中的其他模块。

    Mail 模块

    mail 模块的构建方式类似。它的 API 由单个接口 EmailNotificationService 组成:

    public interface EmailNotificationService {
    
        void sendEmail(String to, String subject, String text);
    
    }

    该接口由内部 beanEmailNotificationServiceImpl 实现。

    请注意,我在 mail 模块中使用的命名约定与在 github 模块中使用的命名约定不同。 github 模块有一个以 *Servicee 结尾的内部类,而 mail 模块有一个 *Service 类作为其 API 的一部分。虽然 github 模块不使用丑陋的 *Impl 后缀,但 mail 模块使用了。

    我故意这样做是为了使代码更现实一些。你有没有见过一个代码库(不是你自己写的)在所有地方都使用相同的命名约定?我没有。

    但是,如果您像我们在本文中所做的那样构建模块,那实际上并不重要。因为丑陋的 *Impl 类隐藏在模块的 API 后面。

    在内部, mail 模块具有 EmailModuleConfiguration 类,它为 Spring 应用程序上下文提供 API 实现:

    @Configuration
    class EmailModuleConfiguration {
    
        @Bean
        EmailNotificationService emailNotificationService() {
            return new EmailNotificationServiceImpl();
        }
    
    }

    REST 模块

    rest 模块由单个 REST 控制器组成:

    @RestController
    class RepositoryController {
    
        private final GitHubMutations gitHubMutations;
        private final GitHubQueries gitHubQueries;
        private final EmailNotificationService emailNotificationService;
    
        // constructor omitted
    
        @PostMapping("/github/repository")
        ResponseEntity<Void> createGitHubRepository(@RequestParam("token") String token,
                @RequestParam("repositoryName") String repoName, @RequestParam("organizationName") String orgName) {
    
            if (gitHubQueries.repositoryExists(token, repoName, orgName)) {
                return ResponseEntity.status(HttpStatus.BAD_REQUEST).build();
            }
            String repoUrl = gitHubMutations.createRepository(token, new GitHubRepository(repoName, orgName));
            emailNotificationService.sendEmail("user@mail.com", "Your new repository",
                    "Here's your new repository: " + repoUrl);
    
            return ResponseEntity.ok().build();
        }
    
    }

    控制器调用 github 模块的 API 来创建一个 GitHub 仓库,然后通过 mail 模块的 API 发送邮件,让用户知道新的仓库。

    模拟 GitHub 模块
    现在,让我们看看如何为 github 模块构建一个可重用的模拟。我们创建了一个 @TestConfiguration 类,它提供了模块 API 的所有 bean:

    @TestConfiguration
    public class GitHubModuleMock {
    
        private final GitHubService gitHubServiceMock = Mockito.mock(GitHubService.class);
    
        @Bean
        @Primary
        GitHubService gitHubServiceMock() {
            return gitHubServiceMock;
        }
    
        public void givenCreateRepositoryReturnsUrl(String url) {
            given(gitHubServiceMock.createRepository(any(), any())).willReturn(url);
        }
    
        public void givenRepositoryExists() {
            given(gitHubServiceMock.repositoryExists(anyString(), anyString(), anyString())).willReturn(true);
        }
    
        public void givenRepositoryDoesNotExist() {
            given(gitHubServiceMock.repositoryExists(anyString(), anyString(), anyString())).willReturn(false);
        }
    
        public void assertRepositoryCreated() {
            verify(gitHubServiceMock).createRepository(any(), any());
        }
    
        public void givenDefaultState(String defaultRepositoryUrl) {
            givenRepositoryDoesNotExist();
            givenCreateRepositoryReturnsUrl(defaultRepositoryUrl);
        }
    
        public void assertRepositoryNotCreated() {
            verify(gitHubServiceMock, never()).createRepository(any(), any());
        }
    
    }

    除了提供一个模拟的 GitHubService bean,我们还向这个类添加了一堆 given*()assert*() 方法。

    给定的 given*() 方法允许我们将模拟设置为所需的状态,而 verify*() 方法允许我们在运行测试后检查与模拟的交互是否发生。

    @Primary 注解确保如果模拟和真实 bean 都加载到应用程序上下文中,则模拟优先。

    模拟 Email 邮件模块

    我们为 mail 模块构建了一个非常相似的模拟配置:

    @TestConfiguration
    public class EmailModuleMock {
    
        private final EmailNotificationService emailNotificationServiceMock = Mockito.mock(EmailNotificationService.class);
    
        @Bean
        @Primary
        EmailNotificationService emailNotificationServiceMock() {
            return emailNotificationServiceMock;
        }
    
        public void givenSendMailSucceeds() {
            // nothing to do, the mock will simply return
        }
    
        public void givenSendMailThrowsError() {
            doThrow(new RuntimeException("error when sending mail")).when(emailNotificationServiceMock)
                    .sendEmail(anyString(), anyString(), anyString());
        }
    
        public void assertSentMailContains(String repositoryUrl) {
            verify(emailNotificationServiceMock).sendEmail(anyString(), anyString(), contains(repositoryUrl));
        }
    
        public void assertNoMailSent() {
            verify(emailNotificationServiceMock, never()).sendEmail(anyString(), anyString(), anyString());
        }
    
    }

    在测试中使用模拟模块

    现在,有了模拟模块,我们可以在控制器的集成测试中使用它们:

    @WebMvcTest
    @Import({ GitHubModuleMock.class, EmailModuleMock.class })
    class RepositoryControllerTest {
    
        @Autowired
        private MockMvc mockMvc;
    
        @Autowired
        private EmailModuleMock emailModuleMock;
    
        @Autowired
        private GitHubModuleMock gitHubModuleMock;
    
        @Test
        void givenRepositoryDoesNotExist_thenRepositoryIsCreatedSuccessfully() throws Exception {
    
            String repositoryUrl = "https://github.com/reflectoring/reflectoring.github.io";
    
            gitHubModuleMock.givenDefaultState(repositoryUrl);
            emailModuleMock.givenSendMailSucceeds();
    
            mockMvc.perform(post("/github/repository").param("token", "123").param("repositoryName", "foo")
                    .param("organizationName", "bar")).andExpect(status().is(200));
    
            emailModuleMock.assertSentMailContains(repositoryUrl);
            gitHubModuleMock.assertRepositoryCreated();
        }
    
        @Test
        void givenRepositoryExists_thenReturnsBadRequest() throws Exception {
    
            String repositoryUrl = "https://github.com/reflectoring/reflectoring.github.io";
    
            gitHubModuleMock.givenDefaultState(repositoryUrl);
            gitHubModuleMock.givenRepositoryExists();
            emailModuleMock.givenSendMailSucceeds();
    
            mockMvc.perform(post("/github/repository").param("token", "123").param("repositoryName", "foo")
                    .param("organizationName", "bar")).andExpect(status().is(400));
    
            emailModuleMock.assertNoMailSent();
            gitHubModuleMock.assertRepositoryNotCreated();
        }
    
    }

    我们使用 @Import 注解将模拟导入到应用程序上下文中。

    请注意, @WebMvcTest 注解也会导致将实际模块加载到应用程序上下文中。这就是我们在模拟上使用 @Primary 注解的原因,以便模拟优先。

    如何处理行为异常的模块?

    模块可能会在启动期间尝试连接到某些外部服务而行为异常。例如, mail 模块可能会在启动时创建一个 SMTP 连接池。当没有可用的 SMTP 服务器时,这自然会失败。这意味着当我们在集成测试中加载模块时,Spring 上下文的启动将失败。
    为了使模块在测试期间表现得更好,我们可以引入一个配置属性 mail.enabled。然后,我们使用 @ConditionalOnProperty 注解模块的配置类,以告诉 Spring 如果该属性设置为 false,则不要加载此配置。
    现在,在测试期间,只加载模拟模块。

    我们现在不是在测试中模拟特定的方法调用,而是在模拟模块上调用准备好的 given*() 方法。这意味着测试不再需要测试对象调用的类的内部知识

    执行代码后,我们可以使用准备好的 verify*() 方法来验证是否已创建存储库或已发送邮件。同样,不知道具体的底层方法调用。

    如果我们需要另一个控制器中的 githubmail 模块,我们可以在该控制器的测试中使用相同的模拟模块。

    如果我们稍后决定构建另一个使用某些模块的真实版本但使用其他模块的模拟版本的集成,则只需使用几个 @Import 注解来构建我们需要的应用程序上下文。

    这就是模块的全部思想:我们可以使用真正的模块 A 和模块 B 的模拟,我们仍然有一个可以运行测试的工作应用程序

    模拟模块是我们在该模块中模拟行为的中心位置。他们可以将诸如“确保可以创建存储库”之类的高级模拟期望转换为对 API bean 模拟的低级调用。

    结论

    通过有意识地了解什么是模块 API 的一部分,什么不是,我们可以构建一个适当的模块化代码库,几乎不会引入不需要的依赖项。

    由于我们知道什么是 API 的一部分,什么不是,我们可以为每个模块的 API 构建一个专用的模拟。我们不在乎内部,我们只是在模拟 API。

    模拟模块可以提供 API 来模拟某些状态并验证某些交互。通过使用模拟模块的 API 而不是模拟每个单独的方法调用,我们的集成测试变得更有弹性以适应变化。