AxonFramework扩展
除了默认的 Axon Server 之外,Spring Cloud 是分发命令总线(命令)的另一种方法。
Spring Cloud Extension 使用 Spring Cloud 描述的服务注册和发现机制来分发命令总线。 因此,您可以选择在发现分发命令的路由时使用哪个 Spring Cloud 实现。 Netflix 的 Eureka Discovery/Eureka Server 组合或 HashiCorp 的 Consul 就是一个例子。
要使用 Axon 的 Spring Cloud 组件,请确保 axon-springcloud 模块在类路径中可用。 最简单的方法是将此扩展中的 Spring Cloud starter (axon-springcloud-spring-boot-starter ) 包含到您的项目中。
给出每个 Spring Cloud 实现的描述会使这个参考指南走得太远。 有关其他 Spring Cloud 实现选项的信息,请参阅它们各自的文档。
Spring Cloud 连接器设置是 SpringCloudCommandRouter 和 SpringHttpCommandBusConnector 的组合。 前者是 CommandRouter ,后者是 CommandBusConnector ,两者都被 DistributedCommandBus 用来启用命令分发。发现命令路由
SpringCloudCommandRouter 使用 Spring Cloud 的发现机制来查找集群中的其他节点。 为此,它使用 Spring Cloud 的 DiscoveryClient 和 Registration 。 这些分别用于收集远程命令路由信息和维护本地信息。 检索两者最直接的方法是使用 @EnableDiscoveryClient 注解您的应用程序。
收集和存储命令路由信息围绕着 Spring Cloud 的 ServiceInstances 。 Registration 只是本地的 ServiceInstance ,而 DiscoveryClient 提供了 API 来查找远程 ServiceInstance 。 此外,正是 ServiceInstance 为我们提供了检索节点功能所需的信息(例如 URI)。
Note Spring Cloud 的心跳需求
使用 SpringCloudCommandRouter 时,请确保您的 Spring 应用程序启用了心跳事件。 Spring Cloud 应用程序发布的心跳事件是检查 DiscoveryClient 中的 ServiceInstances 集是否已更改的触发器。 此外,它还用于验证已知节点的命令路由功能是否已更改。
因此,如果心跳事件被禁用,您的实例将不再使用当前的命令路由功能进行更新。 如果是这样,这将在命令路由期间引起问题。
存储本地功能和发现 ServiceInstance 的远程功能的逻辑在 CapabilityDiscoveryMode 中维护。 因此,CapabilityDiscoveryMode 为我们提供了实际检索它可以处理的 ServiceInstance 的命令集(如果有的话)的方法。 CapabilityDiscoveryMode 提供的唯一完整实现是 RestCapabilityDiscoveryMode ,使用 RestTemplate 和 ServiceInstance URI 来调用可配置的端点。 该端点通向 MemberCapabilitiesController ,后者又在该实例的 RestCapabilityDiscoveryMode 上公开 MemberCapabilities 。
CapabilityDiscoveryMode 存在装饰器,提供两个附加功能:IgnoreListingDiscoveryMode - 一个 CapabilityDiscoveryMode 装饰器,它在检索 MemberCapabilities 失败时会将给定的 ServiceInstance 放在一个列表中,以供将来验证时忽略。 因此,它有效地从集合中删除了可发现的 ServiceInstances 。AcceptAllCommandsDiscoveryMode - 一个 CapabilityDiscoveryMode 装饰器,无论此实例可以作为命令处理什么,都表明它可以处理任何事情。 如果系统中的节点是同质的(也就是说,每个人都可以处理相同的命令集),这个装饰器就会派上用场。
Registration 、DiscoveryClient 和 CapabilityDiscoveryMode 可以说是 SpringCloudCommandRouter 的核心。 但是,您可以为此路由器配置一些额外的东西,如下所示:RoutingStrategy - 负责决定哪个节点一致地接收命令的组件。 默认情况下,使用 AnnotationRoutingStrategy (有关更多信息,请参阅分发命令总线)。ServiceInstance 过滤器 - 此 Predicate 用于过滤掉通过 DiscoveryClient 检索到的 ServiceInstance 。 例如,它允许删除已知不处理任何命令消息的实例。 如果您在 Spring Cloud Discovery 服务中设置了多个服务,这可能很有用,而您不想在命令处理中考虑这些服务。ConsistentHashChangeListener - 如果新节点已添加到已知命令处理程序集中,添加一致的哈希更改侦听器可为您提供执行特定任务的机会。
Note 每个节点的不同命令能力
不需要所有节点都具有相同的命令处理程序集。 您可以将不同的段完全用于不同的命令类型。 分布式命令总线将始终选择一个节点将命令分派给支持该特定类型命令的节点。 在节点之间发送命令
CommandBusConnector 负责根据给定的路由从一个节点到另一个节点发送命令。 这个扩展提供了 SpringHttpCommandBusConnector ,它使用普通的 REST 来发送命令。
创建此服务时有三个硬性要求和一个可选配置: 本地 CommandBus - 这个 "local segment" 是将命令分发到本地 JVM 的命令总线。 因此,当 SpringHttpCommandBusConnector 接收到来自外部的命令时,或者如果它接收到对自己有意义的命令时,就会调用它。RestOperations - 用于将命令消息发布到另一个实例的服务。 在大多数情况下,RestTemplate 用于此目的。Serializer - Serializer 用于在发送命令消息之前对其进行序列化,并在接收到命令消息时对其进行反序列化。Executor (可选) - Executor 用于处理传入的命令和调度命令。 默认为 DirectExecutor 实例。配置此扩展
如果您也在使用 Spring Cloud,那么您将使用 Spring Boot 的可能性很高。 随着配置的进行,这将选择使用 axon-springcloud-spring-boot-starter 依赖项来自动检索所有必需的 bean。 在任何一种情况下,您的应用程序都应该被标记为通过 Spring Cloud 将其启用为可发现的服务。 例如,这可以通过使用 @EnableDiscoveryClient 注解主类来完成。
仍然有很多可自定义的组件。 有关一些建议,请查看以下示例:
自定义 bean 配置 : // Custom Spring Boot app, enabling a "DiscoveryClient" and "Registration" through `@EnableDiscoveryClient` @EnableDiscoveryClient @SpringBootApplication public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } @Bean public CapabilityDiscoveryMode capabilityDiscoveryMode(RestTemplate restTemplate, Serializer serializer) { return RestCapabilityDiscoveryMode.builder() .restTemplate(restTemplate) .serializer(serializer) // Allows changing the endpoint used to find member capabilities .messageCapabilitiesEndpoint(/* custom message information endpoint */) .build(); } @Bean public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient, Registration localServiceInstance, CapabilityDiscoveryMode capabilityDiscoveryMode) { return SpringCloudCommandRouter.builder() .discoveryClient(discoveryClient) .routingStrategy(new AnnotationRoutingStrategy()) .localServiceInstance(localServiceInstance) .capabilityDiscoveryMode(capabilityDiscoveryMode) .serviceInstanceFilter(/* custom ServiceInstance filter */) .consistentHashChangeListener(/* ConsistentHash change listener */) .build(); } // Only required if Axon Spring Boot Starter is not used @Bean @Qualifier("localSegment") public CommandBus localSegment() { return SimpleCommandBus.builder().build(); } @Bean public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment, RestOperations restOperations, Serializer serializer) { return SpringHttpCommandBusConnector.builder() .localCommandBus(localSegment) .restOperations(restOperations) .serializer(serializer) .executor(/* custom Executor */) .build(); } @Bean @Primary public DistributedCommandBus distributedCommandBus(CommandRouter commandRouter, CommandBusConnector commandBusConnector) { return DistributedCommandBus.builder() .commandRouter(commandRouter) .connector(commandBusConnector) .build(); } }
Spring Boot AutoConfiguration :# 需要启用 DistributedCommandBus axon.distributed.enabled=true # 定义用于此 segment 的负载系数。 默认为 100 axon.distributed.load-factor=100 # 定义使用的 CapabilityDiscoveryMode。 默认为 REST axon.distributed.spring-cloud.mode=rest # 定义用于从中检索成员功能的端点。 默认为 "/member-capabilities" axon.distributed.spring-cloud.rest-mode-url="/my-custom-endpoint" # 定义是否应该修饰 CapabilityDiscoveryMode 以忽略有故障的 ServiceInstances axon.distributed.spring-cloud.enable-ignore-listing=true # 定义是否应该修饰 CapabilityDiscoveryMode 以接受所有类型的命令 axon.distributed.spring-cloud.enable-accept-all-commands=true