Java开发中不要使用受检异常

简介

Java是唯一(主流)实现了受检异常概念的编程语言。一开始,受检异常就是争议的焦点。在当时被视为一种创新概念(Java于1996年推出),如今却被视不良实践。

本文要讨论Java中非受检异常和受检异常的动机以及它们优缺点。与大多数关注这个主题的人不同,我希望提供一个平衡的观点,而不仅仅是对受检异常概念的批评。

我们先深入探讨Java中受检异常和非受检异常的动机。Java之父詹姆斯·高斯林对这个话题有何看法?接下来,我们要看一下Java中异常的工作原理以及受检异常存在的问题。我们还将讨论在何时应该使用哪种类型的异常。最后,我们将提供一些常见的解决方法,例如使用Lombok的@SneakyThrows注解。

Java和其他编程语言中异常的历史

在软件开发中,异常处理可以追溯到20世纪60年代LISP的引入。通过异常,我们可以解决在程序错误处理过程中可能遇到的几个问题。

异常的主要思想是将正常的控制流与错误处理分离。让我们看一个不使用异常的例子:

public void handleBookingWithoutExceptions(String customer, String hotel) {

 if (isValidHotel(hotel)) {

  int hotelId = getHotelId(hotel);

  if (sendBookingToHotel(customer, hotelId)) {

   int bookingId = updateDatabase(customer, hotel);

   if (bookingId > 0) {

    if (sendConfirmationMail(customer, hotel, bookingId)) {

     logger.log(Level.INFO, "Booking confirmed");

    } else {

     logger.log(Level.INFO, "Mail failed");

    }

   } else {

    logger.log(Level.INFO, "Database couldn't be updated");

   }

  } else {

   logger.log(Level.INFO, "Request to hotel failed");

  }

 } else {

  logger.log(Level.INFO, "Invalid data");

 }

}

程序的逻辑只占据了大约5行代码,其余的代码则是用于错误处理。这样,代码不再关注主要的流程,而是被错误检查所淹没。

如果我们的编程语言没有异常机制,我们只能依赖函数的返回值。让我们使用异常来重写我们的函数:

public void handleBookingWithExceptions(String customer, String hotel) {

 try {

  validateHotel(hotel);

  sendBookingToHotel(customer, getHotelId(hotel));

  int bookingId = updateDatabase(customer, hotel);

  sendConfirmationMail(customer, hotel, bookingId);

  logger.log(Level.INFO, "Booking confirmed");

 } catch(Exception e) {

  logger.log(Level.INFO, e.getMessage());

 }

}

采用这种方法,我们不需要检查返回值,而是将控制流转移到catch块中。这样的代码更易读。我们有两个独立的流程: 正常流程和错误处理流程。

除了可读性之外,异常还解决了"半谓词问题"(semipredicate problem)。简而言之,半谓词问题发生在表示错误(或不存在值)的返回值成为有效返回值的情况下。让我们看几个示例来说明这个问题:

示例:

int index = "Hello World".indexOf("World");

int value = Integer.parseInt("123");

int freeSeats = getNumberOfAvailableSeatsOfFlight();

indexOf() 方法如果未找到子字符串,将返回 -1。当然,-1 绝对不可能是一个有效的索引,所以这里没有问题。然而,parseInt() 方法的所有可能返回值都是有效的整数。这意味着我们没有一个特殊的返回值来表示错误。最后一个方法 getNumberOfAvailableSeatsOfFlight() 可能会导致隐藏的问题。我们可以将 -1 定义为错误或没有可用信息的返回值。乍看起来这似乎是合理的。然而,后来可能发现负数表示等待名单上的人数。异常机制能更优雅地解决这个问题。

Java中异常的工作方式

在讨论是否使用受检异常之前,让我们简要回顾一下Java中异常的工作方式。下图显示了异常的类层次结构:

java-exception

RuntimeException继承自Exception,而Error继承自Throwable。RuntimeException和Error被称为非受检异常,意味着它们不需要由调用代码处理(即它们不需要被“检查”)。所有其他继承自Throwable(通常通过Exception)的类都是受检异常,这意味着编译器期望调用代码处理它们(即它们必须被“检查”)。

所有继承自Throwable的异常,无论是受检的还是非受检的,都可以在catch块中捕获。

最后,值得注意的是,受检异常和非受检异常的概念是Java编译器的特性。JVM本身并不知道这个区别,所有的异常都是非受检的。这就是为什么其他JVM语言不需要实现这个特性的原因。

在我们开始讨论是否使用受检异常之前,让我们简要回顾一下这两种异常类型之间的区别。

受检异常

受检异常需要被try-catch块包围,或者调用方法需要在其签名中声明异常。由于Scanner类的构造函数抛出一个FileNotFoundException异常,这是一个受检异常,所以下面的代码无法编译:

public void readFile(String filename) {

 Scanner scanner = new Scanner(new File(filename));

}

我们会得到一个编译错误:

Unhandled exception: java.io.FileNotFoundException

我们有两种选项来解决这个问题。我们可以将异常添加到方法的签名中:

public void readFile(String filename) throws FileNotFoundException {

 Scanner scanner = new Scanner(new File(filename));

}

或者我们可以使用try-catch块在现场处理异常:

public void readFile(String filename) {

 try {

  Scanner scanner = new Scanner(new File(filename));

 } catch (FileNotFoundException e) {

  // handle exception

 }

}

非受检异常

对于非受检异常,我们不需要做任何处理。由Integer.parseInt引发的NumberFormatException是一个运行时异常,所以下面的代码可以编译通过:

public int readNumber(String number) {

 return Integer.parseInt(callEndpoint(number));

}

然而,我们仍然可以选择处理异常,因此以下代码也可以编译通过:

public int readNumber(String number) {

 try {

  return Integer.parseInt(callEndpoint(number));

 } catch (NumberFormatException e) {

  // handle exception

  return 0;

 }

}

为什么我们要使用受检异常?

如果我们想了解受检异常背后的动机,我们需要看一下Java的历史。该语言的创建是以强调健壮性和网络功能为重点的。

最好用Java创始人詹姆斯·高斯林(James Gosling)自己的一句话来表达:“你不能无意地说,‘我不在乎。’你必须明确地说,‘我不在乎。’”这句话摘自一篇与詹姆斯·高斯林进行的有趣的采访,在采访中他详细讨论了受检异常。

在《编程之父》这本书中,詹姆斯也谈到了异常。他说:“人们往往忽略了检查返回代码。”

这再次强调了受检异常的动机。通常情况下,当错误是由于编程错误或错误的输入时,应该使用非受检异常。如果在编写代码时程序员无法做任何处理,应该使用受检异常。后一种情况的一个很好的例子是网络问题。开发人员无法解决这个问题,但程序应该适当地处理这种情况,可以是终止程序、重试操作或简单地显示错误消息。

受检异常存在的问题

了解了受检异常和非受检异常背后的动机,我们再来看看受异常在代码库中可能引入的一些问题。

受检异常不适应规模化

一个主要反对受异常的观点是代码的可扩展性和可维护性。当一个方法的异常列表发生变化时,会打破调用链中从调用方法开始一直到最终实现try-catch来处理异常的方法的所有方法调用。举个例子,假设我们调用一个名为libraryMethod()的方法,它是外部库的一部分:

public void retrieveContent() throws IOException {

 libraryMethod();

}

在这里,方法libraryMethod()本身来自一个依赖项,例如,一个处理对外部系统进行REST调用的库。其实现可能如下所示:

public void libraryMethod() throws IOException {

 // some code

}

在将来,我们决定使用库的新版本,甚至用另一个库替换它。尽管功能相似,但新库中的方法会抛出两个异常:

public void otherSdkCall() throws IOException, MalformedURLException {

 // call method from SDK

}

由于有两个受检异常,我们的方法声明也需要更改:

public void retrieveContent() throws IOException, MalformedURLException {

 sdkCall();

}

对于小型代码库来说,这可能不是一个大问题,但对于大型代码库来说,这将需要进行相当多的重构。当然,我们也可以直接在方法内部处理异常:

public void retrieveContent() throws IOException {

 try {

  otherSdkCall();

 } catch (MalformedURLException e) {

  // do something with the exception

 }

}

使用这种方法,我们在代码库中引入了一种不一致性,因为我们立即处理了一个异常,而推迟了另一个异常的处理。

异常传播

一个与可扩展性非常相似的论点是受检异常如何在调用堆栈中传播。如果我们遵循“尽早抛出,尽晚捕获”的原则,我们需要在每个调用方法上添加一个throws子句(a):

异常传播

相反,非受检异常(b)只需要在实际发生异常的地方声明一次,并在我们希望处理异常的地方再次声明。它们会在调用堆栈中自动传播,直到达到实际处理异常的位置。

不必要的依赖关系

受检异常还会引入与非受检异常不必要的依赖关系。让我们再次看看在场景(a)中我们在三个不同的位置添加了IOException。如果methodA()、methodB()和methodC()位于不同的类中,那么所有相关类都将对异常类有一个依赖关系。如果我们使用了非受检异常,我们只需要在methodA()和methodC()中有这个依赖关系。甚至methodB()所在的类或模块都不需要知道异常的存在。

让我们用一个例子来说明这个想法。假设你从度假回家。你在酒店前台退房,乘坐公共汽车去火车站,然后换乘一次火车,在回到家乡后,你又乘坐另一辆公共汽车从车站回家。回到家后,你意识到你把手机忘在了酒店里。在你开始整理行李之前,你进入了“异常”流程,乘坐公共汽车和火车回到酒店取手机。在这种情况下,你按照之前相反的顺序做了所有的事情(就像在Java中发生异常时向上移动堆栈跟踪一样),直到你到达酒店。显然,公共汽车司机和火车操作员不需要知道“异常”,他们只需要按照他们的工作进行。只有在前台,也就是“回家”流程的起点,我们需要询问是否有人找到了手机。

糟糕的编码实践

当然,作为专业的软件开发人员,我们绝不能在良好的编码实践上选择方便。然而,当涉及到受检异常时,往往会诱使我们快速引入以下三种模式。通常的想法是以后再处理。我们都知道这样的结果。另一个常见的说法是“我想为正常流程编写代码,不想被异常打扰”。我经常见到以下三种模式。

第一种模式是捕获所有异常(catch-all exception):

public void retrieveInteger(String endpoint) {

 try {

  URL url = new URL(endpoint);

  int result = Integer.parseInt(callEndpoint(endpoint));

 } catch (Exception e) {

  // do something with the exception

 }

}

我们只是捕获所有可能的异常,而不是单独处理不同的异常:

public void retrieveInteger(String endpoint) {

 try {

  URL url = new URL(endpoint);

  int result = Integer.parseInt(callEndpoint(endpoint));

 } catch (MalformedURLException e) {

  // do something with the exception

 } catch (NumberFormatException e) {

  // do something with the exception

 }

}

当然,在一般情况下,这并不一定是一种糟糕的实践。如果我们只想记录异常,或者在Spring Boot的@ExceptionHandler中作为最后的安全机制,这是一种适当的做法。

第二种模式是空的catch块:

public void myMethod() {

 try {

  URL url = new URL("malformed url");

 } catch (MalformedURLException e) {}

}

这种方法显然绕过了受检异常的整个概念。它完全隐藏了异常,使我们的程序在没有提供任何关于发生了什么的信息的情况下继续执行。

第三种模式是简单地打印堆栈跟踪并继续执行,就好像什么都没有发生一样:

public void consumeAndForgetAllExceptions(){

 try {

  // some code that can throw an exception

 } catch (Exception ex){

  ex.printStacktrace();

 }

}

为了满足方法签名而添加额外的代码

有时我们可以确定除非出现编程错误,否则不会抛出异常。让我们考虑以下示例:

public void readFromUrl(String endpoint) {

 try {

  URL url = new URL(endpoint);

 } catch (MalformedURLException e) {

  // do something with the exception

 }

}

MalformedURLException是一个受检异常,当给定的字符串不符合有效的URL格式时,会抛出该异常。需要注意的重要事项是,如果URL格式不正确,就会抛出异常,这并不意味着URL实际上存在并且可以访问。

即使我们在之前验证了格式:

public void readFromUrl(@ValidUrl String endpoint)

或者我们已经将其硬编码:

public static final String endpoint = "http://www.example.com";

编译器仍然强制我们处理异常。我们需要写两行“无用”的代码,只是因为有一个受检异常。

如果我们无法编写代码来触发某个异常的抛出,就无法对其进行测试,因此测试覆盖率将会降低。

有趣的是,当我们想将字符串解析为整数时,并不强制我们处理异常:

Integer.parseInt("123");

parseInt方法在提供的字符串不是有效整数时会抛出NumberFormatException,这是一个非受检异常。

Lambda表达式和异常

受检异常并不总是与Lambda表达式很好地配合使用。让我们来看一个例子:

public class CheckedExceptions {

 public static String readFirstLine(String filename) throws FileNotFoundException {

  Scanner scanner = new Scanner(new File(filename));

  return scanner.next();

 }

 public void readFile() {

  List<String> fileNames = new ArrayList<>();

  List<String> lines = fileNames.stream().map(CheckedExceptions::readFirstLine).toList();

 }

}

由于我们的readFirstLine()方法抛出了一个受检异常,所以会导致编译错误:

Unhandled exception: java.io.FileNotFoundException in line 8.

如果我们尝试使用try-catch块来修正代码:

public void readFile() {

 List<String> fileNames = new ArrayList<>();

 try {

  List<String> lines = fileNames.stream()

    .map(CheckedExceptions::readFirstLine)

    .toList();

 } catch (FileNotFoundException e) {

   // handle exception

 }

}

我们仍然会得到一个编译错误,因为我们无法在lambda内部将受检异常传播到外部。我们必须在lambda表达式内部处理异常并抛出一个运行时异常:

public void readFile() {

 List<String> lines = fileNames.stream()

  .map(filename -> {

   try{

    return readFirstLine(filename);

   } catch(FileNotFoundException e) {

    throw new RuntimeException("File not found", e);

   }

  }).toList();

}

不幸的是,如果静态方法引用抛出受检异常,这种方式将变得不可行。或者,我们可以让lambda表达式返回一个错误消息,然后将其添加到结果中:

public void readFile() {

 List<String> lines = fileNames.stream()

  .map(filename -> {

   try{

    return readFirstLine(filename);

   } catch(FileNotFoundException e) {

    return "default value";

   }

  }).toList();

}

然而,代码看起来仍然有些杂乱。

我们可以在lambda内部传递一个非受检异常,并在调用方法中捕获它:

public class UncheckedExceptions {

 public static int parseValue(String input) throws NumberFormatException {

  return Integer.parseInt(input);

 }

 public void readNumber() {

  try {

   List<String> values = new ArrayList<>();

   List<Integers> numbers = values.stream()

       .map(UncheckedExceptions::parseValue)

       .toList();

  } catch(NumberFormatException e) {

   // handle exception

  }

 }

}

在这里,我们需要注意之前使用受检异常和使用非受检异常的例子之间的一个关键区别。对于非受检异常,流的处理将继续到下一个元素,而对于受检异常,处理将结束,并且不会处理更多的元素。显然,我们想要哪种行为取决于我们的用例。

处理受检异常的替代方法

将受检异常包装为非受检异常

我们可以通过将受检异常包装为非受检异常来避免在调用堆栈中的所有方法中添加throws子句。而不是让我们的方法抛出一个受检异常:

public void myMethod() throws IOException{}

我们可以将其包装在一个非受检异常中:

public void myMethod(){

 try {

  // some logic

 } catch(IOException e) {

  throw new MyUnchckedException("A problem occurred", e);

 }

}

理想情况下,我们应用异常链。这样可以确保原始异常不会被隐藏。我们可以在第5行看到异常链的应用,原始异常作为参数传递给新的异常。这种技术在早期版本的Java中几乎适用于所有核心Java异常。

异常链是许多流行框架(如Spring或Hibernate)中常见的一种方法。这两个框架从受检异常转向非受检异常,并将不属于框架的受检异常包装在自己的运行时异常中。一个很好的例子是Spring的JDBC模板,它将所有与JDBC相关的异常转换为Spring框架的非受检异常。

Lombok @SneakyThrows

Project Lombok为我们提供了一个注解,可以消除异常链的需要。而不是在我们的方法中添加throws子句:

public void beSneaky() throws MalformedURLException {

 URL url = new URL("http://test.example.org");

}

我们可以添加@SneakyThrows 注解,这样我们的代码就可以编译通过:

@SneakyThrows

public void beSneaky() {

 URL url = new URL("http://test.example.org");

}

然而,重要的是要理解,@SneakyThrows并不会使MalformedURLException的行为完全像运行时异常一样。我们将无法再捕获它,并且以下代码将无法编译:

public void callSneaky() {

 try {

  beSneaky();

 } catch (MalformedURLException e) {

  // handle exception

 }

}

由于@SneakyThrows移除了异常,而MalformedURLException仍然被视为受检异常,因此我们将在第4行得到编译器错误:

Exception 'java.net.MalformedURLException' is never thrown in the corresponding try block

性能

在我的研究过程中,我遇到了一些关于异常性能的讨论。在受检异常和非受检异常之间是否存在性能差异?实际上,它们之间没有性能差异。这是一个在编译时决定的特性。

然而,是否在异常中包含完整的堆栈跟踪会导致显着的性能差异:

public class MyException extends RuntimeException {

 public MyException(String message, boolean includeStacktrace) {

  super(message, null, !includeStacktrace, includeStacktrace);

 }

}

在这里,我们在自定义异常的构造函数中添加了一个标志。该标志指定是否要包含完整的堆栈跟踪。在抛出异常的情况下,构建堆栈跟踪会导致程序变慢。因此,如果性能至关重要,则应排除堆栈跟踪。

一些指南

如何处理软件中的异常是我们工作的一部分,它高度依赖于具体的用例。在我们结束讨论之前,这里有三个高级指南,我相信它们(几乎)总是正确的。

  • 如果不是编程错误,或者程序可以执行一些有用的恢复操作,请使用受检异常。
  • 如果是编程错误,或者程序无法进行任何恢复操作,请使用运行时异常。
  • 避免空的catch块。

结论

本文深入探讨了Java中的异常。我们讲了为什么要引入异常到语言中,何时应该使用受检异常和非受检异常。我们还讨论了受检异常的缺点以及为什么它们现在被认为是不良实践 – 尽管也有一些例外情况。


【注】本文译自: Don’t Use Checked Exceptions (reflectoring.io)

JUnit 5 参数化测试

img

JUnit 5参数化测试

目录

  • 设置

  • 我们的第一个参数化测试

  • 参数来源

    • @ValueSource
    • @NullSource & @EmptySource
    • @MethodSource
    • @CsvSource
    • @CsvFileSource
    • @EnumSource
    • @ArgumentsSource
    • 参数转换
    • 参数聚合
  • 奖励

  • 总结

如果您正在阅读这篇文章,说明您已经熟悉了JUnit。让我为您概括一下JUnit——在软件开发中,我们开发人员编写的代码可能是设计一个人的个人资料这样简单,也可能是在银行系统中进行付款这样复杂。在开发这些功能时,我们倾向于编写单元测试。顾名思义,单元测试的主要目的是确保代码的小、单独部分按预期功能工作。如果单元测试执行失败,这意味着该功能无法按预期工作。编写单元测试的一种工具是JUnit。这些单元测试程序很小,但是非常强大,并且可以快速执行。如果您想了解更多关于JUnit 5(也称为JUnit Jupiter)的信息,请查看这篇JUnit5的文章。现在我们已经了解了JUnit,接下来让我们聚焦于JUnit 5中的参数化测试。参数化测试可以解决在为任何新/旧功能开发测试框架时遇到的最常见问题。

  • 编写针对每个可能输入的测试用例变得更加容易。
  • 单个测试用例可以接受多个输入来测试源代码,有助于减少代码重复。
  • 通过使用多个输入运行单个测试用例,我们可以确信已涵盖所有可能的场景,并维护更好的代码覆盖率。

开发团队通过利用方法和类来创建可重用且松散耦合的源代码。传递给代码的参数会影响其功能。例如,计算器类中的sum方法可以处理整数和浮点数值。JUnit 5引入了执行参数化测试的能力,可以使用单个测试用例测试源代码,该测试用例可以接受不同的输入。这样可以更有效地进行测试,因为在旧版本的JUnit中,必须为每种输入类型创建单独的测试用例,从而导致大量的代码重复。

示例代码

本文附带有在 GitHub上 的一个可工作的示例代码。

设置

就像疯狂泰坦灭霸喜欢访问力量一样,您可以使用以下Maven依赖项来访问JUnit5中参数化测试的力量:

<dependency>
    <groupId>org.junit.jupiter</groupId>
    <artifactId>junit-jupiter-params</artifactId>
    <version>5.9.2</version>
    <scope>test</scope>
</dependency>

让我们来写些代码,好吗?

我们的第一个参数化测试

现在,我想向您介绍一个新的注解 @ParameterizedTest。顾名思义,它告诉JUnit引擎使用不同的输入值运行此测试。

import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class ValueSourceTest {

    @ParameterizedTest
    @ValueSource(ints = { 2, 4 })
    void checkEvenNumber(int number) {
        assertEquals(0, number % 2,
         "Supplied number is not an even number");
    }
}

在上面的示例中,注解@ValueSource为 checkEvenNumber() 方法提供了多个输入。假设我们使用JUnit4编写相同的代码,即使它们的结果(断言)完全相同,我们也必须编写2个测试用例来覆盖输入2和4。

当我们执行 ValueSourceTest 时,我们会看到什么:

ValueSourceTest

|_ checkEvenNumber

|_ [1] 2

|_ [2] 4

这意味着 checkEvenNumber() 方法将使用2个输入值执行。

在下一节中,让我们学习一下JUnit5框架提供的各种参数来源。

参数来源

JUnit5提供了许多参数来源注释。下面的章节将简要概述其中一些注释并提供示例。

@ValueSource

@ValueSource是一个简单的参数源,可以接受单个字面值数组。@ValueSource支持的字面值类型有short、byte、int、long、float、double、char、boolean、String和Class。

@ParameterizedTest
@ValueSource(strings = { "a1", "b2" })
void checkAlphanumeric(String word) {
    assertTrue(StringUtils.isAlphanumeric(word),
             "Supplied word is not alpha-numeric");
}

@NullSource & @EmptySource

假设我们需要验证用户是否已经提供了所有必填字段(例如在登录函数中需要提供用户名和密码)。我们使用注解来检查提供的字段是否为 null,空字符串或空格。

  • 在单元测试中使用 @NullSource 和 @EmptySource 可以帮助我们提供带有 null、空字符串和空格的数据源,并验证源代码的行为。
@ParameterizedTest
@NullSource
void checkNull(String value) {
    assertEquals(null, value);
}

@ParameterizedTest
@EmptySource
void checkEmpty(String value) {
    assertEquals("", value);
}
  • 我们还可以使用 @NullAndEmptySource 注解来组合传递 null 和空输入。
@ParameterizedTest
@NullAndEmptySource
void checkNullAndEmpty(String value) {
    assertTrue(value == null || value.isEmpty());
}
  • 另一个传递 null、空字符串和空格输入值的技巧是结合使用 @NullAndEmptySource 注解,以覆盖所有可能的负面情况。该注解允许我们从一个或多个测试类的工厂方法中加载输入,并生成一个参数流。
@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = { " ", " " })
void checkNullEmptyAndBlank(String value) {
    assertTrue(value == null || value.isBlank());
}

@MethodSource

该注解允许我们从一个或多个测试类的工厂方法中加载输入,并生成一个参数流。

  • 显式方法源 – 测试将尝试加载提供的方法。
// Note: The test will try to load the supplied method
@ParameterizedTest
@MethodSource("checkExplicitMethodSourceArgs")
void checkExplicitMethodSource(String word) {
assertTrue(StringUtils.isAlphanumeric(word),
"Supplied word is not alpha-numeric");
}

static Stream<String> checkExplicitMethodSourceArgs() {
return Stream.of("a1",
"b2");
}
  • 隐式方法源 – 测试将搜索与测试类匹配的源方法。
// Note: The test will search for the source method
// that matches the test-case method name
@ParameterizedTest
@MethodSource
void checkImplicitMethodSource(String word) {
    assertTrue(StringUtils.isAlphanumeric(word),
"Supplied word is not alpha-numeric");
}

static Stream<String> checkImplicitMethodSource() {
return Stream.of("a1",
"b2");
}
  • 多参数方法源 – 我们必须将输入作为参数流传递。测试将按照索引顺序加载参数。
// Note: The test will automatically map arguments based on the index
@ParameterizedTest
@MethodSource
void checkMultiArgumentsMethodSource(int number, String expected) {
    assertEquals(StringUtils.equals(expected, "even") ? 0 : 1, number % 2);
}

static Stream<Arguments> checkMultiArgumentsMethodSource() {
    return Stream.of(Arguments.of(2, "even"),
     Arguments.of(3, "odd"));
}
  • 外部方法源 – 测试将尝试加载外部方法。
// Note: The test will try to load the external method
@ParameterizedTest
@MethodSource(
"source.method.ExternalMethodSource#checkExternalMethodSourceArgs")
void checkExternalMethodSource(String word) {
    assertTrue(StringUtils.isAlphanumeric(word),
"Supplied word is not alpha-numeric");
}
// Note: The test will try to load the external method@ParameterizedTest@MethodSource("source.method.ExternalMethodSource#checkExternalMethodSourceArgs")void checkExternalMethodSource(String word) {    assertTrue(StringUtils.isAlphanumeric(word),"Supplied word is not alpha-numeric");}

package source.method;
import java.util.stream.Stream;

public class ExternalMethodSource {
    static Stream<String> checkExternalMethodSourceArgs() {
        return Stream.of("a1",
         "b2");
    }
}

@CsvSource

该注解允许我们将参数列表作为逗号分隔的值(即 CSV 字符串字面量)传递,每个 CSV 记录都会导致执行一次参数化测试。它还支持使用 useHeadersInDisplayName属性跳过 CSV 标头。

@ParameterizedTest
@CsvSource({ "2, even",
"3, odd"})
void checkCsvSource(int number, String expected) {
    assertEquals(StringUtils.equals(expected, "even")
     ? 0 : 1, number % 2);
}

@CsvFileSource

该注解允许我们使用类路径或本地文件系统中的逗号分隔值(CSV)文件。与 @CsvSource 类似,每个 CSV 记录都会导致执行一次参数化测试。它还支持各种其他属性 -numLinesToSkip、useHeadersInDisplayName、lineSeparator、delimiterString等。

示例 1: 基本实现

@ParameterizedTest
@CsvFileSource(
files = "src/test/resources/csv-file-source.csv",
numLinesToSkip = 1)
void checkCsvFileSource(int number, String expected) {
    assertEquals(StringUtils.equals(expected, "even")
                 ? 0 : 1, number % 2);
}

src/test/resources/csv-file-source.csv

NUMBER, ODD_EVEN

2, even

3, odd

示例2:使用属性

@ParameterizedTest
@CsvFileSource(
    files = "src/test/resources/csv-file-source_attributes.csv",
    delimiterString = "|",
    lineSeparator = "||",
    numLinesToSkip = 1)
void checkCsvFileSourceAttributes(int number, String expected) {
    assertEquals(StringUtils.equals(expected, "even")
? 0 : 1, number % 2);
}

src/test/resources/csv-file-source_attributes.csv

|| NUMBER | ODD_EVEN ||

|| 2 | even ||

|| 3 | odd ||

@EnumSource

该注解提供了一种方便的方法来使用枚举常量作为测试用例参数。支持的属性包括:

  • value – 枚举类类型,例如 ChronoUnit.class
package java.time.temporal;

public enum ChronoUnit implements TemporalUnit {
    SECONDS("Seconds", Duration.ofSeconds(1)),
    MINUTES("Minutes", Duration.ofSeconds(60)),
HOURS("Hours", Duration.ofSeconds(3600)),
    DAYS("Days", Duration.ofSeconds(86400)),
    //12 other units
}

ChronoUnit 是一个包含标准日期周期单位的枚举类型。

@ParameterizedTest
@EnumSource(ChronoUnit.class)
void checkEnumSourceValue(ChronoUnit unit) {
assertNotNull(unit);
}

在此示例中,@EnumSource 将传递所有16个 ChronoUnit 枚举值作为参数。

  • names – 枚举常量的名称或选择名称的正则表达式,例如 DAYS 或 ^.*DAYS$
@ParameterizedTest
@EnumSource(names = { "DAYS", "HOURS" })
void checkEnumSourceNames(ChronoUnit unit) {
    assertNotNull(unit);
}

@ArgumentsSource

该注解提供了一个自定义的可重用ArgumentsProvider。ArgumentsProvider的实现必须是外部类或静态嵌套类。

  • 外部参数提供程序
public class ArgumentsSourceTest {

    @ParameterizedTest
    @ArgumentsSource(ExternalArgumentsProvider.class)
    void checkExternalArgumentsSource(int number, String expected) {
        assertEquals(StringUtils.equals(expected, "even")
                    ? 0 : 1, number % 2,
                    "Supplied number " + number +
                    " is not an " + expected + " number");
    }
}

public class ExternalArgumentsProvider implements ArgumentsProvider {

    @Override
    public Stream<? extends Arguments> provideArguments(
        ExtensionContext context) throws Exception {

        return Stream.of(Arguments.of(2, "even"),
             Arguments.of(3, "odd"));
    }
}
  • 静态嵌套参数提供程序
public class ArgumentsSourceTest {

    @ParameterizedTest
    @ArgumentsSource(NestedArgumentsProvider.class)
    void checkNestedArgumentsSource(int number, String expected) {
        assertEquals(StringUtils.equals(expected, "even")
? 0 : 1, number % 2,
                 "Supplied number " + number +
                    " is not an " + expected + " number");
    }

    static class NestedArgumentsProvider implements ArgumentsProvider {

        @Override
        public Stream<? extends Arguments> provideArguments(
            ExtensionContext context) throws Exception {

            return Stream.of(Arguments.of(2, "even"),
     Arguments.of(3, "odd"));
        }
    }
}

参数转换

首先,想象一下如果没有参数转换,我们将不得不自己处理参数数据类型的问题。

源方法: Calculator 类

public int sum(int a, int b) {
    return a + b;
}

测试用例:

@ParameterizedTest
@CsvSource({ "10, 5, 15" })
void calculateSum(String num1, String num2, String expected) {
    int actual = calculator.sum(Integer.parseInt(num1),
                                Integer.parseInt(num2));
    assertEquals(Integer.parseInt(expected), actual);
}

如果我们有String参数,而我们正在测试的源方法接受Integers,则在调用源方法之前,我们需要负责进行此转换。

JUnit5 提供了不同的参数转换方式

  • 扩展原始类型转换
@ParameterizedTest
@ValueSource(ints = { 2, 4 })
void checkWideningArgumentConversion(long number) {
    assertEquals(0, number % 2);
}

使用 @ValueSource(ints = { 1, 2, 3 }) 进行参数化测试时,可以声明接受 int、long、float 或 double 类型的参数。

  • 隐式转换
@ParameterizedTest
@ValueSource(strings = "DAYS")
void checkImplicitArgumentConversion(ChronoUnit argument) {
    assertNotNull(argument.name());
}

JUnit5提供了几个内置的隐式类型转换器。转换取决于声明的方法参数类型。例如,用@ValueSource(strings = "DAYS")注释的参数化测试会隐式转换为类型ChronoUnit的参数。

  • 回退字符串到对象的转换
@ParameterizedTest
@ValueSource(strings = { "Name1", "Name2" })
void checkImplicitFallbackArgumentConversion(Person person) {
    assertNotNull(person.getName());
}

public class Person {
    private String name;
    public Person(String name) {
        this.name = name;
    }
    //Getters & Setters
}

JUnit5提供了一个回退机制,用于自动将字符串转换为给定目标类型,如果目标类型声明了一个适用的工厂方法或工厂构造函数。例如,用@ValueSource(strings = { "Name1", "Name2" })注释的参数化测试可以声明接受一个类型为Person的参数,其中包含一个名为name且类型为string的单个字段。

  • 显式转换
@ParameterizedTest
@ValueSource(ints = { 100 })
void checkExplicitArgumentConversion(
    @ConvertWith(StringSimpleArgumentConverter.class) String argument) {
    assertEquals("100", argument);
}

public class StringSimpleArgumentConverter extends SimpleArgumentConverter {

    @Override
    protected Object convert(Object source, Class<?> targetType)
        throws ArgumentConversionException {
        return String.valueOf(source);
    }
}

如果由于某种原因,您不想使用隐式参数转换,则可以使用@ConvertWith注释来定义自己的参数转换器。例如,用@ValueSource(ints = { 100 })注释的参数化测试可以声明接受一个类型为String的参数,使用
StringSimpleArgumentConverter.class将整数转换为字符串类型。

参数聚合

@ArgumentsAccessor

默认情况下,提供给@ParameterizedTest方法的每个参数对应于一个方法参数。因此,当提供大量参数的参数源可以导致大型方法签名时,我们可以使用ArgumentsAccessor而不是声明多个参数。类型转换支持如上面的隐式转换所述。

@ParameterizedTest
@CsvSource({ "John, 20",
         "Harry, 30" })
void checkArgumentsAccessor(ArgumentsAccessor arguments) {
    Person person = new Person(arguments.getString(0),
                             arguments.getInteger(1));
    assertTrue(person.getAge() > 19, person.getName() + " is a teenager");
}

自定义聚合器

我们看到ArgumentsAccessor可以直接访问@ParameterizedTest方法的参数。如果我们想在多个测试中声明相同的ArgumentsAccessor怎么办?JUnit5通过提供自定义可重用的聚合器来解决此问题。

  • @AggregateWith
@ParameterizedTest
@CsvSource({ "John, 20",
             "Harry, 30" })
void checkArgumentsAggregator(
    @AggregateWith(PersonArgumentsAggregator.class) Person person) {
    assertTrue(person.getAge() > 19, person.getName() + " is a teenager");
}

public class PersonArgumentsAggregator implements ArgumentsAggregator {

    @Override
    public Object aggregateArguments(ArgumentsAccessor arguments,
        ParameterContext context) throws ArgumentsAggregationException {

        return new Person(arguments.getString(0),
arguments.getInteger(1));
    }
}

实现ArgumentsAggregator接口并通过@AggregateWith注释在@ParameterizedTest方法中注册它。当我们执行测试时,它会将聚合结果作为对应测试的参数提供。ArgumentsAggregator的实现可以是外部类或静态嵌套类。

额外福利

由于您已经阅读完文章,我想给您一个额外的福利 – 如果您正在使用像Fluent assertions for java这样的断言框架,则可以将
java.util.function.Consumer作为参数传递,其中包含断言本身。

@ParameterizedTest
@MethodSource("checkNumberArgs")
void checkNumber(int number, Consumer<Integer> consumer) {
    consumer.accept(number);    
}

static Stream<Arguments> checkNumberArgs() {    
    Consumer<Integer> evenConsumer =
            i -> Assertions.assertThat(i % 2).isZero();
    Consumer<Integer> oddConsumer =
            i -> Assertions.assertThat(i % 2).isEqualTo(1);

    return Stream.of(Arguments.of(2, evenConsumer),
         Arguments.of(3, oddConsumer));
}

总结

JUnit5的参数化测试功能通过消除重复测试用例的需要,提供多次使用不同输入运行相同测试的能力,实现了高效的测试。这不仅为开发团队节省了时间和精力,而且还增加了测试过程的覆盖范围和有效性。此外,该功能允许对源代码进行更全面的测试,因为可以使用更广泛的输入进行测试,从而增加了识别任何潜在的错误或问题的机会。总体而言,JUnit5的参数化测试是提高代码质量和可靠性的有价值的工具。


【注】本文译自: JUnit 5 Parameterized Tests (reflectoring.io)

Java最佳实践

img

计算机编程中,最佳实践是许多开发人员遵循的一组非正式规则,以提高软件质量、可读性和可维护性。在应用程序长时间保持使用的情况下,最佳实践尤其有益,这样它最初是由一个团队开发的,然后由不同的人组成的维护团队进行维护。

本教程将提供Java最佳实践的概述,以及每个条目的解释,包括Java编程的顶级最佳实践列表中的每一项。

Java编程最佳实践概览

虽然Java最佳实践的完整列表可能很长,但对于那些正在努力提高代码质量的编码人员来说,有几个被认为是一个很好的起点,包括使用适当的命名规范、使类成员私有化、避免使用空的catch块、避免内存泄漏以及正确地注释代码块:

  • 使用适当的命名规范
  • 类成员设置为私有
  • 在长数字文字中使用下划线
  • 避免空的catch
  • 使用StringBuilderStringBuffer进行字符串连接
  • 避免冗余初始化
  • 使用增强型for循环代替带计数器的for循环
  • 正确处理空指针异常
  • FloatDouble:哪一个是正确的选择?
  • 使用单引号和双引号
  • 避免内存泄漏
  • 返回空集合而不是返回Null元素
  • 高效使用字符串
  • 避免创建不必要的对象
  • 正确注释代码

Java中的类成员应该是私有的

在Java中,类的成员越不可访问,越好!第一步是使用private访问修饰符。目标是促进理想的封装,这是面向对象编程(OOP)的基本概念之一。太多时候,新的开发人员没有正确地为类分配访问修饰符,或者倾向于将它们设置为public以使事情更容易。

考虑以下字段被设置为public的类:

public class BandMember {
  public String name;
  public String instrument;
}

在这里,类的封装性被破坏了,因为任何人都可以直接更改这些值,如下所示:

BandMember billy = new BandMember();
billy.name = "George";
billy.instrument = "drums";

使用private访问修饰符与类成员一起可以将字段隐藏起来,防止用户通过setter方法之外的方式更改数据:

public class BandMember {
  private String name;
  private String instrument;

  public void setName(String name) {
    this.name = name;
  }
  public void setInstrument(String instrument)
    this.instrument = instrument;
  }
}

setter方法中也是放置验证代码和/或管理任务(如增加计数器)的理想位置。

在长数字文字中使用下划线

得益于Java 7的更新,开发人员现在可以在长数字字面量中使用下划线(_),以提高可读性。以下是在允许下划线之前一些长数字字面量的示例:

int minUploadSize = 05437326;
long debitBalance = 5000000000000000L;
float pi = 3.141592653589F;

我想您会同意下划线使值更易读:

int minUploadSize = 05_437_326;
long debitBalance = 5_000_000_000_000_000L;
float pi = 3.141_592_653_589F;

避免空的Catch块

在Java中,把catch块留空是非常不好的习惯,有两个原因:一是它可能导致程序默默地失败,二是程序可能会继续运行而不会发生任何异常。这两种结果都会使调试变得非常困难

考虑以下程序,它从命令行参数中计算两个数字的和:

public class Sum {
  public static void main(String[] args) {
    int a = 0;
    int b = 0;

    try {
      a = Integer.parseInt(args[0]);
      b = Integer.parseInt(args[1]);
    } catch (NumberFormatException ex) {
    }

    int sum = a + b;

    System.out.println(a + " + " + b + " = " + sum);
  }
}

Java的parseInt()方法会抛出NumberFormatException异常,需要开发人员在其调用周围使用try/catch块来捕获异常。不幸的是,这位开发人员选择忽略抛出的异常!因此,传入无效参数,例如“45y”,将导致关联的变量被赋为其类型的默认值,对于int类型来说是0

img

通常,在捕获异常时,程序员应采取以下三个行动中的一个或多个:

  1. 开发人员最起码应该通知用户异常情况,要么让他们重新输入无效的值,要么让他们知道程序必须提前终止。
  2. 使用 JDK LoggingLog4J 记录异常日志。
  3. 将异常封装并作为一个新的、更适合应用程序的异常重新抛出。

以下是重新编写后的Sum应用程序,用于通知用户输入无效并因此终止程序:

public class Sum {
  public static void main(String[] args) {
    int a = 0;
    int b = 0;

    try {
      a = Integer.parseInt(args[0]);
    } catch (NumberFormatException ex) {
      System.out.println(args[0] + " is not a number. Aborting...");
      return;
    }

    try {
      b = Integer.parseInt(args[1]);
    } catch (NumberFormatException ex) {
      System.out.println(args[1] + " is not a number. Aborting...");
      return;
    }

    int sum = a + b;

    System.out.println(a + " + " + b + " = " + sum);
  }
}

以下是我们观察到的结果:

img

使用StringBuilder或StringBuffer进行字符串拼接

“+” 运算符是在 Java 中快速和简便地组合字符串的方法。在 Hibernate 和 JPA 时代之前,对象是手动持久化的,通过从头开始构建 SQL INSERT 语句来实现!以下是一个存储一些用户数据的示例:

String sql = "Insert Into Users (name, age)";
       sql += " values ('" + user.getName();
       sql += "', '" + user.getage();
       sql += "')";

很遗憾,当像上面那样连接多个字符串时,Java编译器必须创建多个中间字符串对象,然后将它们合并成最终连接的字符串。

相反,我们应该使用StringBuilderStringBuffer类。两者都包含函数,可以连接字符串而无需创建中间String对象,从而节省处理时间和不必要的内存使用。

以前的代码可以使用 StringBuilder 重写,如下所示:

StringBuilder sqlSb = new StringBuilder("Insert Into Users (name, age)");
sqlSb.append(" values ('").append(user.getName());
sqlSb.append("', '").append(user.getage());
sqlSb.append("')");
String sqlSb = sqlSb.toString();

这对开发者来说可能要费点事儿,但是这样做非常值得!

StringBuffer 与 StringBuilder

虽然StringBufferStringBuilder类都比“+”运算符更可取,但它们并不相同。StringBuilderStringBuffer更快,但不是线程安全的。因此,在非多线程环境中进行字符串操作时,应使用StringBuilder;否则,请使用StringBuffer类。

避免冗余初始化

尽管某些语言如TypeScript强烈建议在声明时初始化变量,但在Java中并非总是必要的,因为它在声明时将默认初始化值(如0falsenull)分配给变量。

因此,Java的最佳实践是要知道成员变量的默认初始化值,除非您想将它们设置为除默认值以外的其他值,否则不要显式初始化变量。

以下是一个计算从11000的自然数之和的短程序。请注意,只有部分变量被初始化:

class VariableInitializationExample {
  public static void main(String[] args) {

    // automatically set to 0
    int sum;
    final numberOfIterations = 1000;

    // Set the loop counter to 1
    for (int i = 1; i &= numberOfIterations; ++i) {
      sum += i;
    }

    System.out.println("Sum = " + sum);
  }
}

使用增强型for循环代替需要计数器的for循环

尽管 for 循环在某些情况下很有用,但是计数器变量可能会引起错误。例如,计数器变量可能会在稍后的代码中被无意中更改。即使从 1 而不是从 0 开始索引,也可能导致意外行为。出于这些原因,for-each 循环(也称为增强型 for 循环)可能是更好的选择。

考虑以下代码:

String[] names = {"Rob", "John", "George", "Steve"};
for (int i = 0; i < names.length; i++) {
  System.out.println(names[i]);
}

在这里,变量i既是循环计数器,又是数组names的索引。尽管这个循环只是打印每个名称,但如果下面有修改i的代码,就会变得棘手。我们可以通过使用下面所示的for-each循环轻松避开整个问题:

for (String name : names) {
  System.out.println(name);
}

使用增强的for循环,出错的机会要少得多!

合理处理空指针异常

空指针异常在Java中是一个非常常见的问题,可能是由于其面向对象的设计所致。当您试图在 Null 对象引用上调用方法时,就会发生 Null Pointer 异常。这通常发生在在类实例化之前调用实例方法的情况下,如下例所示:

Office office;

// later in the code...
Employee[] employees = office.getEmployees();

虽然你无法完全消除 Null Pointer Exceptions,但有方法可以将其最小化。一种方法是在调用对象的方法之前检查对象是否为 Null。以下是使用三元运算符的示例:

Office office;

// later in the code...
Employee[] employees = office == null ? 0 : office.getEmployees();

你可能还想抛出自己的异常:

Office office;
Employee[] employees;

// later in the code...
if (office == null) {
  throw new CustomApplicationException("Office can't be null!");
} else {
  employees = office.getEmployees();
}

Float或Double:应该使用哪个?

浮点数和双精度数是相似的类型,因此许多开发人员不确定该选择哪种类型。两者都处理浮点数,但具有非常不同的特性。例如,float的大小为32位,而double分配了64位的内存空间,因此double可以处理比float大得多的小数。然后有一个精度问题:float只能容纳7位精度。极小的指数大小意味着一些位是不可避免的丢失的。相比之下,double为指数分配了更多的位数,允许它处理高达15位精度。

因此,当速度比准确性更重要时,通常建议使用float。尽管大多数程序不涉及大量计算,但在数学密集型应用中,精度差异可能非常显著。当需要的小数位数已知时,float也是一个不错的选择。当精度非常重要时,double应该是你的首选。只需记住,Java强制使用double作为处理浮点数的默认数据类型,因此您可能需要附加字母"f"来明确表示float,例如,1.2f

单引号和双引号在字符串连接中的使用

在Java中,双引号(“)用于保存字符串,单引号用于字符(由char类型表示)。当我们尝试使用+连接运算符连接字符时,可能会出现问题。问题在于使用+连接字符会将char的值转换为ascii,从而产生数字输出。以下是一些示例代码,以说明这一点:

char a, b, c;
a = 'a';
b = 'b';
c = 'c';

str = a + b + c; // not "abc", but 294!

就像字符串连接最好使用StringBuilderStringBuffer类一样,字符连接也是如此!在上面的代码中,变量abc可以通过以下方式组合成一个字符串:

new StringBuilder().append(a).append(b).append(c).toString()

我们可以在下面观察到期望的结果:

img

避免Java中的内存泄漏

在Java中,开发人员并没有太多关于内存管理的控制权,因为Java通过垃圾回收自动地进行内存管理。尽管如此,有一些Java最佳实践可以帮助开发人员避免内存泄漏,例如:

  • 避免创建不必要的对象。
  • 避免使用"+"运算符进行字符串连接。
  • 避免在会话中存储大量数据。
  • 在会话不再使用时,及时让会话超时。
  • 避免使用静态对象,因为它们在整个应用程序的生命周期内存在。
  • 在与数据库交互时,不要忘记在finally块中关闭ResultSetStatementsConnection对象。

返回空集合而不是null引用。

你知道吗,null引用经常被称为软件开发中最严重的错误吗?1965年,Tony Hoare在设计面向对象语言(OOP)的引用的第一个全面类型系统时发明了null引用。后来在2009年的一次会议上,Hoare为自己的发明道歉,承认他的创造“导致了无数的错误、漏洞和系统崩溃,在过去四十年中可能造成了数十亿美元的痛苦和损失。”

在Java中,通常最好返回空值而不是null,特别是当返回集合、可枚举对象或对象时更为重要。尽管你自己的代码可能会处理返回的null值,但其他开发人员可能会忘记编写空值检查,甚至没有意识到null是可能的返回值!

以下是一些Java代码,它以ArrayList的形式获取库存中的书籍列表。但是,如果列表为空,则返回一个空列表:

private final List<Book> booksInStock = ...

public List<Book> getInStockBooks() {
  return booksInStock.isEmpty() ? Collections.emptyList() : new ArrayList<>(booksInStock);
}

这使得方法的调用者可以在不必首先检查null引用的情况下迭代列表:

(Book book: getInStockBooks()) {
 // do something with books
}

Java 中字符串的高效使用

我们已经讨论了使用 + 连接操作符可能产生的副作用,但还有其他一些方法可以更有效地使用字符串,以避免浪费内存和处理器周期。例如,在实例化 String 对象时,通常最好直接创建 String 而不是使用构造函数。原因是什么?使用直接创建 String 比使用构造函数更快(更不用说更少的代码!)。

这里是在Java中创建字符串的两种等效方式:直接创建和使用构造函数:

// directly
String str = "abc";
// using a constructor
char data[] = {'a', 'b', 'c'};
String str = new String(data);

虽然两种方法都是等效的,但直接创建字符串的方式更好。

Java中不必要的对象创建

你知道吗?在Java中,对象的创建是消耗内存最多的操作之一。因此,在没有充分理由的情况下,最好避免创建对象,并仅在绝对必要时才这样做。

那么,如何将这个实践起来呢?具有讽刺意味的是,像我们上面看到的直接创建字符串就是避免不必要创建对象的一种方式!

以下是一个更复杂的示例:

以下是一个Person类的例子,它包括一个isBabyBoomer()方法,用于判断此人是否属于“婴儿潮”年龄段,出生于1946年至1964年之间:

public class Person {
  private final Date birthDate;

  public boolean isBabyBoomer() {
    // Unnecessary allocation of expensive object!
    Calendar gmtCal =
        Calendar.getInstance(TimeZone.getTimeZone("GMT"));
    gmtCal.set(1946, Calendar.JANUARY, 1, 0, 0, 0);
    Date boomStart = gmtCal.getTime();
    gmtCal.set(1965, Calendar.JANUARY, 1, 0, 0, 0);
    Date boomEnd = gmtCal.getTime();

    return birthDate.compareTo(boomStart) >= 0 &&
           birthDate.compareTo(boomEnd)   <  0;
  }
}

isBabyBoomer()方法每次被调用都会创建一个新的CalendarTimeZone和两个Date实例,这是不必要的。纠正这种低效的方法之一是使用静态初始化器,以便只在初始化时创建CalendarTimeZoneDate对象,而不是每次调用isBabyBoomer()方法。

class Person {
  private final Date birthDate;

  // The starting and ending dates of the baby boom.
  private static final Date BOOM_START;
  private static final Date BOOM_END;

  static {
    Calendar gmtCal =
      Calendar.getInstance(TimeZone.getTimeZone("GMT"));
    gmtCal.set(1946, Calendar.JANUARY, 1, 0, 0, 0);
    BOOM_START = gmtCal.getTime();
    gmtCal.set(1965, Calendar.JANUARY, 1, 0, 0, 0);
    BOOM_END = gmtCal.getTime();
  }

  public boolean isBabyBoomer() {
    return birthDate.compareTo(BOOM_START) >= 0 &&
       birthDate.compareTo(BOOM_END)       <  0;
  }
}

Java中适当的注释

清晰简洁的注释在阅读其他开发人员的代码时非常有用。以下是写出高质量注释的几个指南:

  1. 注释不应该重复代码。
  2. 好的注释不能弥补代码不清晰的问题。
  3. 如果您无法编写清晰的注释,则代码可能存在问题。
  4. 在注释中解释不符合惯用方式的代码。
  5. 在最有用的地方包含指向外部参考文献的链接。
  6. 在修复错误时添加注释。
  7. 使用注释标记未完成的实现,通常使用标记“TODO:”开头。

总结

在本文中,我们了解了15个Java最佳实践,并探讨了类成员封装、在冗长的数字字面值中使用下划线、避免空catch块、正确完成字符串连接、如何避免冗余初始化以及使用增强的for循环。


【注】本文译自: Java Best Practices | Developer.com

重新学习Java线程原语

Synchronized曾经是一个革命性的技术,在当前仍然有重要的用途。但是,现在是时候转向更新的Java线程原语,同时重新考虑我们的核心逻辑。

自从Java第一个测试版以来,我就一直在使用它。从那时起,线程就是我最喜欢的特性之一。Java是第一种在编程语言本身中引入线程支持的语言。那是一个具有争议的决定。在过去的十年中,每种编程语言都竞相引入async/await,甚至Java也有一些第三方支持……但是Java选择了引入更优越的虚拟线程(Loom项目)。本文并不讨论这个问题。

我觉得这很好,证明了Java的核心实力。Java不仅仅是一种语言,还是一种文化。这种文化注重深思熟虑的变革,而不是盲目跟随时尚潮流。

在本文中,我想重新探讨Java中的线程编程旧方法。我习惯使用synchronized、wait、notify等技术。但是, “然而,这些方法已经不再是Java中线程处理的最佳方式。 我也是问题的一部分。我还是习惯于使用这些技术,发现很难适应自Java 5以来就存在的一些API。这是一种习惯的力量。 虽然可以讨论许多处理线程的出色API,但我想在这里专注讨论锁,因为它们是基础但极为重要的。

Synchronized 与 ReentrantLock

我犹豫放弃使用 synchronized 的原因是,并没有更好的替代方案。现在弃用 synchronized 的主要原因是,它可能会在 Loom 中触发线程固定,这并不理想。JDK 21 可能会修复这个问题(当 Loom 正式发布时),但还有一些理由弃用它。

synchronized 的直接替代品是 ReentrantLock。不幸的是,ReentrantLock 相比 synchronized 很少有优势,因此迁移的好处最多是存疑的。事实上,它有一个主要的缺点。为了了解这一点,让我们看一个例子。下面是我们如何使用 synchronized:

synchronized(LOCK) {
    // safe code
}

LOCK.lock();
try {
    // safe code
} finally {
    LOCK.unlock();
}

ReentrantLock 的第一个缺点是冗长。我们需要try块,因为如果在块内部发生异常,锁将保持。而 synchronized 则会自动处理异常。

有些人会使用 AutoClosable 对锁进行封装,大概是这样的:

public class ClosableLock implements AutoCloseable {
   private final ReentrantLock lock;

   public ClosableLock() {
       this.lock = new ReentrantLock();
   }

   public ClosableLock(boolean fair) {
       this.lock = new ReentrantLock(fair);
   }

   @Override
   public void close() throws Exception {
       lock.unlock();
   }

   public ClosableLock lock() {
       lock.lock();
       return this;
   }

   public ClosableLock lockInterruptibly() throws InterruptedException {
       lock.lock();
       return this;
   }

   public void unlock() {
       lock.unlock();
   }
}

注意,我没有实现 Lock 接口,这本来是最理想的。这是因为 lock 方法返回了可自动关闭的实现,而不是 void。

一旦我们这样做了,我们就可以编写更简洁的代码,比如这样:

try(LOCK.lock()) {
 // safe code
}

我喜欢代码更简洁的写法,但是这个方法存在一些问题,因为 try-with-resource 语句是用于清理资源的,而我们正在重复使用锁对象。虽然调用了 close 方法,但是我们会再次在同一个对象上调用它。我认为,将 try-with-resource 语法扩展到支持锁接口可能是个好主意。但在此之前,这个技巧可能不值得采用。

ReentrantLock 的优势

使用ReentrantLock的最大原因是Loom支持。其他的优点也不错,但没有一个是“杀手级功能”。

我们可以在方法之间使用它,而不是在一个连续的代码块中使用。但是这可能不是一个好主意,因为你希望尽量减少锁定区域,并且失败可能会成为一个问题。我不认为这个特性是一个优点。

ReentrantLock提供了公平锁(fairness)的选项。这意味着它会先服务于最先停在锁上的线程。我试图想到一个现实而简单的使用案例,但却无从下手。如果您正在编写一个复杂的调度程序,并且有许多线程不断地排队等待资源,您可能会发现一个线程由于其他线程不断到来而被“饥饿”。但是,这种情况可能更适合使用并发包中的其他选项。也许我漏掉了什么……

lockInterruptibly() 方法允许我们在线程等待锁时中断它。这是一个有趣的特性,但是很难找到一个真正实际应用场景。如果你编写的代码需要非常快速响应中断,你需要使用 lockInterruptibly() API 来获得这种能力。但是,你通常在 lock()方法内部花费多长时间呢?

这种情况可能只在极端情况下才会有影响,大多数人在编写高级多线程代码时可能不会遇到这种情况。

ReadWriteReentrantLock

更好的方法是使用ReadWriteReentrantLock。大多数资源都遵循频繁读取、少量写入的原则。由于读取变量是线程安全的,除非正在写入变量,否则没有必要加锁。这意味着我们可以将读取操作进行极致优化,同时稍微降低写操作的速度。

假设这是你的使用情况,你可以创建更快的代码。使用读写锁时,我们有两个锁,一个读锁,如下图所示。它允许多个线程通过,实际上是“自由竞争”的。

img

一旦我们需要写入变量,我们需要获得写锁,如下图所示。我们尝试请求写锁,但仍有线程从变量中读取,因此我们必须等待。

img

一旦所有线程完成读取,所有读取操作都会阻塞,写入操作只能由一个线程执行,如下图所示。一旦释放写锁,我们将回到第一张图中的“自由竞争”状态。

img

这是一种强大的模式,我们可以利用它使集合变得更快。一个典型的同步列表非常慢。它同步所有的操作,包括读和写。我们有一个CopyOnWriteArrayList,它对于读取操作非常快,但是任何写入操作都很慢。

如果可以避免从方法中返回迭代器,你可以封装列表操作并使用这个API。例如,在以下代码中,我们将名字列表暴露为只读,但是当需要添加名字时,我们使用写锁。这可以轻松超过synchronized列表的性能:

private final ReadWriteLock LOCK = new ReentrantReadWriteLock();
private Collection<String> listOfNames = new ArrayList<>();
public void addName(String name) {
   LOCK.writeLock().lock();
   try {
       listOfNames.add(name);
   } finally {
       LOCK.writeLock().unlock();
   }
}
public boolean isInList(String name) {
   LOCK.readLock().lock();
   try {
       return listOfNames.contains(name);
   } finally {
       LOCK.readLock().unlock();
   }
}

这个方案可行,因为synchronized是可重入的。我们已经持有锁,所以从methodA()进入methodB()不会阻塞。这在使用ReentrantLock时也同样适用,只要我们使用相同的锁或相同的synchronized对象。

StampedLock返回一个戳记(stamp),我们用它来释放锁。因此,它有一些限制,但它仍然非常快和强大。它也包括一个读写戳记,我们可以用它来保护共享资源。但ReadWriteReentrantLock不同的是,它允许我们升级锁。为什么需要这样做呢?

看一下之前的addName()方法…如果我用"Shai"两次调用它会怎样?

是的,我可以使用Set…但是为了这个练习的目的,让我们假设我们需要一个列表…我可以使用ReadWriteReentrantLock编写那个逻辑:

public void addName(String name) {
   LOCK.writeLock().lock();
   try {
       if(!listOfNames.contains(name)) {
           listOfNames.add(name);
       }
   } finally {
       LOCK.writeLock().unlock();
   }
}

这很糟糕。我“付出”写锁只是为了在某些情况下检查contains()(假设有很多重复项)。我们可以在获取写锁之前调用isInList(name)。然后我们会:

  • 获取读锁
  • 释放读锁
  • 获取写锁
  • 释放写锁

在两种情况下,我们可能会排队, 这样可能会增加额外的麻烦,不一定值得。

有了StampedLock,我们可以将读锁更新为写锁,并在需要的情况下立即进行更改,例如:

public void addName(String name) {
   long stamp = LOCK.readLock();
   try {
       if(!listOfNames.contains(name)) {
           long writeLock = LOCK.tryConvertToWriteLock(stamp);
           if(writeLock == 0) {
               throw new IllegalStateException();
           }
           listOfNames.add(name);
       }
   } finally {
       LOCK.unlock(stamp);
   }
}

这是针对这些情况的一个强大的优化。

终论

我经常不假思索地使用 synchronized 集合,这有时可能是合理的,但对于大多数情况来说,这可能是次优的。通过花费一点时间研究与线程相关的原语,我们可以显著提高性能。特别是在处理 Loom 时,其中底层争用更为敏感。想象一下在 100 万并发线程上扩展读取操作的情况…在这些情况下,减少锁争用的重要性要大得多。

你可能会想,为什么 synchronized 集合不能使用 ReadWriteReentrantLock 或者是 StampedLock 呢?

这是一个问题,因为API的可见接口范围非常大,很难针对通用用例进行优化。这就是控制低级原语的地方,可以使高吞吐量和阻塞代码之间的差异。


【注】本文译自: Relearning Java Thread Primitives – DZone

关键的Java JVM选项和参数

1. 关键的Java JVM选项和参数

让我们来看看在Java环境中可以配置的21个最重要的JVM选项和参数。

  1. -Xms:将设置JVM的初始堆大小。
  2. -Xmx:将设置JVM的最大堆大小。
  3. -Xss:将设置每个线程的内部使用的线程堆栈的大小。
  4. -XX:+UseCompressedOops:启用使用压缩对象指针以减少内存使用的功能。
  5. -XX:+UseThreadPriorities:将指示JVM使用本机线程优先级。
  6. -XX:PermSize:将设置垃圾收集器永久生成空间的初始大小。
  7. -XX:MaxPermSize:将设置垃圾收集器永久生成空间的最大大小。
  8. -XX:NewSize:设置年轻代空间的初始大小。
  9. -XX:MaxNewSize:设置年轻代空间的最大大小。
  10. -XX:SurvivorRatio:设置伊甸园空间与幸存者空间的比例。
  11. -XX:MaxTenuringThreshold:设置幸存者空间中对象的最大年龄。
  12. -XX:+UseParNewGC:指示JVM使用新的并行生成垃圾收集器。
  13. -XX:+UseSerialGC:指示JVM使用串行垃圾收集器。
  14. -XX:+UseG1GC:指示JVM使用Garbage First(G1)垃圾收集器。
  15. -XX:+UseZGC:指示JVM使用ZGC垃圾收集器。
  16. -XX:+HeapDumpOnOutOfMemoryError:告诉JVM在发生OutOfMemoryError时创建堆转储文件。
  17. -XX:HeapDumpPath:为JVM提供自定义路径,在堆转储期间写入堆的内容。
  18. -Djava.library.path:允许您指定在运行时需要的本机库的路径。
  19. -Duser.timezone:允许您为JVM设置自定义时区。
  20. -XX:+PrintGCDetails:指示JVM打印详细的垃圾回收日志,以帮助您进行GC优化。
  21. -XX:+PrintFlagsFinal-version:将打印在JVM上设置的所有当前配置的标志和选项。

2. 如何使用Java JVM选项

所有这些JVM选项都可以通过将它们作为文本附加到Java运行时命令后来简单地使用。

例如,以下命令将使用六个不同的参数运行名为Go的应用程序,以优化内存分配和垃圾回收:

java Go -XX:MaxPermSize=128m -XX:MaxNewSize=256m -Xms768m -Xmx768m -XX:SurvivorRatio=128 -XX:MaxTenuringThreshold=0

img

Java JVM选项可用于管理内存和优化GC性能

3. 最常用的JVM参数

在列举的所有 JVM 选项中,最常用的是 Xms 和 Xmx,分别设置最小堆大小和最大堆大小。

下面的示例将最小堆大小设置为 768 MB,最大堆大小设置为 2 GB。

-Xms768m
-Xmx2048

4. GC 选择 JVM 选项

Java的一个优点是它为开发者执行垃圾回收,这使得应用程序更加健壮,更不容易发生内存泄漏问题。

有许多垃圾回收器可用,具有各种暂停行为和停顿时间。

在启动运行时,您只能使用以下 Java JVM 选项之一选择一个垃圾收集器:

-XX:+UseSerialGC
-XX:+UseParallelGC
-XX:+USeParNewGC
-XX:+UseG1GC
-XX:+UseZGC

5. 垃圾回收调优选项

VM实现了一种分代垃圾回收算法,它积极监控新对象,而很少检查旧对象。JVM管理eden空间、tenured空间甚至PermGen空间的方式可以通过JVM选项进行配置,如下:

-XX:MaxPermSize
-XX:PermSize
-XX:NewSize
-XX:MaxNewSize
-XX:SurvivorRatio
-XX:MaxTenuringThreshold

6. 用于检查的JVM打印选项

JVM还提供了一些打印方法,允许您查看Java运行时的状态。有用的JVM打印选项包括:

-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
-XX:+PrintHeapAtGC
-XX:+PrintCommandLineFlags
-XX:+PrintFlagsFinal

PrintFlagsFinal是一项有趣的JVM选项,它将显示所有JVM标志设置的详细信息,输出内容有超过500行。本文介绍的Java JVM选项已经全部讲解完毕,如果你对PrintFlagsFinal JVM标志的详细输出感兴趣,可以查看上文提到的输出内容。

$ java -XX:+PrintFlagsFinal -version
[Global flags]
int ActiveProcessorCount = -1
uintx AdaptiveSizeDecrementScaleFactor = 4
uintx AdaptiveSizeMajorGCDecayTimeScale = 10
uintx AdaptiveSizePolicyCollectionCostMargin = 50
uintx AdaptiveSizePolicyInitializingSteps = 20
uintx AdaptiveSizePolicyOutputInterval = 0
uintx AdaptiveSizePolicyWeight = 10
uintx AdaptiveSizeThroughPutPolicy = 0
uintx AdaptiveTimeWeight = 25
bool AggressiveHeap = false
intx AliasLevel = 3
bool AlignVector = false
ccstr AllocateHeapAt =
intx AllocateInstancePrefetchLines = 1
intx AllocatePrefetchDistance = 256
intx AllocatePrefetchInstr = 0
intx AllocatePrefetchLines = 3
intx AllocatePrefetchStepSize = 64
intx AllocatePrefetchStyle = 1
bool AllowParallelDefineClass = false
bool AllowRedefinitionToAddDeleteMethods = false
bool AllowUserSignalHandlers = false
bool AllowVectorizeOnDemand = true
bool AlwaysActAsServerClassMachine = false
bool AlwaysCompileLoopMethods = false
bool AlwaysLockClassLoader = false
bool AlwaysPreTouch = false
bool AlwaysRestoreFPU = false
bool AlwaysTenure = false
ccstr ArchiveClassesAtExit =
intx ArrayCopyLoadStoreMaxElem = 8
size_t AsyncLogBufferSize = 2097152
intx AutoBoxCacheMax = 128
intx BCEATraceLevel = 0
bool BackgroundCompilation = true
size_t BaseFootPrintEstimate = 268435456
intx BiasedLockingBulkRebiasThreshold = 20
intx BiasedLockingBulkRevokeThreshold = 40
intx BiasedLockingDecayTime = 25000
intx BiasedLockingStartupDelay = 0
bool BlockLayoutByFrequency = true
intx BlockLayoutMinDiamondPercentage = 20
bool BlockLayoutRotateLoops = true
intx C1InlineStackLimit = 5
intx C1MaxInlineLevel = 9
intx C1MaxInlineSize = 35
intx C1MaxRecursiveInlineLevel = 1
intx C1MaxTrivialSize = 6
bool C1OptimizeVirtualCallProfiling = true
bool C1ProfileBranches = true
bool C1ProfileCalls = true
bool C1ProfileCheckcasts = true
bool C1ProfileInlinedCalls = true
bool C1ProfileVirtualCalls = true
bool C1UpdateMethodData = true
intx CICompilerCount = 4
bool CICompilerCountPerCPU = true
bool CITime = false
bool CheckJNICalls = false
bool ClassUnloading = true
bool ClassUnloadingWithConcurrentMark = true
bool ClipInlining = true
uintx CodeCacheExpansionSize = 65536
bool CompactStrings = true
ccstr CompilationMode = default
ccstrlist CompileCommand =
ccstr CompileCommandFile =
ccstrlist CompileOnly =
intx CompileThreshold = 10000
double CompileThresholdScaling = 1.000000
intx CompilerThreadPriority = -1
intx CompilerThreadStackSize = 0
size_t CompressedClassSpaceSize = 1073741824
uint ConcGCThreads = 3
intx ConditionalMoveLimit = 3
intx ContendedPaddingWidth = 128
bool CrashOnOutOfMemoryError = false
bool CreateCoredumpOnCrash = true
bool CriticalJNINatives = false
bool DTraceAllocProbes = false
bool DTraceMethodProbes = false
bool DTraceMonitorProbes = false
bool DisableAttachMechanism = false
bool DisableExplicitGC = false
bool DisplayVMOutputToStderr = false
bool DisplayVMOutputToStdout = false
bool DoEscapeAnalysis = true
bool DoReserveCopyInSuperWord = true
bool DontCompileHugeMethods = true
bool DontYieldALot = false
ccstr DumpLoadedClassList =
bool DumpReplayDataOnError = true
bool DumpSharedSpaces = false
bool DynamicDumpSharedSpaces = false
bool EagerXrunInit = false
intx EliminateAllocationArraySizeLimit = 64
bool EliminateAllocations = true
bool EliminateAutoBox = true
bool EliminateLocks = true
bool EliminateNestedLocks = true
bool EnableContended = true
bool EnableDynamicAgentLoading = true
size_t ErgoHeapSizeLimit = 0
ccstr ErrorFile =
bool ErrorFileToStderr = false
bool ErrorFileToStdout = false
uint64_t ErrorLogTimeout = 120
double EscapeAnalysisTimeout = 20.000000
bool EstimateArgEscape = true
bool ExecutingUnitTests = false
bool ExitOnOutOfMemoryError = false
bool ExplicitGCInvokesConcurrent = false
bool ExtendedDTraceProbes = false
bool ExtensiveErrorReports = false
ccstr ExtraSharedClassListFile =
bool FilterSpuriousWakeups = true
bool FlightRecorder = false
ccstr FlightRecorderOptions =
bool ForceTimeHighResolution = false
intx FreqInlineSize = 325
double G1ConcMarkStepDurationMillis = 10.000000
uintx G1ConcRSHotCardLimit = 4
size_t G1ConcRSLogCacheSize = 10
size_t G1ConcRefinementGreenZone = 0
size_t G1ConcRefinementRedZone = 0
uintx G1ConcRefinementServiceIntervalMillis = 300
uint G1ConcRefinementThreads = 10
size_t G1ConcRefinementThresholdStep = 2
size_t G1ConcRefinementYellowZone = 0
uintx G1ConfidencePercent = 50
size_t G1HeapRegionSize = 2097152
uintx G1HeapWastePercent = 5
uintx G1MixedGCCountTarget = 8
uintx G1PeriodicGCInterval = 0
bool G1PeriodicGCInvokesConcurrent = true
double G1PeriodicGCSystemLoadThreshold = 0.000000
intx G1RSetRegionEntries = 512
intx G1RSetSparseRegionEntries = 16
intx G1RSetUpdatingPauseTimePercent = 10
uint G1RefProcDrainInterval = 1000
uintx G1ReservePercent = 10
uintx G1SATBBufferEnqueueingThresholdPercent = 60
size_t G1SATBBufferSize = 1024
size_t G1UpdateBufferSize = 256
bool G1UseAdaptiveConcRefinement = true
bool G1UseAdaptiveIHOP = true
uintx GCDrainStackTargetSize = 64
uintx GCHeapFreeLimit = 2
uintx GCLockerEdenExpansionPercent = 5
uintx GCPauseIntervalMillis = 201
uintx GCTimeLimit = 98
uintx GCTimeRatio = 12
size_t HeapBaseMinAddress = 2147483648
bool HeapDumpAfterFullGC = false
bool HeapDumpBeforeFullGC = false
intx HeapDumpGzipLevel = 0
bool HeapDumpOnOutOfMemoryError = false
ccstr HeapDumpPath =
uintx HeapFirstMaximumCompactionCount = 3
uintx HeapMaximumCompactionInterval = 20
uintx HeapSearchSteps = 3
size_t HeapSizePerGCThread = 43620760
bool IgnoreEmptyClassPaths = false
bool IgnoreUnrecognizedVMOptions = false
uintx IncreaseFirstTierCompileThresholdAt = 50
bool IncrementalInline = true
uintx InitialCodeCacheSize = 2555904
size_t InitialHeapSize = 268435456
uintx InitialRAMFraction = 64
double InitialRAMPercentage = 1.562500
uintx InitialSurvivorRatio = 8
uintx InitialTenuringThreshold = 7
uintx InitiatingHeapOccupancyPercent = 45
bool Inline = true
ccstr InlineDataFile =
intx InlineSmallCode = 2500
bool InlineSynchronizedMethods = true
intx InteriorEntryAlignment = 16
intx InterpreterProfilePercentage = 33
bool JavaMonitorsInStackTrace = true
intx JavaPriority10_To_OSPriority = -1
intx JavaPriority1_To_OSPriority = -1
intx JavaPriority2_To_OSPriority = -1
intx JavaPriority3_To_OSPriority = -1
intx JavaPriority4_To_OSPriority = -1
intx JavaPriority5_To_OSPriority = -1
intx JavaPriority6_To_OSPriority = -1
intx JavaPriority7_To_OSPriority = -1
intx JavaPriority8_To_OSPriority = -1
intx JavaPriority9_To_OSPriority = -1
size_t LargePageHeapSizeThreshold = 134217728
size_t LargePageSizeInBytes = 0
intx LiveNodeCountInliningCutoff = 40000
intx LoopMaxUnroll = 16
intx LoopOptsCount = 43
intx LoopPercentProfileLimit = 10
uintx LoopStripMiningIter = 1000
uintx LoopStripMiningIterShortLoop = 100
intx LoopUnrollLimit = 60
intx LoopUnrollMin = 4
bool LoopUnswitching = true
bool ManagementServer = false
size_t MarkStackSize = 4194304
size_t MarkStackSizeMax = 536870912
uint MarkSweepAlwaysCompactCount = 4
uintx MarkSweepDeadRatio = 5
intx MaxBCEAEstimateLevel = 5
intx MaxBCEAEstimateSize = 150
uint64_t MaxDirectMemorySize = 0
bool MaxFDLimit = true
uintx MaxGCMinorPauseMillis = 18446744073709551615
uintx MaxGCPauseMillis = 200
uintx MaxHeapFreeRatio = 70
size_t MaxHeapSize = 4282384384
intx MaxInlineLevel = 15
intx MaxInlineSize = 35
intx MaxJNILocalCapacity = 65536
intx MaxJavaStackTraceDepth = 1024
intx MaxJumpTableSize = 65000
intx MaxJumpTableSparseness = 5
intx MaxLabelRootDepth = 1100
intx MaxLoopPad = 15
size_t MaxMetaspaceExpansion = 5439488
uintx MaxMetaspaceFreeRatio = 70
size_t MaxMetaspaceSize = 18446744073709551615
size_t MaxNewSize = 2569011200
intx MaxNodeLimit = 80000
uint64_t MaxRAM = 137438953472
uintx MaxRAMFraction = 4
double MaxRAMPercentage = 25.000000
intx MaxRecursiveInlineLevel = 1
uintx MaxTenuringThreshold = 15
intx MaxTrivialSize = 6
intx MaxVectorSize = 32
ccstr MetaspaceReclaimPolicy = balanced
size_t MetaspaceSize = 22020096
bool MethodFlushing = true
size_t MinHeapDeltaBytes = 2097152
uintx MinHeapFreeRatio = 40
size_t MinHeapSize = 8388608
intx MinInliningThreshold = 250
intx MinJumpTableSize = 10
size_t MinMetaspaceExpansion = 327680
uintx MinMetaspaceFreeRatio = 40
uintx MinRAMFraction = 2
double MinRAMPercentage = 50.000000
uintx MinSurvivorRatio = 3
size_t MinTLABSize = 2048
intx MultiArrayExpandLimit = 6
uintx NUMAChunkResizeWeight = 20
size_t NUMAInterleaveGranularity = 2097152
uintx NUMAPageScanRate = 256
size_t NUMASpaceResizeRate = 1073741824
bool NUMAStats = false
ccstr NativeMemoryTracking = off
bool NeverActAsServerClassMachine = false
bool NeverTenure = false
uintx NewRatio = 2
size_t NewSize = 1363144
size_t NewSizeThreadIncrease = 5320
intx NmethodSweepActivity = 10
intx NodeLimitFudgeFactor = 2000
uintx NonNMethodCodeHeapSize = 5839372
uintx NonProfiledCodeHeapSize = 122909434
intx NumberOfLoopInstrToAlign = 4
intx ObjectAlignmentInBytes = 8 {
size_t OldPLABSize = 1024
size_t OldSize = 5452592
bool OmitStackTraceInFastThrow = true
ccstrlist OnError =
ccstrlist OnOutOfMemoryError =
intx OnStackReplacePercentage = 140
bool OptimizeFill = false
bool OptimizePtrCompare = true
bool OptimizeStringConcat = true
bool OptoBundling = false
intx OptoLoopAlignment = 16
bool OptoRegScheduling = true
bool OptoScheduling = false
uintx PLABWeight = 75
bool PSChunkLargeArrays = true
int ParGCArrayScanChunk = 50
uintx ParallelGCBufferWastePct = 10
uint ParallelGCThreads = 10
size_t ParallelOldDeadWoodLimiterMean = 50
size_t ParallelOldDeadWoodLimiterStdDev = 80
bool ParallelRefProcBalancingEnabled = true
bool ParallelRefProcEnabled = true
bool PartialPeelAtUnsignedTests = true
bool PartialPeelLoop = true
intx PartialPeelNewPhiDelta = 0
uintx PausePadding = 1
intx PerBytecodeRecompilationCutoff = 200
intx PerBytecodeTrapLimit = 4
intx PerMethodRecompilationCutoff = 400
intx PerMethodTrapLimit = 100
bool PerfAllowAtExitRegistration = false
bool PerfBypassFileSystemCheck = false
intx PerfDataMemorySize = 32768
intx PerfDataSamplingInterval = 50
ccstr PerfDataSaveFile =
bool PerfDataSaveToFile = false
bool PerfDisableSharedMem = false
intx PerfMaxStringConstLength = 1024
size_t PreTouchParallelChunkSize = 1073741824
bool PreferInterpreterNativeStubs = false
intx PrefetchCopyIntervalInBytes = 576
intx PrefetchFieldsAhead = 1
intx PrefetchScanIntervalInBytes = 576
bool PreserveAllAnnotations = false
bool PreserveFramePointer = false
size_t PretenureSizeThreshold = 0
bool PrintClassHistogram = false
bool PrintCodeCache = false
bool PrintCodeCacheOnCompilation = false
bool PrintCommandLineFlags = false
bool PrintCompilation = false
bool PrintConcurrentLocks = false
bool PrintExtendedThreadInfo = false
bool PrintFlagsFinal = true
bool PrintFlagsInitial = false
bool PrintFlagsRanges = false
bool PrintGC = false
bool PrintGCDetails = false
bool PrintHeapAtSIGBREAK = true
bool PrintSharedArchiveAndExit = false
bool PrintSharedDictionary = false
bool PrintStringTableStatistics = false
bool PrintTieredEvents = false
bool PrintVMOptions = false
bool PrintWarnings = true
uintx ProcessDistributionStride = 4
bool ProfileInterpreter = true
intx ProfileMaturityPercentage = 20
uintx ProfiledCodeHeapSize = 122909434
uintx PromotedPadding = 3
uintx QueuedAllocationWarningCount = 0
int RTMRetryCount = 5
bool RangeCheckElimination = true
bool ReassociateInvariants = true
bool RecordDynamicDumpInfo = false
bool ReduceBulkZeroing = true
bool ReduceFieldZeroing = true
bool ReduceInitialCardMarks = true
bool ReduceSignalUsage = false
intx RefDiscoveryPolicy = 0
bool RegisterFinalizersAtInit = true
bool RelaxAccessControlCheck = false
ccstr ReplayDataFile =
bool RequireSharedSpaces = false
uintx ReservedCodeCacheSize = 251658240
bool ResizePLAB = true
bool ResizeTLAB = true
bool RestoreMXCSROnJNICalls = false
bool RestrictContended = true
bool RestrictReservedStack = true
bool RewriteBytecodes = true
bool RewriteFrequentPairs = true
bool SafepointTimeout = false
intx SafepointTimeoutDelay = 10000
bool ScavengeBeforeFullGC = false
bool SegmentedCodeCache = true
intx SelfDestructTimer = 0
ccstr SharedArchiveConfigFile =
ccstr SharedArchiveFile =
size_t SharedBaseAddress = 34359738368
ccstr SharedClassListFile =
uintx SharedSymbolTableBucketSize = 4
ccstr ShenandoahGCHeuristics = adaptive
ccstr ShenandoahGCMode = satb
bool ShowCodeDetailsInExceptionMessages = true
bool ShowMessageBoxOnError = false
bool ShrinkHeapInSteps = true
size_t SoftMaxHeapSize = 4282384384
intx SoftRefLRUPolicyMSPerMB = 1000
bool SplitIfBlocks = true
intx StackRedPages = 1
intx StackReservedPages = 0
intx StackShadowPages = 7
bool StackTraceInThrowable = true
intx StackYellowPages = 3
uintx StartAggressiveSweepingAt = 10
bool StartAttachListener = false
ccstr StartFlightRecording =
uint StringDeduplicationAgeThreshold = 3
uintx StringTableSize = 65536
bool SuperWordLoopUnrollAnalysis = true
bool SuperWordReductions = true
bool SuppressFatalErrorMessage = false
uintx SurvivorPadding = 3
uintx SurvivorRatio = 8
double SweeperThreshold = 0.500000
uintx TLABAllocationWeight = 35
uintx TLABRefillWasteFraction = 64
size_t TLABSize = 0
bool TLABStats = true
uintx TLABWasteIncrement = 4
uintx TLABWasteTargetPercent = 1
uintx TargetPLABWastePct = 10
uintx TargetSurvivorRatio = 50
uintx TenuredGenerationSizeIncrement = 20
uintx TenuredGenerationSizeSupplement = 80
uintx TenuredGenerationSizeSupplementDecay = 2
intx ThreadPriorityPolicy = 0
bool ThreadPriorityVerbose = false
intx ThreadStackSize = 0
uintx ThresholdTolerance = 10
intx Tier0BackedgeNotifyFreqLog = 10
intx Tier0InvokeNotifyFreqLog = 7
intx Tier0ProfilingStartPercentage = 200
intx Tier23InlineeNotifyFreqLog = 20
intx Tier2BackEdgeThreshold = 0
intx Tier2BackedgeNotifyFreqLog = 14
intx Tier2CompileThreshold = 0
intx Tier2InvokeNotifyFreqLog = 11
intx Tier3BackEdgeThreshold = 60000
intx Tier3BackedgeNotifyFreqLog = 13
intx Tier3CompileThreshold = 2000
intx Tier3DelayOff = 2
intx Tier3DelayOn = 5
intx Tier3InvocationThreshold = 200
intx Tier3InvokeNotifyFreqLog = 10
intx Tier3LoadFeedback = 5
intx Tier3MinInvocationThreshold = 100
intx Tier4BackEdgeThreshold = 40000
intx Tier4CompileThreshold = 15000
intx Tier4InvocationThreshold = 5000
intx Tier4LoadFeedback = 3
intx Tier4MinInvocationThreshold = 600
bool TieredCompilation = true
intx TieredCompileTaskTimeout = 50
intx TieredRateUpdateMaxTime = 25
intx TieredRateUpdateMinTime = 1
intx TieredStopAtLevel = 4
bool TimeLinearScan = false
ccstr TraceJVMTI =
intx TrackedInitializationLimit = 50
bool TrapBasedNullChecks = false
bool TrapBasedRangeChecks = false
intx TypeProfileArgsLimit = 2
uintx TypeProfileLevel = 111
intx TypeProfileMajorReceiverPercent = 90
intx TypeProfileParmsLimit = 2
intx TypeProfileWidth = 2
intx UnguardOnExecutionViolation = 0
bool UseAES = true
intx UseAVX = 2
bool UseAdaptiveGenerationSizePolicyAtMajorCollection = true
bool UseAdaptiveGenerationSizePolicyAtMinorCollection = true
bool UseAdaptiveNUMAChunkSizing = true
bool UseAdaptiveSizeDecayMajorGCCost = true
bool UseAdaptiveSizePolicy = true
bool UseAdaptiveSizePolicyFootprintGoal = true
bool UseAdaptiveSizePolicyWithSystemGC = false
bool UseAddressNop = true
bool UseBASE64Intrinsics = false
bool UseBMI1Instructions = true
bool UseBMI2Instructions = true
bool UseBiasedLocking = false
bool UseBimorphicInlining = true
bool UseCLMUL = true
bool UseCMoveUnconditionally = false
bool UseCodeAging = true
bool UseCodeCacheFlushing = true
bool UseCompiler = true
bool UseCompressedClassPointers = true {
bool UseCompressedOops = true {
bool UseCondCardMark = false
bool UseCountLeadingZerosInstruction = true
bool UseCountTrailingZerosInstruction = true
bool UseCountedLoopSafepoints = true
bool UseCounterDecay = true
bool UseDivMod = true
bool UseDynamicNumberOfCompilerThreads = true
bool UseDynamicNumberOfGCThreads = true
bool UseEmptySlotsInSupers = true
bool UseFMA = true
bool UseFPUForSpilling = true
bool UseFastJNIAccessors = true
bool UseFastStosb = false
bool UseG1GC = true
bool UseGCOverheadLimit = true
bool UseHeavyMonitors = false
bool UseInlineCaches = true
bool UseInterpreter = true
bool UseJumpTables = true
bool UseLargePages = false
bool UseLargePagesIndividualAllocation = false
bool UseLoopCounter = true
bool UseLoopInvariantCodeMotion = true
bool UseLoopPredicate = true
bool UseMaximumCompactionOnSystemGC = true
bool UseNUMA = false
bool UseNUMAInterleaving = false
bool UseNewLongLShift = true
bool UseNotificationThread = true
bool UseOSErrorReporting = false
bool UseOnStackReplacement = true
bool UseOnlyInlinedBimorphic = true
bool UseOptoBiasInlining = false
bool UsePSAdaptiveSurvivorSizePolicy = true
bool UseParallelGC = false
bool UsePerfData = true
bool UsePopCountInstruction = true
bool UseProfiledLoopPredicate = true
bool UseRTMDeopt = false
bool UseRTMLocking = false
bool UseSHA = true
intx UseSSE = 4
bool UseSSE42Intrinsics = true
bool UseSerialGC = false
bool UseSharedSpaces = true
bool UseShenandoahGC = false
bool UseSignalChaining = true
bool UseStoreImmI16 = true
bool UseStringDeduplication = false
bool UseSubwordForMaxVector = true
bool UseSuperWord = true
bool UseTLAB = true
bool UseThreadPriorities = true
bool UseTypeProfile = true
bool UseTypeSpeculation = true
bool UseUnalignedLoadStores = true
bool UseVectorCmov = false
bool UseXMMForArrayCopy = true
bool UseXMMForObjInit = true
bool UseXmmI2D = true
bool UseXmmI2F = true
bool UseXmmLoadAndClearUpper = true
bool UseXmmRegToRegMoveAll = true
bool UseZGC = false
intx VMThreadPriority = -1
intx VMThreadStackSize = 0
intx ValueMapInitialSize = 11
intx ValueMapMaxLoopSize = 8
intx ValueSearchLimit = 1000
bool VerifySharedSpaces = false
uintx YoungGenerationSizeIncrement = 20
uintx YoungGenerationSizeSupplement = 80
uintx YoungGenerationSizeSupplementDecay = 8
size_t YoungPLABSize = 4096
double ZAllocationSpikeTolerance = 2.000000
double ZCollectionInterval = 0.000000
double ZFragmentationLimit = 25.000000
size_t ZMarkStackSpaceLimit = 8589934592
bool ZProactive = true
bool ZUncommit = true
uintx ZUncommitDelay = 300
bool ZeroTLAB = false
openjdk version "17.0.6" 2023-01-17
OpenJDK Runtime Environment Temurin-17.0.6+10 (build 17.0.6+10)
OpenJDK 64-Bit Server VM Temurin-17.0.6+10 (build 17.0.6+10, mixed mode, sharing)

Java 项目中使用 Resilience4j 框架实现隔断机制/断路器


到目前为止,在本系列中,我们已经了解了 Resilience4j 及其 Retry, RateLimiter, TimeLimiter, 和 Bulkhead 模块。在本文中,我们将探索 CircuitBreaker 模块。我们将了解何时以及如何使用它,并查看一些示例。

代码示例

本文附有 GitHub 上的工作代码示例。

什么是 Resilience4j?

请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

什么是断路器?

断路器的思想是,如果我们知道调用可能会失败或超时,则阻止对远程服务的调用。我们这样做是为了不会在我们的服务和远程服务中不必要地浪费关键资源。这样的退出也给了远程服务一些时间来恢复。

我们怎么知道一个调用可能会失败? 通过跟踪对远程服务发出的先前请求的结果。例如,如果前 10 次调用中有 8 次导致失败或超时,则下一次调用也可能会失败。

断路器通过包装对远程服务的调用来跟踪响应。在正常运行期间,当远程服务成功响应时,我们说断路器处于“闭合”状态。当处于关闭状态时,断路器正常将请求传递给远程服务。

当远程服务返回错误或超时时,断路器会增加一个内部计数器。如果错误计数超过配置的阈值,断路器将切换到“断开”状态。当处于断开状态时,断路器立即向调用者返回错误,甚至无需尝试远程调用。

经过一段配置的时间后,断路器从断开状态切换到“半开”状态。在这种状态下,它允许一些请求传递到远程服务以检查它是否仍然不可用或缓慢。 如果错误率或慢呼叫率高于配置的阈值,则切换回断开状态。但是,如果错误率或慢呼叫率低于配置的阈值,则切换到关闭状态以恢复正常操作。

断路器的类型

断路器可以基于计数或基于时间。如果最后 N 次调用失败或缓慢,则基于计数的断路器将状态从关闭切换为断开。如果最后 N 秒的响应失败或缓慢,则基于时间的断路器将切换到断开状态。在这两个断路器中,我们还可以指定失败或慢速调用的阈值。

例如,如果最近 25 次调用中有 70% 失败或需要 2 秒以上才能完成,我们可以配置一个基于计数的断路器来“断开电路”。同样,如果过去 30 秒内 80% 的调用失败或耗时超过 5 秒,我们可以告诉基于时间的断路器断开电路。

Resilience4j 的 CircuitBreaker 概念

resilience4j-circuitbreaker 的工作原理与其他 Resilience4j 模块类似。我们提供想要作为函数构造执行的代码——一个进行远程调用的 lambda 表达式或一个从远程服务中检索到的某个值的 Supplier,等等——并且断路器用代码修饰它 如果需要,跟踪响应并切换状态。

Resilience4j 同时支持基于计数和基于时间的断路器。

我们使用 slidingWindowType() 配置指定断路器的类型。此配置可以采用两个值之一 –
SlidingWindowType.COUNT_BASED
SlidingWindowType.TIME_BASED

failureRateThreshold()slowCallRateThreshold() 以百分比形式配置失败率阈值和慢速调用率。

slowCallDurationThreshold() 以秒为单位配置调用被认为慢的时间。

我们可以指定一个 minimumNumberOfCalls(),在断路器可以计算错误率或慢速调用率之前需要它。

如前所述,断路器在一定时间后从断开状态切换到半断开状态,以检查远程服务的情况。waitDurationInOpenState() 指定断路器在切换到半开状态之前应等待的时间。

permittedNumberOfCallsInHalfOpenState() 配置在半开状态下允许的调用次数,
maxWaitDurationInHalfOpenState() 确定断路器在切换回开状态之前可以保持在半开状态的时间。

此配置的默认值 0 意味着断路器将无限等待,直到所有
permittedNumberOfCallsInHalfOpenState() 完成。

默认情况下,断路器将任何异常视为失败。但是我们可以对此进行调整,以使用 recordExceptions() 配置指定应视为失败的异常列表和使用 ignoreExceptions() 配置忽略的异常列表。

如果我们在确定异常应该被视为失败还是忽略时想要更精细的控制,我们可以提供 Predicate<Throwable> 作为 recordException()ignoreException() 配置。

当断路器拒绝处于断开状态的呼叫时,它会抛出 CallNotPermittedException。我们可以使用 writablestacktraceEnabled() 配置控制 CallNotPermittedException 的堆栈跟踪中的信息量。

使用 Resilience4j CircuitBreaker模块

让我们看看如何使用
resilience4j-circuitbreaker 模块中可用的各种功能。

我们将使用与本系列前几篇文章相同的示例。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

使用 Resilience4j 断路器时,CircuitBreakerRegistryCircuitBreakerConfigCircuitBreaker 是我们使用的主要抽象。

CircuitBreakerRegistry 是用于创建和管理 CircuitBreaker 对象的工厂。

CircuitBreakerConfig 封装了上一节中的所有配置。每个 CircuitBreaker 对象都与一个 CircuitBreakerConfig 相关联。

第一步是创建一个 CircuitBreakerConfig

CircuitBreakerConfig config = CircuitBreakerConfig.ofDefaults();

这将创建一个具有以下默认值的 CircuitBreakerConfig:

配置 默认值
slidingWindowType COUNT_BASED
failureRateThreshold 50%
slowCallRateThreshold 100%
slowCallDurationThreshold 60s
minimumNumberOfCalls 100
permittedNumberOfCallsInHalfOpenState 10
maxWaitDurationInHalfOpenState 0s

基于计数的断路器

假设我们希望断路器在最近 10 次调用中有 70% 失败时断开:

CircuitBreakerConfig config = CircuitBreakerConfig
  .custom()
  .slidingWindowType(SlidingWindowType.COUNT_BASED)
  .slidingWindowSize(10)
  .failureRateThreshold(70.0f)
  .build();

然后我们用这个配置创建一个 CircuitBreaker

CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
CircuitBreaker circuitBreaker = registry.circuitBreaker("flightSearchService");

现在让我们表达我们的代码以作为 Supplier 运行航班搜索并使用 circuitbreaker 装饰它:

Supplier<List<Flight>> flightsSupplier =
  () -> service.searchFlights(request);
Supplier<List<Flight>> decoratedFlightsSupplier =
  circuitBreaker.decorateSupplier(flightsSupplier);

最后,让我们调用几次修饰操作来了解断路器的工作原理。我们可以使用 CompletableFuture 来模拟来自用户的并发航班搜索请求:

for (int i=0; i<20; i++) {
  try {
    System.out.println(decoratedFlightsSupplier.get());
  }
  catch (...) {
    // Exception handling
  }
}

输出显示前几次飞行搜索成功,然后是 7 次飞行搜索失败。此时,断路器断开并为后续调用抛出 CallNotPermittedException

Searching for flights; current time = 12:01:12 884
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
Searching for flights; current time = 12:01:12 954
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
Searching for flights; current time = 12:01:12 957
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
Searching for flights; current time = 12:01:12 958
io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search
... stack trace omitted ...
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
... other lines omitted ...
io.reflectoring.resilience4j.circuitbreaker.Examples.countBasedSlidingWindow_FailedCalls(Examples.java:56)
  at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:229)

现在,假设我们希望断路器在最后 10 个调用中有 70% 需要 2 秒或更长时间才能完成:

CircuitBreakerConfig config = CircuitBreakerConfig
  .custom()
  .slidingWindowType(SlidingWindowType.COUNT_BASED)
  .slidingWindowSize(10)
  .slowCallRateThreshold(70.0f)
  .slowCallDurationThreshold(Duration.ofSeconds(2))
  .build();

示例输出中的时间戳显示请求始终需要 2 秒才能完成。在 7 次缓慢响应后,断路器断开并且不允许进一步调用:

Searching for flights; current time = 12:26:27 901
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
Searching for flights; current time = 12:26:29 953
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
Searching for flights; current time = 12:26:31 957
Flight search successful
... other lines omitted ...
Searching for flights; current time = 12:26:43 966
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
... stack trace omitted ...
        at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:231)
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
... stack trace omitted ...
        at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:231)

通常我们会配置一个具有故障率和慢速调用率阈值的断路器:

CircuitBreakerConfig config = CircuitBreakerConfig
  .custom()
  .slidingWindowType(SlidingWindowType.COUNT_BASED)
  .slidingWindowSize(10)
  .failureRateThreshold(70.0f)
  .slowCallRateThreshold(70.0f)
  .slowCallDurationThreshold(Duration.ofSeconds(2))
  .build();

基于时间的断路器

假设我们希望断路器在过去 10 秒内 70% 的请求失败时断开:

CircuitBreakerConfig config = CircuitBreakerConfig
  .custom()
  .slidingWindowType(SlidingWindowType.COUNT_BASED)
  .slidingWindowSize(10)
  .failureRateThreshold(70.0f)
  .slowCallRateThreshold(70.0f)
  .slowCallDurationThreshold(Duration.ofSeconds(2))
  .build();

我们创建了 CircuitBreaker,将航班搜索调用表示为 Supplier<List<Flight>> 并使用 CircuitBreaker 对其进行装饰,就像我们在上一节中所做的那样。

以下是多次调用修饰操作后的示例输出:

Start time: 18:51:01 552
Searching for flights; current time = 18:51:01 582
Flight search successful
[Flight{flightNumber='XY 765', ... }]
... other lines omitted ...
Searching for flights; current time = 18:51:01 631
io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search
... stack trace omitted ...
Searching for flights; current time = 18:51:01 632
io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search
... stack trace omitted ...
Searching for flights; current time = 18:51:01 633
... other lines omitted ...
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
... other lines omitted ...

前 3 个请求成功,接下来的 7 个请求失败。此时断路器断开,后续请求因抛出 CallNotPermittedException 而失败。

现在,假设我们希望断路器在过去 10 秒内 70% 的调用需要 1 秒或更长时间才能完成:

CircuitBreakerConfig config = CircuitBreakerConfig
  .custom()
  .slidingWindowType(SlidingWindowType.TIME_BASED)
  .minimumNumberOfCalls(10)
  .slidingWindowSize(10)
  .slowCallRateThreshold(70.0f)
  .slowCallDurationThreshold(Duration.ofSeconds(1))
  .build();

示例输出中的时间戳显示请求始终需要 1 秒才能完成。在 10 个请求(minimumNumberOfCalls)之后,当断路器确定 70% 的先前请求花费了 1 秒或更长时间时,它断开电路:

Start time: 19:06:37 957
Searching for flights; current time = 19:06:37 979
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 19:06:39 066
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 19:06:40 070
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 19:06:41 070
... other lines omitted ...
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
... stack trace omitted ...

通常我们会配置一个具有故障率和慢速调用率阈值的基于时间的断路器:

指定断开状态下的等待时间

假设我们希望断路器处于断开状态时等待 10 秒,然后转换到半断开状态并让一些请求传递到远程服务:

CircuitBreakerConfig config = CircuitBreakerConfig
  .custom()
  .slidingWindowType(SlidingWindowType.TIME_BASED)
  .slidingWindowSize(10)
  .minimumNumberOfCalls(10)
  .failureRateThreshold(70.0f)
  .slowCallRateThreshold(70.0f)
  .slowCallDurationThreshold(Duration.ofSeconds(2))
  .build();

示例输出中的时间戳显示断路器最初转换为断开状态,在接下来的 10 秒内阻止一些调用,然后更改为半断开状态。后来,在半开状态时一致的成功响应导致它再次切换到关闭状态:

Searching for flights; current time = 20:55:58 735
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 20:55:59 812
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 20:56:00 816
... other lines omitted ...
io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Flight search failed
    at
... stack trace omitted ...
2020-12-13T20:56:03.850115+05:30: CircuitBreaker 'flightSearchService' changed state from CLOSED to OPEN
2020-12-13T20:56:04.851700+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
2020-12-13T20:56:05.852220+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
2020-12-13T20:56:06.855338+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
... other similar lines omitted ...
2020-12-13T20:56:12.862362+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
2020-12-13T20:56:13.865436+05:30: CircuitBreaker 'flightSearchService' changed state from OPEN to HALF_OPEN
Searching for flights; current time = 20:56:13 865
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
... other similar lines omitted ...
2020-12-13T20:56:16.877230+05:30: CircuitBreaker 'flightSearchService' changed state from HALF_OPEN to CLOSED
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 20:56:17 879
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
... other similar lines omitted ...

指定回退方法

使用断路器时的常见模式是指定在电路断开时要调用的回退方法。回退方法可以为不允许的远程调用提供一些默认值或行为

我们可以使用 Decorators 实用程序类进行设置。Decorators 是来自 resilience4j-all 模块的构建器,具有 withCircuitBreaker()withRetry()withRateLimiter() 等方法,可帮助将多个 Resilience4j 装饰器应用于 SupplierFunction 等。

当断路器断开并抛出 CallNotPermittedException 时,我们将使用它的 withFallback() 方法从本地缓存返回航班搜索结果:

Supplier<List<Flight>> flightsSupplier = () -> service.searchFlights(request);
Supplier<List<Flight>> decorated = Decorators
  .ofSupplier(flightsSupplier)
  .withCircuitBreaker(circuitBreaker)
  .withFallback(Arrays.asList(CallNotPermittedException.class),
                e -> this.getFlightSearchResultsFromCache(request))
  .decorate();

以下示例输出显示了断路器断开后从缓存中返回的搜索结果:

Searching for flights; current time = 22:08:29 735
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 22:08:29 854
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 22:08:29 855
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Searching for flights; current time = 22:08:29 855
2020-12-13T22:08:29.856277+05:30: CircuitBreaker 'flightSearchService' recorded an error: 'io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search'. Elapsed time: 0 ms
Searching for flights; current time = 22:08:29 912
... other lines omitted ...
2020-12-13T22:08:29.926691+05:30: CircuitBreaker 'flightSearchService' changed state from CLOSED to OPEN
Returning flight search results from cache
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
Returning flight search results from cache
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... }]
... other lines omitted ...

减少 Stacktrace 中的信息

每当断路器断开时,它就会抛出 CallNotPermittedException

io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
    at io.github.resilience4j.circuitbreaker.CallNotPermittedException.createCallNotPermittedException(CallNotPermittedException.java:48)
... other lines in stack trace omitted ...
at io.reflectoring.resilience4j.circuitbreaker.Examples.timeBasedSlidingWindow_SlowCalls(Examples.java:169)
    at io.reflectoring.resilience4j.circuitbreaker.Examples.main(Examples.java:263)

除了第一行,堆栈跟踪中的其他行没有增加太多价值。如果 CallNotPermittedException 发生多次,这些堆栈跟踪行将在我们的日志文件中重复。

我们可以通过将 writablestacktraceEnabled() 配置设置为 false 来减少堆栈跟踪中生成的信息量:

CircuitBreakerConfig config = CircuitBreakerConfig
  .custom()
  .slidingWindowType(SlidingWindowType.COUNT_BASED)
  .slidingWindowSize(10)
  .failureRateThreshold(70.0f)
  .writablestacktraceEnabled(false)
  .build();

现在,当 CallNotPermittedException 发生时,堆栈跟踪中只存在一行:

Searching for flights; current time = 20:29:24 476
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
Searching for flights; current time = 20:29:24 540
Flight search successful
[Flight{flightNumber='XY 765', flightDate='12/31/2020', from='NYC', to='LAX'}, ... ]
... other lines omitted ...
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'flightSearchService' is OPEN and does not permit further calls
...

其他有用的方法

Retry 模块类似,CircuitBreaker 也有像 ignoreExceptions()recordExceptions() 等方法,让我们可以指定 CircuitBreaker 在跟踪调用结果时应该忽略和考虑哪些异常。

例如,我们可能不想忽略来自远程飞行服务的 SeatsUnavailableException – 在这种情况下,我们真的不想断开电路。

与我们见过的其他 Resilience4j 模块类似,CircuitBreaker 还提供了额外的方法,如 decorateCheckedSupplier()decorateCompletionStage()decorateRunnable()decorateConsumer() 等,因此我们可以在 Supplier 之外的其他结构中提供我们的代码。

断路器事件

CircuitBreaker 有一个 EventPublisher 可以生成以下类型的事件:

  • CircuitBreakerOnSuccessEvent,
  • CircuitBreakerOnErrorEvent,
  • CircuitBreakerOnStateTransitionEvent,
  • CircuitBreakerOnResetEvent,
  • CircuitBreakerOnIgnoredErrorEvent,
  • CircuitBreakerOnCallNotPermittedEvent,
  • CircuitBreakerOnFailureRateExceededEvent 以及
  • CircuitBreakerOnSlowCallRateExceededEvent.

我们可以监听这些事件并记录它们,例如:

circuitBreaker.getEventPublisher()
  .onCallNotPermitted(e -> System.out.println(e.toString()));
circuitBreaker.getEventPublisher()
  .onError(e -> System.out.println(e.toString()));
circuitBreaker.getEventPublisher()
  .onFailureRateExceeded(e -> System.out.println(e.toString()));
circuitBreaker.getEventPublisher().onStateTransition(e -> System.out.println(e.toString()));

以下是示例的日志输出:

2020-12-13T22:25:52.972943+05:30: CircuitBreaker 'flightSearchService' recorded an error: 'io.reflectoring.resilience4j.circuitbreaker.exceptions.FlightServiceException: Error occurred during flight search'. Elapsed time: 0 ms
Searching for flights; current time = 22:25:52 973
... other lines omitted ...
2020-12-13T22:25:52.974448+05:30: CircuitBreaker 'flightSearchService' exceeded failure rate threshold. Current failure rate: 70.0
2020-12-13T22:25:52.984300+05:30: CircuitBreaker 'flightSearchService' changed state from CLOSED to OPEN
2020-12-13T22:25:52.985057+05:30: CircuitBreaker 'flightSearchService' recorded a call which was not permitted.
... other lines omitted ...

CircuitBreaker指标

CircuitBreake 暴露了许多指标,这些是一些重要的条目:

  • 成功、失败或忽略的调用总数 (resilience4j.circuitbreaker.calls)
  • 断路器状态 (resilience4j.circuitbreaker.state)
  • 断路器故障率 (resilience4j.circuitbreaker.failure.rate)
  • 未被允许的调用总数 (resilience4.circuitbreaker.not.permitted.calls)
  • 断路器的缓慢调用 (resilience4j.circuitbreaker.slow.call.rate)

首先,我们像往常一样创建 CircuitBreakerConfigCircuitBreakerRegistryCircuitBreaker。然后,我们创建一个 MeterRegistry 并将 CircuitBreakerRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedCircuitBreakerMetrics.ofCircuitBreakerRegistry(registry)
  .bindTo(meterRegistry);

运行几次断路器修饰操作后,我们显示捕获的指标。这是一些示例输出:

The number of slow failed calls which were slower than a certain threshold - resilience4j.circuitbreaker.slow.calls: 0.0
The states of the circuit breaker - resilience4j.circuitbreaker.state: 0.0, state: metrics_only
Total number of not permitted calls - resilience4j.circuitbreakernot.permitted.calls: 0.0
The slow call of the circuit breaker - resilience4j.circuitbreaker.slow.call.rate: -1.0
The states of the circuit breaker - resilience4j.circuitbreaker.state: 0.0, state: half_open
Total number of successful calls - resilience4j.circuitbreaker.calls: 0.0, kind: successful
The failure rate of the circuit breaker - resilience4j.circuitbreaker.failure.rate: -1.0

在实际应用中,我们会定期将数据导出到监控系统并在仪表板上进行分析。

结论

在本文中,我们学习了如何使用 Resilience4j 的 Circuitbreaker 模块在远程服务返回错误时暂停向其发出请求。我们了解了为什么这很重要,还看到了一些有关如何配置它的实际示例。

您可以使用 GitHub 上的代码来演示一个完整的应用程序。


本文译自:Implementing a Circuit Breaker with Resilience4j – Reflectoring

Java 项目中使用 Resilience4j 框架实现故障隔离

Java 项目中使用 Resilience4j 框架实现故障隔离

到目前为止,在本系列中,我们已经了解了 Resilience4j 及其 Retry, RateLimiterTimeLimiter 模块。在本文中,我们将探讨 Bulkhead 模块。我们将了解它解决了什么问题,何时以及如何使用它,并查看一些示例。

代码示例

本文附有 GitHub 上的工作代码示例。

什么是 Resilience4j?

请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

什么是故障隔离?

几年前,我们遇到了一个生产问题,其中一台服务器停止响应健康检查,负载均衡器将服务器从池中取出。

就在我们开始调查这个问题的时候,还有第二个警报——另一台服务器已经停止响应健康检查,也被从池中取出。

几分钟后,每台服务器都停止响应健康探测,我们的服务完全关闭。

我们使用 Redis 为应用程序支持的几个功能缓存一些数据。正如我们后来发现的那样,Redis 集群同时出现了一些问题,它已停止接受新连接。我们使用 Jedis 库连接到 Redis,该库的默认行为是无限期地阻塞调用线程,直到建立连接。

我们的服务托管在 Tomcat 上,它的默认请求处理线程池大小为 200 个线程。因此,通过连接到 Redis 的代码路径的每个请求最终都会无限期地阻塞线程。

几分钟之内,集群中的所有 2000 个线程都无限期地阻塞了——甚至没有空闲线程来响应负载均衡器的健康检查。

该服务本身支持多项功能,并非所有功能都需要访问 Redis 缓存。但是当这一方面出现问题时,它最终影响了整个服务。

这正是故障隔离要解决的问题——它可以防止某个服务区域的问题影响整个服务。

虽然我们的服务发生的事情是一个极端的例子,但我们可以看到缓慢的上游依赖如何影响调用服务的不相关区域。

如果我们在每个服务器实例上对 Redis 设置了 20 个并发请求的限制,那么当 Redis 连接问题发生时,只有这些线程会受到影响。剩余的请求处理线程可以继续为其他请求提供服务。

故障隔离背后的想法是对我们对远程服务进行的并发调用数量设置限制。我们将对不同远程服务的调用视为不同的、隔离的池,并对可以同时进行的调用数量设置限制。

术语舱壁本身来自它在船舶中的使用,其中船舶的底部被分成彼此分开的部分。如果有裂缝,并且水开始流入,则只有该部分会充满水。这可以防止整艘船沉没。

Resilience4j 隔板概念

resilience4j-bulkhead 的工作原理类似于其他 Resilience4j 模块。我们为它提供了我们想要作为函数构造执行的代码——一个进行远程调用的 lambda 表达式或一个从远程服务中检索到的某个值的 Supplier,等等——并且隔板用代码装饰它以控制并发调用数。

Resilience4j 提供两种类型的隔板 – SemaphoreBulkhead ThreadPoolBulkhead

SemaphoreBulkhead 内部使用
java.util.concurrent.Semaphore 来控制并发调用的数量并在当前线程上执行我们的代码。

ThreadPoolBulkhead 使用线程池中的一个线程来执行我们的代码。它内部使用
java.util.concurrent.ArrayBlockingQueue
java.util.concurrent.ThreadPoolExecutor 来控制并发调用的数量。

SemaphoreBulkhead

让我们看看与信号量隔板相关的配置及其含义。

maxConcurrentCalls 确定我们可以对远程服务进行的最大并发调用数。我们可以将此值视为初始化信号量的许可数。

任何尝试超过此限制调用远程服务的线程都可以立即获得 BulkheadFullException 或等待一段时间以等待另一个线程释放许可。这由 maxWaitDuration 值决定。

当有多个线程在等待许可时,fairCallHandlingEnabled 配置确定等待的线程是否以先进先出的顺序获取许可。

最后, writableStackTraceEnabled 配置让我们可以在 BulkheadFullException 发生时减少堆栈跟踪中的信息量。这很有用,因为如果没有它,当异常多次发生时,我们的日志可能会充满许多类似的信息。通常在读取日志时,只知道发生了 BulkheadFullException 就足够了。

ThreadPoolBulkhead

coreThreadPoolSizemaxThreadPoolSizekeepAliveDurationqueueCapacity 是与 ThreadPoolBulkhead 相关的主要配置。ThreadPoolBulkhead 内部使用这些配置来构造一个 ThreadPoolExecutor

internalThreadPoolExecutor 使用可用的空闲线程之一执行传入的任务。 如果没有线程可以自由执行传入的任务,则该任务将排队等待线程可用时稍后执行。如果已达到 queueCapacity,则远程调用将被拒绝并返回 BulkheadFullException

ThreadPoolBulkhead 也有 writableStackTraceEnabled 配置来控制 BulkheadFullException 的堆栈跟踪中的信息量。

使用 Resilience4j 隔板模块

让我们看看如何使用 resilience4j-bulkhead 模块中可用的各种功能。

我们将使用与本系列前几篇文章相同的示例。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

SemaphoreBulkhead

使用基于信号量的隔板时,BulkheadRegistryBulkheadConfigBulkhead 是我们使用的主要抽象。

BulkheadRegistry 是一个用于创建和管理 Bulkhead 对象的工厂。

BulkheadConfig 封装了 maxConcurrentCallsmaxWaitDurationwritableStackTraceEnabledfairCallHandlingEnabled 配置。每个 Bulkhead 对象都与一个 BulkheadConfig 相关联。

第一步是创建一个 BulkheadConfig

BulkheadConfig config = BulkheadConfig.ofDefaults();

这将创建一个 BulkheadConfig,其默认值为 maxConcurrentCalls(25)、maxWaitDuration(0s)、writableStackTraceEnabled(true) 和 fairCallHandlingEnabled(true)。

假设我们希望将并发调用的数量限制为 2,并且我们愿意等待 2 秒让线程获得许可:

BulkheadConfig config = BulkheadConfig.custom()
  .maxConcurrentCalls(2)
  .maxWaitDuration(Duration.ofSeconds(2))
  .build();

然后我们创建一个 Bulkhead

BulkheadRegistry registry = BulkheadRegistry.of(config);

Bulkhead bulkhead = registry.bulkhead("flightSearchService");

现在让我们表达我们的代码以作为 Supplier 运行航班搜索并使用 bulkhead 装饰它:

BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead("flightSearchService");

最后,让我们调用几次装饰操作来了解隔板的工作原理。我们可以使用 CompletableFuture 来模拟来自用户的并发航班搜索请求:

for (int i=0; i<4; i++) {
  CompletableFuture
    .supplyAsync(decoratedFlightsSupplier)
    .thenAccept(flights -> System.out.println("Received results"));
}

输出中的时间戳和线程名称显示,在 4 个并发请求中,前两个请求立即通过:

Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-3
Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-5
Flight search successful at 11:42:13 226
Flight search successful at 11:42:13 226
Received results
Received results
Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-9
Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-7
Flight search successful at 11:42:14 239
Flight search successful at 11:42:14 239
Received results
Received results

第三个和第四个请求仅在 1 秒后就能够获得许可,在之前的请求完成之后。

如果线程无法在我们指定的 2s maxWaitDuration 内获得许可,则会抛出 BulkheadFullException

Caused by: io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
    at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:49)
    at io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead.acquirePermission(SemaphoreBulkhead.java:164)
    at io.github.resilience4j.bulkhead.Bulkhead.lambda$decorateSupplier$5(Bulkhead.java:194)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 6 more

除了第一行,堆栈跟踪中的其他行没有增加太多价值。如果 BulkheadFullException 发生多次,这些堆栈跟踪行将在我们的日志文件中重复。

我们可以通过将 writableStackTraceEnabled 配置设置为 false 来减少堆栈跟踪中生成的信息量:

BulkheadConfig config = BulkheadConfig.custom()
    .maxConcurrentCalls(2)
    .maxWaitDuration(Duration.ofSeconds(1))
    .writableStackTraceEnabled(false)
.build();

现在,当 BulkheadFullException 发生时,堆栈跟踪中只存在一行:

Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3
Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5
io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
Flight search successful at 12:27:58 699
Flight search successful at 12:27:58 699
Received results
Received results

与我们见过的其他 Resilience4j 模块类似,Bulkhead 还提供了额外的方法,如 decorateCheckedSupplier()decorateCompletionStage()decorateRunnable()decorateConsumer() 等,因此我们可以在 Supplier 供应商之外的其他结构中提供我们的代码。

ThreadPoolBulkhead

当使用基于线程池的隔板时,
ThreadPoolBulkheadRegistryThreadPoolBulkheadConfigThreadPoolBulkhead 是我们使用的主要抽象。

ThreadPoolBulkheadRegistry 是用于创建和管理 ThreadPoolBulkhead 对象的工厂。

ThreadPoolBulkheadConfig 封装了 coreThreadPoolSizemaxThreadPoolSizekeepAliveDurationqueueCapacity 配置。每个 ThreadPoolBulkhead 对象都与一个 ThreadPoolBulkheadConfig 相关联。

第一步是创建一个 ThreadPoolBulkheadConfig

ThreadPoolBulkheadConfig config =
  ThreadPoolBulkheadConfig.ofDefaults();

这将创建一个 ThreadPoolBulkheadConfig,其默认值为 coreThreadPoolSize(可用处理器数量 -1)、maxThreadPoolSize(可用处理器最大数量)、keepAliveDuration(20ms)和 queueCapacity(100)。

假设我们要将并发调用的数量限制为 2:

ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
  .maxThreadPoolSize(2)
  .coreThreadPoolSize(1)
  .queueCapacity(1)
  .build();

然后我们创建一个 ThreadPoolBulkhead

ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
ThreadPoolBulkhead bulkhead = registry.bulkhead("flightSearchService");

现在让我们表达我们的代码以作为 Supplier 运行航班搜索并使用 bulkhead 装饰它:

Supplier<List<Flight>> flightsSupplier =
  () -> service.searchFlightsTakingOneSecond(request);
Supplier<CompletionStage<List<Flight>>> decoratedFlightsSupplier =
  ThreadPoolBulkhead.decorateSupplier(bulkhead, flightsSupplier);

与返回一个 Supplier<List<Flight>>
SemaphoreBulkhead.decorateSupplier() 不同,
ThreadPoolBulkhead.decorateSupplier() 返回一个 Supplier<CompletionStage<List<Flight>>。这是因为 ThreadPoolBulkHead 不会在当前线程上同步执行代码。

最后,让我们调用几次装饰操作来了解隔板的工作原理:

for (int i=0; i<3; i++) {
  decoratedFlightsSupplier
    .get()
    .whenComplete((r,t) -> {
      if (r != null) {
        System.out.println("Received results");
      }
      if (t != null) {
        t.printStackTrace();
      }
    });
}

输出中的时间戳和线程名称显示,虽然前两个请求立即执行,但第三个请求已排队,稍后由释放的线程之一执行:

Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-1
Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-2
Flight search successful at 16:15:00 136
Flight search successful at 16:15:00 135
Received results
Received results
Searching for flights; current time = 16:15:01 151; current thread = bulkhead-flightSearchService-2
Flight search successful at 16:15:01 151
Received results

如果队列中没有空闲线程和容量,则抛出 BulkheadFullException

Exception in thread "main" io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
 at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:64)
 at io.github.resilience4j.bulkhead.internal.FixedThreadPoolBulkhead.submit(FixedThreadPoolBulkhead.java:157)
... other lines omitted ...

我们可以使用 writableStackTraceEnabled 配置来减少堆栈跟踪中生成的信息量:

ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
  .maxThreadPoolSize(2)
  .coreThreadPoolSize(1)
  .queueCapacity(1)
  .writableStackTraceEnabled(false)
  .build();

现在,当 BulkheadFullException 发生时,堆栈跟踪中只存在一行:

Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3
Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5
io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
Flight search successful at 12:27:58 699
Flight search successful at 12:27:58 699
Received results
Received results

上下文传播

有时我们将数据存储在 ThreadLocal 变量中并在代码的不同区域中读取它。我们这样做是为了避免在方法链之间显式地将数据作为参数传递,尤其是当该值与我们正在实现的核心业务逻辑没有直接关系时。

例如,我们可能希望将当前用户 ID 或事务 ID 或某个请求跟踪 ID 记录到每个日志语句中,以便更轻松地搜索日志。对于此类场景,使用 ThreadLocal 是一种有用的技术。

使用 ThreadPoolBulkhead 时,由于我们的代码不在当前线程上执行,因此我们存储在 ThreadLocal 变量中的数据在其他线程中将不可用。

让我们看一个例子来理解这个问题。首先我们定义一个 RequestTrackingIdHolder 类,一个围绕 ThreadLocal 的包装类:

class RequestTrackingIdHolder {
  static ThreadLocal<String> threadLocal = new ThreadLocal<>();

  static String getRequestTrackingId() {
    return threadLocal.get();
  }

  static void setRequestTrackingId(String id) {
    if (threadLocal.get() != null) {
      threadLocal.set(null);
      threadLocal.remove();
    }
    threadLocal.set(id);
  }

  static void clear() {
    threadLocal.set(null);
    threadLocal.remove();
  }
}

静态方法可以轻松设置和获取存储在 ThreadLocal 上的值。我们接下来在调用隔板装饰的航班搜索操作之前设置一个请求跟踪 ID:

for (int i=0; i<2; i++) {
  String trackingId = UUID.randomUUID().toString();
  System.out.println("Setting trackingId " + trackingId + " on parent, main thread before calling flight search");
  RequestTrackingIdHolder.setRequestTrackingId(trackingId);
  decoratedFlightsSupplier
    .get()
    .whenComplete((r,t) -> {
        // other lines omitted
    });
}

示例输出显示此值在隔板管理的线程中不可用:

Setting trackingId 98ff99df-466a-47f7-88f7-5e31fc8fcb6b on parent, main thread before calling flight search
Setting trackingId 6b98d73c-a590-4a20-b19d-c85fea783caf on parent, main thread before calling flight search
Searching for flights; current time = 19:53:53 799; current thread = bulkhead-flightSearchService-1; Request Tracking Id = null
Flight search successful at 19:53:53 824
Received results
Searching for flights; current time = 19:53:54 836; current thread = bulkhead-flightSearchService-1; Request Tracking Id = null
Flight search successful at 19:53:54 836
Received results

为了解决这个问题,ThreadPoolBulkhead 提供了一个 ContextPropagatorContextPropagator 是一种用于跨线程边界检索、复制和清理值的抽象。它定义了一个接口,其中包含从当前线程 (retrieve()) 获取值、将其复制到新的执行线程 (copy()) 并最终在执行线程 (clear()) 上进行清理的方法。

让我们实现一个
RequestTrackingIdPropagator

class RequestTrackingIdPropagator implements ContextPropagator {
  @Override
  public Supplier<Optional> retrieve() {
    System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName());
    return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
  }

  @Override
  Consumer<Optional> copy() {
    return optional -> {
      System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
      optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
    };
  }

  @Override
  Consumer<Optional> clear() {
    return optional -> {
      System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName());
      optional.ifPresent(s -> RequestTrackingIdHolder.clear());
    };
  }
}

我们通过在 ThreadPoolBulkheadConfig 上的设置来为 ThreadPoolBulkhead 提供 ContextPropagator

class RequestTrackingIdPropagator implements ContextPropagator {
  @Override
  public Supplier<Optional> retrieve() {
    System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName());
    return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
  }

  @Override
  Consumer<Optional> copy() {
    return optional -> {
      System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
      optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
    };
  }

  @Override
  Consumer<Optional> clear() {
    return optional -> {
      System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName());
      optional.ifPresent(s -> RequestTrackingIdHolder.clear());
    };
  }
}

现在,示例输出显示请求跟踪 ID 在隔板管理的线程中可用:

Setting trackingId 71d44cb8-dab6-4222-8945-e7fd023528ba on parent, main thread before calling flight search
Getting request tracking id from thread: main
Setting trackingId 5f9dd084-f2cb-4a20-804b-038828abc161 on parent, main thread before calling flight search
Getting request tracking id from thread: main
Setting request tracking id 71d44cb8-dab6-4222-8945-e7fd023528ba on thread: bulkhead-flightSearchService-1
Searching for flights; current time = 20:07:56 508; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 71d44cb8-dab6-4222-8945-e7fd023528ba
Flight search successful at 20:07:56 538
Clearing request tracking id on thread: bulkhead-flightSearchService-1
Received results
Setting request tracking id 5f9dd084-f2cb-4a20-804b-038828abc161 on thread: bulkhead-flightSearchService-1
Searching for flights; current time = 20:07:57 542; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 5f9dd084-f2cb-4a20-804b-038828abc161
Flight search successful at 20:07:57 542
Clearing request tracking id on thread: bulkhead-flightSearchService-1
Received results

Bulkhead事件

Bulkhead 和 ThreadPoolBulkhead 都有一个 EventPublisher 来生成以下类型的事件:

  • BulkheadOnCallPermittedEvent
  • BulkheadOnCallRejectedEvent 和
  • BulkheadOnCallFinishedEvent

我们可以监听这些事件并记录它们,例如:

Bulkhead bulkhead = registry.bulkhead("flightSearchService");
bulkhead.getEventPublisher().onCallPermitted(e -> System.out.println(e.toString()));
bulkhead.getEventPublisher().onCallFinished(e -> System.out.println(e.toString()));
bulkhead.getEventPublisher().onCallRejected(e -> System.out.println(e.toString()));

示例输出显示了记录的内容:

2020-08-26T12:27:39.790435: Bulkhead 'flightSearch' permitted a call.
... other lines omitted ...
2020-08-26T12:27:40.290987: Bulkhead 'flightSearch' rejected a call.
... other lines omitted ...
2020-08-26T12:27:41.094866: Bulkhead 'flightSearch' has finished a call.

Bulkhead 指标

SemaphoreBulkhead

Bulkhead 暴露了两个指标:

  • 可用权限的最大数量(resilience4j.bulkhead.max.allowed.concurrent.calls),和
  • 允许的并发调用数(resilience4j.bulkhead.available.concurrent.calls)。

bulkhead.available 指标与我们在 BulkheadConfig 上配置的 maxConcurrentCalls 相同。

首先,我们像前面一样创建 BulkheadConfigBulkheadRegistryBulkhead。然后,我们创建一个 MeterRegistry 并将 BulkheadRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedBulkheadMetrics.ofBulkheadRegistry(registry)
  .bindTo(meterRegistry);

运行几次隔板装饰操作后,我们显示捕获的指标:

Consumer<Meter> meterConsumer = meter -> {
  String desc = meter.getId().getDescription();
  String metricName = meter.getId().getName();
  Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
    .filter(m -> m.getStatistic().name().equals("VALUE"))
    .findFirst()
    .map(m -> m.getValue())
    .orElse(0.0);
  System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

这是一些示例输出:

The maximum number of available permissions - resilience4j.bulkhead.max.allowed.concurrent.calls: 8.0
The number of available permissions - resilience4j.bulkhead.available.concurrent.calls: 3.0

ThreadPoolBulkhead

ThreadPoolBulkhead 暴露五个指标:

  • 队列的当前长度(resilience4j.bulkhead.queue.depth),
  • 当前线程池的大小(resilience4j.bulkhead.thread.pool.size),
  • 线程池的核心和最大容量(resilience4j.bulkhead.core.thread.pool.sizeresilience4j.bulkhead.max.thread.pool.size),以及
  • 队列的容量(resilience4j.bulkhead.queue.capacity)。

首先,我们像前面一样创建 ThreadPoolBulkheadConfig
ThreadPoolBulkheadRegistryThreadPoolBulkhead。然后,我们创建一个 MeterRegistry 并将
ThreadPoolBulkheadRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedThreadPoolBulkheadMetrics.ofThreadPoolBulkheadRegistry(registry).bindTo(meterRegistry);

运行几次隔板装饰操作后,我们将显示捕获的指标:

The queue capacity - resilience4j.bulkhead.queue.capacity: 5.0
The queue depth - resilience4j.bulkhead.queue.depth: 1.0
The thread pool size - resilience4j.bulkhead.thread.pool.size: 5.0
The maximum thread pool size - resilience4j.bulkhead.max.thread.pool.size: 5.0
The core thread pool size - resilience4j.bulkhead.core.thread.pool.size: 3.0

在实际应用中,我们会定期将数据导出到监控系统并在仪表板上进行分析。

实施隔板时的陷阱和良好实践

使隔板成为单例

对给定远程服务的所有调用都应通过同一个 Bulkhead 实例。对于给定的远程服务,Bulkhead 必须是单例。

如果我们不强制执行此操作,我们代码库的某些区域可能会绕过 Bulkhead 直接调用远程服务。为了防止这种情况,远程服务的实际调用应该在一个核心、内部层和其他区域应该使用内部层暴露的隔板装饰器。

我们如何确保未来的新开发人员理解这一意图? 查看 Tom 的文章,该文章展示了解决此类问题的一种方法,即通过组织包结构来明确此类意图。此外,它还展示了如何通过在 ArchUnit 测试中编码意图来强制执行此操作。

与其他 Resilience4j 模块结合

将隔板与一个或多个其他 Resilience4j 模块(如重试和速率限制器)结合使用会更有效。例如,如果有 BulkheadFullException,我们可能希望在一些延迟后重试。

结论

在本文中,我们学习了如何使用 Resilience4j 的 Bulkhead 模块对我们对远程服务进行的并发调用设置限制。我们了解了为什么这很重要,还看到了一些有关如何配置它的实际示例。

您可以使用 GitHub 上的代码演示一个完整的应用程序。


本文译自: Implementing Bulkhead with Resilience4j – Reflectoring

Java 项目中使用 Resilience4j 框架实现异步超时处理

到目前为止,在本系列中,我们已经了解了 Resilience4j 及其 RetryRateLimiter 模块。在本文中,我们将通过 TimeLimiter 继续探索 Resilience4j。我们将了解它解决了什么问题,何时以及如何使用它,并查看一些示例。

代码示例

本文附有 GitHub 上的工作代码示例。

什么是 Resilience4j?

请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

什么是限时?

对我们愿意等待操作完成的时间设置限制称为时间限制。如果操作没有在我们指定的时间内完成,我们希望通过超时错误收到通知。

有时,这也称为“设定最后期限”。

我们这样做的一个主要原因是确保我们不会让用户或客户无限期地等待。不提供任何反馈的缓慢服务可能会让用户感到沮丧。

我们对操作设置时间限制的另一个原因是确保我们不会无限期地占用服务器资源。我们在使用 Spring 的 @Transactional 注解时指定的 timeout 值就是一个例子——在这种情况下,我们不想长时间占用数据库资源。

什么时候使用 Resilience4j TimeLimiter?

Resilience4j 的 TimeLimiter 可用于设置使用 CompleteableFutures 实现的异步操作的时间限制(超时)。

Java 8 中引入的 CompletableFuture 类使异步、非阻塞编程变得更容易。可以在不同的线程上执行慢速方法,释放当前线程来处理其他任务。 我们可以提供一个当 slowMethod() 返回时执行的回调:

int slowMethod() {
  // time-consuming computation or remote operation
return 42;
}

CompletableFuture.supplyAsync(this::slowMethod)
.thenAccept(System.out::println);

这里的 slowMethod() 可以是一些计算或远程操作。通常,我们希望在进行这样的异步调用时设置时间限制。我们不想无限期地等待 slowMethod() 返回。例如,如果 slowMethod() 花费的时间超过一秒,我们可能想要返回先前计算的、缓存的值,甚至可能会出错。

在 Java 8 的 CompletableFuture 中,没有简单的方法来设置异步操作的时间限制。CompletableFuture 实现了 Future 接口,Future 有一个重载的 get() 方法来指定我们可以等待多长时间:

CompletableFuture<Integer> completableFuture = CompletableFuture
  .supplyAsync(this::slowMethod);
Integer result = completableFuture.get(3000, TimeUnit.MILLISECONDS);
System.out.println(result);

但是这里有一个问题—— get() 方法是一个阻塞调用。所以它首先违背了使用 CompletableFuture 的目的,即释放当前线程。

这是 Resilience4j 的 TimeLimiter 解决的问题——它让我们在异步操作上设置时间限制,同时保留在 Java 8 中使用 CompletableFuture 时非阻塞的好处。

CompletableFuture 的这种限制已在 Java 9 中得到解决。我们可以在 Java 9 及更高版本中使用 CompletableFuture 上的 orTimeout()completeOnTimeout() 等方法直接设置时间限制。然而,凭借 Resilience4J指标事件,与普通的 Java 9 解决方案相比,它仍然提供了附加值。

Resilience4j TimeLimiter 概念

TimeLimiter支持 FutureCompletableFuture。但是将它与 Future 一起使用相当于 Future.get(long timeout, TimeUnit unit)。因此,我们将在本文的其余部分关注 CompletableFuture

与其他 Resilience4j 模块一样,TimeLimiter 的工作方式是使用所需的功能装饰我们的代码 – 如果在这种情况下操作未在指定的 timeoutDuration 内完成,则返回 TimeoutException

我们为 TimeLimiter 提供 timeoutDurationScheduledExecutorService 和异步操作本身,表示为 CompletionStageSupplier。它返回一个 CompletionStage 的装饰 Supplier

在内部,它使用调度器来调度一个超时任务——通过抛出一个 TimeoutException 来完成 CompletableFuture 的任务。如果操作先完成,TimeLimiter 取消内部超时任务。

除了 timeoutDuration 之外,还有另一个与 TimeLimiter 关联的配置 cancelRunningFuture。此配置仅适用于 Future 而不适用于 CompletableFuture。当超时发生时,它会在抛出 TimeoutException 之前取消正在运行的 Future

使用 Resilience4j TimeLimiter 模块

TimeLimiterRegistryTimeLimiterConfigTimeLimiterresilience4j-timelimiter 的主要抽象。

TimeLimiterRegistry 是用于创建和管理 TimeLimiter 对象的工厂。

TimeLimiterConfig 封装了 timeoutDurationcancelRunningFuture 配置。每个 TimeLimiter 对象都与一个 TimeLimiterConfig 相关联。

TimeLimiter 提供辅助方法来为 FutureCompletableFuture Suppliers 创建或执行装饰器。

让我们看看如何使用 TimeLimiter 模块中可用的各种功能。我们将使用与本系列前几篇文章相同的示例。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

第一步是创建一个 TimeLimiterConfig

TimeLimiterConfig config = TimeLimiterConfig.ofDefaults();

这将创建一个 TimeLimiterConfig,其默认值为 timeoutDuration (1000ms) 和 cancelRunningFuture (true)。

假设我们想将超时值设置为 2s 而不是默认值:

TimeLimiterConfig config = TimeLimiterConfig.custom()
  .timeoutDuration(Duration.ofSeconds(2))
  .build();

然后我们创建一个 TimeLimiter

TimeLimiterRegistry registry = TimeLimiterRegistry.of(config);

TimeLimiter limiter = registry.timeLimiter("flightSearch");

我们想要异步调用
FlightSearchService.searchFlights(),它返回一个 List<Flight>。让我们将其表示为 Supplier<CompletionStage<List<Flight>>>

Supplier<List<Flight>> flightSupplier = () -> service.searchFlights(request);
Supplier<CompletionStage<List<Flight>>> origCompletionStageSupplier =
() -> CompletableFuture.supplyAsync(flightSupplier);

然后我们可以使用 TimeLimiter 装饰 Supplier

ScheduledExecutorService scheduler =
  Executors.newSingleThreadScheduledExecutor();
Supplier<CompletionStage<List<Flight>>> decoratedCompletionStageSupplier =  
  limiter.decorateCompletionStage(scheduler, origCompletionStageSupplier);

最后,让我们调用装饰的异步操作:

decoratedCompletionStageSupplier.get().whenComplete((result, ex) -> {
  if (ex != null) {
    System.out.println(ex.getMessage());
  }
  if (result != null) {
    System.out.println(result);
  }
});

以下是成功飞行搜索的示例输出,其耗时少于我们指定的 2 秒 timeoutDuration

Searching for flights; current time = 19:25:09 783; current thread = ForkJoinPool.commonPool-worker-3

Flight search successful

[Flight{flightNumber='XY 765', flightDate='08/30/2020', from='NYC', to='LAX'}, Flight{flightNumber='XY 746', flightDate='08/30/2020', from='NYC', to='LAX'}] on thread ForkJoinPool.commonPool-worker-3

这是超时的航班搜索的示例输出:

Exception java.util.concurrent.TimeoutException: TimeLimiter 'flightSearch' recorded a timeout exception on thread pool-1-thread-1 at 19:38:16 963

Searching for flights; current time = 19:38:18 448; current thread = ForkJoinPool.commonPool-worker-3

Flight search successful at 19:38:18 461

上面的时间戳和线程名称表明,即使异步操作稍后在另一个线程上完成,调用线程也会收到 TimeoutException。

如果我们想创建一个装饰器并在代码库的不同位置重用它,我们将使用decorateCompletionStage()。如果我们想创建它并立即执行 Supplier<CompletionStage>,我们可以使用 executeCompletionStage() 实例方法代替:

CompletionStage<List<Flight>> decoratedCompletionStage =  
  limiter.executeCompletionStage(scheduler, origCompletionStageSupplier);

TimeLimiter 事件

TimeLimiter 有一个 EventPublisher,它生成 TimeLimiterOnSuccessEventTimeLimiterOnErrorEventTimeLimiterOnTimeoutEvent 类型的事件。我们可以监听这些事件并记录它们,例如:

TimeLimiter limiter = registry.timeLimiter("flightSearch");

limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));

limiter.getEventPublisher().onError(e -> System.out.println(e.toString()));

limiter.getEventPublisher().onTimeout(e -> System.out.println(e.toString()));

示例输出显示了记录的内容:

2020-08-07T11:31:48.181944: TimeLimiter 'flightSearch' recorded a successful call.

... other lines omitted ...

2020-08-07T11:31:48.582263: TimeLimiter 'flightSearch' recorded a timeout exception.

TimeLimiter 指标

TimeLimiter 跟踪成功、失败和超时的调用次数。

首先,我们像往常一样创建 TimeLimiterConfigTimeLimiterRegistryTimeLimiter。然后,我们创建一个 MeterRegistry 并将 TimeLimiterRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedTimeLimiterMetrics.ofTimeLimiterRegistry(registry)
  .bindTo(meterRegistry);

运行几次限时操作后,我们显示捕获的指标:

Consumer<Meter> meterConsumer = meter -> {
  String desc = meter.getId().getDescription();
  String metricName = meter.getId().getName();
  String metricKind = meter.getId().getTag("kind");
  Double metricValue =
    StreamSupport.stream(meter.measure().spliterator(), false)
    .filter(m -> m.getStatistic().name().equals("COUNT"))
    .findFirst()
    .map(Measurement::getValue)
    .orElse(0.0);
  System.out.println(desc + " - " +
                     metricName +
                     "(" + metricKind + ")" +
                     ": " + metricValue);
};
meterRegistry.forEachMeter(meterConsumer);

这是一些示例输出:

The number of timed out calls - resilience4j.timelimiter.calls(timeout): 6.0

The number of successful calls - resilience4j.timelimiter.calls(successful): 4.0

The number of failed calls - resilience4j.timelimiter.calls(failed): 0.0

在实际应用中,我们会定期将数据导出到监控系统并在仪表板上进行分析。

实施时间限制时的陷阱和良好实践

通常,我们处理两种操作 – 查询(或读取)和命令(或写入)。对查询进行时间限制是安全的,因为我们知道它们不会改变系统的状态。我们看到的 searchFlights() 操作是查询操作的一个例子。

命令通常会改变系统的状态。bookFlights() 操作将是命令的一个示例。在对命令进行时间限制时,我们必须记住,当我们超时时,该命令很可能仍在运行。例如,bookFlights() 调用上的 TimeoutException 并不一定意味着命令失败。

在这种情况下,我们需要管理用户体验——也许在超时时,我们可以通知用户操作花费的时间比我们预期的要长。然后我们可以查询上游以检查操作的状态并稍后通知用户。

结论

在本文中,我们学习了如何使用 Resilience4j 的 TimeLimiter 模块为异步、非阻塞操作设置时间限制。我们通过一些实际示例了解了何时使用它以及如何配置它。

您可以使用 GitHub 上的代码演示一个完整的应用程序来说明这些想法。


本文译自:
https://reflectoring.io/time-limiting-with-resilience4j/

Java 项目中使用 Resilience4j 框架实现客户端 API 调用的限速/节流机制


在本系列的上一篇文章中,我们了解了 Resilience4j 以及如何使用其 Retry 模块。现在让我们了解 RateLimiter – 它是什么,何时以及如何使用它,以及在实施速率限制(或者也称为“节流”)时要注意什么。

代码示例

本文附有GitHub 上的工作代码示例。

什么是 Resilience4j?

请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

什么是限速?

我们可以从两个角度来看待速率限制——作为服务提供者和作为服务消费者。

服务端限速

作为服务提供商,我们实施速率限制以保护我们的资源免受过载和拒绝服务 (DoS) 攻击

为了满足我们与所有消费者的服务水平协议 (SLA),我们希望确保一个导致流量激增的消费者不会影响我们对他人的服务质量。

我们通过设置在给定时间单位内允许消费者发出多少请求的限制来做到这一点。我们通过适当的响应拒绝任何超出限制的请求,例如 HTTP 状态 429(请求过多)。这称为服务器端速率限制。

速率限制以每秒请求数 (rps)、每分钟请求数 (rpm) 或类似形式指定。某些服务在不同的持续时间(例如 50 rpm 且不超过 2500 rph)和一天中的不同时间(例如,白天 100 rps 和晚上 150 rps)有多个速率限制。该限制可能适用于单个用户(由用户 ID、IP 地址、API 访问密钥等标识)或多租户应用程序中的租户。

客户端限速

作为服务的消费者,我们希望确保我们不会使服务提供者过载。此外,我们不想招致意外的成本——无论是金钱上的还是服务质量方面的。

如果我们消费的服务是有弹性的,就会发生这种情况。服务提供商可能不会限制我们的请求,而是会因额外负载而向我们收取额外费用。有些甚至在短时间内禁止行为不端的客户。消费者为防止此类问题而实施的速率限制称为客户端速率限制。

何时使用 RateLimiter?

resilience4j-ratelimiter 用于客户端速率限制。

服务器端速率限制需要诸如缓存和多个服务器实例之间的协调之类的东西,这是 resilience4j 不支持的。对于服务器端的速率限制,有 API 网关和 API 过滤器,例如 Kong API GatewayRepose API Filter。Resilience4j 的 RateLimiter 模块并不打算取代它们。

Resilience4j RateLimiter 概念

想要调用远程服务的线程首先向 RateLimiter 请求许可。如果 RateLimiter 允许,则线程继续。 否则,RateLimiter 会停放线程或将其置于等待状态。

RateLimiter 定期创建新权限。当权限可用时,线程会收到通知,然后可以继续。

一段时间内允许的调用次数称为 limitForPeriod。RateLimiter 刷新权限的频率由 limitRefreshPeriod 指定。timeoutDuration 指定线程可以等待多长时间获取权限。如果在等待时间结束时没有可用的权限,RateLimiter 将抛出 RequestNotPermitted 运行时异常。

使用Resilience4j RateLimiter 模块

RateLimiterRegistryRateLimiterConfigRateLimiterresilience4j-ratelimiter 的主要抽象。

RateLimiterRegistry 是一个用于创建和管理 RateLimiter 对象的工厂。

RateLimiterConfig 封装了 limitForPeriodlimitRefreshPeriodtimeoutDuration 配置。每个 RateLimiter 对象都与一个 RateLimiterConfig 相关联。

RateLimiter 提供辅助方法来为包含远程调用的函数式接口或 lambda 表达式创建装饰器。

让我们看看如何使用 RateLimiter 模块中可用的各种功能。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

基本示例

第一步是创建一个 RateLimiterConfig

RateLimiterConfig config = RateLimiterConfig.ofDefaults();

这将创建一个 RateLimiterConfig,其默认值为 limitForPeriod (50)、limitRefreshPeriod(500ns) 和 timeoutDuration (5s)。

假设我们与航空公司服务的合同规定我们可以以 1 rps 调用他们的搜索 API。然后我们将像这样创建 RateLimiterConfig

RateLimiterConfig config = RateLimiterConfig.custom()
  .limitForPeriod(1)
  .limitRefreshPeriod(Duration.ofSeconds(1))
  .timeoutDuration(Duration.ofSeconds(1))
  .build();

如果线程无法在指定的 1 秒 timeoutDuration 内获取权限,则会出错。

然后我们创建一个 RateLimiter 并装饰 searchFlights() 调用:

RateLimiterRegistry registry = RateLimiterRegistry.of(config);
RateLimiter limiter = registry.rateLimiter("flightSearchService");
// FlightSearchService and SearchRequest creation omitted
Supplier<List<Flight>> flightsSupplier =
  RateLimiter.decorateSupplier(limiter,
    () -> service.searchFlights(request));

最后,我们多次使用装饰过的 Supplier<List<Flight>>

for (int i=0; i<3; i++) {
  System.out.println(flightsSupplier.get());
}

示例输出中的时间戳显示每秒发出一个请求:

Searching for flights; current time = 15:29:40 786
...
[Flight{flightNumber='XY 765', ... }, ... ]
Searching for flights; current time = 15:29:41 791
...
[Flight{flightNumber='XY 765', ... }, ... ]

如果超出限制,我们会收到 RequestNotPermitted 异常:

Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)

 at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)

... other lines omitted ...

装饰方法抛出已检异常

假设我们正在调用
FlightSearchService.searchFlightsThrowingException() ,它可以抛出一个已检 Exception。那么我们就不能使用
RateLimiter.decorateSupplier()。我们将使用
RateLimiter.decorateCheckedSupplier() 代替:

CheckedFunction0<List<Flight>> flights =
  RateLimiter.decorateCheckedSupplier(limiter,
    () -> service.searchFlightsThrowingException(request));

try {
  System.out.println(flights.apply());
} catch (...) {
  // exception handling
}

RateLimiter.decorateCheckedSupplier() 返回一个 CheckedFunction0,它表示一个没有参数的函数。请注意对 CheckedFunction0 对象的 apply() 调用以调用远程操作。

如果我们不想使用 SuppliersRateLimiter 提供了更多的辅助装饰器方法,如 decorateFunction()decorateCheckedFunction()decorateRunnable()decorateCallable() 等,以与其他语言结构一起使用。decorateChecked* 方法用于装饰抛出已检查异常的方法。

应用多个速率限制

假设航空公司的航班搜索有多个速率限制:2 rps 和 40 rpm。 我们可以通过创建多个 RateLimiters 在客户端应用多个限制:

RateLimiterConfig rpsConfig = RateLimiterConfig.custom().
  limitForPeriod(2).
  limitRefreshPeriod(Duration.ofSeconds(1)).
  timeoutDuration(Duration.ofMillis(2000)).build();

RateLimiterConfig rpmConfig = RateLimiterConfig.custom().
  limitForPeriod(40).
  limitRefreshPeriod(Duration.ofMinutes(1)).
  timeoutDuration(Duration.ofMillis(2000)).build();

RateLimiterRegistry registry = RateLimiterRegistry.of(rpsConfig);
RateLimiter rpsLimiter =
  registry.rateLimiter("flightSearchService_rps", rpsConfig);
RateLimiter rpmLimiter =
  registry.rateLimiter("flightSearchService_rpm", rpmConfig);  
然后我们使用两个 RateLimiters 装饰 searchFlights() 方法:

Supplier<List<Flight>> rpsLimitedSupplier =
  RateLimiter.decorateSupplier(rpsLimiter,
    () -> service.searchFlights(request));

Supplier<List<Flight>> flightsSupplier
  = RateLimiter.decorateSupplier(rpmLimiter, rpsLimitedSupplier);

示例输出显示每秒发出 2 个请求,并且限制为 40 个请求:

Searching for flights; current time = 15:13:21 246
...
Searching for flights; current time = 15:13:21 249
...
Searching for flights; current time = 15:13:22 212
...
Searching for flights; current time = 15:13:40 215
...
Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted:
RateLimiter 'flightSearchService_rpm' does not permit further calls
at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)
at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)

在运行时更改限制

如果需要,我们可以在运行时更改 limitForPeriodtimeoutDuration 的值:

limiter.changeLimitForPeriod(2);
limiter.changeTimeoutDuration(Duration.ofSeconds(2));

例如,如果我们的速率限制根据一天中的时间而变化,则此功能很有用 – 我们可以有一个计划线程来更改这些值。新值不会影响当前正在等待权限的线程。

RateLimiter和 Retry一起使用

假设我们想在收到 RequestNotPermitted 异常时重试,因为它是一个暂时性错误。我们会像往常一样创建 RateLimiterRetry 对象。然后我们装饰一个 Supplier 的供应商并用 Retry 包装它:

Supplier<List<Flight>> rateLimitedFlightsSupplier =
  RateLimiter.decorateSupplier(rateLimiter,
    () -> service.searchFlights(request));

Supplier<List<Flight>> retryingFlightsSupplier =
  Retry.decorateSupplier(retry, rateLimitedFlightsSupplier);

示例输出显示为 RequestNotPermitted 异常重试请求:

Searching for flights; current time = 15:29:39 847
Flight search successful
[Flight{flightNumber='XY 765', ... }, ... ]
Searching for flights; current time = 17:10:09 218
...
[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]
2020-07-27T17:10:09.484: Retry 'rateLimitedFlightSearch', waiting PT1S until attempt '1'. Last attempt failed with exception 'io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls'.
Searching for flights; current time = 17:10:10 492
...
2020-07-27T17:10:10.494: Retry 'rateLimitedFlightSearch' recorded a successful retry attempt...
[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]

我们创建装饰器的顺序很重要。如果我们将 RetryRateLimiter 包装在一起,它将不起作用。

RateLimiter 事件

RateLimiter 有一个 EventPublisher,它在调用远程操作时生成 RateLimiterOnSuccessEventRateLimiterOnFailureEvent 类型的事件,以指示获取权限是否成功。我们可以监听这些事件并记录它们,例如:

RateLimiter limiter = registry.rateLimiter("flightSearchService");
limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));
limiter.getEventPublisher().onFailure(e -> System.out.println(e.toString()));

日志输出示例如下:

RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.127+05:30}
... other lines omitted ...
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.186+05:30}

RateLimiter 指标

假设在实施客户端节流后,我们发现 API 的响应时间增加了。这是可能的 – 正如我们所见,如果在线程调用远程操作时权限不可用,RateLimiter 会将线程置于等待状态。

如果我们的请求处理线程经常等待获得许可,则可能意味着我们的 limitForPeriod 太低。也许我们需要与我们的服务提供商合作并首先获得额外的配额。

监控 RateLimiter 指标可帮助我们识别此类容量问题,并确保我们在 RateLimiterConfig 上设置的值运行良好。

RateLimiter 跟踪两个指标:可用权限的数量(
resilience4j.ratelimiter.available.permissions)和等待权限的线程数量(
resilience4j.ratelimiter.waiting.threads)。

首先,我们像往常一样创建 RateLimiterConfigRateLimiterRegistryRateLimiter。然后,我们创建一个 MeterRegistry 并将 RateLimiterRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedRateLimiterMetrics.ofRateLimiterRegistry(registry)
  .bindTo(meterRegistry);

运行几次限速操作后,我们显示捕获的指标:

Consumer<Meter> meterConsumer = meter -> {
  String desc = meter.getId().getDescription();
  String metricName = meter.getId().getName();
  Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
    .filter(m -> m.getStatistic().name().equals("VALUE"))
    .findFirst()
    .map(m -> m.getValue())
    .orElse(0.0);
  System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

这是一些示例输出:

The number of available permissions - resilience4j.ratelimiter.available.permissions: -6.0
The number of waiting threads - resilience4j.ratelimiter.waiting_threads: 7.0

resilience4j.ratelimiter.available.permissions 的负值显示为请求线程保留的权限数。在实际应用中,我们会定期将数据导出到监控系统,并在仪表板上进行分析。

实施客户端速率限制时的陷阱和良好实践

使速率限制器成为单例

对给定远程服务的所有调用都应通过相同的 RateLimiter 实例。对于给定的远程服务,RateLimiter 必须是单例。

如果我们不强制执行此操作,我们代码库的某些区域可能会绕过 RateLimiter 直接调用远程服务。为了防止这种情况,对远程服务的实际调用应该在核心、内部层和其他区域应该使用内部层暴露的限速装饰器。

我们如何确保未来的新开发人员理解这一意图?查看 Tom 的文章,其中揭示了一种解决此类问题的方法,即通过组织包结构来明确此类意图。此外,它还展示了如何通过在 ArchUnit 测试中编码意图来强制执行此操作。

为多个服务器实例配置速率限制器

为配置找出正确的值可能很棘手。如果我们在集群中运行多个服务实例,limitForPeriod 的值必须考虑到这一点

例如,如果上游服务的速率限制为 100 rps,而我们的服务有 4 个实例,那么我们将配置 25 rps 作为每个实例的限制。

然而,这假设我们每个实例上的负载大致相同。 如果情况并非如此,或者如果我们的服务本身具有弹性并且实例数量可能会有所不同,那么 Resilience4j 的 RateLimiter 可能不适合

在这种情况下,我们需要一个速率限制器,将其数据保存在分布式缓存中,而不是像 Resilience4j RateLimiter 那样保存在内存中。但这会影响我们服务的响应时间。另一种选择是实现某种自适应速率限制。尽管 Resilience4j 可能会支持它,但尚不清楚何时可用。

选择正确的超时时间

对于 timeoutDuration 配置值,我们应该牢记 API 的预期响应时间

如果我们将 timeoutDuration 设置得太高,响应时间和吞吐量就会受到影响。如果它太低,我们的错误率可能会增加。

由于此处可能涉及一些反复试验,因此一个好的做法是将我们在 RateLimiterConfig 中使用的值(如 timeoutDurationlimitForPeriodlimitRefreshPeriod)作为我们服务之外的配置进行维护。然后我们可以在不更改代码的情况下更改它们。

调优客户端和服务器端速率限制器

实现客户端速率限制并不能保证我们永远不会受到上游服务的速率限制

假设我们有来自上游服务的 2 rps 的限制,并且我们将 limitForPeriod 配置为 2,将 limitRefreshPeriod 配置为 1s。如果我们在第二秒的最后几毫秒发出两个请求,在此之前没有其他调用,RateLimiter 将允许它们。如果我们在下一秒的前几毫秒内再进行两次调用,RateLimiter 也会允许它们,因为有两个新权限可用。但是上游服务可能会拒绝这两个请求,因为服务器通常会实现基于滑动窗口的速率限制。

为了保证我们永远不会从上游服务中获得超过速率,我们需要将客户端中的固定窗口配置为短于服务中的滑动窗口。因此,如果我们在前面的示例中将 limitForPeriod 配置为 1 并将 limitRefreshPeriod 配置为 500ms,我们就不会出现超出速率限制的错误。但是,第一个请求之后的所有三个请求都会等待,从而增加响应时间并降低吞吐量。

结论

在本文中,我们学习了如何使用 Resilience4j 的 RateLimiter 模块来实现客户端速率限制。 我们通过实际示例研究了配置它的不同方法。我们学习了一些在实施速率限制时要记住的良好做法和注意事项。

您可以使用 GitHub 上的代码演示一个完整的应用程序来说明这些想法。


本文译自: Implementing Rate Limiting with Resilience4j – Reflectoring

使用 Resilience4j 框架实现重试机制


在本文中,我们将从快速介绍 Resilience4j 开始,然后深入探讨其 Retry 模块。我们将了解何时、如何使用它,以及它提供的功能。在此过程中,我们还将学习实现重试时的一些良好实践。

代码示例

本文在 GitHub 上附有工作代码示例。

什么是 Resilience4j?

当应用程序通过网络进行通信时,会有很多出错的情况。由于连接断开、网络故障、上游服务不可用等,操作可能会超时或失败。应用程序可能会相互过载、无响应甚至崩溃。

Resilience4j 是一个 Java 库,可以帮助我们构建弹性和容错的应用程序。它提供了一个框架,可编写代码以防止和处理此类问题

Resilience4j 为 Java 8 及更高版本编写,适用于函数接口、lambda 表达式和方法引用等结构。

Resilience4j 模块

让我们快速浏览一下这些模块及其用途:

模块 目的
Retry 自动重试失败的远程操作
RateLimiter 限制我们在一定时间内调用远程操作的次数
TimeLimiter 调用远程操作时设置时间限制
Circuit Breaker 当远程操作持续失败时,快速失败或执行默认操作
Bulkhead 限制并发远程操作的数量
Cache 存储昂贵的远程操作的结果

使用范式

虽然每个模块都有其抽象,但通常的使用范式如下:

  1. 创建一个 Resilience4j 配置对象
  2. 为此类配置创建一个 Registry 对象
  3. 从注册表创建或获取 Resilience4j 对象
  4. 将远程操作编码为 lambda 表达式或函数式接口或通常的 Java 方法
  5. 使用提供的辅助方法之一围绕第 4 步中的代码创建装饰器或包装器
  6. 调用装饰器方法来调用远程操作
    步骤 1-5 通常在应用程序启动时完成一次。让我们看看重试模块的这些步骤:
RetryConfig config = RetryConfig.ofDefaults(); // ----> 1
RetryRegistry registry = RetryRegistry.of(config); // ----> 2
Retry retry = registry.retry("flightSearchService", config); // ----> 3

FlightSearchService searchService = new FlightSearchService();
SearchRequest request = new SearchRequest("NYC", "LAX", "07/21/2020");
Supplier<List<Flight>> flightSearchSupplier =
  () -> searchService.searchFlights(request); // ----> 4

Supplier<List<Flight>> retryingFlightSearch =
  Retry.decorateSupplier(retry, flightSearchSupplier); // ----> 5

System.out.println(retryingFlightSearch.get()); // ----> 6

什么时候使用重试?

远程操作可以是通过网络发出的任何请求。通常,它是以下之一:

  1. 向 REST 端点发送 HTTP 请求
  2. 调用远程过程 (RPC) 或 Web 服务
  3. 从数据存储(SQL/NoSQL 数据库、对象存储等)读取和写入数据
  4. 向消息代理(RabbitMQ/ActiveMQ/Kafka 等)发送和接收消息

当远程操作失败时,我们有两种选择——立即向我们的客户端返回错误,或者重试操作。如果重试成功,这对客户来说是件好事——他们甚至不必知道这是一个临时问题。

选择哪个选项取决于错误类型(瞬时或永久)、操作(幂等或非幂等)、客户端(人或应用程序)和用例。

暂时性错误是暂时的,通常,如果重试,操作很可能会成功。请求被上游服务限制、连接断开或由于某些服务暂时不可用而超时就是例子。

来自 REST API 的硬件故障或 404(未找到)响应是永久性错误的示例,重试无济于事

如果我们想应用重试,操作必须是幂等的。假设远程服务接收并处理了我们的请求,但在发送响应时出现问题。在这种情况下,当我们重试时,我们不希望服务将请求视为新请求或返回意外错误(想想银行转账)。

重试会增加 API 的响应时间。如果客户端是另一个应用程序,如 cron 作业或守护进程,这可能不是问题。但是,如果是一个人,有时最好做出响应,快速失败并提供反馈,而不是在我们不断重试时让这个人等待。

对于某些关键用例,可靠性可能比响应时间更重要,即使客户是个人,我们也可能需要实现重试。银行转账或旅行社预订航班和旅行酒店的转账就是很好的例子 – 用户期望可靠性,而不是对此类用例的即时响应。我们可以通过立即通知用户我们已接受他们的请求并在完成后通知他们来做出响应。

使用 Resilience4j 重试模块

RetryRegistryRetryConfigRetryresilience4j-retry 中的主要抽象。RetryRegistry 是用于创建和管理 Retry 对象的工厂。RetryConfig 封装了诸如应该尝试重试多少次、尝试之间等待多长时间等配置。每个 Retry 对象都与一个 RetryConfig 相关联。 Retry 提供了辅助方法来为包含远程调用的函数式接口或 lambda 表达式创建装饰器。

让我们看看如何使用 retry 模块中可用的各种功能。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务通信。

简单重试

在简单重试中,如果在远程调用期间抛出 RuntimeException,则重试该操作。 我们可以配置尝试次数、尝试之间等待多长时间等:

RetryConfig config = RetryConfig.custom()
  .maxAttempts(3)
  .waitDuration(Duration.of(2, SECONDS))
  .build();

// Registry, Retry creation omitted

FlightSearchService service = new FlightSearchService();
SearchRequest request = new SearchRequest("NYC", "LAX", "07/31/2020");
Supplier<List<Flight>> flightSearchSupplier =
  () -> service.searchFlights(request);

Supplier<List<Flight>> retryingFlightSearch =
  Retry.decorateSupplier(retry, flightSearchSupplier);

System.out.println(retryingFlightSearch.get());

我们创建了一个 RetryConfig,指定我们最多要重试 3 次,并在两次尝试之间等待 2 秒。如果我们改用 RetryConfig.ofDefaults() 方法,则将使用 3 次尝试和 500 毫秒等待持续时间的默认值。

我们将航班搜索调用表示为 lambda 表达式 – List<Flight>SupplierRetry.decorateSupplier() 方法使用重试功能装饰此 Supplier。最后,我们在装饰过的 Supplier 上调用 get() 方法来进行远程调用。

如果我们想创建一个装饰器并在代码库的不同位置重用它,我们将使用 decorateSupplier()。如果我们想创建它并立即执行它,我们可以使用 executeSupplier() 实例方法代替:

List<Flight> flights = retry.executeSupplier(
  () -> service.searchFlights(request));
这是显示第一个请求失败然后第二次尝试成功的示例输出:

Searching for flights; current time = 20:51:34 975
Operation failed
Searching for flights; current time = 20:51:36 985
Flight search successful
[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]

在已检异常上重试

现在,假设我们要重试已检查和未检查的异常。假设我们正在调用
FlightSearchService.searchFlightsThrowingException(),它可以抛出一个已检查的 Exception。由于 Supplier 不能抛出已检查的异常,我们会在这一行得到编译器错误:

Supplier<List<Flight>> flightSearchSupplier =
  () -> service.searchFlightsThrowingException(request);

我们可能会尝试在 lambda 表达式中处理 Exception 并返回 Collections.emptyList(),但这看起来不太好。更重要的是,由于我们自己捕获 Exception,重试不再起作用:

ExceptionSupplier<List<Flight>> flightSearchSupplier = () -> {
    try {      
      return service.searchFlightsThrowingException(request);
    } catch (Exception e) {
      // don't do this, this breaks the retry!
    }
    return Collections.emptyList();
  };

那么当我们想要重试远程调用可能抛出的所有异常时,我们应该怎么做呢?我们可以使用
Retry.decorateCheckedSupplier()(或 executeCheckedSupplier() 实例方法)代替 Retry.decorateSupplier()

CheckedFunction0<List<Flight>> retryingFlightSearch =
  Retry.decorateCheckedSupplier(retry,
    () -> service.searchFlightsThrowingException(request));

try {
  System.out.println(retryingFlightSearch.apply());
} catch (...) {
  // handle exception that can occur after retries are exhausted
}

Retry.decorateCheckedSupplier() 返回一个 CheckedFunction0,它表示一个没有参数的函数。请注意对 CheckedFunction0 对象的 apply() 调用以调用远程操作。

如果我们不想使用 SuppliersRetry 提供了更多的辅助装饰器方法,如 decorateFunction()decorateCheckedFunction()decorateRunnable()decorateCallable() 等,以与其他语言结构一起使用。decorate*decorateChecked* 版本之间的区别在于,decorate* 版本在 RuntimeExceptions 上重试,而 decorateChecked* 版本在 Exception 上重试。

有条件重试

上面的简单重试示例展示了如何在调用远程服务时遇到 RuntimeException 或已检查 Exception 时重试。在实际应用中,我们可能不想对所有异常都重试。 例如,如果我们得到一个
AuthenticationFailedException 重试相同的请求将无济于事。当我们进行 HTTP 调用时,我们可能想要检查 HTTP 响应状态代码或在响应中查找特定的应用程序错误代码来决定是否应该重试。让我们看看如何实现这种有条件的重试。

Predicate-based条件重试

假设航空公司的航班服务定期初始化其数据库中的航班数据。对于给定日期的飞行数据,此内部操作需要几秒钟时间。 如果我们在初始化过程中调用当天的航班搜索,该服务将返回一个特定的错误代码 FS-167。航班搜索文档说这是一个临时错误,可以在几秒钟后重试该操作。

让我们看看如何创建 RetryConfig

RetryConfig config = RetryConfig.<SearchResponse>custom()
  .maxAttempts(3)
  .waitDuration(Duration.of(3, SECONDS))
  .retryOnResult(searchResponse -> searchResponse
    .getErrorCode()
    .equals("FS-167"))
  .build();

我们使用 retryOnResult() 方法并传递执行此检查的 Predicate。这个 Predicate 中的逻辑可以像我们想要的那样复杂——它可以是对一组错误代码的检查,也可以是一些自定义逻辑来决定是否应该重试搜索。

Exception-based条件重试

假设我们有一个通用异常
FlightServiceBaseException,当在与航空公司的航班服务交互期间发生任何意外时会抛出该异常。作为一般策略,我们希望在抛出此异常时重试。但是我们不想重试 SeatsUnavailableException 的一个子类 – 如果航班上没有可用座位,重试将无济于事。我们可以通过像这样创建 RetryConfig 来做到这一点:

RetryConfig config = RetryConfig.custom()
  .maxAttempts(3)
  .waitDuration(Duration.of(3, SECONDS))
  .retryExceptions(FlightServiceBaseException.class)
  .ignoreExceptions(SeatsUnavailableException.class)
  .build();

retryExceptions() 中,我们指定了一个异常列表。ignoreExceptions() 将重试与此列表中的异常匹配或继承的任何异常。我们把我们想忽略而不是重试的那些放入ignoreExceptions()。如果代码在运行时抛出一些其他异常,比如 IOException,它也不会被重试。

假设即使对于给定的异常,我们也不希望在所有情况下都重试。也许我们只想在异常具有特定错误代码或异常消息中的特定文本时重试。在这种情况下,我们可以使用 retryOnException 方法:

Predicate<Throwable> rateLimitPredicate = rle ->
  (rle instanceof  RateLimitExceededException) &&
  "RL-101".equals(((RateLimitExceededException) rle).getErrorCode());

RetryConfig config = RetryConfig.custom()
  .maxAttempts(3)
  .waitDuration(Duration.of(1, SECONDS))
  .retryOnException(rateLimitPredicate)
  build();

与 predicate-based (基于谓词)的条件重试一样,谓词内的检查可以根据需要复杂化。

退避策略

到目前为止,我们的示例有固定的重试等待时间。通常我们希望在每次尝试后增加等待时间——这是为了让远程服务有足够的时间在当前过载的情况下进行恢复。我们可以使用 IntervalFunction 来做到这一点。

IntervalFunction 是一个函数式接口——它是一个以尝试次数为参数并以毫秒为单位返回等待时间的 Function

随机间隔

这里我们指定尝试之间的随机等待时间:

RetryConfig config = RetryConfig.custom()
.maxAttempts(4)
.intervalFunction(IntervalFunction.ofRandomized(2000))
.build();

IntervalFunction.ofRandomized() 有一个关联的 randomizationFactor。我们可以将其设置为 ofRandomized() 的第二个参数。如果未设置,则采用默认值 0.5。这个 randomizationFactor 决定了随机值的分布范围。因此,对于上面的默认值 0.5,生成的等待时间将介于 1000 毫秒(2000 – 2000 0.5)和 3000 毫秒(2000 + 2000 0.5)之间。

这种行为的示例输出如下:

Searching for flights; current time = 20:27:08 729
Operation failed
Searching for flights; current time = 20:27:10 643
Operation failed
Searching for flights; current time = 20:27:13 204
Operation failed
Searching for flights; current time = 20:27:15 236
Flight search successful
[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'},...]

指数间隔

对于指数退避,我们指定两个值 – 初始等待时间和乘数。在这种方法中,由于乘数,等待时间在尝试之间呈指数增长。例如,如果我们指定初始等待时间为 1 秒,乘数为 2,则重试将在 1 秒、2 秒、4 秒、8 秒、16 秒等之后进行。当客户端是后台作业或守护进程时,此方法是推荐的方法。

以下是我们如何为指数退避创建 RetryConfig

RetryConfig config = RetryConfig.custom()
.maxAttempts(6)
.intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
.build();

这种行为的示例输出如下:

Searching for flights; current 
time = 20:37:02 684

Operation failed

Searching for flights; current time = 20:37:03 727

Operation failed

Searching for flights; current time = 20:37:05 731

Operation failed

Searching for flights; current time = 20:37:09 731

Operation failed

Searching for flights; current time = 20:37:17 731

IntervalFunction 还提供了一个 exponentialRandomBackoff() 方法,它结合了上述两种方法。我们还可以提供 IntervalFunction 的自定义实现。

重试异步操作

直到现在我们看到的例子都是同步调用。让我们看看如何重试异步操作。假设我们像这样异步搜索航班:

CompletableFuture.supplyAsync(() -> service.searchFlights(request))
  .thenAccept(System.out::println);

searchFlight() 调用发生在不同的线程上,当它返回时,返回的 List<Flight> 被传递给 thenAccept(),它只是打印它。

我们可以使用 Retry 对象上的 executeCompletionStage() 方法对上述异步操作进行重试。 此方法采用两个参数 – 一个 ScheduledExecutorService 将在其上安排重试,以及一个 Supplier<CompletionStage> 将被装饰。它装饰并执行 CompletionStage,然后返回一个 CompletionStage,我们可以像以前一样调用 thenAccept

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

Supplier<CompletionStage<List<Flight>>> completionStageSupplier =
  () -> CompletableFuture.supplyAsync(() -> service.searchFlights(request));

retry.executeCompletionStage(scheduler, completionStageSupplier)
.thenAccept(System.out::println);

在实际应用程序中,我们将使用共享线程池 (
Executors.newScheduledThreadPool()) 来调度重试,而不是此处显示的单线程调度执行器。

重试事件

在所有这些例子中,装饰器都是一个黑盒子——我们不知道什么时候尝试失败了,框架代码正在尝试重试。假设对于给定的请求,我们想要记录一些详细信息,例如尝试计数或下一次尝试之前的等待时间。 我们可以使用在不同执行点发布的重试事件来做到这一点。Retry 有一个 EventPublisher,它具有 onRetry()onSuccess() 等方法。

我们可以通过实现这些监听器方法来收集和记录详细信息:

Retry.EventPublisher publisher = retry.getEventPublisher();

publisher.onRetry(event -> System.out.println(event.toString()));

publisher.onSuccess(event -> System.out.println(event.toString()));

类似地,RetryRegistry 也有一个 EventPublisher,它在 Retry 对象被添加或从注册表中删除时发布事件。

重试指标

Retry 维护计数器以跟踪操作的次数

  1. 第一次尝试成功
  2. 重试后成功
  3. 没有重试就失败了
  4. 重试后仍失败

每次执行装饰器时,它都会更新这些计数器。

为什么要捕获指标?

捕获并定期分析指标可以让我们深入了解上游服务的行为。它还可以帮助识别瓶颈和其他潜在问题

例如,如果我们发现某个操作通常在第一次尝试时失败,我们可以调查其原因。如果我们发现我们的请求在建立连接时受到限制或超时,则可能表明远程服务需要额外的资源或容量。

如何捕获指标?

Resilience4j 使用 Micrometer 发布指标。Micrometer 为监控系统(如 Prometheus、Azure Monitor、New Relic 等)提供了仪表客户端的外观。因此我们可以将指标发布到这些系统中的任何一个或在它们之间切换,而无需更改我们的代码。

首先,我们像往常一样创建 RetryConfigRetryRegistryRetry。然后,我们创建一个 MeterRegistry 并将 etryRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();

TaggedRetryMetrics.ofRetryRegistry(retryRegistry).bindTo(meterRegistry);

运行几次可重试操作后,我们显示捕获的指标:

Consumer<Meter> meterConsumer = meter -> {
  String desc = meter.getId().getDescription();
  String metricName = meter.getId().getTag("kind");
  Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
    .filter(m -> m.getStatistic().name().equals("COUNT"))
    .findFirst()
    .map(m -> m.getValue())
    .orElse(0.0);
  System.out.println(desc + " - " + metricName + ": " + metricValue);
};
meterRegistry.forEachMeter(meterConsumer);

一些示例输出如下:

The number of successful calls without a retry attempt - successful_without_retry: 4.0

The number of failed calls without a retry attempt - failed_without_retry: 0.0

The number of failed calls after a retry attempt - failed_with_retry: 0.0

The number of successful calls after a retry attempt - successful_with_retry: 6.0

当然,在实际应用中,我们会将数据导出到监控系统并在仪表板上查看。

重试时的注意事项和良好实践

服务通常提供具有内置重试机制的客户端库或 SDK。对于云服务尤其如此。 例如,Azure CosmosDB 和 Azure 服务总线为客户端库提供内置重试工具。 它们允许应用程序设置重试策略来控制重试行为。

在这种情况下,最好使用内置的重试而不是我们自己的编码。如果我们确实需要自己编写,我们应该禁用内置的默认重试策略 – 否则,它可能导致嵌套重试,其中应用程序的每次尝试都会导致客户端库的多次尝试

一些云服务记录瞬时错误代码。例如,Azure SQL 提供了它期望数据库客户端重试的错误代码列表。在决定为特定操作添加重试之前,最好检查一下服务提供商是否有这样的列表。

另一个好的做法是将我们在 RetryConfig 中使用的值(例如最大尝试次数、等待时间和可重试错误代码和异常)作为我们服务之外的配置进行维护。如果我们发现新的暂时性错误或者我们需要调整尝试之间的间隔,我们可以在不构建和重新部署服务的情况下进行更改。

通常在重试时,框架代码中的某处可能会发生 Thread.sleep()。对于在重试之间有等待时间的同步重试就是这种情况。如果我们的代码在 Web 应用程序的上下文中运行,则 Thread 很可能是 Web 服务器的请求处理线程。因此,如果我们进行过多的重试,则会降低应用程序的吞吐量

结论

在本文中,我们了解了 Resilience4j 是什么,以及如何使用它的重试模块使我们的应用程序可以在应对临时错误具备弹性。我们研究了配置重试的不同方法,以及在不同方法之间做出决定的一些示例。我们学习了一些在实施重试时要遵循的良好实践,以及收集和分析重试指标的重要性。

您可以使用 GitHub 上的代码尝试一个完整的应用程序来演示这些想法。


本文译自: Implementing Retry with Resilience4j – Reflectoring