sentinel流控效果是如何生效的
一、sentinel简介
Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应保护等多个维度来帮助用户保障微服务的稳定性
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。伴随着公司用户量和流量的日益增加,对于数据库的压力是越来越大,Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护
二、源码入口
使用sentinel有两种方式,一种方式是对需要限流和降级的接口资源方法上面加入@SentinelResource注解,还有一种就是通过拦截器的方式进行资源保护限流和降级,其实两种方法执行的关键方法都是同一段代码,今天我们暂时通过注解这种aop切面的形式,sentinel源码是如何实现限流和降级的呢
底层源码的工作还是基于SpringBoot的自动装配原理,在spring-cloud-starter-alibaba-sentinel.jar,下面的spring.fatories里面的SentinelAutoConfiguration类,我们来看看这个类的源码
在Spring容器启动的时候实例化了一个SentinelResourceAspect类,看这个类的命名就应该可以大概猜到这个类就是一个切面类,我们点击进入@Aspect public class SentinelResourceAspect extends AbstractSentinelAspectSupport { @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); return pjp.proceed(); } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } }
一看就知道这个切面类,是利用的aop的技术,切点就是我们上面说的@SentinelResource注解,所以当我们的接口方法上面加了@SentinelResource注解,执行这个接口里面的业务逻辑前会先进入这个切面类的invokeResourceWithSentinel方法,我们看看这个切面是如何执行的,我们重点看SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())这个方法 public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException { return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); }
在这个Env的类的实例化的时候会执行static代码块的逻辑public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }
我们继续看看这个InitExecutor.doInit()方法里面到底做了什么动作public static void doInit() { if (!initialized.compareAndSet(false, true)) { return; } try { List initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted(); List initList = new ArrayList(); for (InitFunc initFunc : initFuncs) { RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName()); insertSorted(initList, initFunc); } for (OrderWrapper w : initList) { w.func.init(); RecordLog.info("[InitExecutor] Executing {} with order {}", w.func.getClass().getCanonicalName(), w.order); } } catch (Exception ex) { RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex); ex.printStackTrace(); } catch (Error error) { RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error); error.printStackTrace(); } }
首先用cas做了并发安全处理,保证多线程情况下面只有一个线程执行成功,然后这里主要的逻辑是 SpiLoader.of(InitFunc.class).loadInstanceListSorted(),然后会调用到里面的load方法/** * Load all Provider instances of the specified Service, sorted by order value in class"s {@link Spi} annotation * * @return Sorted Provider instances list */ public List loadInstanceListSorted() { load(); return createInstanceList(sortedClassList); } /** * Load the Provider class from Provider configuration file */ public void load() { if (!loaded.compareAndSet(false, true)) { return; } String fullFileName = SPI_FILE_PREFIX + service.getName(); ClassLoader classLoader; if (SentinelConfig.shouldUseContextClassloader()) { classLoader = Thread.currentThread().getContextClassLoader(); } else { classLoader = service.getClassLoader(); } if (classLoader == null) { classLoader = ClassLoader.getSystemClassLoader(); } Enumeration urls = null; try { urls = classLoader.getResources(fullFileName); } catch (IOException e) { fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e); } if (urls == null || !urls.hasMoreElements()) { RecordLog.warn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader); return; } while (urls.hasMoreElements()) { URL url = urls.nextElement(); InputStream in = null; BufferedReader br = null; try { in = url.openStream(); br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); String line; while ((line = br.readLine()) != null) { if (StringUtil.isBlank(line)) { // Skip blank line continue; } line = line.trim(); int commentIndex = line.indexOf("#"); if (commentIndex == 0) { // Skip comment line continue; } if (commentIndex > 0) { line = line.substring(0, commentIndex); } line = line.trim(); Class clazz = null; try { clazz = (Class) Class.forName(line, false, classLoader); } catch (ClassNotFoundException e) { fail("class " + line + " not found", e); } if (!service.isAssignableFrom(clazz)) { fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName); } classList.add(clazz); Spi spi = clazz.getAnnotation(Spi.class); String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value(); if (classMap.containsKey(aliasName)) { Class<? extends S> existClass = classMap.get(aliasName); fail("Found repeat alias name for " + clazz.getName() + " and " + existClass.getName() + ",SPI configuration file=" + fullFileName); } classMap.put(aliasName, clazz); if (spi != null && spi.isDefault()) { if (defaultClass != null) { fail("Found more than one default Provider, SPI configuration file=" + fullFileName); } defaultClass = clazz; } RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}" + ", isSingleton={}, isDefault={}, order={}", service.getName(), line, aliasName , spi == null ? true : spi.isSingleton() , spi == null ? false : spi.isDefault() , spi == null ? 0 : spi.order()); } } catch (IOException e) { fail("error reading SPI configuration file", e); } finally { closeResources(in, br); } } sortedClassList.addAll(classList); Collections.sort(sortedClassList, new Comparator>() { @Override public int compare(Class<? extends S> o1, Class<? extends S> o2) { Spi spi1 = o1.getAnnotation(Spi.class); int order1 = spi1 == null ? 0 : spi1.order(); Spi spi2 = o2.getAnnotation(Spi.class); int order2 = spi2 == null ? 0 : spi2.order(); return Integer.compare(order1, order2); } }); }
在这个load方法里面这里会加载到sentinel-core.jar下面META-INF目录下面的services里面的所有的实现类,通过反射实例化
这里我们先返回进入到这个entryWithType方法里面去@Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException { return entryWithType(name, resourceType, entryType, count, false, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException { StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType); return entryWithPriority(resource, count, prioritized, args); } private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } if (context == null) { // Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } ProcessorSlot