Java 项目中使用 Resilience4j 框架实现异步超时处理

内容纲要

到目前为止,在本系列中,我们已经了解了 Resilience4j 及其 RetryRateLimiter 模块。在本文中,我们将通过 TimeLimiter 继续探索 Resilience4j。我们将了解它解决了什么问题,何时以及如何使用它,并查看一些示例。

代码示例

本文附有 GitHub 上的工作代码示例。

什么是 Resilience4j?

请参阅上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理

什么是限时?

对我们愿意等待操作完成的时间设置限制称为时间限制。如果操作没有在我们指定的时间内完成,我们希望通过超时错误收到通知。

有时,这也称为“设定最后期限”。

我们这样做的一个主要原因是确保我们不会让用户或客户无限期地等待。不提供任何反馈的缓慢服务可能会让用户感到沮丧。

我们对操作设置时间限制的另一个原因是确保我们不会无限期地占用服务器资源。我们在使用 Spring 的 @Transactional 注解时指定的 timeout 值就是一个例子——在这种情况下,我们不想长时间占用数据库资源。

什么时候使用 Resilience4j TimeLimiter?

Resilience4j 的 TimeLimiter 可用于设置使用 CompleteableFutures 实现的异步操作的时间限制(超时)。

Java 8 中引入的 CompletableFuture 类使异步、非阻塞编程变得更容易。可以在不同的线程上执行慢速方法,释放当前线程来处理其他任务。 我们可以提供一个当 slowMethod() 返回时执行的回调:

int slowMethod() {
  // time-consuming computation or remote operation
return 42;
}

CompletableFuture.supplyAsync(this::slowMethod)
.thenAccept(System.out::println);

这里的 slowMethod() 可以是一些计算或远程操作。通常,我们希望在进行这样的异步调用时设置时间限制。我们不想无限期地等待 slowMethod() 返回。例如,如果 slowMethod() 花费的时间超过一秒,我们可能想要返回先前计算的、缓存的值,甚至可能会出错。

在 Java 8 的 CompletableFuture 中,没有简单的方法来设置异步操作的时间限制。CompletableFuture 实现了 Future 接口,Future 有一个重载的 get() 方法来指定我们可以等待多长时间:

CompletableFuture<Integer> completableFuture = CompletableFuture
  .supplyAsync(this::slowMethod);
Integer result = completableFuture.get(3000, TimeUnit.MILLISECONDS);
System.out.println(result);

但是这里有一个问题—— get() 方法是一个阻塞调用。所以它首先违背了使用 CompletableFuture 的目的,即释放当前线程。

这是 Resilience4j 的 TimeLimiter 解决的问题——它让我们在异步操作上设置时间限制,同时保留在 Java 8 中使用 CompletableFuture 时非阻塞的好处。

CompletableFuture 的这种限制已在 Java 9 中得到解决。我们可以在 Java 9 及更高版本中使用 CompletableFuture 上的 orTimeout()completeOnTimeout() 等方法直接设置时间限制。然而,凭借 Resilience4J指标事件,与普通的 Java 9 解决方案相比,它仍然提供了附加值。

Resilience4j TimeLimiter 概念

TimeLimiter支持 FutureCompletableFuture。但是将它与 Future 一起使用相当于 Future.get(long timeout, TimeUnit unit)。因此,我们将在本文的其余部分关注 CompletableFuture

与其他 Resilience4j 模块一样,TimeLimiter 的工作方式是使用所需的功能装饰我们的代码 – 如果在这种情况下操作未在指定的 timeoutDuration 内完成,则返回 TimeoutException

我们为 TimeLimiter 提供 timeoutDurationScheduledExecutorService 和异步操作本身,表示为 CompletionStageSupplier。它返回一个 CompletionStage 的装饰 Supplier

在内部,它使用调度器来调度一个超时任务——通过抛出一个 TimeoutException 来完成 CompletableFuture 的任务。如果操作先完成,TimeLimiter 取消内部超时任务。

除了 timeoutDuration 之外,还有另一个与 TimeLimiter 关联的配置 cancelRunningFuture。此配置仅适用于 Future 而不适用于 CompletableFuture。当超时发生时,它会在抛出 TimeoutException 之前取消正在运行的 Future

使用 Resilience4j TimeLimiter 模块

TimeLimiterRegistryTimeLimiterConfigTimeLimiterresilience4j-timelimiter 的主要抽象。

TimeLimiterRegistry 是用于创建和管理 TimeLimiter 对象的工厂。

TimeLimiterConfig 封装了 timeoutDurationcancelRunningFuture 配置。每个 TimeLimiter 对象都与一个 TimeLimiterConfig 相关联。

TimeLimiter 提供辅助方法来为 FutureCompletableFuture Suppliers 创建或执行装饰器。

让我们看看如何使用 TimeLimiter 模块中可用的各种功能。我们将使用与本系列前几篇文章相同的示例。假设我们正在为一家航空公司建立一个网站,以允许其客户搜索和预订航班。我们的服务与 FlightSearchService 类封装的远程服务对话。

第一步是创建一个 TimeLimiterConfig

TimeLimiterConfig config = TimeLimiterConfig.ofDefaults();

这将创建一个 TimeLimiterConfig,其默认值为 timeoutDuration (1000ms) 和 cancelRunningFuture (true)。

假设我们想将超时值设置为 2s 而不是默认值:

TimeLimiterConfig config = TimeLimiterConfig.custom()
  .timeoutDuration(Duration.ofSeconds(2))
  .build();

然后我们创建一个 TimeLimiter

TimeLimiterRegistry registry = TimeLimiterRegistry.of(config);

TimeLimiter limiter = registry.timeLimiter("flightSearch");

我们想要异步调用
FlightSearchService.searchFlights(),它返回一个 List<Flight>。让我们将其表示为 Supplier<CompletionStage<List<Flight>>>

Supplier<List<Flight>> flightSupplier = () -> service.searchFlights(request);
Supplier<CompletionStage<List<Flight>>> origCompletionStageSupplier =
() -> CompletableFuture.supplyAsync(flightSupplier);

然后我们可以使用 TimeLimiter 装饰 Supplier

ScheduledExecutorService scheduler =
  Executors.newSingleThreadScheduledExecutor();
Supplier<CompletionStage<List<Flight>>> decoratedCompletionStageSupplier =  
  limiter.decorateCompletionStage(scheduler, origCompletionStageSupplier);

最后,让我们调用装饰的异步操作:

decoratedCompletionStageSupplier.get().whenComplete((result, ex) -> {
  if (ex != null) {
    System.out.println(ex.getMessage());
  }
  if (result != null) {
    System.out.println(result);
  }
});

以下是成功飞行搜索的示例输出,其耗时少于我们指定的 2 秒 timeoutDuration

Searching for flights; current time = 19:25:09 783; current thread = ForkJoinPool.commonPool-worker-3

Flight search successful

[Flight{flightNumber='XY 765', flightDate='08/30/2020', from='NYC', to='LAX'}, Flight{flightNumber='XY 746', flightDate='08/30/2020', from='NYC', to='LAX'}] on thread ForkJoinPool.commonPool-worker-3

这是超时的航班搜索的示例输出:

Exception java.util.concurrent.TimeoutException: TimeLimiter 'flightSearch' recorded a timeout exception on thread pool-1-thread-1 at 19:38:16 963

Searching for flights; current time = 19:38:18 448; current thread = ForkJoinPool.commonPool-worker-3

Flight search successful at 19:38:18 461

上面的时间戳和线程名称表明,即使异步操作稍后在另一个线程上完成,调用线程也会收到 TimeoutException。

如果我们想创建一个装饰器并在代码库的不同位置重用它,我们将使用decorateCompletionStage()。如果我们想创建它并立即执行 Supplier<CompletionStage>,我们可以使用 executeCompletionStage() 实例方法代替:

CompletionStage<List<Flight>> decoratedCompletionStage =  
  limiter.executeCompletionStage(scheduler, origCompletionStageSupplier);

TimeLimiter 事件

TimeLimiter 有一个 EventPublisher,它生成 TimeLimiterOnSuccessEventTimeLimiterOnErrorEventTimeLimiterOnTimeoutEvent 类型的事件。我们可以监听这些事件并记录它们,例如:

TimeLimiter limiter = registry.timeLimiter("flightSearch");

limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));

limiter.getEventPublisher().onError(e -> System.out.println(e.toString()));

limiter.getEventPublisher().onTimeout(e -> System.out.println(e.toString()));

示例输出显示了记录的内容:

2020-08-07T11:31:48.181944: TimeLimiter 'flightSearch' recorded a successful call.

... other lines omitted ...

2020-08-07T11:31:48.582263: TimeLimiter 'flightSearch' recorded a timeout exception.

TimeLimiter 指标

TimeLimiter 跟踪成功、失败和超时的调用次数。

首先,我们像往常一样创建 TimeLimiterConfigTimeLimiterRegistryTimeLimiter。然后,我们创建一个 MeterRegistry 并将 TimeLimiterRegistry 绑定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedTimeLimiterMetrics.ofTimeLimiterRegistry(registry)
  .bindTo(meterRegistry);

运行几次限时操作后,我们显示捕获的指标:

Consumer<Meter> meterConsumer = meter -> {
  String desc = meter.getId().getDescription();
  String metricName = meter.getId().getName();
  String metricKind = meter.getId().getTag("kind");
  Double metricValue =
    StreamSupport.stream(meter.measure().spliterator(), false)
    .filter(m -> m.getStatistic().name().equals("COUNT"))
    .findFirst()
    .map(Measurement::getValue)
    .orElse(0.0);
  System.out.println(desc + " - " +
                     metricName +
                     "(" + metricKind + ")" +
                     ": " + metricValue);
};
meterRegistry.forEachMeter(meterConsumer);

这是一些示例输出:

The number of timed out calls - resilience4j.timelimiter.calls(timeout): 6.0

The number of successful calls - resilience4j.timelimiter.calls(successful): 4.0

The number of failed calls - resilience4j.timelimiter.calls(failed): 0.0

在实际应用中,我们会定期将数据导出到监控系统并在仪表板上进行分析。

实施时间限制时的陷阱和良好实践

通常,我们处理两种操作 – 查询(或读取)和命令(或写入)。对查询进行时间限制是安全的,因为我们知道它们不会改变系统的状态。我们看到的 searchFlights() 操作是查询操作的一个例子。

命令通常会改变系统的状态。bookFlights() 操作将是命令的一个示例。在对命令进行时间限制时,我们必须记住,当我们超时时,该命令很可能仍在运行。例如,bookFlights() 调用上的 TimeoutException 并不一定意味着命令失败。

在这种情况下,我们需要管理用户体验——也许在超时时,我们可以通知用户操作花费的时间比我们预期的要长。然后我们可以查询上游以检查操作的状态并稍后通知用户。

结论

在本文中,我们学习了如何使用 Resilience4j 的 TimeLimiter 模块为异步、非阻塞操作设置时间限制。我们通过一些实际示例了解了何时使用它以及如何配置它。

您可以使用 GitHub 上的代码演示一个完整的应用程序来说明这些想法。


本文译自:
https://reflectoring.io/time-limiting-with-resilience4j/

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注