

  Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应保护等多个维度来帮助用户保障微服务的稳定性
  在Spring容器启动的时候实例化了一个SentinelResourceAspect类,看这个类的命名就应该可以大概猜到这个类就是一个切面类,我们点击进入@Aspect public class SentinelResourceAspect extends AbstractSentinelAspectSupport {      @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")     public void sentinelResourceAnnotationPointcut() {     }      @Around("sentinelResourceAnnotationPointcut()")     public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {         Method originMethod = resolveMethod(pjp);          SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);         if (annotation == null) {             // Should not go through here.             throw new IllegalStateException("Wrong state for SentinelResource annotation");         }         String resourceName = getResourceName(annotation.value(), originMethod);         EntryType entryType = annotation.entryType();         int resourceType = annotation.resourceType();         Entry entry = null;         try {             entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());             return pjp.proceed();         } catch (BlockException ex) {             return handleBlockException(pjp, annotation, ex);         } catch (Throwable ex) {             Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();             // The ignore list will be checked first.             if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {                 throw ex;             }             if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {                 traceException(ex);                 return handleFallback(pjp, annotation, ex);             }              // No fallback function can handle the exception, so throw it out.             throw ex;         } finally {             if (entry != null) {                 entry.exit(1, pjp.getArgs());             }         }     } }
  一看就知道这个切面类,是利用的aop的技术,切点就是我们上面说的@SentinelResource注解,所以当我们的接口方法上面加了@SentinelResource注解,执行这个接口里面的业务逻辑前会先进入这个切面类的invokeResourceWithSentinel方法,我们看看这个切面是如何执行的,我们重点看SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())这个方法 public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)     throws BlockException {     return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); }
  在这个Env的类的实例化的时候会执行static代码块的逻辑public class Env {      public static final Sph sph = new CtSph();      static {         // If init fails, the process will exit.         InitExecutor.doInit();     }  }
  我们继续看看这个InitExecutor.doInit()方法里面到底做了什么动作public static void doInit() {     if (!initialized.compareAndSet(false, true)) {         return;     }     try {         List initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();         List initList = new ArrayList();         for (InitFunc initFunc : initFuncs) {             RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());             insertSorted(initList, initFunc);         }         for (OrderWrapper w : initList) {             w.func.init();             RecordLog.info("[InitExecutor] Executing {} with order {}",                 w.func.getClass().getCanonicalName(), w.order);         }     } catch (Exception ex) {         RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);         ex.printStackTrace();     } catch (Error error) {         RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);         error.printStackTrace();     } }
  首先用cas做了并发安全处理,保证多线程情况下面只有一个线程执行成功,然后这里主要的逻辑是 SpiLoader.of(InitFunc.class).loadInstanceListSorted(),然后会调用到里面的load方法/**  * Load all Provider instances of the specified Service, sorted by order value in class"s {@link Spi} annotation  *  * @return Sorted Provider instances list  */ public List loadInstanceListSorted() {     load();      return createInstanceList(sortedClassList); }      /**      * Load the Provider class from Provider configuration file      */     public void load() {         if (!loaded.compareAndSet(false, true)) {             return;         }          String fullFileName = SPI_FILE_PREFIX + service.getName();         ClassLoader classLoader;         if (SentinelConfig.shouldUseContextClassloader()) {             classLoader = Thread.currentThread().getContextClassLoader();         } else {             classLoader = service.getClassLoader();         }         if (classLoader == null) {             classLoader = ClassLoader.getSystemClassLoader();         }         Enumeration urls = null;         try {             urls = classLoader.getResources(fullFileName);         } catch (IOException e) {             fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e);         }          if (urls == null || !urls.hasMoreElements()) {             RecordLog.warn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader);             return;         }          while (urls.hasMoreElements()) {             URL url = urls.nextElement();              InputStream in = null;             BufferedReader br = null;             try {                 in = url.openStream();                 br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));                 String line;                 while ((line = br.readLine()) != null) {                     if (StringUtil.isBlank(line)) {                         // Skip blank line                         continue;                     }                      line = line.trim();                     int commentIndex = line.indexOf("#");                     if (commentIndex == 0) {                         // Skip comment line                         continue;                     }                      if (commentIndex > 0) {                         line = line.substring(0, commentIndex);                     }                     line = line.trim();                      Class clazz = null;                     try {                         clazz = (Class) Class.forName(line, false, classLoader);                     } catch (ClassNotFoundException e) {                         fail("class " + line + " not found", e);                     }                      if (!service.isAssignableFrom(clazz)) {                         fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName);                     }                      classList.add(clazz);                     Spi spi = clazz.getAnnotation(Spi.class);                     String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value();                     if (classMap.containsKey(aliasName)) {                         Class<? extends S> existClass = classMap.get(aliasName);                         fail("Found repeat alias name for " + clazz.getName() + " and "                                 + existClass.getName() + ",SPI configuration file=" + fullFileName);                     }                     classMap.put(aliasName, clazz);                      if (spi != null && spi.isDefault()) {                         if (defaultClass != null) {                             fail("Found more than one default Provider, SPI configuration file=" + fullFileName);                         }                         defaultClass = clazz;                     }                      RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}"                             + ", isSingleton={}, isDefault={}, order={}",                         service.getName(), line, aliasName                             , spi == null ? true : spi.isSingleton()                             , spi == null ? false : spi.isDefault()                             , spi == null ? 0 : spi.order());                 }             } catch (IOException e) {                 fail("error reading SPI configuration file", e);             } finally {                 closeResources(in, br);             }         }          sortedClassList.addAll(classList);         Collections.sort(sortedClassList, new Comparator>() {             @Override             public int compare(Class<? extends S> o1, Class<? extends S> o2) {                 Spi spi1 = o1.getAnnotation(Spi.class);                 int order1 = spi1 == null ? 0 : spi1.order();                  Spi spi2 = o2.getAnnotation(Spi.class);                 int order2 = spi2 == null ? 0 : spi2.order();                  return Integer.compare(order1, order2);             }         });     }
  这里我们先返回进入到这个entryWithType方法里面去@Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)     throws BlockException {     return entryWithType(name, resourceType, entryType, count, false, args); }    @Override     public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,                                Object[] args) throws BlockException {         StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);         return entryWithPriority(resource, count, prioritized, args);     }    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)         throws BlockException {         Context context = ContextUtil.getContext();         if (context instanceof NullContext) {             // The {@link NullContext} indicates that the amount of context has exceeded the threshold,             // so here init the entry only. No rule checking will be done.             return new CtEntry(resourceWrapper, null, context);         }          if (context == null) {             // Using default context.             context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);         }          // Global switch is close, no rule checking will do.         if (!Constants.ON) {             return new CtEntry(resourceWrapper, null, context);         }          ProcessorSlot chain = lookProcessChain(resourceWrapper);          /*          * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},          * so no rule checking will be done.          */         if (chain == null) {             return new CtEntry(resourceWrapper, null, context);         }          Entry e = new CtEntry(resourceWrapper, chain, context);         try {             chain.entry(context, resourceWrapper, null, count, prioritized, args);         } catch (BlockException e1) {             e.exit(count, args);             throw e1;         } catch (Throwable e1) {             // This should not happen, unless there are errors existing in Sentinel internal.             RecordLog.info("Sentinel unexpected exception", e1);         }         return e;     }
  这里最后会调用到entryWithPriority这个方法里面,这里面用到了责任链的设计模式,首先会构造一个处理的链路,我们关注一下这个lookProcessChain(resourceWrapper)方法 ProcessorSlot lookProcessChain(ResourceWrapper resourceWrapper) {     ProcessorSlotChain chain = chainMap.get(resourceWrapper);     if (chain == null) {         synchronized (LOCK) {             chain = chainMap.get(resourceWrapper);             if (chain == null) {                 // Entry size limit.                 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {                     return null;                 }                  chain = SlotChainProvider.newSlotChain();                 Map newMap = new HashMap(                     chainMap.size() + 1);                 newMap.putAll(chainMap);                 newMap.put(resourceWrapper, chain);                 chainMap = newMap;             }         }     }     return chain; }
  首先会从一个缓存map里面去取,如果为空的话, 调用SlotChainProvider.newSlotChain()构造一个链路,并且放到map里面缓存public static ProcessorSlotChain newSlotChain() {     if (slotChainBuilder != null) {         return slotChainBuilder.build();     }      // Resolve the slot chain builder SPI.     slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();      if (slotChainBuilder == null) {         // Should not go through here.         RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");         slotChainBuilder = new DefaultSlotChainBuilder();     } else {         RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",             slotChainBuilder.getClass().getCanonicalName());     }     return slotChainBuilder.build(); }
  这里也是用到jdk的spi机制,会加载sentinel-core.jar里面META-INF下面的services下面配置的ProcessSlot的实现类# Sentinel default ProcessorSlots com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot com.alibaba.csp.sentinel.slots.logger.LogSlot com.alibaba.csp.sentinel.slots.statistic.StatisticSlot com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot com.alibaba.csp.sentinel.slots.system.SystemSlot com.alibaba.csp.sentinel.slots.block.flow.FlowSlot com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
  然后依次调用 chain.entry(context, resourceWrapper, null, count, prioritized, args)方法,依次调用图中的各个slot类,这里我们重点看看FlowSlot类和DegradeSlot这两个类的entry方法@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                   boolean prioritized, Object... args) throws Throwable {     checkFlow(resourceWrapper, context, node, count, prioritized);      fireEntry(context, resourceWrapper, node, count, prioritized, args); }
  在这个FlowSlot的entry方法,首先会校验我们控制台配置的限流流控规则,最终会调用到checkFlow方法void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)     throws BlockException {     checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }  public void checkFlow(Function> ruleProvider, ResourceWrapper resource,                           Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {         if (ruleProvider == null || resource == null) {             return;         }         Collection rules = ruleProvider.apply(resource.getName());         if (rules != null) {             for (FlowRule rule : rules) {                 if (!canPassCheck(rule, context, node, count, prioritized)) {                     throw new FlowException(rule.getLimitApp(), rule);                 }             }         }     }
  通过Collection rules = ruleProvider.apply(resource.getName())拿到我们在sentinel控制台配置的各种限流规则,然后遍历进行校验,不通过就抛出FlowException异常,这个时候整个这个责任链的执行流程就结束了,但是在之前执行的StatisticSlot的entry方法里面用try catch包裹住了,在这个SentinelResource Aspect里面会捕获BlocakExption异常,进行处理@Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {     Method originMethod = resolveMethod(pjp);      SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);     if (annotation == null) {         // Should not go through here.         throw new IllegalStateException("Wrong state for SentinelResource annotation");     }     String resourceName = getResourceName(annotation.value(), originMethod);     EntryType entryType = annotation.entryType();     int resourceType = annotation.resourceType();     Entry entry = null;     try {         entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());         return pjp.proceed();     } catch (BlockException ex) {         return handleBlockException(pjp, annotation, ex);     } catch (Throwable ex) {         Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();         // The ignore list will be checked first.         if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {             throw ex;         }         if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {             traceException(ex);             return handleFallback(pjp, annotation, ex);         }          // No fallback function can handle the exception, so throw it out.         throw ex;     } finally {         if (entry != null) {             entry.exit(1, pjp.getArgs());         }     } }protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)     throws Throwable {      // Execute block handler if configured.     Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),         annotation.blockHandlerClass());     if (blockHandlerMethod != null) {         Object[] originArgs = pjp.getArgs();         // Construct args.         Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);         args[args.length - 1] = ex;         return invoke(pjp, blockHandlerMethod, args);     }      // If no block handler is present, then go to fallback.     return handleFallback(pjp, annotation, ex); }