使用 Spring AI 创建 MCP 服务器

本文提供了使用 Spring AI 创建模型上下文协议服务器的分步指南,并阐述了使用 MCP 的优势。

在本篇博客中,您将学习如何使用 Spring AI 创建一个模型上下文协议服务器。您将看到创建自己的 MCP 服务器是多么简单。

引言

模型上下文协议(Model Context Protocol)为将大型语言模型(LLM)连接到各种数据源和工具提供了一种标准化的方式。这句话中的"标准化"一词非常重要。这意味着与数据源和工具的集成变得比以往容易得多。除此之外,MCP 服务器通过额外的知识或功能来增强您的 LLM,使其成为一个更强大的助手。想象一下,您可以要求 LLM 为您预订假期。

根据您的偏好,它将在互联网上搜索合适的地点、预订酒店、预订航班等等,然后您就可以出发了!当然,LLM 需要能够为您预订酒店和航班。这种附加功能可以由 MCP 服务器提供。这听起来很吸引人,但也有些令人担忧。为了预订酒店,MCP 服务器需要知道您的个人详细信息并需要访问您的信用卡。这可能不是一个好主意。建议对敏感操作使用人工介入(HITL),以便您可以批准或拒绝某项操作。然而,MCP 服务器将使您的生活变得更加轻松。

在本博客中,您将学习如何使用 Spring Boot 和 Spring AI 创建自己的 MCP 服务器。没有 MCP 客户端,服务器将毫无用处。MCP 客户端与一个或多个 MCP 服务器交互,并且是控制方。作为 MCP 客户端,您将(滥)用 IntelliJ DevoxxGenie 插件。DevoxxGenie 实际上是一个 AI 编码助手,但您也可以使用它来测试您的 MCP 服务器。在下一篇博客中,您将创建自己的 MCP 客户端。

本篇博客中使用的源代码可在 GitHubserver 目录中找到。

先决条件

阅读本博客的先决条件是:

  • 基础的 Java 知识;
  • 基础的 Spring Boot 知识;
  • 基础的 LMStudio 知识;
  • 基础的 IntelliJ 和 DevoxxGenie 知识。

构建 MCP 服务器

您将构建的 MCP 服务器具有以下功能:

  • 返回我喜爱的艺术家列表
  • 返回我喜爱的歌曲列表

LLM 不会拥有关于此信息的任何知识,而当它能够访问这些工具时,希望它会利用它们。本应用深受 Dan Vega 的《使用 Java 创建您的第一个模型上下文协议服务器》的启发。

访问 Spring Initializr 并添加依赖项 Model Context Protocol Server。这将在 pom.xml 中添加以下依赖项。

<dependency>
  <groupId>org.springframework.ai</groupId>
  <artifactId>spring-ai-starter-mcp-server</artifactId>
</dependency>

创建 Artist 的数据模型。

public record Artist(String name) {
}

创建 Song 的数据模型。

public record Song(Artist artist, String title) {
}

创建 ArtistService。该服务将包含一个我喜爱的艺术家列表,但在实际应用中,这些信息将存储在数据库中。通过使用 @Tool 注解来定义一个名为 get_artists 的工具。为工具提供名称和描述。LLM 将使用该描述来了解工具的功能。

@Service
public class ArtistService {

    private final List<Artist> artists = new ArrayList<>();

    @Tool(name = "get_artists", description = "获取 Gunter 喜爱的艺术家完整列表")
    public List<Artist> getArtists() {
        return artists;
    }

    @PostConstruct
    public void init() {
        artists.addAll(List.of(
                new Artist("Bruce Springsteen"),
                new Artist("JJ Johnson")
        ));
    }

}

以类似的方式,创建 SongService

@Service
public class SongService {

    private final List<Song> songs = new ArrayList<>();

    @Tool(name = "get_songs", description = "获取 Gunter 喜爱的歌曲完整列表")
    public List<Song> getSongs() {
        return songs;
    }

    @PostConstruct
    public void init() {
        songs.addAll((List.of(
                new Song(new Artist("Bruce Springsteen"), "My Hometown"),
                new Song(new Artist("JJ Johnson"), "Lament")
        )));
    }

}

您通过一个 Bean 来注册这些工具。

@SpringBootApplication
public class MyMcpServerPlanetApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyMcpServerPlanetApplication.class, args);
    }

    @Bean
    public ToolCallbackProvider mcpServices(ArtistService artistService, SongService songService) {
        return MethodToolCallbackProvider.builder()
                .toolObjects(artistService, songService)
                .build();
    }

}

最后,添加以下 application.properties 配置。

spring.main.web-application-type=none
spring.ai.mcp.server.name=mcp-server
spring.ai.mcp.server.version=0.0.1

spring.main.banner-mode=off
logging.pattern.console=

此配置做了几件重要的事情:

  • 禁用 Web 应用程序:由于 MCP 使用 STDIO 传输,因此不需要 Web 服务器。
  • 设置服务器名称和版本:这用于向客户端标识 MCP 服务器。
  • 禁用横幅和控制台日志记录:这对于 STDIO 传输正常工作至关重要。

构建 jar 文件。

mvn clean verify

这将在 target 目录中生成一个 jar 文件:target/mcp-server-0.0.1-SNAPSHOT.jar

测试 MCP 服务器

为了测试 MCP 服务器,需要一个 MCP 客户端。如前所述,将使用 DevoxxGenie 来实现此目的。

将 MCP 服务器添加到 DevoxxGenie 的 MCP 设置中。请注意,您需要对命令和参数使用完整路径。

  • 名称:MyMcpServerPlanet
  • 传输类型:STDIO
  • 命令/<java installation directory>/bin/java
  • 参数-jar /home/<project directory>/mymcpserverplanet/server/target/mymcpserverplanet-0.0.1-SNAPSHOT.jar

Test MCP Server)

单击"测试连接并获取工具"按钮。按钮的标题应变更为"连接成功!找到 2 个工具"。

与 DevoxxGenie 一起,使用 LMStudio 作为推理引擎,模型 qwen3-8b 在 GPU 上运行。

输入提示:给我一个 Gunter 喜爱的艺术家列表。

该请求被发送到 LMStudio,可用的工具也随请求一起发送。如果您在 LMStudio 中启用了调试日志,可以检查该请求。

2025-07-20 11:45:42 [DEBUG]
 Received request: POST to /v1/chat/completions with body  {
  "model": "qwen3-8b",
  "messages": [
    {
      "role": "system",
      "content": "You are a software developer IDEA plugin with expe... <日志中已截断> ...at is correct and relevant to the code or plugin.\n"
    },
    {
      "role": "user",
      "content": "<ProjectPath>\n/home/<project directory>... <日志中已截断> ... list of gunter's favorite artists\n</UserPrompt>\n\n"
    }
  ],
  "temperature": 0,
  "top_p": 0.9,
  "stream": false,
  "max_tokens": 8000,
  "tools": [
    {
      "type": "function",
      "function": {
        "name": "get_songs",
        "description": "获取 Gunter 喜爱的歌曲完整列表",
        "parameters": {
          "type": "object",
          "properties": {},
          "required": []
        }
      }
    },
    {
      "type": "function",
      "function": {
        "name": "get_artists",
        "description": "获取 Gunter 喜爱的艺术家完整列表",
        "parameters": {
          "type": "object",
          "properties": {},
          "required": []
        }
      }
    }
  ]
}

LLM 将意识到它自己无法回答这个问题。然而,LLM 确实识别出有一个可用的工具可以回答这个问题。因此,LLM 将请求客户端调用 MCP 工具。DevoxxGenie 内置了人工介入机制,并将请求您的批准。
点击批准按钮

点击"批准"按钮。MCP 服务器将执行 get_artists 工具,并将响应发送回 LLM。LLM 将使用此响应来给出适当的答案。

收到以下响应,并且完全正确。观察 LLM 的思维过程也很有趣。

<think> 好的,用户要求提供 Gunter 喜爱的艺术家列表。我调用了 `get_artists` 函数,返回了两位艺术家:Bruce Springsteen 和 JJ Johnson。现在他们可能想知道是否还有更多或者需要进一步帮助。

我应该确认结果,也许可以询问他们是否还需要其他什么。由于之前的响应是成功的,保持积极的语气很好。工具调用没有错误,所以这里不需要额外的函数调用。 </think>

以下是 Gunter 喜爱的艺术家:

*   **Bruce Springsteen**
*   **JJ Johnson**

如果您需要更多信息或帮助,请告诉我!

让我们使用以下提示检查 get_songs 工具:给我一个 Gunter 喜爱的歌曲列表

遵循相同的过程,响应再次正确。

<think> 好的,用户要求提供 Gunter 喜爱的歌曲列表。我使用了 get_songs 函数,它检索到了两首歌曲。现在我需要清晰地呈现这些信息。让我用艺术家和歌名来格式化每首歌曲。确保响应易于阅读,也许用项目符号。检查是否需要任何额外信息,但既然用户只想要一个列表,保持简单应该是最好的。 </think>

以下是 Gunter 喜爱的歌曲列表:

1.  **Bruce Springsteen** - *《My Hometown》*
2.  **JJ Johnson** - *《Lament》*

如果您想了解这些曲目的更多详细信息,请告诉我!

添加搜索功能

让我们向两个 MCP 工具添加一些基本的搜索功能。

ArtistService 添加一个 search_artist 工具。

@Tool(name = "search_artist", description = "从 Gunter 喜爱的艺术家中搜索单个艺术家")
public Artist getArtist(String name) {
    return artists.stream()
            .filter(artist -> artist.name().equalsIgnoreCase(name))
            .findFirst()
            .orElse(null);
}

SongService 添加一个 search_song 工具。

@Tool(name = "search_song", description = "从 Gunter 喜爱的歌曲中搜索单首歌曲")
public Song getSong(String title) {
    return songs.stream()
            .filter(song -> song.title().equalsIgnoreCase(title))
            .findFirst()
            .orElse(null);
}

构建 jar 文件并使用 DevoxxGenie 获取新工具。

输入以下提示:Bruce Springsteen 是 Gunter 喜爱的艺术家之一吗?

这次调用了 search_artist 工具,响应再次正确。

<think> 好的,用户询问 Bruce Springsteen 是否是 Gunter 喜爱的艺术家之一。我使用 search_artist 函数并传入了他的名字。响应返回了艺术家 "Bruce Springsteen"。现在我需要检查他是否在 Gunter 的喜爱列表中。

等等,可用的函数是 get_artists 和 search_artist。search_artist 函数可能检查艺术家是否存在于数据库中。既然响应返回了该艺术家,那意味着 Bruce Springsteen 确实在 Gunter 喜爱的艺术家列表中。

所以答案应该是肯定的。我应该根据函数的响应来确认这一点。既然用户的问题已经得到回答,不需要进一步的操作。只需清晰地呈现结果即可。 </think>

是的,Bruce Springsteen 是 Gunter 喜爱的艺术家之一。搜索确认了他在收藏中。

输入以下提示:滚石乐队是 Gunter 喜爱的艺术家之一吗?

调用了 search_artist 工具,由于未找到滚石乐队,LLM 断定他们不在我喜爱的艺术家列表中。

<think> 好的,让我试着弄清楚这里发生了什么。用户询问滚石乐队是否是 Gunter 喜爱的艺术家之一。我使用 search_artist 函数,传入了名称 "Rolling Stones",但响应是 null。这意味着该函数在 Gunter 的喜爱列表中没有找到该名称的艺术家。

嗯,也许数据库中艺术家的名称不完全是 "Rolling Stones"。会不会有拼写错误或不同的拼写方式?或者也许他们在系统中以另一个名称而闻名。或者,也许是因为数据不存在,所以函数没有返回任何结果。

既然用户是在询问 Gunter 的喜爱列表,我应该检查是否有一种方法可以首先列出所有喜爱的艺术家。get_artists 函数可能会有所帮助。如果我调用它,也许我可以看到滚石乐队是否在列表中。但是等等,用户可能不想要所有艺术家,只是确认这一个特定的。

或者,也许 search_artist 函数需要不同的参数。让我再次检查工具定义。search_artist 函数需要一个 "name" 参数,这是必需的。那么查询应该是大小写不敏感的?或者可能是部分匹配?用户可能指的是 "Rolling Stones",但数据库中的存储方式不同。

另一个想法:也许滚石乐队不在 Gunter 喜爱的艺术家列表中。搜索返回了 null,所以可能他们不在。但我应该通过先检查所有艺术家来确认。我应该向用户建议这一点。等等,但用户是专门询问滚石乐队。也许我应该告知他们搜索没有找到任何结果,并询问他们是否想改为检查所有艺术家。

等等,响应是 null,所以函数没有返回任何数据。这意味着该艺术家不在列表中。所以答案应该是否定的,滚石乐队不是 Gunter 喜爱的艺术家之一。但是函数调用会不会有错误?让我再检查一遍。函数是 search_artist,名称是 "Rolling Stones"。如果数据库中有确切的名称,那么它应该返回。如果没有,则返回 null。

所以结论是滚石乐队不在 Gunter 的喜爱列表中。或者,可能存在拼写错误。但既然用户是直接询问,我应该根据函数的响应来回答。 </think>

在 Gunter 喜爱的艺术家列表中,对艺术家 "Rolling Stones" 的搜索未返回任何结果。这表明滚石乐队**目前并未**列为 Gunter 喜爱的艺术家之一。您想改为查看他喜爱的艺术家的完整列表吗?

结论

在本博客中,您学习了如何使用 Spring Boot 和 Spring AI 创建一个 MCP 服务器。创建 MCP 服务器和添加工具非常容易。您可以专注于工具的功能,而这正是您想要的。


【注】本文译自:Creating an MCP Server With Spring AI

Java 缓存精要

Java 缓存精要

实现更低延迟、降低成本并赋能智能体架构

作者:GRANVILLE BARNETT
架构师,HAZELCAST

缓存技术在系统中的作用日益重要,对于大规模解锁众多用例至关重要。几十年来,缓存已实现低成本、可扩展地访问会话状态和数据存储等信息。更现代的缓存用例正在实现低成本、可扩展的工具链,并在智能体架构中实现嵌入生成,这正在解锁下一代系统创新。

本参考资料卡介绍了使用 Java 的 JCache(Java 临时缓存 API)将缓存融入系统的方法。文中首先讨论了缓存的基础知识,然后通过代码示例简要介绍了 JCache API,最后总结了缓存部署架构。

缓存概述

缓存是先前计算结果的一个存储,以便可以省略后续计算。理解缓存最简单的方式是将其视为键值存储:对于给定的输入(键),输出(值)代表先前基于该输入计算出的结果。

缓存命中表示特定数据存在于缓存中,这种情况下可以使用其值。否则,就会发生缓存未命中,此时需要执行相关计算并将其输出放入缓存。缓存未命中的代价可能除了昂贵的计算操作外,还涉及网络通信。

图 1:简化的缓存命中/未命中流程

采用缓存是为了减少延迟并降低运营成本,几十年来对于实现众多类别的应用程序至关重要。缓存数据的例子包括 Web 应用程序的会话状态、数据库查询结果、网页渲染结果,以及来自通用网络和计算成本高昂的操作的结果。

缓存的一个更现代的用途是在 AI 领域。在这里,缓存的使用减少了昂贵的 API 调用(例如,嵌入生成),并最大限度地减少了智能体架构中智能体之间的对话断续(例如,由于工具调用和网络通信所致),从而解锁了新一波的解决方案和用户体验。

缓存可以驻留在进程内,作为客户端-服务器架构的一部分存在于服务中,或者是两者的结合。此外,缓存的部署通常可以组合。例如,应用程序可能与位于同一数据中心的缓存服务通信,而数据中心的本地缓存又是跨越多个数据中心的缓存的缓存。这种灵活性,加上缓存所支持的应用类别,使得缓存在过去几十年中成为一种主导的抽象概念。

本参考资料卡的剩余部分将讨论 JCache——Java 用于将缓存融入应用程序的抽象——首先简要概述您将经常使用的类,然后深入探讨 JCache 更广泛功能所提供的特性。最后,我们将总结缓存部署策略。

JCACHE 精要

JCache 在 Java 规范请求(JSR)107 中引入,并提供了一套关于缓存的抽象。JCache 有两个突出的特性:

  • JCache 是一个规范。 JSR 是由专家组设计和提交,并最终由 Java 社区过程执行委员会批准的规范。因为 JCache 是一个规范,所以它与那些 API 频繁变化的实现隔离开来。
  • JCache 是提供商独立的。 JCache 作为规范的一个副作用是,缓存解决方案提供商可以通过实现其暴露的服务提供程序接口(SPI)来与 JCache 集成。这为系统设计者提供了灵活性并避免了供应商锁定。

以下是一个简单的 JCache 示例,以便理解其使用方式。javax.cache 依赖项的获取方式可以在此处找到。

import java.util.Map;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class App {
    public static void main(String[] args) {
        CachingProvider cachingProvider = Caching.getCachingProvider(); // (1)
        CacheManager cacheManager = cachingProvider.getCacheManager(); // (2)
        MutableConfiguration<String, String> cacheConfig = new MutableConfiguration<String, String>(); // (3)
        Cache<String, String> cache = cacheManager.createCache("dzone-cache", cacheConfig); // (4)
        cache.put("England", "London"); // (5)
        cache.putAll(Map.of("France", "Paris", "Ireland", "Dublin")); // (6)
        assert cache.get("England").equals("London"); // (7)
        assert cache.get("Italy") == null; // (8)
    }
}

对上述示例的简要说明:
(1) 获取底层缓存提供程序的句柄
(2) 管理缓存的生命周期(例如,创建和销毁缓存)
(3) 允许启用/禁用缓存的特定功能(例如,统计信息、条目监听器)
(4) 创建由缓存提供程序支持的缓存
(5) 在缓存中放入单个键值条目
(6) 将键值条目放入缓存
(7) 断言缓存条目的存在
(8) 断言某个条目不在缓存中

本节的剩余部分将更详细地讨论上述示例中引入的抽象,以及您将经常遇到的相关类的其他方法。

javax.cache.spi.CachingProvider 构成了 JCache SPI,缓存提供者可以与之集成。您将使用的最常见功能是获取对 CacheManager 的引用。我们稍后将讨论 Caching

getCacheManagergetCacheManager 变体中最简单的一个。这将根据提供者的默认设置获取一个 CacheManager。可以使用 javax.cache.CacheManager 创建和销毁缓存:

  • createCache 创建一个具有给定名称和配置的缓存。
  • destroyCache 销毁具有给定名称的缓存。

javax.cache.Cache 是对提供者缓存的抽象,并暴露了少量用于查询和变更缓存项的操作:

  • putputAll 将条目放入缓存。请注意,这些方法不返回与正在放入的键先前关联的任何值。
  • containsKey 测试键是否存在于缓存中。
  • getgetAll 返回与指定键关联的值。
  • removeremoveAll 从缓存中移除项。

JCACHE 包

在本节中,我们将快速概述 javax.cache 更广泛包结构中的一些重要接口,并提供常用功能的示例。我们可以参考文档来浏览其内容的详尽列表。

图 2:javax.cache 的组成包

JAVAX.CACHE

通用管理(CacheManager)和与缓存交互(Cache)的设施位于 javax.cache 包内。除了初始配置之外,除非您想为缓存添加额外功能,否则仅使用此包中的类型就可以完成很多工作。例如,"JCache 精要"部分介绍中的示例用法主要使用了 javax.cache 中定义的接口。

JAVAX.CACHE.CONFIGURATION

在创建缓存期间,您可能希望添加功能,例如启用统计信息或通读缓存。此包提供了一个 Configuration 接口和一个实现 MutableConfiguration,可用于此类目的。

// ...
MutableConfiguration<String, String> cacheConfig = new MutableConfiguration<String, String>();
cacheConfig.setManagementEnabled(true).setStatisticsEnabled(true);
Cache<String, String> cache = cacheManager.createCache("dzone-cache", cacheConfig);

JAVAX.CACHE.EXPIRY

有时您希望驻留在缓存中的项过期。例如,我们可能有一个家庭保险报价的缓存,有效期为 24 小时。在这种情况下,我们可以使用过期策略如下:

// ...
MutableConfiguration<String, Double> cacheConfig = new MutableConfiguration<String, Double>();
cacheConfig.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY));
Cache<String, Double> cache = cacheManager.createCache("insurance-home-quotes", cacheConfig);
cache.put(quote.getId(), quote.getValue());

javax.cache.expiry 包提供了额外的过期策略,可能对其他场景有用。例如,AccessedExpiryPolicy 允许基于缓存条目的最后访问时间附加过期设置。

JAVAX.CACHE.EVENT

JCache 的一个强大功能是能够订阅缓存事件。例如,我们可能希望在创建或删除缓存条目后运行某些领域逻辑。javax.cache.event 包提供了实现此功能的抽象,特别是订阅缓存创建、更新、过期和移除的能力。以下基本示例在缓存条目创建后运行某些领域逻辑:

// ...
CacheEntryCreatedListener<String, String> createdListener = new CacheEntryCreatedListener<String, String>() {
  @Override
  public void onCreated(Iterable<CacheEntryEvent<? extends String, ? extends String>> events) throws CacheEntryListenerException {
    for (var c : events) {
      performDomainLogic(c);
    }
  }
};
MutableConfiguration<String, String> cacheConfig = new MutableConfiguration<String, String>();
MutableCacheEntryListenerConfiguration<String, String> listenerConfig = new MutableCacheEntryListenerConfiguration<>(() -> createdListener, null, false, true); // 请参阅文档
cacheConfig.addCacheEntryListenerConfiguration(listenerConfig);
Cache<String, String> cache = cacheManager.createCache("events", cacheConfig);
cache.put("key", "value"); // 调用创建监听器

JAVAX.CACHE.PROCESSOR

JCache 的一个强大组件是能够使用 EntryProcessor 将计算移至数据所在处,然后以编程方式调用该计算。当使用在分布式系统(例如,Hazelcast)内托管其缓存的提供者时,这尤其强大,因为它以很少的投入为分布式计算提供了一个简单的入口点。以下是一个 EntryProcessor 的简单示例,它将 UUID 附加到缓存条目:

// ...
class AppendUuidEntryProcessor implements EntryProcessor<String, String, String> {
  @Override
  public String process(MutableEntry<String, String> entry, Object... arguments) throws EntryProcessorException {
    if (entry.exists()) {
      String newValue = entry.getValue() + "-" + UUID.randomUUID();
      entry.setValue(newValue);
      return newValue;
    }
    return null;
  }
}
// ...
cache.invoke(key, new AppendUuidEntryProcessor())

JAVAX.CACHE.MANAGEMENT

JCache 提供的管理钩子非常强大且易于启用。例如,下面的小代码片段暴露了由 Java 管理扩展(JMX)规范定义的托管 Bean。这使得诸如 jconsole 和 JDK Mission Control 之类的 JMX 客户端能够查看缓存配置和统计信息(例如,命中和未命中百分比、平均获取和放置时间)。

// ...
MutableConfiguration<String, String> cacheConfig = new MutableConfiguration<String, String>();
cacheConfig.setManagementEnabled(true).setStatisticsEnabled(true);
Cache<String, String> cache = cacheManager.createCache("management", cacheConfig);
// ...

JAVAX.CACHE.SPI

"JCache 精要"部分提供的示例省略了我们如何注册缓存提供者,即使用 JCache API 与我们的应用程序交互的缓存宿主服务。这就是 JCache 的 SPI 组件发挥作用的地方。

实现这一点有两个组成部分:

  1. 将我们的缓存提供者添加到类路径中
  2. 告诉 JCache 使用该提供者

第一步很简单:只需添加对任何符合 JSR 107 标准的提供者的依赖。

第二步有几种通用的方法:

  • 我们可以通过调用 Caching#getCachingProvider(...) 的某个变体(以及其他方法)来告诉 JCache。
  • 我们可以提供一个 META-INF/services/javax.cache.spi.CachingProvider 文件,并让其指定提供者实现。指定的提供者是您的提供者的缓存提供者实现的完全限定名称。
  • 我们可以使用 Caching#getCachingProvider();但是,最好明确限定要使用的提供者,因为您的类路径上可能有多个提供者,这会抛出 javax.cache.CacheException

例如,以下代码使用 CachingProvider Caching.getCachingProvider(String) 指定 Hazelcast 为提供者:

CachingProvider cachingProvider = Caching.getCachingProvider("com.hazelcast.cache.HazelcastCachingProvider");
CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<String, String> cacheConfig = new MutableConfiguration<String, String>();
Cache<String, String> cache = cacheManager.createCache("spi-example", cacheConfig);
cache.put("k", "v");

JAVAX.CACHE.ANNOTATION

JCache 定义了许多注解,用于集成到上下文和依赖注入环境中。Spring Framework 原生支持 JCache 注解。我们可以参考 JCache 文档以获取更多信息。

JAVAX.CACHE.INTEGRATION

javax.cache.integration 包提供了 CacheLoader(需要通读)和 CacheWriter(需要通写)。CacheLoader 在将数据读入缓存时使用——例如 Cache#loadAll(...)CacheWriter 可以作为一个集成点,将缓存变更(例如,写入、删除)传播到外部存储服务。

缓存部署

JCache 没有缓存部署策略的概念;它仅仅是缓存提供者之上的一个 API。然而,不同的提供者支持不同类型的缓存部署。请考虑哪种缓存部署对您的应用程序有意义,并由此反向确定合适的缓存提供者。

图 3: 缓存部署示例

请注意,一些缓存提供者可能支持所有这三种缓存部署,而其他提供者可能不支持。

本节的剩余部分讨论图 2 中所示的常见缓存部署:

  • 嵌入式 – 缓存与应用程序位于同一进程中。
  • 客户端-服务器 – 缓存托管在独立的服务中,客户端与该服务通信以确定缓存驻留。
  • 嵌入式/客户端-服务器 – 这是一种混合模式,整个缓存托管在不同的服务上,但客户端在同一进程中拥有一个较小的本地缓存。

重要的是要注意,上述缓存部署并非互斥的;它们可以通过多种方式组合以满足应用程序需求。

最简单的缓存部署是让缓存与应用程序驻留在同一进程中,这样做的好处是提供低延迟的缓存访问。嵌入式缓存不能在应用程序之间共享,并且在应用程序重启或故障时,其托管(它们所需的资源)和重建成本可能很高。

客户端-服务器缓存部署将缓存托管在与客户端不同的服务中。缓存服务允许通过跨服务复制来满足容错需求,提供更大的容量、更多的可扩展性选项,以及跨应用程序共享缓存的能力。客户端-服务器模型的主要缺点在于客户端缓存查询期间网络通信的成本。

混合嵌入式/客户端-服务器部署是指我们拥有一个嵌入式缓存,它包含来自服务缓存条目的一个子集,作为应用程序缓存请求的副作用被填充。在这里,客户端可以对频繁访问的数据(或表现出特定访问模式的数据)实现低延迟的缓存命中,并省去与缓存服务通信所带来的网络通信开销。如果嵌入式缓存过期,一些提供者会负责使用服务托管的缓存来更新它们。

结论

本参考资料卡介绍了缓存以及如何将其与 Java 的 JCache API 一起使用。JCache API 直观、强大,并且由于其是一个规范而避免了供应商锁定,为架构师和系统设计者提供了他们所需的灵活性。这种灵活性在我们进入基于智能体架构的新一代创新时尤为重要,其中缓存对于工具链和嵌入生成至关重要。

作者:GRANVILLE BARNETT,
架构师,HAZELCAST
Granville Barnett 拥有计算机科学博士学位,是拥有超过 15 年经验的分布式系统专家。他目前是 Hazelcast 的架构师,此前曾在 HP Labs 和 Microsoft 任职。Granville 拥有多项美国专利,并发表了关于程序验证主题的研究。

附加资源:


【注】本文译自:Java Caching Essentials

使用 Java、Spring Boot 和 Spring AI 开发符合 A2A 标准的 AI 智能体

AI 智能体指的是一种软件实体,它能够利用自然语言处理、机器学习或推理系统等人工智能技术,自主感知、推理和行动,以实现特定目标。

我为 Telex 开发了一个 AI 智能体,该智能体接收一个正则表达式模式,并就该模式所匹配的字符串类型提供易于理解的解释。开发此智能体的灵感源于我在此之前开发的一个 API(您可以在此处查看该项目),在该 API 中我必须使用正则表达式进行一些自然语言处理。尽管我之前学习过正则表达式,但感觉像是第一次见到它。正则表达式就是这样。因此,当 Telex 为其平台寻求更多 AI 智能体时,我决定开发这个智能体。

以下是我使用 Java、Spring AI 和 Spring Boot 实现它的过程。

初始设置

1. Spring Boot 项目初始化

我使用 Spring 提供的初始化工具来初始化项目。请注意,我在依赖项中包含了 Spring Web 和 Open AI。

初始化 Spring 项目

2. 设置 API 凭证

在我的 application.properties 文件中,我设置了 Spring AI 以使用我的 API 凭证(我的 API 密钥)。我通过 Google AI Studio 获得了一个免费的 Google Gemini API 密钥。我的 application.properties 文件设置如下:

    spring.config.import=classpath:AI.properties

    spring.application.name=regexplain

    spring.ai.openai.api-key = ${GEMINI_API_KEY}
    spring.ai.openai.base-url = https://generativelanguage.googleapis.com/v1beta/openai
    spring.ai.openai.chat.completions-path = /chat/completions
    spring.ai.openai.chat.options.model = gemini-2.5-pro

第一行导入了包含我 API 密钥的文件。重要的是不要将您的 API 密钥暴露给公众。该文件与 application.properties 位于同一文件夹中。

3. 首次项目运行

使用我的包管理器(Maven),我安装了所需的依赖项。然后我运行了我的主类,以确保一切正常。如果您到目前为止一切都做对了,您的项目应该可以无错误运行。如果遇到任何错误,请在 Google 上查找解决方法。

A2A 请求和响应模型

在深入实现之前,让我们先谈谈符合 A2A 标准的请求和响应的结构。A2A 协议遵循标准的 JSON-RPC 2.0 结构来处理请求和响应。

所有方法调用都封装在一个请求对象中,其结构如下:

{
  "jsonrpc": "2.0",
  "method": "String",
  "id": "String | Integer",
  "params": "Message"
}

响应对象有些类似:

{
  "jsonrpc": "2.0",
  "id": "String | Integer | null",
  "result?": "Task | Message | null",
  "error?": "JSONRPCError"
}

响应中的 ID 必须与请求中的 ID 相同。

有关 A2A 协议的更多信息,请查阅 A2A 协议文档

以上就是请求和响应的通用结构。我开发这个智能体是为了在 Telex 平台上使用,因此我的部分实现可能特定于 Telex。

现在进入实现部分。我创建了一个名为 model 的文件夹,用于存储我的模型。请求模型类 A2ARequest 如下所示:

public class A2ARequest {
    private String id;
    private RequestParamsProperty params;

    public A2ARequest(String id, RequestParamsProperty params) {
        this.id = id;
        this.params = params;
    }

    // getters and setters
}

RequestParamsProperty 类代表了 params 中包含信息的结构。它如下所示:

public class RequestParamsProperty {
    private HistoryMessage message;
    private String messageId;

    public RequestParamsProperty(HistoryMessage message, String messageId) {
        this.message = message;
        this.messageId = messageId;
    }

    // getters and setter
}

HistoryMessage 类如下所示:

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class HistoryMessage {
    private String kind;
    private String role;
    private List<MessagePart> parts;
    private String messageId;
    private String taskId;

    public HistoryMessage() {}

    public HistoryMessage(String role, List<MessagePart> parts, String messageId, String taskId) {
        this.kind = "message";
        this.role = role;
        this.parts = parts;
        this.messageId = messageId;
        this.taskId = taskId;
    }

    // getters and setters
}

注解的作用是让 Spring 知道在请求和响应的 JSON 表示中包含什么。如果请求中不存在某个属性,它应该忽略它并在类中将其设置为 null。如果某个属性设置为 null,则不应将其包含在响应中。

MessagePart 类如下所示:

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class MessagePart {
    private String kind;
    private String text;
    private List<MessagePart> data;

    public MessagePart(String kind, String text, List<MessagePart> data) {
        this.kind = kind;
        this.text = text;
        this.data = data;
    }

    // getters and setters
}

以上就是表示从 Telex 接收的请求结构所需的所有类。现在需要为我的响应创建一个模型,以及表示响应所需的所有支持类。

A2AResponse 类:

@JsonInclude(JsonInclude.Include.NON_NULL)
public class A2AResponse {
    private final String jsonrpc;
    @JsonInclude(JsonInclude.Include.ALWAYS)
    private String id;
    private Result result;
    private CustomError error;

    public A2AResponse() {
        this.jsonrpc = "2.0";
    }

    public A2AResponse(String id, Result result, CustomError error) {
        this.jsonrpc = "2.0";
        this.id = id;
        this.result = result;
        this.error = error;
    }

    //getters and setters
}

Result 类:

public class Result {
    private String id;
    private String contextId;
    private TaskStatus status;
    private List<Artifact> artifacts;
    private List<HistoryMessage> history;
    private String kind;

    public Result() {}

    public Result(String id, String contextId, TaskStatus status, List<Artifact> artifacts, List<HistoryMessage> history, String task) {
        this.id = id;
        this.contextId = contextId;
        this.status = status;
        this.artifacts = artifacts;
        this.history = history;
        this.kind = task;
    }

    // getters and setters
}

CustomError 类:

public class CustomError {
    private int code;
    private String message;
    private Map<String, String> data;

    public CustomError(int code, String message, Map<String, String> data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }

    // getters and setters
}

TaskStatus 类:

@JsonInclude(JsonInclude.Include.NON_NULL)
public class TaskStatus {
    private String state;
    private Instant timestamp;
    private HistoryMessage message;

    public TaskStatus() {}

    public TaskStatus(String state, Instant timestamp, HistoryMessage message) {
        this.state = state;
        this.timestamp = timestamp;
        this.message = message;
    }

    // getters and setters
}

Artifact 类:

public class Artifact {
    private String artifactId;
    private String name;
    private List<MessagePart> parts; // 稍后复查此类型

    public Artifact() {}

    public Artifact(String artifactId, String name, List<MessagePart> parts) {
        this.artifactId = artifactId;
        this.name = name;
        this.parts = parts;
    }

    // getters and setters
}

A2A 协议还包含一个称为"智能体卡片"的东西。我也为它创建了一个模型。

public class AgentCard {
    private String name;
    private String description;
    private String url;
    private Map<String, String> provider;
    private String version;
    private Map<String, Boolean> capabilities;
    private List<String> defaultInputModes;
    private List<String> defaultOutputModes;
    private List<Map<String, Object>> skills;

    public AgentCard() {
        this.provider = new HashMap<>();
        this.capabilities = new HashMap<>();
        this.skills = new ArrayList<>();
    }

    // getters and setters
}

模型部分就这些了。继续…

服务类

我的智能体的作用是获取一个正则表达式字符串,然后使用预定义的提示词将其发送到 OpenAI 的 API。服务类负责与 OpenAI 通信,发送提示词并接收响应。我创建了另一个名为 service 的文件夹,我的服务类就放在这里。我是这样编写我的服务类的:

@Service
public class RegExPlainService {
    private ChatClient chatClient;

    RegExPlainService(ChatClient.Builder chatClientBuilder) {
        this.chatClient = chatClientBuilder.build();
    }

    @Tool(name = "regexplain", description = "An agent that explains what type of string a regex pattern matches")
    public String generateResponse(String regex) {
        return chatClient
                .prompt("Give me a simple explanation of the type of string matched by this regex pattern: %s. No validating statements from you. Just straight to the point".formatted(regex))
                .call()
                .content();
    }
}

@Service 注解允许 Spring Boot 将服务注入到您的控制器中。@Tool 注解将该方法标记为一个智能体工具,如果将来要扩展该智能体以包含该功能,它可以被自主调用。不过目前并不需要它。

控制器

控制器通过 REST API 暴露该智能体。在这个案例中,我有两个端点,一个 GET 端点和一个 POST 端点。我在一个名为 controller 的文件夹中创建了我的控制器。实现如下:

@RestController
public class RegExPlainController {
    private final RegExPlainService regexplainService;

    @Autowired
    RegExPlainController (RegExPlainService regexplainService) {
        this.regexplainService = regexplainService;
    }

    @GetMapping("/a2a/agent/regexplain/.well-known/agent.json")
    public ResponseEntity<AgentCard> getAgentCard () {
        AgentCard agentCard = new AgentCard();
        agentCard.setName("regexplain");
        agentCard.setDescription("An agent that provides a simple explanation of the type of string a regex pattern matches");
        agentCard.setUrl("regexplain-production.up.railway.app/api");
        agentCard.setProvider("Bituan", null); // 假设 setProvider 处理 Map 的填充
        agentCard.setVersion("1.0");
        agentCard.setCapabilities(false, false, false); // 假设 setCapabilities 处理 Map 的填充
        agentCard.setDefaultInputModes(List.of("text/plain"));
        agentCard.setDefaultOutputModes(List.of("application/json", "text/plain"));
        agentCard.setSkill("skill-001", "Explain Regex", "Provides a simple explanation of the type of string a regex pattern matches",
                List.of("text/plain"), List.of("text/plain"), List.of());

        return ResponseEntity.ok(agentCard);
    }

    @PostMapping("/a2a/agent/regexplain")
    public ResponseEntity<A2AResponse> explainRegex (@RequestBody A2ARequest request) {
        String regexRequest;
        String responseText;

        // 如果参数无效,返回 403
        try {
            regexRequest = request.getParams().getMessage().getParts().get(0).getText();
        } catch (Exception e) {
            CustomError error = new CustomError(-32603, "Invalid Parameter", Map.of("details", e.getMessage()));
            A2AResponse errorResponse = new A2AResponse(null, null,  error);
            return ResponseEntity.status(HttpStatusCode.valueOf(403)).body(errorResponse);
        }

        // 如果调用服务失败,返回错误 500
        try {
            responseText = regexplainService.generateResponse(regexRequest);
        } catch (Exception e) {
            CustomError error = new CustomError(-32603, "Internal Error", Map.of("details", e.getMessage()));
            A2AResponse errorResponse = new A2AResponse(null, null,  error);
            return ResponseEntity.internalServerError().body(errorResponse);
        }

        // 构建响应
        A2AResponse response = new A2AResponse();
        response.setId(request.getId());

        // 构建响应 -> 构建结果
        Result result = new Result();
        result.setId(UUID.randomUUID().toString());
        result.setContextId(UUID.randomUUID().toString());
        result.setKind("task");

        // 构建响应 -> 构建结果 -> 构建状态
        TaskStatus status = new TaskStatus();
        status.setState("completed");
        status.setTimestamp(Instant.now());

        // 构建响应 -> 构建结果 -> 构建状态 -> 构建消息
        HistoryMessage message = new HistoryMessage();
        message.setRole("agent");
        message.setParts(List.of(new MessagePart("text", responseText, null)));
        message.setKind("message");
        message.setMessageId(UUID.randomUUID().toString());

        // 构建响应 -> 构建结果 -> 构建状态 (续)
        status.setMessage(message);

        // 构建响应 -> 构建结果 -> 构建工件
        List<Artifact> artifacts = new ArrayList<>();
        Artifact artifact = new Artifact();
        artifact.setArtifactId(UUID.randomUUID().toString());
        artifact.setName("regexplainerResponse");
        artifact.setParts(List.of(new MessagePart("text", responseText, null)));
        artifacts.add(artifact);

        // 构建响应 -> 构建结果 -> 构建历史记录
        List<HistoryMessage> history = new ArrayList<>();

        // 构建响应 -> 构建结果 (续)
        result.setStatus(status);
        result.setArtifacts(artifacts);
        result.setHistory(history);

        // 构建响应 (续)
        response.setResult(result);

        return ResponseEntity.ok(response);
    }
}
  • GET 端点使用的路由路径是 A2A 协议标准中用于获取智能体卡片的部分。智能体卡片是对智能体及其功能的描述。
  • POST 端点接收一个符合 A2A 标准的请求,执行智能体,然后返回适当的响应。

结论

就是这样。这就是我编写 Regexplain 的过程。

通过这个示例,您可以从头开始构建您的 AI 智能体并使其符合 A2A 标准。或者,至少我希望这能让您对如何使用 Java 开发符合 A2A 标准的 AI 智能体有所了解。


【注】本文译自:Developing an A2A-compliant AI Agent with Java, Spring Boot and Spring AI – DEV Community

Java智能体框架的繁荣是一种代码异味

停止构建编排框架,开始构建智能体。未来属于那些掌握生态系统的人,而不是那些被困在构建特定语言引擎中的人。

我需要坦白。我是一个框架狂热者。我的职业生涯建立在 Apache Camel 之上,我人生中的大部分成功都归功于企业集成模式的优雅。我懂。如果有一个社区值得获得诺贝尔框架奖,那就是 Java 社区。从早年在红帽公司到整个大数据生态系统,框架 15 年来一直是 JVM 世界的引擎。我们是抽象的大师。

因此,当智能体时代来临而 Java 在奋力追赶时,我的第一本能是原始的:构建一个框架。我甚至开始了一个,驱动力是这样一个想法:"AI 智能体的 Apache Camel 在哪里?"

三个月前,可能只有一个严肃的 Java 智能体框架。现在,包括 Embabel 在内,已经有了三个。竞赛开始了。但目睹这场爆炸式增长,我不得不提出一个难题:框架本身是否正在成为一种反模式?我们是否在为自己创造负担,而不是专注于真正重要的事情:构建智能体

最近 Java 智能体框架的繁荣并非一个健康、成熟生态系统的标志。它是一种症状。一种架构层面的代码坏味道,告诉我们我们的方法存在根本性问题。

我们最初为什么要构建框架?

让我们回顾一下。为什么像 Spring 和 Camel 这样的框架变得如此主流?原因清晰且合理:

  • 开发人员生产力: 我们当时淹没在样板代码中。框架将其抽象掉了。
  • 代码质量与治理: 它们提供了标准化的模式,防止每个开发人员重新发明轮子。
  • 可重用性: 它们为我们提供了经过实战检验的构造来构建,节省了大量的时间和精力。

目标是优化生产力、质量和治理。但这些是我们今天应该优化的相同参数吗?感觉我们像是在用 2010 年的方法解决 2025 年的问题,完全忽视了房间里的大象:AI 驱动的开发工具

这头大象有个名字:Cursor(及其伙伴)

在我们忙于将 LangChain 移植到 Java 时,情况发生了变化:

Cursor 和 Copilot 生成样板代码的速度比你输入 import 语句还快。你正在构建的那个复杂的链式抽象?Cursor 三秒钟就能写出来。你正在标准化的那个工具注册模式?Copilot 已经知道五种变体。

但在这里,我们需要停下来问一个更根本的问题:你的最终用户实际需要什么?

你真正需要构建什么?

让我们具体点。我们大多数人面临两种情况:

  • 场景 1: 你在未来 12 个月内构建一个关键智能体。也许它是一个每天处理 10,000 次对话的客户服务智能体。或者一个需要理解你公司特定标准的代码审查智能体。或者一个绝不能对监管要求产生幻觉的合规智能体。
  • 场景 2: 你正在构建一个智能体平台。成百上千个智能体,每个都有不同的上下文、不同的领域、不同的需求。也许你在一家咨询公司,为多个客户构建智能体。或者你正在创建一个内部平台,不同团队可以在上面启动自己的智能体。你需要可重用、适应性强、可演进的东西。一种能让你快速创建新智能体,同时保持所有智能体一致性和质量的东西。

在这两种情况下,诚实地问自己:你的用户需要一个代码框架吗?

还是他们需要完全不同的东西?

重新定义框架

在放弃我的框架并实际交付智能体之后,我学到了:我们不需要消除框架。我们需要重新定义在 AI 时代框架实际意味着什么。

  • 旧框架定义: 一种可重用的代码抽象,提供结构并处理横切关注点。你导入并在其之上构建的东西。
  • 新框架定义: 构建智能体的完整环境,一组协同工作的相互依赖的层,其中代码层只是更大拼图的一部分。

以下是现代智能体框架中真正重要的层次:

第 1 层:语言本身

Java(或你选择的语言)及其构造、类型和模式。不包装在抽象中,直接使用。语言已经是你的逻辑结构框架。你不需要在 Java 之上再加一个代码框架。Java 本身就是框架。

第 2 层:模型

一个真正好的大语言模型:GPT-5、Claude、Gemini、Grok。这不仅仅是你调用的 API。它是你技术栈的核心组件。模型的能力直接决定了你能构建什么。像选择编程语言一样仔细地选择它。

第 3 层:开发人员生产力工具

Cursor、Copilot 以及下一代 AI 驱动的开发工具。这些不是可选的。它们是关键基础设施。你的框架应设计成与这些工具无缝协作。如果 Cursor 不能轻松地按照你的模式生成代码,那么你的模式是错误的,或者你可能需要很好地描述你的模式。

第 4 层:提示词包与指南

你经过版本控制、测试、治理的提示词。你的组织语音。你的领域知识。你的合规规则。这是你的业务逻辑存在的地方——不在代码中,而在精心策划的上下文和指令中。将这些视为你的依赖构件,就像 JAR 包,但用于智能体行为。

第 5 层:生态系统 API

对新兴的专业化平台及其 API 的上下文感知。用于知识检索的向量数据库。上下文存储和内存管理系统,如 Zep 或 Cognee。工具执行平台,如 Arcade。用于智能体监控的可观测性平台,如 Langfuse。提示词管理和版本控制工具。这些大多暴露 REST API 或标准协议,大多提供 LLM.txt 用于上下文导入。你的框架需要知道这些存在,并知道如何连接到它们。

第 6 层:架构与设计模式

作为指南和模式捕获的架构思维。关于这些层如何在不同用例中组合在一起的可重用蓝图。不是代码抽象——关于路由逻辑、版本控制策略、部署模式和生态系统集成的文档化知识,这些知识成为你组织上下文的一部分。

想想看。当你构建那个关键的客户服务智能体时,真正决定其成功的是什么?

  • 调用 LLM 的 Java 代码吗?(那是 20 行代码,Cursor 写的)
  • 复杂的链式编排吗?(标准控制流)
  • 重试逻辑和错误处理吗?(Java 已经有这方面的模式)

还是:

  • 选择的模型以及它处理你领域的能力
  • 教导它你的升级规则和语气的提示词
  • 让你能快速迭代这些提示词的工具
  • 与像 Arcade(工具)和 Zep(内存)这样的平台的集成
  • 让你能够对变更进行版本控制、测试和部署的架构
  • 让你能在多个智能体中重用这种方法的设计模式

那就是你的框架。所有六层,协同工作。

实践中的框架

让我向你展示在构建智能体时的实际示例:

第 4 层(提示词包) 是版本化的构件,不是你代码中的字符串:

prompts/
  customer-service/
    v1.2/
      system-prompt.md
      escalation-rules.md
      tone-guidelines.md
      product-context.md
      examples/
        refund-scenarios.yaml
        technical-issues.yaml

第 5 层(生态系统 API) 配置在你的环境中:
你的生态系统上下文嵌入在指南中:

# 生态系统集成指南

## 工具发现
- 调用 Arcade API 列出可用工具: GET /tools
- 参考: 查看 Arcade LLM.txt 位于 https://docs.arcade.dev/llms.txt

## 内存管理
- Zep 会话 API: https://api.getzep.com/v2/sessions/{session_id}
- 参考: 查看 Zep API 文档位于 https://docs.getzep.com

## 基础设施与存储
- 用于提示词构件的对象存储: S3, GCS, 或 Azure Blob
- 用于长时间运行工作流的状态持久化

第 1 层(Java) 提供结构,干净简单:

public class CustomerServiceAgent {
    private final Model model;
    private final PromptPack prompts;
    private final ArcadeClient tools;
    private final ZepMemory memory;

    public Response handle(CustomerQuery query) {
        // 检索会话内存
        var history = memory.getSession(query.sessionId());

        // 从 Arcade 获取可用工具
        var availableTools = tools.listTools();

        // 使用上下文渲染提示词
        var context = prompts.render("main", query, history, availableTools);

        return model.complete(context);
    }
}

第 3 层(Cursor) 在几秒钟内生成这段代码。你专注于架构。

第 6 层(架构) 指南:

# 智能体架构指南

## 工作流路由
- 为多节点智能体工作流设计路由逻辑
  - 分类节点 → 路由到专家节点(支持、销售、技术)
  - 复杂性分析 → 路由到适当的模型层级(GPT-4o vs GPT-3.5)
  - 工具选择节点 → 根据用户意图路由到工具执行节点
- 通过 Arcade 网关路由工具执行:集中认证、速率限制、工具发现
- 提示词版本的 A/B 路由:10% 到 v2.0,90% 到 v1.5,监控质量

## 速率限制与节流
- 每用户令牌预算:10K 令牌/天(免费),100K(付费)
- 队列管理:最大 100 个并发请求,溢出到 SQS...
..
..

为什么这个框架能扩展

  • 对于一个关键智能体: 选择你的模型(第 2 层),编写清晰的代码(第 1 层),用 Cursor 迭代(第 3 层),优化提示词(第 4 层),集成生态系统 API(第 5 层),遵循架构模式(第 6 层)。
  • 对于一千个智能体: 相同的模型,相同的架构模式,相同的生态系统 API,但每个智能体都有自己的提示词包。Cursor 生成样板代码。你的语言提供结构。生态系统处理难题。

美妙之处何在?各层协同工作。Cursor 生成代码是因为模式简单。提示词是独立版本控制的。集成使用 REST API。架构无需抽象即可实现重用。

不需要编排框架。这就是框架。

引擎与 SDK 的问题

让我澄清一下:我并不是说所有框架都应该消失。我对 LangChain、LangGraph、Mastra、CrewAI、Autogen 等团队所构建的东西怀有极大的敬意。但我们需要理解一个在急于将所有东西移植到 Java 的过程中被忽视的关键区别。

不要混淆引擎SDK

我的意思是:我迫不及待地想用 Java 开发完整的智能体。我热爱 Java。但我不想仅仅因为我想用 Java 开发智能体就要一个 Java 引擎

考虑这些例子:

  • LangChain4J? 作为连接更广泛的 LangChain 生态系统的 SDK,这是一个很好的开始。你用 Java 编写,但你正在利用一个经过验证的引擎。
  • 带有 Java SDK 的 Crew AI? 完美。在 Python 中掌握编排模式,然后给我一个 Java 接口来使用它们。
  • 支持多语言的 Mastra? 正是正确的方向。构建一次引擎,为每种语言提供 SDK。
  • 为使用 Go 构建的 Not7 添加 Java SDK 或任何语言 SDK?

这里的模式是?用你喜欢的语言开发,而无需用该语言重建整个引擎。

编排层正在变薄

这就是为什么我认为即使是 SDK 方法也可能是暂时的,或者至少变得非常精简的原因:

  • 一方面: 模型正变得 dramatically 更好。GPT-5、Claude 4.5、Gemini 2.5 Pro、Grok 的推理能力使得复杂的编排模式过时了。它们可以用简单的提示词处理多步骤工作流,而这在六个月前需要复杂的链。
  • 另一方面: 真正的工程问题正在由专业平台解决。以 Arcade 为例:工具发现、认证、大规模执行、处理速率限制、管理工具版本。这才是艰难的工程工作所在。工具管理不再是编排问题;它是在平台层解决的基础设施问题。
  • 在中间: 编排框架正被挤压得越来越薄。

当你的模型能够推理工作流,并且平台处理复杂的工程问题(工具、内存、上下文)时,编排还剩下什么?

答案是:非常少。这就是为什么工程重点需要从编排转向更广泛的智能体开发挑战——提示词管理、生态系统集成、工具决策可审计性、成本优化。真正的问题已不在编排之中。

新现实:AI 原生框架

代码坏味道不仅仅是我们构建了太多框架。而是我们正在为一个不复存在的世界构建框架。以下是 2025 年构建框架实际意味着什么:

方面 过去的框架思维模式 (2005-2024) 下一代框架思维模式 (2025+)
定义 需要导入的代码库 跨越6个层级的完整环境
业务逻辑 位于代码抽象中 位于版本化提示词与指南中
关键构件 JAR 文件、软件包 提示词、上下文、API 知识
可重用性 代码继承与组合 架构模式与蓝图
开发工具 用于编写代码的 IDE 用于生成代码的 AI 工具(如 Cursor)
生态系统 自包含、单体式 集成专业化平台
样板代码 由框架抽象处理 由 AI 在几秒内生成
你导入/使用什么 Spring、Camel、框架 JAR 包 无需导入——你只需组合这些层级
  1. 接受 AI 驱动的开发现实 每个构建智能体的开发人员都将使用 Cursor、Copilot 或类似工具。这不是趋势——这是新的基线。设计你的框架以与 AI 代码生成无缝协作,而不是背道而驰。如果 Cursor 无法理解你的模式,那你的模式就是错的。
  2. 你的框架是纯文本英语,而不仅仅是代码 你的框架最关键部分将是精心设计的提示词、清晰的指南和结构化的上下文——而不是聪明的抽象。这些是你的版本化构件。这些决定了智能体行为。像对待代码一样严格对待它们。
  3. 当你需要 SDK 时,不要重新发明引擎 是的,Java SDK 至关重要。但你不需要仅仅为了用 Java 编写智能体就重建整个编排引擎。生态系统已经有平台在解决难题:内存(Zep, Mem0)、工具(MCPs, Arcade)、向量(Weaviate, Pinecone, Qdrant)、可观测性等。集成,不要重建。
  4. 框架仍然至关重要——但不是为了编排 如果你正在解决真正的问题——提示词版本控制、决策可审计性、生态系统集成模式、成本优化——那就构建这些。但编排?生态系统已经向前发展了。内存、工具、上下文、可观测性正由专业平台解决。将你的创新重点放在其他地方。
  5. 相信你的语言 如果你觉得你选择的语言中缺少一个框架,请退后一步。现代语言——Java、Python、TypeScript、Go——非常强大。凭借它们的最新特性加上 AI 代码生成工具,你可以用干净、简单的代码构建复杂的智能体。你的语言不是限制——试图用不必要的抽象包装它才是。

未来的框架不是你导入的代码库。它是对六个相互依赖层的掌握:你的语言、你的模型、你的开发工具、你的提示词、你的生态系统集成和你的架构。

也许我们不需要另一个智能体框架。也许我们所需要的只是一个智能体,一个能用你选择的语言创建智能体的智能体。一个开源的就更好了。


【注】本文译自:Java’s Agentic Framework Boom is a Code Smell

Java 运行时安全:输入验证、沙箱机制、安全反序列化

你的 Java 应用程序刚刚被攻破了。攻击者发送了一个精心构造的 JSON 载荷,你的反序列化代码"尽职尽责"地执行了它,现在他们正在下载你的客户数据库。这并非假设场景——它曾在 Equifax、Apache 以及无数其他公司真实发生过。

运行时安全与防火墙或身份验证无关。它关注的是不受信任的数据进入你的应用程序之后会发生什么。攻击者能否诱使你的代码执行你从未打算做的事情?答案通常是"可以",除非你刻意提高了攻击难度。

Java 为你提供了自卫的工具。大多数开发者忽略了它们,因为这些工具看起来偏执或过于复杂。然后生产环境就遭到了入侵,突然间那些"偏执"的措施就显得相当合理了。

为何运行时安全被忽视

你专注于功能。安全评审即使有,也往往在后期进行。代码在测试中能工作,于是就发布了。然后有人发现你的公共 API 未经验证就接受了用户输入,或者发现你正在反序列化不受信任的数据,或者意识到你的插件系统以完全权限运行第三方代码。

问题在于,大多数漏洞在你编写它们时看起来并不危险。一个简单的 ObjectInputStream.readObject() 调用看似无害,直到有人解释它如何实现远程代码执行。跳过输入验证节省了五分钟的开发时间,却在六个月后让你付出安全事件的代价。

安全不吸引人,它不会在演示中体现,而且在出事之前很难量化。但运行时安全问题是在生产系统中最常被利用的漏洞之一。让我们来谈谈三大要点:输入验证、沙箱机制和反序列化。

输入验证:万物皆不可信

每一个从外部进入你应用程序的数据都是潜在的攻击向量。用户输入、API 请求、文件上传、来自共享数据库的数据库记录、配置文件——所有这些都是。

规则很简单:在边界验证一切。不要等到业务逻辑中再验证。不要假设前端已经验证过了。在数据进入你的系统时进行验证。

糟糕的验证示例

以下是我在生产环境中经常看到的代码:

@PostMapping("/users")
public ResponseEntity<User> createUser(@RequestBody UserRequest request) {
    User user = new User();
    user.setEmail(request.getEmail());
    user.setAge(request.getAge());
    user.setRole(request.getRole());

    userRepository.save(user);
    return ResponseEntity.ok(user);
}

看起来没问题,对吧?这是一场灾难。攻击者可以发送:

  • 邮箱:"admin@evil.com<script>alert('xss')</script>"
  • 年龄:-1999999
  • 角色:"ADMIN"(提升自己的权限)

你的应用程序会欣然接受所有这一切,因为你信任了输入。

正确的输入验证

以下是正确的做法:

public class UserRequest {
    @NotNull(message = "Email is required")
    @Email(message = "Must be a valid email")
    @Size(max = 255, message = "Email too long")
    private String email;

    @NotNull(message = "Age is required")
    @Min(value = 0, message = "Age must be positive")
    @Max(value = 150, message = "Age unrealistic")
    private Integer age;

    @NotNull(message = "Role is required")
    @Pattern(regexp = "^(USER|MODERATOR)$", message = "Invalid role")
    private String role;
}

@PostMapping("/users")
public ResponseEntity<User> createUser(@Valid @RequestBody UserRequest request) {
    // 如果验证失败,Spring 自动返回 400 Bad Request

    User user = new User();
    user.setEmail(sanitizeEmail(request.getEmail()));
    user.setAge(request.getAge());
    user.setRole(request.getRole());

    userRepository.save(user);
    return ResponseEntity.ok(user);
}

private String sanitizeEmail(String email) {
    // 额外防护层:清除任何 HTML/脚本标签以防万一
    return email.replaceAll("<[^>]*>", "");
}

注意这种分层方法。Bean 验证注解捕获明显的问题。然后即使在验证之后,你还要对输入进行清理。这种深度防御方法意味着即使一层失效,你仍然受到保护。

验证复杂对象

真实的应用程序处理的是嵌套对象、列表和复杂结构:

public class OrderRequest {
    @NotNull
    @Valid  // 这很关键 - 验证嵌套对象
    private Customer customer;

    @NotEmpty(message = "Order must contain items")
    @Size(max = 100, message = "Too many items")
    @Valid
    private List<OrderItem> items;

    @NotNull
    @DecimalMin(value = "0.01", message = "Total must be positive")
    private BigDecimal total;
}

public class OrderItem {
    @NotBlank
    @Size(max = 50)
    private String productId;

    @Min(1)
    @Max(999)
    private Integer quantity;

    @DecimalMin("0.01")
    private BigDecimal price;
}

嵌套对象上的 @Valid 注解很容易被忘记,但至关重要。没有它,嵌套对象会完全绕过验证。

用于业务规则的自定义验证器

有时 Bean 验证还不够。你需要业务逻辑:

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = SafeFilenameValidator.class)
public @interface SafeFilename {
    String message() default "Unsafe filename";
    Class<?>[] groups() default {};
    Class<? extends Payload>[] payload() default {};
}

public class SafeFilenameValidator implements ConstraintValidator<SafeFilename, String> {
    private static final Pattern DANGEROUS_PATTERNS = Pattern.compile(
        "(\\.\\./|\\.\\.\\\\|[<>:\"|?*]|^\\.|\\.$)"
    );

    @Override
    public boolean isValid(String filename, ConstraintValidatorContext context) {
        if (filename == null) {
            return true; // 单独使用 @NotNull
        }

        // 防止路径遍历攻击
        if (DANGEROUS_PATTERNS.matcher(filename).find()) {
            return false;
        }

        // 白名单方法:只允许安全字符
        if (!filename.matches("^[a-zA-Z0-9_.-]+$")) {
            return false;
        }

        return true;
    }
}

现在你可以在任何文件上传参数上使用 @SafeFilename。这可以捕获攻击者试图上传到 ../../../etc/passwd 的路径遍历攻击。

白名单与黑名单的陷阱

在验证输入时,开发者常常试图阻止"坏"字符。这是黑名单方法,而且几乎总是错误的:

// 不好:黑名单方法
public boolean isValidUsername(String username) {
    return !username.contains("<") && 
           !username.contains(">") && 
           !username.contains("'") &&
           !username.contains("\"") &&
           !username.contains("script");
           // 你永远无法列出所有危险模式
}

攻击者很有创造力。他们会使用 Unicode 字符、URL 编码、双重编码以及你没想到的技巧来绕过你的黑名单。

相反,应该对你允许的内容使用白名单:

// 好:白名单方法
public boolean isValidUsername(String username) {
    return username.matches("^[a-zA-Z0-9_-]{3,20}$");
    // 只允许字母数字、下划线、连字符,3-20个字符
}

如果不在明确允许的范围内,就拒绝。这样安全得多。

沙箱机制:限制损害

输入验证阻止坏数据进入。沙箱机制则限制代码即使攻击成功也能做的事情。如果你的应用程序运行不受信任的代码——插件、用户脚本、动态类加载——沙箱机制至关重要。

Java 安全管理器(传统方法)

多年来,Java 使用安全管理器进行沙箱处理。它在 Java 17 中已被弃用并将被移除,但理解它有助于掌握概念:

// 旧方法(已弃用)
System.setSecurityManager(new SecurityManager());

// 在策略文件中定义权限
grant codeBase "file:/path/to/untrusted/*" {
    permission java.io.FilePermission "/tmp/*", "read,write";
    permission java.net.SocketPermission "example.com:80", "connect";
    // 权限非常有限
};

安全管理器可以限制代码能做什么:文件访问、网络访问、系统属性访问等。它功能强大但复杂,并且有性能开销。

现代沙箱方法

没有安全管理器,你需要替代策略。

在独立进程中隔离。 最可靠的沙箱是进程边界:

public class PluginExecutor {
    public String executePlugin(String pluginPath, String input) throws Exception {
        ProcessBuilder pb = new ProcessBuilder(
            "java",
            "-Xmx256m",  // 限制内存
            "-classpath", pluginPath,
            "com.example.PluginRunner",
            input
        );

        // 限制进程能做的事情
        pb.environment().clear();  // 无环境变量
        pb.directory(new File("/tmp/sandbox"));  // 受限目录

        Process process = pb.start();

        // 超时保护
        if (!process.waitFor(10, TimeUnit.SECONDS)) {
            process.destroyForcibly();
            throw new TimeoutException("Plugin execution timeout");
        }

        return new String(process.getInputStream().readAllBytes());
    }
}

插件在它自己的、资源受限的进程中运行。如果它崩溃或行为不端,你的主应用程序不会受到影响。你可以使用容器或虚拟机实现更强的隔离。

使用带有限制的自定义 ClassLoader:

public class SandboxedClassLoader extends ClassLoader {
    private final Set<String> allowedPackages;

    public SandboxedClassLoader(Set<String> allowedPackages) {
        super(SandboxedClassLoader.class.getClassLoader());
        this.allowedPackages = allowedPackages;
    }

    @Override
    protected Class<?> loadClass(String name, boolean resolve) 
            throws ClassNotFoundException {
        // 阻止危险的类
        if (name.startsWith("java.lang.Runtime") ||
            name.startsWith("java.lang.ProcessBuilder") ||
            name.startsWith("sun.misc.Unsafe")) {
            throw new ClassNotFoundException("Access denied: " + name);
        }

        // 仅白名单特定的包
        boolean allowed = allowedPackages.stream()
            .anyMatch(name::startsWith);

        if (!allowed) {
            throw new ClassNotFoundException("Package not whitelisted: " + name);
        }

        return super.loadClass(name, resolve);
    }
}

// 用法
Set<String> allowed = Set.of("com.example.safe.", "org.apache.commons.lang3.");
ClassLoader sandboxed = new SandboxedClassLoader(allowed);
Class<?> pluginClass = sandboxed.loadClass("com.example.safe.UserPlugin");

这可以防止插件加载危险的类。它并非无懈可击——坚定的攻击者可能会找到基于反射的变通方法——但它显著提高了攻击门槛。

限制资源消耗:

public class ResourceLimitedExecutor {
    private final ExecutorService executor = Executors.newFixedThreadPool(4);

    public <T> T executeWithLimits(Callable<T> task, 
                                   long timeoutSeconds,
                                   long maxMemoryMB) throws Exception {
        // 通过超时限制 CPU/时间
        Future<T> future = executor.submit(task);

        try {
            return future.get(timeoutSeconds, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            future.cancel(true);
            throw new RuntimeException("Task exceeded time limit");
        }

        // 内存限制更难——最好在 JVM 级别使用 -Xmx 处理
        // 或者使用如前所示的进程隔离
    }
}

如果你强制执行超时,即使是不受信任的代码也无法消耗无限的 CPU。内存更棘手——进程隔离或容器限制比尝试在 JVM 内强制执行效果更好。

真实世界的沙箱示例

假设你正在构建一个运行用户提交的数据转换脚本的系统:

public class ScriptSandbox {
    private static final long MAX_EXECUTION_TIME_MS = 5000;
    private static final String SANDBOX_DIR = "/tmp/script-sandbox";

    public String executeScript(String script, String data) {
        // 1. 验证脚本没有明显的恶意
        if (containsDangerousPatterns(script)) {
            throw new SecurityException("Script contains forbidden patterns");
        }

        // 2. 将脚本写入隔离目录
        Path scriptPath = Paths.get(SANDBOX_DIR, UUID.randomUUID().toString() + ".js");
        Files.writeString(scriptPath, script);

        try {
            // 3. 在具有资源限制的独立进程中执行
            ProcessBuilder pb = new ProcessBuilder(
                "timeout", String.valueOf(MAX_EXECUTION_TIME_MS / 1000),
                "node",
                "--max-old-space-size=100",  // 100MB 内存限制
                scriptPath.toString()
            );

            pb.directory(new File(SANDBOX_DIR));
            pb.redirectErrorStream(true);

            Process process = pb.start();

            // 4. 通过 stdin 传递数据,从 stdout 读取结果
            try (OutputStream os = process.getOutputStream()) {
                os.write(data.getBytes());
            }

            String result = new String(process.getInputStream().readAllBytes());

            int exitCode = process.waitFor();
            if (exitCode != 0) {
                throw new RuntimeException("Script failed with exit code: " + exitCode);
            }

            return result;

        } finally {
            // 5. 清理
            Files.deleteIfExists(scriptPath);
        }
    }

    private boolean containsDangerousPatterns(String script) {
        // 检查明显的攻击
        return script.contains("require('child_process')") ||
               script.contains("eval(") ||
               script.contains("Function(") ||
               script.matches(".*\\brequire\\s*\\(.*");
    }
}

这个例子结合了多种防御措施:静态分析、进程隔离、资源限制和清理。没有单一的防御是完美的,但层层设防使得利用难度大大增加。

安全反序列化:最大的隐患

Java 反序列化漏洞是历史上一些最严重安全漏洞的罪魁祸首。问题在于其根本性质:反序列化可以在对象构造期间执行任意代码。

为何反序列化是危险的

当你反序列化一个对象时,Java 会调用构造函数、readObject 方法和其他代码。控制序列化数据的攻击者可以精心构造对象来执行任意命令:

// 危险代码 - 请勿在生产环境中使用
public void loadUserSettings(byte[] data) {
    try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) {
        UserSettings settings = (UserSettings) ois.readObject();
        applySettings(settings);
    }
}

这看起来无害。但攻击者可以发送包含你类路径上(如 Apache Commons Collections)库中对象的序列化数据,这些对象在反序列化期间会执行系统命令。他们甚至根本不需要接触你的 UserSettings 类。

臭名昭著的"工具链"就是利用这一点。通过以特定方式链式组合标准库类,攻击者实现了远程代码执行。像 ysoserial 这样的工具可以自动创建这些载荷。

切勿反序列化不受信任的数据

最安全的方法很简单:不要对来自不受信任来源的数据使用 Java 序列化。绝不。

改用 JSON、Protocol Buffers 或其他仅包含数据的格式:

// 安全:使用 JSON
public UserSettings loadUserSettings(String json) {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.readValue(json, UserSettings.class);
}

像 Jackson 这样的 JSON 解析器在解析期间不会执行任意代码。它们只是填充字段。攻击面急剧缩小。

当你必须反序列化时

有时你无法摆脱 Java 序列化——遗留协议、缓存库或分布式计算框架。如果你绝对必须反序列化不受信任的数据,请使用防御措施。

使用 ObjectInputFilter (Java 9+):

public Object safeDeserialize(byte[] data) throws Exception {
    try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) {
        // 白名单允许的类
        ObjectInputFilter filter = ObjectInputFilter.Config.createFilter(
            "com.example.UserSettings;" +
            "com.example.UserPreference;" +
            "java.util.ArrayList;" +
            "java.lang.String;" +
            "!*"  // 拒绝其他所有类
        );

        ois.setObjectInputFilter(filter);

        return ois.readObject();
    }
}

该过滤器明确地将安全的类加入白名单,并拒绝其他所有类。这阻止了依赖于意外可用类的工具链。

验证对象图:

public class SafeObjectInputStream extends ObjectInputStream {
    private final Set<String> allowedClasses;
    private int maxDepth = 10;
    private int currentDepth = 0;

    public SafeObjectInputStream(InputStream in, Set<String> allowedClasses) 
            throws IOException {
        super(in);
        this.allowedClasses = allowedClasses;
    }

    @Override
    protected Class<?> resolveClass(ObjectStreamClass desc) 
            throws IOException, ClassNotFoundException {
        // 检查深度以防止深度嵌套的对象
        if (++currentDepth > maxDepth) {
            throw new InvalidClassException("Max depth exceeded");
        }

        String className = desc.getName();

        // 白名单检查
        if (!allowedClasses.contains(className)) {
            throw new InvalidClassException("Class not allowed: " + className);
        }

        return super.resolveClass(desc);
    }

    @Override
    protected ObjectStreamClass readClassDescriptor() 
            throws IOException, ClassNotFoundException {
        ObjectStreamClass desc = super.readClassDescriptor();
        currentDepth--;
        return desc;
    }
}

这个自定义实现通过跟踪反序列化深度和执行严格的白名单来增加另一层防御。

对序列化数据进行签名:

public class SignedSerializer {
    private final SecretKey signingKey;

    public byte[] serialize(Object obj) throws Exception {
        // 序列化对象
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(obj);
        }
        byte[] data = baos.toByteArray();

        // 创建签名
        Mac mac = Mac.getInstance("HmacSHA256");
        mac.init(signingKey);
        byte[] signature = mac.doFinal(data);

        // 合并签名和数据
        ByteBuffer buffer = ByteBuffer.allocate(signature.length + data.length);
        buffer.put(signature);
        buffer.put(data);

        return buffer.array();
    }

    public Object deserialize(byte[] signedData) throws Exception {
        ByteBuffer buffer = ByteBuffer.wrap(signedData);

        // 提取签名和数据
        byte[] signature = new byte[32];  // HmacSHA256 产生 32 字节
        buffer.get(signature);

        byte[] data = new byte[buffer.remaining()];
        buffer.get(data);

        // 验证签名
        Mac mac = Mac.getInstance("HmacSHA256");
        mac.init(signingKey);
        byte[] expectedSignature = mac.doFinal(data);

        if (!MessageDigest.isEqual(signature, expectedSignature)) {
            throw new SecurityException("Signature verification failed");
        }

        // 签名有效则反序列化
        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) {
            return ois.readObject();
        }
    }
}

签名可以防止攻击者篡改序列化数据。没有签名密钥,他们无法注入恶意对象。这在数据可能暴露但不受攻击者直接控制时(如客户端存储或缓存系统)有效。

替代序列化库

有几个库提供了更安全的序列化:

Kryo 提供更好的性能,并且可以配置为使用白名单:

Kryo kryo = new Kryo();
kryo.setRegistrationRequired(true);  // 拒绝未注册的类
kryo.register(UserSettings.class);
kryo.register(ArrayList.class);

// 序列化
Output output = new Output(new FileOutputStream("file.bin"));
kryo.writeObject(output, userSettings);
output.close();

// 反序列化 - 只允许注册的类
Input input = new Input(new FileInputStream("file.bin"));
UserSettings settings = kryo.readObject(input, UserSettings.class);
input.close();

Protocol BuffersApache Avro 使用基于模式的序列化。它们设置起来比较繁琐,但完全避免了代码执行风险:

message UserSettings {
  string theme = 1;
  int32 fontSize = 2;
  repeated string favorites = 3;
}

这些格式只反序列化数据,从不反序列化代码。通过 protobuf 反序列化实现代码执行是不可能的。

真实世界安全事件:一个警示故事

我曾咨询过的一家公司有一个管理门户,用于接受文件上传以进行批处理。代码看起来像这样:

@PostMapping("/admin/import")
public String importData(@RequestParam("file") MultipartFile file) {
    try {
        byte[] data = file.getBytes();
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
        DataImport importData = (DataImport) ois.readObject();

        processImport(importData);
        return "Import successful";
    } catch (Exception e) {
        return "Import failed: " + e.getMessage();
    }
}

开发人员认为这是安全的,因为该端点需要管理员身份验证。他们遗漏的是:

  • 攻击者通过钓鱼攻击攻陷了一个低级别管理员账户
  • 攻击者使用 ysoserial 上传了一个恶意的序列化载荷
  • 在反序列化期间,载荷执行了系统命令
  • 攻击者获得了应用程序服务器的 shell 访问权限
  • 从那里,他们横向移动到数据库并窃取了客户数据

修复需要多次更改:

@PostMapping("/admin/import")
public String importData(@RequestParam("file") MultipartFile file) {
    // 验证文件类型
    if (!file.getContentType().equals("application/json")) {
        return "Only JSON imports allowed";
    }

    // 验证文件大小
    if (file.getSize() > 10 * 1024 * 1024) {  // 10MB 限制
        return "File too large";
    }

    try {
        // 使用 JSON 代替 Java 序列化
        ObjectMapper mapper = new ObjectMapper();
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);

        DataImport importData = mapper.readValue(
            file.getInputStream(), 
            DataImport.class
        );

        // 验证导入的数据
        validateImportData(importData);

        // 在受限上下文中处理
        processImportSafely(importData);

        return "Import successful";
    } catch (Exception e) {
        log.error("Import failed", e);
        return "Import failed - check logs";
    }
}

这次事件使他们付出了事件响应、法律费用和声誉损失方面的数百万代价。全都是因为一个不安全的反序列化调用。

实用安全检查清单

以下是你在每个 Java 应用程序中都应该做的事情:

输入验证:

  • 在所有 DTO 上使用 Bean 验证注解
  • 使用 @Valid 验证嵌套对象
  • 白名单允许的模式,不要黑名单危险模式
  • 即使在验证之后也要清理数据
  • 验证文件上传:类型、大小、内容
  • 绝不只依赖客户端验证

沙箱机制:

  • 在独立进程或容器中运行不受信任的代码
  • 使用自定义 ClassLoader 来限制类访问
  • 强制执行资源限制:内存、CPU 时间、磁盘空间
  • 清理临时文件和资源
  • 记录所有沙箱违规行为

反序列化:

  • 优先使用 JSON/Protocol Buffers 而非 Java 序列化
  • 没有过滤器的情况下切勿反序列化不受信任的数据
  • 使用 ObjectInputFilter 将类加入白名单
  • 可能时对序列化数据进行签名
  • 定期审计类路径依赖项以查找已知的工具类
  • 考虑使用需要注册模式的 Kryo

通用实践:

  • 保持依赖项更新(漏洞利用针对特定版本)
  • 使用静态分析工具捕获安全问题
  • 记录安全相关事件以进行监控
  • 使用恶意输入进行测试,而不仅仅是正常路径
  • 假设一切都可以被攻击

有用的工具

SpotBugsFindSecBugs 插件可在构建时捕获常见安全问题:

<plugin>
    <groupId>com.github.spotbugs</groupId>
    <artifactId>spotbugs-maven-plugin</artifactId>
    <configuration>
        <plugins>
            <plugin>
                <groupId>com.h3xstream.findsecbugs</groupId>
                <artifactId>findsecbugs-plugin</artifactId>
                <version>1.12.0</version>
            </plugin>
        </plugins>
    </configuration>
</plugin>

OWASP Dependency-Check 识别易受攻击的依赖项:

<plugin>
    <groupId>org.owasp</groupId>
    <artifactId>dependency-check-maven</artifactId>
    <executions>
        <execution>
            <goals>
                <goal>check</goal>
            </goals>
        </execution>
    </executions>
</plugin>

SnykDependabot 在漏洞披露时自动更新依赖项。

思维模式的转变

安全不是你最后添加的功能。它是你从一开始就为之设计的约束。每次你接受外部输入时,问问自己:"攻击者利用这个能做的最坏的事情是什么?" 每次你反序列化数据时,问问:"我是否完全信任这个数据的来源?"

在代码审查中偏执是一种美德。当某人的 PR 包含反序列化或动态类加载时,积极地提出质疑。当缺少输入验证时,把它打回去。在代码审查中显得迂腐,也比在漏洞发生后显得疏忽要好。

运行时安全是关于减少信任。不要信任用户输入。不要信任插件。不要信任序列化数据。不要信任你的验证是完美的。层层设防,这样当一层失效时——它会的——其他层可以捕获攻击。

好消息是,一旦你内化了这些模式,它们就会成为第二天性。输入验证变得自动进行。你会本能地避免 Java 序列化。你会带着隔离的思想进行设计。安全成为你编码风格的一部分,而不是事后附加的东西。

有用资源


【注】本文译自:
Runtime Security in Java: Input Validation, Sandboxing, Safe Deserialization

Java 21 虚拟线程 vs 缓存线程池与固定线程池

探索 Java 并发如何从 Java 8 的增强发展到 Java 21 的虚拟线程,从而实现轻量级、可扩展且高效的多线程处理。

引言

并发编程仍然是构建可扩展、响应式 Java 应用程序的关键部分。多年来,Java 持续增强了其多线程编程能力。本文回顾了从 Java 8 到 Java 21 并发的演进,重点介绍了重要的改进以及 Java 21 中引入的具有重大影响的虚拟线程。

从 Java 8 开始,并发 API 出现了显著的增强,例如原子变量、并发映射以及集成 lambda 表达式以实现更具表现力的并行编程。

Java 8 引入的关键改进包括:

  • 线程与执行器
  • 同步与锁
  • 原子变量与 ConcurrentMap

Java 21 于 2023 年底发布,带来了虚拟线程这一重大演进,从根本上改变了 Java 应用程序处理大量并发任务的方式。虚拟线程为服务器应用程序提供了更高的可扩展性,同时保持了熟悉的"每个请求一个线程"的编程模型。

或许,Java 21 中最重要的特性就是虚拟线程。
在 Java 21 中,Java 的基本并发模型保持不变,Stream API 仍然是并行处理大型数据集的首选方式。
随着虚拟线程的引入,并发 API 现在能提供更好的性能。在当今的微服务和可扩展服务器应用领域,线程数量必须增长以满足需求。虚拟线程的主要目标是使服务器应用程序能够实现高可扩展性,同时仍使用简单的"每个请求一个线程"模型。

虚拟线程

在 Java 21 之前,JDK 的线程实现使用的是操作系统线程的薄包装器。然而,操作系统线程代价高昂:

  • 如果每个请求在其整个持续时间内消耗一个操作系统线程,线程数量很快就会成为可扩展性的瓶颈。
  • 即使使用线程池,吞吐量仍然受到限制,因为实际线程数量是有上限的。

虚拟线程的目标是打破 Java 线程与操作系统线程之间的 1:1 关系。
虚拟线程应用了类似于虚拟内存的概念。正如虚拟内存将大的地址空间映射到较小的物理内存一样,虚拟线程允许运行时通过将它们映射到少量操作系统线程来制造拥有许多线程的假象。

平台线程是操作系统线程的薄包装器。
而虚拟线程并不绑定到任何特定的操作系统线程。虚拟线程可以执行平台线程可以运行的任何代码。这是一个主要优势——现有的 Java 代码通常无需修改或仅需少量修改即可在虚拟线程上运行。虚拟线程由平台线程承载,这些平台线程仍然由操作系统调度。

例如,您可以像这样创建一个使用虚拟线程的执行器:

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

对比示例

虚拟线程仅在主动执行 CPU 密集型任务时才消耗操作系统线程。虚拟线程在其生命周期内可以在不同的载体线程上挂载或卸载。

通常,当虚拟线程遇到阻塞操作时,它会自行卸载。一旦该阻塞任务完成,虚拟线程通过挂载到任何可用的载体线程上来恢复执行。这种挂载和卸载过程频繁且透明地发生——不会阻塞操作系统线程。

示例 — 源代码

Example01CachedThreadPool.java

在此示例中,使用缓存线程池创建了一个执行器:

var executor = Executors.newCachedThreadPool()
package threads;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example01CachedThreadPool {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newCachedThreadPool()' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (var executor = Executors.newCachedThreadPool()) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
                executor.submit(() -> {
                    // 模拟阻塞调用
                    Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                    return i;
                });
            });

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example01CachedThreadPoolTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
        example01CachedThreadPool.executeTasks(1_000_000);
    }

}

我 PC 上的测试结果:

Example02FixedThreadPool.java

使用固定线程池创建执行器:

var executor = Executors.newFixedThreadPool(500)
package threads;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example02FixedThreadPool {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newFixedThreadPool(500)' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (var executor = Executors.newFixedThreadPool(500)) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
               executor.submit(() -> {
                   // 模拟阻塞调用
                  Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                  return i;
               });
            });

        }   catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example02FixedThreadPoolTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
        example02FixedThreadPool.executeTasks(1_000_000);
    }

}

我 PC 上的测试结果:

Example03VirtualThread.java

使用虚拟线程每任务执行器创建执行器:

var executor = Executors.newVirtualThreadPerTaskExecutor()
package threads;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

public class Example03VirtualThread {

    public void executeTasks(final int NUMBER_OF_TASKS) {

        final int BLOCKING_CALL = 1;
        System.out.println("Number of tasks which executed using 'newVirtualThreadPerTaskExecutor()' " + NUMBER_OF_TASKS + " tasks each.");

        long startTime = System.currentTimeMillis();

        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

            IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
               executor.submit(() -> {
                   // 模拟阻塞调用
                  Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
                  return i;
               });
            });

        }   catch (Exception e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
    }

}
package threads;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

/**
 *
 * @author Milan Karajovic <milan.karajovic.rs@gmail.com>
 *
 */

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example03VirtualThreadTest {

    @Test
    @Order(1)
    public void test_1000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(1000);
    }

    @Test
    @Order(2)
    public void test_10_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(10_000);
    }

    @Test
    @Order(3)
    public void test_100_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(100_000);
    }

    @Test
    @Order(4)
    public void test_1_000_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(1_000_000);
    }

    @Test
    @Order(5)
    public void test_2_000_000_tasks() {
        Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
        example03VirtualThread.executeTasks(2_000_000);
    }

}

我 PC 上的测试结果:

结论

您可以清楚地看到用于处理所有 NUMBER_OF_TASKS 的不同执行器实现之间的执行时间差异。值得尝试不同的 NUMBER_OF_TASKS 值以观察性能变化。

虚拟线程的优势在处理大量任务时变得尤其明显。当 NUMBER_OF_TASKS 设置为较高的数值时——例如 1,000,000——性能差距是显著的。如下表所示,虚拟线程在处理大量任务时效率要高得多:

我确信,在澄清这一点之后,如果您的应用程序使用并发 API 处理大量任务,您会认真考虑迁移到 Java 21 并利用虚拟线程。在许多情况下,这种转变可以显著提高应用程序的性能和可扩展性。

源代码:GitHub Repository – Comparing Threads in Java 21


【注】本文译自:Java 21 Virtual Threads vs Cached and Fixed Threads

Java数据库应用原型

一个使用 Spring Boot 和容器进行测试、Keycloak 提供安全、PostgreSQL 提供数据持久化的,带有 REST 和安全功能的 Java 数据库应用原型。

在工作中开发时,我多次需要一个简单应用的模板,以便基于此模板开始为手头的项目添加特定代码。
在本文中,我将创建一个简单的 Java 应用程序,它连接到数据库,暴露一些 REST 端点,并使用基于角色的访问来保护这些端点。
目的是拥有一个最小化且功能齐全的应用程序,然后可以针对特定任务进行定制。
对于数据库,我们将使用 PostgreSQL;对于安全,我们将使用 Keycloak,两者都通过容器部署。在开发过程中,我使用 podman 来测试容器是否正确创建(作为 docker 的替代品——它们在大多数情况下可以互换)作为一次学习体验。
应用程序本身是使用 Spring Boot 框架开发的,并使用 Flyway 进行数据库版本管理。
所有这些技术都是 Java EE 领域业界标准,在项目中被使用的可能性很高。

我们构建原型的核心需求是一个图书馆应用程序,它暴露 REST 端点,允许创建作者、书籍以及它们之间的关系。这将使我们能够实现一个多对多关系,然后可以将其扩展用于任何可以想象的目的。
完整可用的应用程序可以在 https://github.com/ghalldev/db_proto 找到。
本文中的代码片段取自该代码库

在创建容器之前,请确保使用您偏好的值定义以下环境变量(教程中故意省略了它们,以避免传播多个用户使用的默认值):

DOCKER_POSTGRES_PASSWORD
DOCKER_KEYCLOAK_ADMIN_PASSWORD
DOCKER_GH_USER1_PASSWORD

配置 PostgreSQL:

docker container create --name gh_postgres --env POSTGRES_PASSWORD=$DOCKER_POSTGRES_PASSWORD --env POSTGRES_USER=gh_pguser --env POSTGRES_INITDB_ARGS=--auth=scram-sha-256 --publish 5432:5432 postgres:17.5-alpine3.22
docker container start gh_postgres

配置 Keycloak:
首先是容器的创建并启动:

docker container create --name gh_keycloak --env DOCKER_GH_USER1_PASSWORD=$DOCKER_GH_USER1_PASSWORD --env KC_BOOTSTRAP_ADMIN_USERNAME=gh_admin --env KC_BOOTSTRAP_ADMIN_PASSWORD=$DOCKER_KEYCLOAK_ADMIN_PASSWORD --publish 8080:8080 --publish 8443:8443 --publish 9000:9000 keycloak/keycloak:26.3 start-dev
docker container start gh_keycloak

在容器启动并运行后,我们可以继续创建领域、用户和角色(这些命令必须在正在运行的容器内部执行):

cd $HOME/bin
./kcadm.sh config credentials --server http://localhost:8080 --realm master --user gh_admin --password $KC_BOOTSTRAP_ADMIN_PASSWORD
./kcadm.sh create realms -s realm=gh_realm -s enabled=true
./kcadm.sh create users -s username=gh_user1 -s email="gh_user1@email.com" -s firstName="gh_user1firstName" -s lastName="gh_user1lastName" -s emailVerified=true -s enabled=true -r gh_realm
./kcadm.sh set-password -r gh_realm --username gh_user1 --new-password $DOCKER_GH_USER1_PASSWORD
./kcadm.sh create roles -r gh_realm -s name=viewer -s 'description=Realm role to be used for read-only features'
./kcadm.sh add-roles --uusername gh_user1 --rolename viewer -r gh_realm
./kcadm.sh create roles -r gh_realm -s name=creator -s 'description=Realm role to be used for create/update features'
./kcadm.sh add-roles --uusername gh_user1 --rolename creator -r gh_realm
ID_ACCOUNT_CONSOLE=$(./kcadm.sh get clients -r gh_realm --fields id,clientId | grep -B 1 '"clientId" : "account-console"' | grep -oP '[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}')
./kcadm.sh update clients/$ID_ACCOUNT_CONSOLE -r gh_realm -s 'fullScopeAllowed=true' -s 'directAccessGrantsEnabled=true'

用户 gh_user1 在领域 gh_realm 中被创建,并拥有 viewercreator 角色。

您可能已经注意到,我们没有创建新的客户端,而是使用了 Keycloak 自带的一个默认客户端:account-console。这是为了方便起见,在实际场景中,您会创建一个特定的客户端,然后将其更新为具有 fullScopeAllowed(这会导致领域角色被添加到令牌中——默认情况下不添加)和 directAccessGrantsEnabled(允许通过 Keycloak 的 openid-connect/token 端点生成令牌,在我们的例子中使用 curl)。

创建的角色随后可以在 Java 应用程序内部使用,以根据我们约定的契约来限制对某些功能的访问——viewer 只能访问只读操作,而 creator 可以执行创建、更新和删除操作。当然,同样地,可以根据任何原因创建各种角色,只要约定的契约被明确定义并被所有人理解。
角色还可以进一步添加到组中,但本教程不包含这部分内容。

但是,在实际使用这些角色之前,我们必须告诉 Java 应用程序如何提取角色——这是必须的,因为 Keycloak 将角色添加到 JWT 的方式是其特有的,所以我们必须编写一段自定义代码,将其转换为 Spring Security 可以使用的东西:

@Bean
public JwtAuthenticationConverter jwtAuthenticationConverter() {
    // 遵循与 org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter 相同的模式
    Converter<Jwt, Collection<GrantedAuthority>> keycloakRolesConverter = new Converter<>() {
        private static final String DEFAULT_AUTHORITY_PREFIX = "ROLE_";
        //https://github.com/keycloak/keycloak/blob/main/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java#L901
        private static final String KEYCLOAK_REALM_ACCESS_CLAIM_NAME = "realm_access";
        private static final String KEYCLOAK_REALM_ACCESS_ROLES = "roles";

        @Override
        public Collection<GrantedAuthority> convert(Jwt source) {
            Collection<GrantedAuthority> grantedAuthorities = new ArrayList<>();
            Map<String, List<String>> realmAccess = source.getClaim(KEYCLOAK_REALM_ACCESS_CLAIM_NAME);
            if (realmAccess == null) {
                logger.warn("No " + KEYCLOAK_REALM_ACCESS_CLAIM_NAME + " present in the JWT");
                return grantedAuthorities;
            }
            List<String> roles = realmAccess.get(KEYCLOAK_REALM_ACCESS_ROLES);
            if (roles == null) {
                logger.warn("No " + KEYCLOAK_REALM_ACCESS_ROLES + " present in the JWT");
                return grantedAuthorities;
            }
            roles.forEach(
                    role -> grantedAuthorities.add(new SimpleGrantedAuthority(DEFAULT_AUTHORITY_PREFIX + role)));

            return grantedAuthorities;
        }

    };
    JwtAuthenticationConverter jwtAuthenticationConverter = new JwtAuthenticationConverter();
    jwtAuthenticationConverter.setJwtGrantedAuthoritiesConverter(keycloakRolesConverter);

    return jwtAuthenticationConverter;
}

AppConfiguration 类中还完成了其他重要配置,例如启用方法安全性和禁用 CSRF。

现在我们可以在 REST 控制器中使用 @org.springframework.security.access.prepost.PreAuthorize 注解来限制访问:

@PostMapping("/author")
@PreAuthorize("hasRole('creator')")
public void addAuthor(@RequestParam String name, @RequestParam String address) {
  authorService.add(new AuthorDto(name, address));
}

@GetMapping("/author")
@PreAuthorize("hasRole('viewer')")
public String getAuthors() {
  return authorService.allInfo();
}

通过这种方式,只有成功通过身份验证且拥有 hasRole 中列出的角色的用户才能调用端点,否则他们将收到 HTTP 403 Forbidden 错误。

在容器启动并配置完成后,Java 应用程序可以启动了,但在启动之前需要添加数据库密码——这可以通过环境变量完成(下面是一个 Linux shell 示例):

export SPRING_DATASOURCE_PASSWORD=$DOCKER_POSTGRES_PASSWORD

现在,如果一切正常启动并运行,我们可以使用 curl 来测试我们的应用程序(以下所有命令均为 Linux shell 命令)。

使用之前创建的用户 gh_user1 登录并提取身份验证令牌:

KEYCLOAK_ACCESS_TOKEN=$(curl -d 'client_id=account-console' -d 'username=gh_user1' -d "password=$DOCKER_GH_USER1_PASSWORD" -d 'grant_type=password' 'http://localhost:8080/realms/gh_realm/protocol/openid-connect/token' | grep -oP '"access_token":"\K[^"]*')

创建一个新作者(这将测试 creator 角色是否有效):

curl -X POST --data-raw 'name="GH_name1"&address="GH_address1"' -H "Authorization: Bearer $KEYCLOAK_ACCESS_TOKEN" 'localhost:8090/library/author'

检索库中的所有作者(这将测试 viewer 角色是否有效):

curl -X GET -H "Authorization: Bearer $KEYCLOAK_ACCESS_TOKEN" 'localhost:8090/library/author'

至此,您应该拥有了创建自己的 Java 应用程序所需的一切,可以根据需要对其进行扩展和配置。


【注】本文译自:Java Spring Boot Template With PostgreSQL, Keycloak Securit

单体架构中的事件驱动架构:Java应用程序的渐进式重构

传统观点认为事件驱动架构属于微服务架构范畴,服务通过消息代理进行异步通信。然而,事件驱动模式一些最具价值的应用恰恰发生在单体应用程序内部——在这些地方,紧密耦合已造成维护噩梦,而渐进式重构则提供了一条通往更好架构的路径,且无需分布式系统的运维复杂性。

为何在单体应用中使用事件有意义

传统的分层单体应用存在一个特定问题:直接的方法调用在组件之间创建了僵化的依赖关系。您的订单处理代码直接调用库存管理,库存管理又调用仓库系统,继而触发电子邮件通知。每个组件都了解其他几个组件,从而形成一个纠缠不清的网,更改其中一部分需要理解并测试它所触及的所有内容。

事件驱动模式引入了间接性。当下单时,订单服务发布一个"OrderPlaced"事件。其他对订单感兴趣的组件——库存、发货、通知——订阅此事件并独立响应。订单服务不知道也不关心谁在监听。即使这些组件存在于同一个代码库并共享同一个数据库,它们也变得松散耦合。

这种方法提供了立竿见影的好处,而无需将应用程序拆分为微服务。您在保持单体应用运维简单性的同时,获得了可测试性、灵活性和更清晰的边界。当您最终需要提取服务时,事件驱动的结构使得过渡更加平滑,因为组件已经通过定义良好的消息进行通信,而不是直接的方法调用。

起点:一个紧密耦合的订单系统

考虑一个使用 Spring Boot 构建的典型电子商务单体应用。订单创建流程如下所示:

@Service
@Transactional
public class OrderService {
    private final OrderRepository orderRepository;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
    private final ShippingService shippingService;
    private final LoyaltyService loyaltyService;
    private final EmailService emailService;
    private final AnalyticsService analyticsService;

    public OrderService(
        OrderRepository orderRepository,
        InventoryService inventoryService,
        PaymentService paymentService,
        ShippingService shippingService,
        LoyaltyService loyaltyService,
        EmailService emailService,
        AnalyticsService analyticsService
    ) {
        this.orderRepository = orderRepository;
        this.inventoryService = inventoryService;
        this.paymentService = paymentService;
        this.shippingService = shippingService;
        this.loyaltyService = loyaltyService;
        this.emailService = emailService;
        this.analyticsService = analyticsService;
    }

    public Order createOrder(CreateOrderRequest request) {
        // 验证库存
        for (OrderItem item : request.getItems()) {
            if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {
                throw new InsufficientInventoryException(item.getProductId());
            }
        }

        // 处理支付
        PaymentResult payment = paymentService.processPayment(
            request.getCustomerId(),
            calculateTotal(request.getItems()),
            request.getPaymentDetails()
        );

        if (!payment.isSuccessful()) {
            throw new PaymentFailedException(payment.getErrorMessage());
        }

        // 创建订单
        Order order = new Order(
            request.getCustomerId(),
            request.getItems(),
            payment.getTransactionId()
        );
        order.setStatus(OrderStatus.CONFIRMED);
        Order savedOrder = orderRepository.save(order);

        // 预留库存
        for (OrderItem item : request.getItems()) {
            inventoryService.reserveInventory(item.getProductId(), item.getQuantity());
        }

        // 创建发货单
        shippingService.createShipment(savedOrder);

        // 更新忠诚度积分
        loyaltyService.addPoints(
            request.getCustomerId(),
            calculateLoyaltyPoints(savedOrder)
        );

        // 发送确认邮件
        emailService.sendOrderConfirmation(savedOrder);

        // 跟踪分析
        analyticsService.trackOrderPlaced(savedOrder);

        return savedOrder;
    }
}

这段代码可以工作,但存在严重问题。OrderService 知道七个不同的服务。测试需要模拟所有这些服务。添加新的订单后操作意味着要修改此方法。如果电子邮件服务缓慢,订单创建就会变慢。如果分析跟踪失败,整个订单就会失败并回滚。

事务边界也是错误的。所有操作都在单个数据库事务中发生,这意味着即使电子邮件服务临时停机也会阻止订单创建。库存预留和发货单创建在事务上耦合,尽管它们在逻辑上是独立的操作。

引入 Spring 应用事件

Spring Framework 提供了一个内置的事件系统,在单个 JVM 内工作。默认情况下它是同步的,这使得它易于推理和调试。首先定义领域事件:

public abstract class DomainEvent {
    private final Instant occurredAt;
    private final String eventId;

    protected DomainEvent() {
        this.occurredAt = Instant.now();
        this.eventId = UUID.randomUUID().toString();
    }

    public Instant getOccurredAt() {
        return occurredAt;
    }

    public String getEventId() {
        return eventId;
    }
}

public class OrderPlacedEvent extends DomainEvent {
    private final Long orderId;
    private final Long customerId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;

    public OrderPlacedEvent(Order order) {
        super();
        this.orderId = order.getId();
        this.customerId = order.getCustomerId();
        this.items = new ArrayList<>(order.getItems());
        this.totalAmount = order.getTotalAmount();
    }

    // Getters
}

事件应该是不可变的,并包含订阅者需要的所有信息。避免直接传递实体——而是复制相关数据。这可以防止订阅者意外修改共享状态。

重构 OrderService 以发布事件,而不是直接调用服务:

@Service
@Transactional
public class OrderService {
    private final OrderRepository orderRepository;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
    private final ApplicationEventPublisher eventPublisher;

    public OrderService(
        OrderRepository orderRepository,
        InventoryService inventoryService,
        PaymentService paymentService,
        ApplicationEventPublisher eventPublisher
    ) {
        this.orderRepository = orderRepository;
        this.inventoryService = inventoryService;
        this.paymentService = paymentService;
        this.eventPublisher = eventPublisher;
    }

    public Order createOrder(CreateOrderRequest request) {
        // 验证库存
        for (OrderItem item : request.getItems()) {
            if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {
                throw new InsufficientInventoryException(item.getProductId());
            }
        }

        // 处理支付
        PaymentResult payment = paymentService.processPayment(
            request.getCustomerId(),
            calculateTotal(request.getItems()),
            request.getPaymentDetails()
        );

        if (!payment.isSuccessful()) {
            throw new PaymentFailedException(payment.getErrorMessage());
        }

        // 创建并保存订单
        Order order = new Order(
            request.getCustomerId(),
            request.getItems(),
            payment.getTransactionId()
        );
        order.setStatus(OrderStatus.CONFIRMED);
        Order savedOrder = orderRepository.save(order);

        // 同步预留库存(仍在关键路径上)
        for (OrderItem item : request.getItems()) {
            inventoryService.reserveInventory(item.getProductId(), item.getQuantity());
        }

        // 为非关键操作发布事件
        eventPublisher.publishEvent(new OrderPlacedEvent(savedOrder));

        return savedOrder;
    }
}

现在 OrderService 仅依赖四个组件,而不是八个。更重要的是,它只了解对订单创建至关重要的操作——库存验证、支付处理和库存预留。其他所有操作都通过事件发生。

为解耦的操作创建事件监听器:

@Component
public class OrderEventListeners {
    private static final Logger logger = LoggerFactory.getLogger(OrderEventListeners.class);

    private final ShippingService shippingService;
    private final LoyaltyService loyaltyService;
    private final EmailService emailService;
    private final AnalyticsService analyticsService;

    public OrderEventListeners(
        ShippingService shippingService,
        LoyaltyService loyaltyService,
        EmailService emailService,
        AnalyticsService analyticsService
    ) {
        this.shippingService = shippingService;
        this.loyaltyService = loyaltyService;
        this.emailService = emailService;
        this.analyticsService = analyticsService;
    }

    @EventListener
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void handleOrderPlaced(OrderPlacedEvent event) {
        try {
            shippingService.createShipment(event.getOrderId());
        } catch (Exception e) {
            logger.error("Failed to create shipment for order {}", event.getOrderId(), e);
            // 不要重新抛出 - 其他监听器仍应执行
        }
    }

    @EventListener
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void updateLoyaltyPoints(OrderPlacedEvent event) {
        try {
            int points = calculatePoints(event.getTotalAmount());
            loyaltyService.addPoints(event.getCustomerId(), points);
        } catch (Exception e) {
            logger.error("Failed to update loyalty points for order {}", event.getOrderId(), e);
        }
    }

    @EventListener
    public void sendConfirmationEmail(OrderPlacedEvent event) {
        try {
            emailService.sendOrderConfirmation(event.getOrderId());
        } catch (Exception e) {
            logger.error("Failed to send confirmation email for order {}", event.getOrderId(), e);
        }
    }

    @EventListener
    public void trackAnalytics(OrderPlacedEvent event) {
        try {
            analyticsService.trackOrderPlaced(event.getOrderId(), event.getTotalAmount());
        } catch (Exception e) {
            logger.error("Failed to track analytics for order {}", event.getOrderId(), e);
        }
    }
}

每个监听器在它自己的事务中运行(在适当的时候)并独立处理故障。如果发送电子邮件失败,发货单创建仍然会发生。即使分析跟踪抛出异常,订单创建事务也会成功提交。

理解事务边界

@Transactional(propagation = Propagation.REQUIRES_NEW) 注解至关重要。没有它,所有监听器都会参与订单创建事务。如果任何监听器失败,整个订单都会回滚——这正是我们试图避免的情况。

使用 REQUIRES_NEW,每个监听器都会启动一个新的事务。当监听器运行时,订单已经提交。这意味着:

  • 监听器无法阻止订单创建
  • 监听器故障不会回滚订单
  • 每个监听器的工作是独立原子性的

但这有一个权衡。如果监听器失败,订单存在但某些后处理没有发生。您需要处理这些部分故障的策略:

@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleOrderPlaced(OrderPlacedEvent event) {
    try {
        shippingService.createShipment(event.getOrderId());
    } catch (Exception e) {
        logger.error("Failed to create shipment for order {}", event.getOrderId(), e);

        // 记录失败以便重试
        failedEventRepository.save(new FailedEvent(
            event.getClass().getSimpleName(),
            event.getEventId(),
            "handleOrderPlaced",
            e.getMessage()
        ));
    }
}

一个单独的后台作业可以重试失败的事件:

@Component
public class FailedEventRetryJob {
    private final FailedEventRepository failedEventRepository;
    private final ApplicationEventPublisher eventPublisher;

    @Scheduled(fixedDelay = 60000) // 每分钟
    public void retryFailedEvents() {
        List failures = failedEventRepository.findRetryable();

        for (FailedEvent failure : failures) {
            try {
                // 重建并重新发布事件
                DomainEvent event = reconstructEvent(failure);
                eventPublisher.publishEvent(event);

                failure.markRetried();
                failedEventRepository.save(failure);
            } catch (Exception e) {
                logger.warn("Retry failed for event {}", failure.getEventId(), e);
                failure.incrementRetryCount();
                failedEventRepository.save(failure);
            }
        }
    }
}

这种模式提供了最终一致性——系统可能暂时不一致,但通过重试自行恢复。

转向异步事件

Spring 的 @EventListener 默认是同步的。事件处理发生在发布事件的同一线程中,发布者等待所有监听器完成。这提供了强有力的保证,但限制了可扩展性。

通过启用异步支持并注解监听器来使监听器异步:

@Configuration
@EnableAsync
public class AsyncConfig {
    @Bean(name = "eventExecutor")
    public Executor eventExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("event-");
        executor.initialize();
        return executor;
    }
}

@Component
public class OrderEventListeners {
    // ... 依赖 ...

    @Async("eventExecutor")
    @EventListener
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void handleOrderPlaced(OrderPlacedEvent event) {
        shippingService.createShipment(event.getOrderId());
    }

    @Async("eventExecutor")
    @EventListener
    public void sendConfirmationEmail(OrderPlacedEvent event) {
        emailService.sendOrderConfirmation(event.getOrderId());
    }
}

使用 @AsynccreateOrder() 方法在发布事件后立即返回。监听器在线程池中并发执行。这显著提高了响应时间——订单创建不再等待电子邮件发送或分析跟踪。

但异步事件引入了新的复杂性。当监听器执行时,订单创建事务可能尚未提交。监听器可能尝试从数据库加载订单,但由于事务仍在进行中而找不到它。

Spring 提供了 @TransactionalEventListener 来处理这种情况:

@Component
public class OrderEventListeners {
    @Async("eventExecutor")
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // 这仅在订单创建事务成功提交后运行
        shippingService.createShipment(event.getOrderId());
    }
}

AFTER_COMMIT 阶段确保监听器仅在发布事务成功提交后运行。如果订单创建失败并回滚,监听器永远不会执行。这可以防止处理实际上不存在的订单的事件。

实现事件存储

随着事件驱动架构的成熟,存储事件变得有价值。事件存储提供了审计日志,支持调试,并支持更复杂的模式,如事件溯源。

创建一个简单的事件存储:

@Entity
@Table(name = "domain_events")
public class StoredEvent {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false)
    private String eventId;

    @Column(nullable = false)
    private String eventType;

    @Column(nullable = false, columnDefinition = "TEXT")
    private String payload;

    @Column(nullable = false)
    private Instant occurredAt;

    @Column(nullable = false)
    private Instant storedAt;

    @Column
    private String aggregateId;

    @Column
    private String aggregateType;

    // 构造器、getter、setter
}

@Repository
public interface StoredEventRepository extends JpaRepository<StoredEvent, Long> {
    List<StoredEvent> findByAggregateIdOrderByOccurredAt(String aggregateId);
    List<StoredEvent> findByEventType(String eventType);
}

拦截并存储所有领域事件:

@Component
public class EventStoreListener {
    private final StoredEventRepository repository;
    private final ObjectMapper objectMapper;

    public EventStoreListener(StoredEventRepository repository, ObjectMapper objectMapper) {
        this.repository = repository;
        this.objectMapper = objectMapper;
    }

    @EventListener
    @Order(Ordered.HIGHEST_PRECEDENCE) // 在其他监听器之前存储
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void storeEvent(DomainEvent event) {
        try {
            StoredEvent stored = new StoredEvent();
            stored.setEventId(event.getEventId());
            stored.setEventType(event.getClass().getSimpleName());
            stored.setPayload(objectMapper.writeValueAsString(event));
            stored.setOccurredAt(event.getOccurredAt());
            stored.setStoredAt(Instant.now());

            // 如果可用,提取聚合信息
            if (event instanceof OrderPlacedEvent) {
                OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;
                stored.setAggregateId(orderEvent.getOrderId().toString());
                stored.setAggregateType("Order");
            }

            repository.save(stored);
        } catch (JsonProcessingException e) {
            throw new EventStoreException("Failed to serialize event", e);
        }
    }
}

现在,每个领域事件在业务逻辑处理之前都会持久化。您可以通过重放事件来重建系统中发生的情况:

@Service
public class OrderHistoryService {
    private final StoredEventRepository eventRepository;

    public List<OrderEvent> getOrderHistory(Long orderId) {
        List<StoredEvent> events = eventRepository.findByAggregateIdOrderByOccurredAt(
            orderId.toString()
        );

        return events.stream()
            .map(this::deserializeEvent)
            .collect(Collectors.toList());
    }

    private OrderEvent deserializeEvent(StoredEvent stored) {
        // 根据事件类型反序列化
        try {
            Class<?> eventClass = Class.forName("com.example.events." + stored.getEventType());
            return (OrderEvent) objectMapper.readValue(stored.getPayload(), eventClass);
        } catch (Exception e) {
            throw new EventStoreException("Failed to deserialize event", e);
        }
    }
}

这实现了强大的调试能力。当客户报告其订单问题时,您可以准确看到发生了什么事件以及发生的顺序。

Saga 和补偿操作

某些工作流需要跨多个步骤进行协调,其中每个步骤都可能失败。传统方法使用分布式事务,但这些方法扩展性不佳且增加了复杂性。Saga 使用编排事件和补偿操作提供了一种替代方案。

考虑一个更复杂的订单流程,您需要:

  1. 预留库存
  2. 处理支付
  3. 创建发货单

如果在预留库存后支付失败,您需要释放预留。通过补偿事件实现这一点:

public class InventoryReservedEvent extends DomainEvent {
    private final Long orderId;
    private final List<ReservationDetail> reservations;

    // 构造器、getter
}

public class PaymentFailedEvent extends DomainEvent {
    private final Long orderId;
    private final String reason;

    // 构造器、getter
}

@Component
public class InventorySagaHandler {
    private final InventoryService inventoryService;

    @EventListener
    public void handlePaymentFailed(PaymentFailedEvent event) {
        // 补偿操作:释放预留库存
        inventoryService.releaseReservation(event.getOrderId());
    }
}

Saga 通过事件而不是中央协调器进行协调:

@Service
public class OrderSagaService {
    private final ApplicationEventPublisher eventPublisher;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;

    public void processOrder(Order order) {
        // 步骤 1: 预留库存
        List<ReservationDetail> reservations = inventoryService.reserve(order.getItems());
        eventPublisher.publishEvent(new InventoryReservedEvent(order.getId(), reservations));

        try {
            // 步骤 2: 处理支付
            PaymentResult payment = paymentService.processPayment(order);

            if (payment.isSuccessful()) {
                eventPublisher.publishEvent(new PaymentSucceededEvent(order.getId(), payment));
            } else {
                // 触发补偿
                eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), payment.getReason()));
                throw new PaymentException(payment.getReason());
            }
        } catch (Exception e) {
            // 触发补偿
            eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), e.getMessage()));
            throw e;
        }
    }
}

这种模式在没有分布式事务的情况下保持了一致性。每个步骤发布记录所发生事件的事件。当发生故障时,补偿事件会触发撤销先前步骤的操作。

桥接到外部消息代理

随着单体应用的增长,您可能希望与外部系统集成或为最终的服务提取做准备。Spring Cloud Stream 提供了对 RabbitMQ 或 Kafka 等消息代理的抽象,同时保持相同的事件驱动模式:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

application.yml 中配置绑定:

spring:
  cloud:
    stream:
      bindings:
        orderPlaced-out-0:
          destination: order.placed
        orderPlaced-in-0:
          destination: order.placed
          group: order-processors
      kafka:
        binder:
          brokers: localhost:9092

创建内部事件和外部消息之间的桥接:

@Component
public class EventPublisher {
    private final StreamBridge streamBridge;

    public EventPublisher(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @EventListener
    public void publishToExternalBroker(OrderPlacedEvent event) {
        // 将内部事件发布到外部消息代理
        streamBridge.send("orderPlaced-out-0", event);
    }
}

@Component
public class ExternalEventConsumer {
    private final ApplicationEventPublisher eventPublisher;

    public ExternalEventConsumer(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    @Bean
    public Consumer<OrderPlacedEvent> orderPlaced() {
        return event -> {
            // 将外部事件重新发布为内部事件
            eventPublisher.publishEvent(event);
        };
    }
}

这种模式让您可以选择性地将事件发布到外部,同时将内部事件保留在本地。关键的实时操作使用内部事件以实现低延迟。跨服务通信使用消息代理以实现可靠性和可扩展性。

监控与可观测性

事件驱动系统引入了新的可观测性挑战。理解正在发生的情况需要跨多个异步处理步骤跟踪事件。实施全面的日志记录和指标:

@Aspect
@Component
public class EventMonitoringAspect {
    private static final Logger logger = LoggerFactory.getLogger(EventMonitoringAspect.class);
    private final MeterRegistry meterRegistry;

    public EventMonitoringAspect(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Around("@annotation(org.springframework.context.event.EventListener)")
    public Object monitorEventListener(ProceedingJoinPoint joinPoint) throws Throwable {
        String listenerName = joinPoint.getSignature().getName();
        Object[] args = joinPoint.getArgs();
        DomainEvent event = (DomainEvent) args[0];

        Timer.Sample sample = Timer.start(meterRegistry);

        try {
            logger.info("Processing event {} in listener {}", 
                event.getEventId(), listenerName);

            Object result = joinPoint.proceed();

            sample.stop(Timer.builder("event.listener.duration")
                .tag("listener", listenerName)
                .tag("event_type", event.getClass().getSimpleName())
                .tag("status", "success")
                .register(meterRegistry));

            meterRegistry.counter("event.listener.processed",
                "listener", listenerName,
                "event_type", event.getClass().getSimpleName(),
                "status", "success"
            ).increment();

            return result;
        } catch (Exception e) {
            sample.stop(Timer.builder("event.listener.duration")
                .tag("listener", listenerName)
                .tag("event_type", event.getClass().getSimpleName())
                .tag("status", "failure")
                .register(meterRegistry));

            meterRegistry.counter("event.listener.processed",
                "listener", listenerName,
                "event_type", event.getClass().getSimpleName(),
                "status", "failure"
            ).increment();

            logger.error("Error processing event {} in listener {}", 
                event.getEventId(), listenerName, e);

            throw e;
        }
    }
}

这个切面自动跟踪每个事件监听器的执行时间和成功率。结合 Prometheus 和 Grafana 等工具,您可以可视化事件处理模式并识别瓶颈。

添加关联 ID 以跟踪系统中的事件:

public abstract class DomainEvent {
    private final Instant occurredAt;
    private final String eventId;
    private final String correlationId;

    protected DomainEvent(String correlationId) {
        this.occurredAt = Instant.now();
        this.eventId = UUID.randomUUID().toString();
        this.correlationId = correlationId != null ? correlationId : UUID.randomUUID().toString();
    }

    // Getters
}

通过事件链传播关联 ID:

@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
    MDC.put("correlationId", event.getCorrelationId());

    try {
        // 执行工作

        // 发布具有相同关联 ID 的后续事件
        eventPublisher.publishEvent(new ShipmentCreatedEvent(
            event.getOrderId(),
            event.getCorrelationId()
        ));
    } finally {
        MDC.clear();
    }
}

现在,与单个订单流相关的所有日志消息共享一个关联 ID,使得跨多个异步操作跟踪整个工作流变得微不足道。

测试事件驱动代码

事件驱动架构需要不同的测试策略。传统的单元测试适用于单个监听器,但集成测试对于验证事件流变得更加重要:

@SpringBootTest
@TestConfiguration
public class OrderEventIntegrationTest {
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private ShippingService shippingService;

    @Autowired
    private EmailService emailService;

    @Test
    public void shouldProcessOrderPlacedEventCompletely() throws Exception {
        // 给定
        Order order = createTestOrder();
        OrderPlacedEvent event = new OrderPlacedEvent(order);

        // 当
        eventPublisher.publishEvent(event);

        // 等待异步处理
        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
            // 然后
            verify(shippingService).createShipment(order.getId());
            verify(emailService).sendOrderConfirmation(order.getId());
        });
    }
}

对于单元测试,注入一个间谍事件发布器以验证事件是否正确发布:

@ExtendWith(MockitoExtension.class)
public class OrderServiceTest {
    @Mock
    private OrderRepository orderRepository;

    @Mock
    private InventoryService inventoryService;

    @Mock
    private PaymentService paymentService;

    @Spy
    private ApplicationEventPublisher eventPublisher = new SimpleApplicationEventPublisher();

    @InjectMocks
    private OrderService orderService;

    @Test
    public void shouldPublishOrderPlacedEventAfterCreatingOrder() {
        // 给定
        CreateOrderRequest request = createValidRequest();

        when(inventoryService.checkAvailability(any(), anyInt())).thenReturn(true);
        when(paymentService.processPayment(any(), any(), any()))
            .thenReturn(PaymentResult.successful("txn-123"));
        when(orderRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));

        // 当
        orderService.createOrder(request);

        // 然后
        verify(eventPublisher).publishEvent(argThat(event -> 
            event instanceof OrderPlacedEvent
        ));
    }
}

迁移之旅

将单体应用重构为使用事件驱动架构并非全有或全无的命题。从一个工作流开始——通常是造成最多痛苦的那个。识别可以事件驱动的直接服务调用,并逐步引入事件。

从同步事件开始,以最小化行为变更。一旦事件正确流动,为非关键监听器切换到异步处理。当您需要审计跟踪或调试能力时,添加事件存储。仅当您需要跨服务通信或准备提取微服务时,才集成外部消息代理。

目标不是实现完美的事件驱动架构。而是减少耦合、提高可测试性,并在组件之间创建更清晰的边界。即使是部分采用也能提供价值——具有一些事件驱动模式的单体应用比完全没有的模式更易于维护。

这种渐进式方法使您能够持续交付价值,而不是投入一个需要数月时间、直到完全结束时才能交付任何成果的重构项目。您能够了解在特定领域和团队中哪些方法有效,根据实际经验而非理论理想来调整实施策略。

有用的资源


【注】本文译自: Event-Driven Architecture in Monoliths: Incremental Refactoring for Java Apps – Java Code Geeks

Java有哪些优势?

Java 的价值

当具有开创性的 Java 白皮书在 1995 年推出该语言时,它列出了七项使其超越竞争对手的核心价值。如今,Java 为在 AWS 和 Google Cloud 等主要云上运行的大规模系统提供动力,这使得这些价值对于现代部署和认证路径更具现实意义。
那份白皮书撰写至今已过去近 30 年,虽然其中许多价值仍然有效,但在 2025 年,选择 Java 作为您的部署平台的理由比以往任何时候都多。如果您关注 Java 路线图或热门技术博客,您会看到 Java 出现在云架构师、开发人员和数据领域的各个路径中。

Java 的优势

以下是 Java、JVM 和 JDK 的十大现代优势:

  1.  Java 是开源的
  2.  Java 是由社区驱动的
  3.  Java 快速且高性能
  4.  Java 易于学习
  5.  Java 是静态类型的
  6.  Java 拥有专家领导
  7.  Java 功能添加迅速
  8.  Java 是面向对象的
  9.  Java 支持函数式编程
  10. Java 优先考虑向后兼容性

    Java 是开源的

    Java 自 2011 年起已开源。任何人都可以查看 JDK 的源代码并创建定制化和优化的构建版本。这种开放性与 AWS 开发者GCP 专业云开发者等云学习路径非常契合,在这些路径中,基于 Java 的微服务很常见。
    流行的 OpenJDK 和 JVM 发行版包括:

    •   Azul 的高性能实现
    •   Oracle 的授权版本
    •   AdoptOpenJDK(现称为 Adoptium)
    •   IBM 的 Java 运行时
    •   Amazon Corretto
    •   Red Hat 的 OpenJDK 发行版
    •   微软构建的 OpenJDK
    •   高性能的 GraalVM
      谷歌甚至不惜借用 Java 源代码来构建自己的移动操作系统。这样做在道德上可能值得商榷,但美国最高法院表示,为构建 Android 操作系统而侵犯 Oracle 的版权是完全公平合理的。

      Java 是由社区驱动的

      Oracle 拥有 Java 商标这一事实在技术社区中引发了无休止的、任性的焦虑。然而,事实是 Java 通过 Java 社区进程向前发展,而非根据 Larry Ellison 的个人意愿。社区驱动的学习也体现在认证项目中,如 AWS 云从业者AWS 解决方案架构师GCP 助理云工程师
      JCP 是向 Java 编程语言添加新功能、新规范和新 API 的途径。在过去的 20 年里,JCP 完成了以下工作:

    •   增加了 1000 多名成员
    •   欢迎了 200 多家公司
    •   鼓励独立开发人员加入

社区支持和贡献是 Java 为软件开发社区带来的巨大优势之一,这种精神您同样可以在 AWS DevOps 工程师GCP DevOps 工程师圈子里找到。

Java 快速且高性能

Java 虚拟机是一个抽象层,使得 Java 程序能够跨平台运行。这种可移植性与 AWS 安全专家AWS 数据工程师GCP 专业数据工程师路径中的云工作负载非常匹配。
JVM 架构中立是 Java 的一大优势,但人们总是担心所需的抽象层可能会严重影响性能。但事实并非如此
在 JVM 上运行的 Java 可能无法达到与 C++ 或 Rust 等编译语言相同的性能。然而,垃圾收集器工作方式的改进、即时编译器的使用以及大量其他底层优化为 Java 平台带来了接近原生的性能。

Java 易于学习

1995 年的 Java 白皮书曾夸耀 Java 易于学习,因为它采用了该语言发布时流行的、类似 C 的熟悉语法。如果您喜欢结构化的目标和问责制,来自 Scrumtuous 的 Scrum 式冲刺可以帮助您规划 Java 学习节奏。
2023 年,JDK 拥有了 JShell,这使得 Java 对 Python 和 JavaScript 开发人员来说变得熟悉且易于学习。应试耐力可以通过像 Udemy 实践考试合集这样的题库来培养,即使它针对的是 AWS。这种训练方法可以很好地迁移到 Java 考试和云认证中。
此外,像 Replit 和 OneCompiler 这样的在线编译器允许学习者无需安装 IDE 或配置 JAVA_HOME 即可开始使用 Java。如果您的最终目标包括云角色,请参阅基础的 AWS 云从业者GCP 助理云工程师页面。

Java 是静态类型的

与 Python 或 JavaScript 等语言不同,Java 是静态类型的。
在 Java 中,您需要指定变量是 float、double、int、Integer、char 还是 String。这比动态类型语言提供了两个显著好处:

  •   它使得管理大型代码库更加容易,这对于 AWS 解决方案架构师GCP 云架构师非常重要。
  •   它使得优化运行时环境成为可能,这对 AWS 数据工程师GCP 数据库工程师等数据密集型角色有所帮助。
    Java 在 Python 和 JavaScript 失败的情况下仍能扩展的原因,通常可以追溯到 Java 的静态类型特性。

    Java 语言的静态类型特性是其主要优势。

    Java 拥有专家领导

    虽然该语言通过 Java 社区进程向前发展,但有两位杰出的软件架构师在 Oracle 内部指导着 Java 平台的演进。领导力和管理也是云项目中的主题,例如 AWS 专业级解决方案架构师以及专注于安全的路径,如 AWS 安全专家GCP 安全工程师

    功能采纳迅速

    与其他语言相比,Java 的优势之一是采纳新功能和响应社区需求的速度非常快。同样的迭代速度也反映在实践角色中,如 AWS DevOps 工程师GCP DevOps 工程师,这些角色会持续部署 Java 服务。

    Java 是面向对象的

    Java 用户认为这是理所当然的,但讨论 Java 的优势不能忽视 Java 是完全面向对象的,它实现了重要的 OOA&D 概念,例如:

  •   继承
  •   组合
  •   多态
  •   封装
  •   接口

对于使用 Scrum 主管产品负责人角色等框架组织工作的团队来说,Java 的对象建模自然地契合了映射到领域驱动设计的待办事项项。

Java 支持函数式编程

软件开发行业出现了向函数式编程的重大转变,而 Java 一直是这一趋势的重要组成部分。如果您旨在将 ML 服务与 Java 微服务融合,请探索 AWS 机器学习AWS AI 从业者路径。
函数式编程和不可变类型的使用可以使程序更快、更简洁且更易于理解。Java 在 Java 8 中进行了重大转变,引入了 Java Streams 和 lambda 表达式,这开启了 Java 函数式编程的新时代。您可以使用该语言同时进行函数式编程和面向对象编程,这是一个主要优势。

向后兼容性

随着 Java 社区推动 API 的重大更改,该语言的维护者始终优先考虑向后兼容性和非破坏性功能的添加。稳定性是 Java 在准备 AWS 助理级解决方案架构师GCP 专业云架构师角色的架构师中保持首选的原因之一。
即使引入了作为函数式编程的默认接口和 lambda 表达式,Java 平台也保持了向后兼容性。早期版本编写的代码可以在更新的环境中运行,无需重新编译。
在 2025 年,Java 的价值众多,因为 JDK 和 JVM 对于包含 AWS 云从业者解决方案架构师开发者数据工程师安全专家,以及高级角色如 AWS 专业级解决方案架构师,还有 GCP 路径如 GCP 数据从业者GCP 专业云网络工程师GCP Workspace 管理员GCP 机器学习工程师GCP 生成式 AI 负责人GCP 数据库工程师在内的多云职业而言,比以往任何时候都更具现实意义。

Java、JVM 和 JDK 的诸多优势持续推动着该编程语言的采用。


【注】本文译自:What are the advantages of Java?

Spring Boot WebSocket:使用 Java 构建多频道聊天系统

这是一个使用 WebFlux 和 MongoDB 构建响应式 Spring Boot WebSocket 聊天的分步指南,包括配置、处理程序和手动测试。


正如您可能已经从标题中猜到的,今天的主题将是 Spring Boot WebSockets。不久前,我提供了一个基于 Akka 工具包库的 WebSocket 聊天示例。然而,这个聊天将拥有更多一些功能,以及一个相当不同的设计。

我将跳过某些部分,以避免与上一篇文章的内容有太多重复。在这里您可以找到关于 WebSockets 更深入的介绍。请注意,本文中使用的所有代码也可以在 GitHub 仓库中找到。

Spring Boot WebSocket:使用的工具

让我们从描述将用于实现整个应用程序的工具开始本文的技术部分。由于我无法完全掌握如何使用经典的 Spring STOMP 覆盖来构建真正的 WebSocket API,我决定选择 Spring WebFlux 并使一切具有响应式特性。

  • Spring Boot – 基于 Spring 的现代 Java 应用程序离不开 Spring Boot;所有的自动配置都是无价的。
  • Spring WebFlux – 经典 Spring 的响应式版本,为处理 WebSocket 和 REST 提供了相当不错且描述性的工具集。我敢说,这是在 Spring 中实际获得 WebSocket 支持的唯一方法。
  • Mongo – 最流行的 NoSQL 数据库之一,我使用它来存储消息历史记录。
  • Spring Reactive Mongo – 用于以响应式方式处理 Mongo 访问的 Spring Boot 启动器。在一个地方使用响应式而在另一个地方不使用并不是最好的主意。因此,我决定也让数据库访问具有响应式特性。

让我们开始实现吧!

Spring Boot WebSocket:实现

依赖项与配置

pom.xml

<dependencies>
    <!--编译时依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>
</dependencies>

application.properties

spring.data.mongodb.uri=mongodb://chats-admin:admin@localhost:27017/chats

我更喜欢 .properties 而不是 .yml——依我拙见,YAML 在较大规模上不可读且难以维护。

WebSocketConfig

@Configuration
class WebSocketConfig {

    @Bean
    ChatStore chatStore(MessagesStore messagesStore) {
        return new DefaultChatStore(Clock.systemUTC(), messagesStore);
    }

    @Bean
    WebSocketHandler chatsHandler(ChatStore chatStore) {
        return new ChatsHandler(chatStore);
    }

    @Bean
    SimpleUrlHandlerMapping handlerMapping(WebSocketHandler wsh) {
        Map<String, WebSocketHandler> paths = Map.of("/chats/{id}", wsh);
        return new SimpleUrlHandlerMapping(paths, 1);
    }

    @Bean
    WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

出乎意料的是,这里定义的四个 Bean 都非常重要。

  • ChatStore – 用于操作聊天的自定义 Bean,我将在后续步骤中详细介绍这个 Bean。
  • WebSocketHandler – 将存储所有与处理 WebSocket 会话相关逻辑的 Bean。
  • SimpleUrlHandlerMapping – 负责将 URL 映射到正确的处理器,此处理的完整 URL 看起来大致像这样:ws://localhost:8080/chats/{id}
  • WebSocketHandlerAdapter – 一种功能性的 Bean,它为 Spring Dispatcher Servlet 添加了 WebSocket 处理支持。

ChatsHandler

class ChatsHandler implements WebSocketHandler {

    private final Logger log = LoggerFactory.getLogger(ChatsHandler.class);

    private final ChatStore store;

    ChatsHandler(ChatStore store) {
        this.store = store;
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String[] split = session.getHandshakeInfo()
            .getUri()
            .getPath()
            .split("/");
        String chatIdStr = split[split.length - 1];
        int chatId = Integer.parseInt(chatIdStr);
        ChatMeta chatMeta = store.get(chatId);
        if (chatMeta == null) {
            return session.close(CloseStatus.GOING_AWAY);
        }
        if (!chatMeta.canAddUser()) {
            return session.close(CloseStatus.NOT_ACCEPTABLE);
        }

        String sessionId = session.getId();
        store.addNewUser(chatId, session);
        log.info("New User {} join the chat {}", sessionId, chatId);
        return session
               .receive()
               .map(WebSocketMessage::getPayloadAsText)
               .flatMap(message -> store.addNewMessage(chatId, sessionId, message))
               .flatMap(message -> broadcastToSessions(sessionId, message, store.get(chatId).sessions()))
               .doFinally(sig -> store.removeSession(chatId, session.getId()))
               .then();
    }

    private Mono<Void> broadcastToSessions(String sessionId, String message, List<WebSocketSession> sessions) {
        return Flux.fromStream(sessions
                .stream()
                .filter(session -> !session.getId().equals(sessionId))
                .map(session -> session.send(Mono.just(session.textMessage(message)))))
                .then();
    }
}

正如我上面提到的,在这里您可以找到所有与处理 WebSocket 会话相关的逻辑。首先,我们从 URL 解析聊天的 ID 以获取目标聊天。根据特定聊天的上下文,响应不同的状态。

此外,我还将消息广播到与特定聊天相关的所有会话——以便用户实际交换消息。我还添加了 doFinally 触发器,它将从 chatStore 中清除已关闭的会话,以减少冗余通信。总的来说,这段代码是响应式的;我需要遵循一些限制。我试图使其尽可能简单和可读,如果您有任何改进的想法,我持开放态度。

ChatsRouter

@Configuration(proxyBeanMethods = false)
class ChatRouter {

    private final ChatStore chatStore;

    ChatRouter(ChatStore chatStore) {
        this.chatStore = chatStore;
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return RouterFunctions
        .route(POST("api/v1/chats/create"), e -> create(false))
        .andRoute(POST("api/v1/chats/create-f2f"), e -> create(true))
        .andRoute(GET("api/v1/chats/{id}"), this::get)
        .andRoute(DELETE("api/v1/chats/{id}"), this::delete);
    }
}

WebFlux 定义 REST 端点的方法与经典 Spring 有很大不同。上面,您可以看到用于管理聊天的 4 个端点的定义。与 Akka 实现中的情况类似,我希望有一个用于管理聊天的 REST API 和一个用于实际处理聊天的 WebSocket API。我将跳过函数实现,因为它们非常简单;您可以在 GitHub 上查看它们。

ChatStore

首先,接口:

public interface ChatStore {
    int create(boolean isF2F);
    void addNewUser(int id, WebSocketSession session);
    Mono<String> addNewMessage(int id, String userId, String message);
    void removeSession(int id, String session);
    ChatMeta get(int id);
    ChatMeta delete(int id);
}

然后是实现:

public class DefaultChatStore implements ChatStore {

    private final Map<Integer, ChatMeta> chats;
    private final AtomicInteger idGen;
    private final MessagesStore messagesStore;
    private final Clock clock;

    public DefaultChatStore(Clock clock, MessagesStore store) {
        this.chats = new ConcurrentHashMap<>();
        this.idGen = new AtomicInteger(0);
        this.clock = clock;
        this.messagesStore = store;
    }

    @Override
    public int create(boolean isF2F) {
        int newId = idGen.incrementAndGet();
        ChatMeta chatMeta = chats.computeIfAbsent(newId, id -> {
            if (isF2F) {
                return ChatMeta.ofId(id);
            }
            return ChatMeta.ofIdF2F(id);
        });
        return chatMeta.id;
    }

    @Override
    public void addNewUser(int id, WebSocketSession session) {
        chats.computeIfPresent(id, (k, v) -> v.addUser(session));
    }

    @Override
    public void removeSession(int id, String sessionId) {
        chats.computeIfPresent(id, (k, v) -> v.removeUser(sessionId));
    }

    @Override
    public Mono<String> addNewMessage(int id, String userId, String message) {
        ChatMeta meta = chats.getOrDefault(id, null);
        if (meta != null) {
            Message messageDoc = new Message(id, userId, meta.offset.getAndIncrement(), clock.instant(), message);
            return messagesStore.save(messageDoc)
                    .map(Message::getContent);
        }
        return Mono.empty();
    }
    // 省略部分
}

ChatStore 的基础是 ConcurrentHashMap,它保存所有开放聊天的元数据。接口中的大多数方法都不言自明,背后没有什么特别之处。

  • create – 创建一个新聊天,带有一个布尔属性,指示聊天是 f2f 还是群聊。
  • addNewUser – 向现有聊天添加新用户。
  • removeUser – 从现有聊天中移除用户。
  • get – 获取具有 ID 的聊天的元数据。
  • delete – 从 CMH 中删除聊天。

这里唯一复杂的方法是 addNewMessages。它增加聊天内的消息计数器,并将消息内容持久化到 MongoDB 中,以实现持久性。

MongoDB

消息实体

public class Message {
   @Id
   private String id;
   private int chatId;
   private String owner;
   private long offset;
   private Instant timestamp;
   private String content;
}

存储在数据库中的消息内容模型,这里有三个重要的字段:

  1. chatId – 表示发送特定消息的聊天。
  2. ownerId – 消息发送者的用户 ID。
  3. offset – 消息在聊天中的序号,用于检索排序。

MessageStore

public interface MessagesStore extends ReactiveMongoRepository<Message, String> {}

没什么特别的,经典的 Spring 仓库,但是以响应式方式实现,提供了与 JpaRepository 相同的功能集。它直接在 ChatStore 中使用。此外,在主应用程序类 WebsocketsChatApplication 中,我通过使用 @EnableReactiveMongoRepositories 来激活响应式仓库。没有这个注解,上面的 messageStore 将无法工作。好了,我们完成了整个聊天的实现。让我们测试一下!

Spring Boot WebSocket:测试

对于测试,我使用 Postman 和 Simple WebSocket Client。

  1. 我正在使用 Postman 创建一个新聊天。在响应体中,我得到了最近创建的聊天的 WebSocket URL。

图片:Postman 创建聊天请求的屏幕截图

  1. 现在是使用它们并检查用户是否可以相互通信的时候了。Simple Web Socket Client 在这里派上用场。因此,我在这里连接到新创建的聊天。

图片:Simple Web Socket Client 连接界面的屏幕截图

  1. 好了,一切正常,用户可以相互通信了。

图片:两个 WebSocket 客户端交换消息的屏幕截图
图片:两个 WebSocket 客户端交换消息的屏幕截图
图片:两个 WebSocket 客户端交换消息的屏幕截图

还有最后一件事要做。让我们花点时间看看哪些地方可以做得更好。

可以改进的地方

由于我刚刚构建的是最基础的聊天应用程序,有一些(或者实际上相当多)地方可以做得更好。下面,我列出了一些我认为值得改进的方面:

  • 身份验证和重新加入支持 – 目前,一切都基于 sessionId。这不是一个最优的方法。最好能有一些身份验证机制,并基于用户数据实现实际的重新加入。
  • 发送附件 – 目前,聊天仅支持简单的文本消息。虽然发消息是聊天的基本功能,但用户也喜欢交换图片和音频文件。
  • 测试 – 目前没有测试,但为什么要保持这样呢?测试总是一个好主意。
  • offset 溢出 – 目前,它只是一个简单的 int。如果我们要在非常长的时间内跟踪 offset,它迟早会溢出。

总结

好了!Spring Boot WebSocket 聊天已经实现,主要任务已完成。您对下一步要开发什么有了一些想法。

请记住,这个聊天案例非常简单,对于任何类型的商业项目,都需要大量的修改和开发。

无论如何,我希望您在阅读本文时学到了一些新东西。

感谢您的时间。


【注】本文译自:Spring Boot WebSocket: Building a Multichannel Chat in Java