上千star的分布式ID生产黑科技,开箱即用,附源码
sequence介绍
sequence是一个基于雪花算法(Snowflake)实现的64位自增ID算法,实现语言是JAVA。
其在雪花算法的基础上,做了一些优化,解决了原生算法的一些痛点:支持用户自定义允许时间回拨的范围;解决了跨毫秒时起始值从0开始增长的问题;解决了高并发场景中获取时间戳性能的问题;
雪花算法的介绍可以参考博主上篇文章:还在用数据库自增ID做主键?建议了解一下雪花算法生成的分布式ID性能
从官方的性能测试数据来看,sequence每秒可以生成最多418万个有序的ID,即TPS=400w/s,已经基本满足绝大部分的业务场景。
源代码及使用
核心代码一共两个类,分别是Sequence.java和SystemClock.java。
Sequence是产生分布式ID的核心,SystemClock主要解决了高并发场景下System.currentTimeMills()的性能问题。
大家也可根据自己的需要,修改其中的部署属性值,比如自定义机器标识,起始时间戳等。
Sequence.java :public class Sequence { private static final Logger log = LoggerFactory.getLogger(Sequence.class); /** * 时间起始标记点,作为基准,一般取系统的最近时间 * 一旦确定不能变动,确定后改变此值可能造成id重复 */ private final long twepoch = 1519740777809L; //5位的机房id private final long datacenterIdBits = 5L; //5位的机器id private final long workerIdBits = 5L; //每毫秒内产生的id数: 2的12次方个 private final long sequenceBits = 12L; protected final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); protected final long maxWorkerId = -1L ^ (-1L << workerIdBits); private final long workerIdShift = sequenceBits; private final long datacenterIdShift = sequenceBits + workerIdBits; private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; private final long sequenceMask = -1L ^ (-1L << sequenceBits); //所属机房id private final long datacenterId; //所属机器id private final long workerId; //并发控制序列 private long sequence = 0L; //上次生产 ID 时间戳 private long lastTimestamp = -1L; //获取IP private static volatile InetAddress LOCAL_ADDRESS = null; private static final Pattern IP_PATTERN = Pattern.compile("d{1,3}(.d{1,3}){3,5}#34;); //默认的无参构造 public Sequence() { this.datacenterId = getDatacenterId(); this.workerId = getMaxWorkerId(datacenterId); } //有参构造器,我们可以自定义机器id和机房id public Sequence(long workerId, long datacenterId) { if (workerId > maxWorkerId || workerId < 0) { throw new IllegalArgumentException(String.format("Worker Id can"t be greater than %d or less than 0", maxWorkerId)); } if (datacenterId > maxDatacenterId || datacenterId < 0) { throw new IllegalArgumentException(String.format("Datacenter Id can"t be greater than %d or less than 0", maxDatacenterId)); } this.workerId = workerId; this.datacenterId = datacenterId; } //基于网卡MAC地址计算余数作为数据中心,如果不想用网卡可以自定义 protected long getDatacenterId() { long id = 0L; try { NetworkInterface network = NetworkInterface.getByInetAddress(getLocalAddress()); if (null == network) { id = 1L; } else { byte[] mac = network.getHardwareAddress(); if (null != mac) { id = ((0x000000FF & (long) mac[mac.length - 2]) | (0x0000FF00 & (((long) mac[mac.length - 1]) << 8))) >> 6; id = id % (maxDatacenterId + 1); } } } catch (Exception e) { log.warn(" getDatacenterId: " + e.getMessage()); } return id; } //基于 MAC + PID 的 hashcode 获取16个低位 protected long getMaxWorkerId(long datacenterId) { StringBuilder mpId = new StringBuilder(); mpId.append(datacenterId); String name = ManagementFactory.getRuntimeMXBean().getName(); if (name != null && name.length() > 0) { // GET jvmPid mpId.append(name.split("@")[0]); } // MAC + PID 的 hashcode 获取16个低位 return (mpId.toString().hashCode() & 0xffff) % (maxWorkerId + 1); } //获取下一个 ID public synchronized long nextId() { //获取当前时间戳,这里通过SystemClock优化获取性能 long timestamp = timeGen(); // 时间回拨了 if (timestamp < lastTimestamp) { long offset = lastTimestamp - timestamp; if (offset <= 5) { try { // 休眠双倍差值后重新获取,再次校验 wait(offset << 1); timestamp = timeGen(); if (timestamp < lastTimestamp) { throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset)); } } catch (Exception e) { throw new RuntimeException(e); } } else { throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset)); } } if (lastTimestamp == timestamp) { // 相同毫秒内,序列号自增 sequence = (sequence + 1) & sequenceMask; if (sequence == 0) { // 同一毫秒的序列数已经达到最大 timestamp = tilNextMillis(lastTimestamp); } } else { // 不同毫秒内,序列号置为 1 - 3 随机数 sequence = ThreadLocalRandom.current().nextLong(1, 3); } lastTimestamp = timestamp; // 时间戳部分 | 数据中心部分 | 机器标识部分 | 序列号部分 return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence; } protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } protected long timeGen() { return SystemClock.INSTANCE.currentTimeMillis(); } /** * Find first valid IP from local network card * * @return first valid local IP */ public static InetAddress getLocalAddress() { if (LOCAL_ADDRESS != null) { return LOCAL_ADDRESS; } LOCAL_ADDRESS = getLocalAddress0(); return LOCAL_ADDRESS; } private static InetAddress getLocalAddress0() { InetAddress localAddress = null; try { localAddress = InetAddress.getLocalHost(); if (isValidAddress(localAddress)) { return localAddress; } } catch (Throwable e) { log.warn("Failed to retrieving ip address, " + e.getMessage(), e); } try { Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); if (interfaces != null) { while (interfaces.hasMoreElements()) { try { NetworkInterface network = interfaces.nextElement(); Enumeration addresses = network.getInetAddresses(); while (addresses.hasMoreElements()) { try { InetAddress address = addresses.nextElement(); if (isValidAddress(address)) { return address; } } catch (Throwable e) { log.warn("Failed to retrieving ip address, " + e.getMessage(), e); } } } catch (Throwable e) { log.warn("Failed to retrieving ip address, " + e.getMessage(), e); } } } } catch (Throwable e) { log.warn("Failed to retrieving ip address, " + e.getMessage(), e); } log.error("Could not get local host ip address, will use 127.0.0.1 instead."); return localAddress; } private static boolean isValidAddress(InetAddress address) { if (address == null || address.isLoopbackAddress()) { return false; } String name = address.getHostAddress(); return (name != null && !"0.0.0.0".equals(name) && !"127.0.0.1".equals(name) && IP_PATTERN.matcher(name).matches()); } }
SystemClock.java://利用ScheduledExecutorService实现高并发场景下 //System.curentTimeMillis()的性能问题的优化. public enum SystemClock { INSTANCE(1); private final long period; private final AtomicLong nowTime; private boolean started = false; private ScheduledExecutorService executorService; SystemClock(long period) { this.period = period; this.nowTime = new AtomicLong(System.currentTimeMillis()); } public void initialize() { if (started) { return; } this.executorService = new ScheduledThreadPoolExecutor(1, r -> { Thread thread = new Thread(r, "system-clock"); thread.setDaemon(true); return thread; }); executorService.scheduleAtFixedRate(() -> nowTime.set(System.currentTimeMillis()), this.period, this.period, TimeUnit.MILLISECONDS); Runtime.getRuntime().addShutdownHook(new Thread(this::destroy)); started = true; } public long currentTimeMillis() { return started ? nowTime.get() : System.currentTimeMillis(); } public String currentTime() { return new Timestamp(currentTimeMillis()).toString(); } public void destroy() { if (executorService != null) { executorService.shutdown(); } } }最后
sequence虽然大幅提升了性能,但是在某些情况下仍然可能出现重复的情况,比如机器标识重复、起始时间戳被修改重置等,这些问题需要我们特别注意。
总得来说,sequence被称为分布式高效ID生产黑科技并不为过,著名的ORM框架mybatis plus用的也是这个组件,大家有兴趣也可以去了解一下。
#头条创作挑战赛#
学习技术,分享技术,期待与大家共同进步,也感谢您的点赞与关注。