范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

ApacheCuratorFramework教程(一)

  Apache Curator Framework教程
  Apache Curator 是 Apache ZooKeeper(分布式协调服务)的 Java/JVM 客户端库。它包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加容易和可靠。
  依赖
  curator 有很多的依赖,比如如下是maven依赖官方说明
  一般情况下只要引入 curator-recipes  基本就够用。他包含了 client  和 framework  的依赖,会自动下载下来。
  创建项目并引入依赖
  pom文件 <?xml version="1.0" encoding="UTF-8"?>      4.0.0      com.itlab1024     curator-framework-tutorial     1.0-SNAPSHOT               17         17         UTF-8                                org.apache.curator             curator-recipes             5.4.0                               org.projectlombok             lombok             1.18.24             test                               org.slf4j             slf4j-api             2.0.6                               org.slf4j             slf4j-simple             2.0.6                               org.junit.jupiter             junit-jupiter             5.9.0             test               创建连接
  curator主要通过工厂类 CuratorFrameworkFactory  的 newClient  方法创建连接
  有三种多态方法。 public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig) public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
  参数说明: connectString:连接字符串,服务器访问地址例如localhost:2181(注意是IP(域名)+ 端口),如果是集群地址,则用逗号(,)隔开即可。 sessionTimeoutMs:会话超时时间,单位毫秒,如果不设置则先去属性中找 curator-default-session-timeout  的值,如果没设置,则默认是60 * 1000毫秒。 connectionTimeoutMs:连接超时时间,单位毫秒,如果不设置则先去属性中找 curator-default-connection-timeout  的值,如果没设置,则默认是15 * 1000毫秒。 RetryPolicy:重试策略,后面具体讲解。
  使用Java代码创建连接并创建一个节点 package cn.programtalk.connection;  import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  public class ConnectionTest {     /**      * 创建连接      */     @Test     public void TestConnection1() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("172.30.140.89:2181",  new ExponentialBackoffRetry(1000,3));         curatorFramework.start();         curatorFramework.create().forPath("/test");     } }
  运行完毕后查看结果:
  也可以使用其 builder()  建造者模式构建client。 @Test public void TestConnection2() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString("172.20.98.4:2181")         .retryPolicy(new ExponentialBackoffRetry(1000,3))         .sessionTimeoutMs(1000)         .connectionTimeoutMs(10000)         .build();     curatorFramework.start();     curatorFramework.create().forPath("/test"); }重试策略
  重试策略有几种实现,可以通过如下类图直观地展示出来。
  RetryForever
  该策略是永远尝试。
  new RetryForever(2000)  参数是毫秒,代表间隔多久进行重试! package cn.programtalk;  import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryForever; import org.junit.jupiter.api.Test;  public class RetryTest {     /**      * RetryForever      */     @Test     public void testRetryForever() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("unknownHost:2181",  new RetryForever(2000));         curatorFramework.start();     } }SessionFailedRetryPolicy
  session超时重试策略,其构造方法是 SessionFailedRetryPolicy(RetryPolicy delegatePolicy)  ,参数就是也是一个重试策略,其含义就是说会话超时的时候使用哪种具体的重试策略。 public void testSessionFailedRetryPolicy() throws Exception {         RetryPolicy sessionFailedRetryPolicy = new SessionFailedRetryPolicy(new RetryForever(1000));          CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()                 .connectString("localhost:2181")                 .retryPolicy(sessionFailedRetryPolicy)                 .build();         curatorFramework.start();         TimeUnit.DAYS.sleep(1);     }
  session超时后,会尝试重新连接。
  RetryNTimes
  重试N次策略: public RetryNTimes(int n, int sleepMsBetweenRetries)  ,第一个是重试次数,第二个参数是每次重试间隔多少毫秒。 @Test public void testRetryNTimes() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("unknownHost:2181", new RetryNTimes(5, 1000));     curatorFramework.start();     TimeUnit.DAYS.sleep(1); }
  上面的代码就是重试5次,重试间隔1000毫秒。 RetryOneTime
  重试一次,他是RetryNTime的特例,N=1的情况。 @Test public void testRetryOneTime() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("unknownHost:2181", new RetryOneTime(1000));     curatorFramework.start();     TimeUnit.DAYS.sleep(1); }RetryUntilElapsed
  public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
  一直重试直到达到规定的时间, maxElapsedTimeMs  :最大重试时间, sleepMsBetweenRetries  每次重试间隔时间。 @Test public void testRetryUntilElapsed() throws Exception {     RetryUntilElapsed retryPolicy = new RetryUntilElapsed(3000, 1000);     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("unknownHost:2181", retryPolicy);     curatorFramework.start();     TimeUnit.DAYS.sleep(1); }ExponentialBackoffRetry
  按照设定的次数重试,每次重试之间的睡眠时间都会增加。
  构造方法如下: public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) {     this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS); }
  baseSleepTimeMs  :重试间隔毫秒数
  maxRetries  :最大重试次数 @Test public void testExponentialBackoffRetry() throws Exception {     RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1000);     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("unknownHost:2181", retryPolicy);     curatorFramework.start();     TimeUnit.DAYS.sleep(1); }BoundedExponentialBackoffRetry
  重试策略,该策略重试设定的次数,重试之间的休眠时间增加(最多达到最大限制)
  BoundedExponentialBackoffRetry  继承 ExponentialBackoffRetry  ,相比与 ExponentialBackoffRetry  ,它增加了最大休眠时间的设置。
  构造方法如下: public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries) {     super(baseSleepTimeMs, maxRetries);     this.maxSleepTimeMs = maxSleepTimeMs; }
  示例如下: @Test public void testBoundedExponentialBackoffRetry() throws Exception {     RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(3000, 6000, 1000);     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("unknownHost:2181", retryPolicy);     curatorFramework.start();     TimeUnit.DAYS.sleep(1); }名称空间(Namespace)
  curator中名称空间的含义,就是设置一个公共的父级path,之后的操作全部都是基于该path。 /**  * 名称空间  * @throws Exception  */ @Test public void testCreate6() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     CuratorFramework c2 = curatorFramework.usingNamespace("namespace1");     c2.create().forPath("/node1");     c2.create().forPath("/node2"); }
  查看运行结果:
  CRUD基础创建节点
  创建节点使用 create  方法,该方法返回一个 CreateBuilder  他是一个建造者模式的类。用于创建节点。 package cn.programtalk.connection;  import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  public class CreateNodeTest {     String connectString = "172.30.140.89:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);      /**      * 创建节点      */     @Test     public void testCreate1() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         curatorFramework.create().forPath("/test");     } }
  创建完毕后,通过命令行查看节点:
  看到值是 10.112.33.229  ,可实际上我并未给节点设置值,这个值是框架默认设置的,客户端的IP。 这个默认值可以修改,此时不能使用 newClient  方法,需要使用工厂的builder自己构建设置。示例代码如下: @Test public void testCreateDefaultData() throws Exception {     CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().defaultData("默认值".getBytes(StandardCharsets.UTF_8));     CuratorFramework client = builder.connectString(connectString).retryPolicy(retryPolicy).build();     client.start();     client.create().forPath("/defaultDataTest"); }
  运行结果:
  可以看到,默认值已经被修改为 默认值  。
  创建节点时如果节点存在,则会抛出 NodeExistsException  异常
  使用forPath设置节点的值
  forPath还接收第二个参数(节点的值,字节数组类型) @Test public void testCreate2() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     curatorFramework.create().forPath("/test2", "用户自己设置的值".getBytes(StandardCharsets.UTF_8)); }
  运行结果:
  可见正确设置了值。 节点模式设置
  可以通过 withMode  方法设置节点的类型,为显示指定的节点都是持久性节点。 /**  * 设置节点类型  * @throws Exception  */ @Test public void testCreate3() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/EPHEMERAL1");     // 临时节点,会话结束就会删除,线程睡眠用于延长会话时间     TimeUnit.SECONDS.sleep(30); }
  查看结果:
  可以看到临时节点,红色框内只有临时节点该属性才是非零。 TTL时长设置
  使用 withTtl  设置时长,单位毫秒。当模式为 CreateMode.PERSISTENT_WITH_TTL 或CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL时指定 TTL。必须大于 0 且小于或等于 EphemeralType.MAX_TTL。 /**  * 测试ttl  * @throws Exception  */ @Test public void testCreate5() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     curatorFramework.create().withTtl(10000).withMode(CreateMode.PERSISTENT_WITH_TTL).forPath("/ttl1"); }
  可能出现如下错误:
  这是因为TTL默认是关闭的,需要打开(zoo.cfg中设置 extendedTypesEnabled=true  )。 再次运行: [zk: localhost:2181(CONNECTED) 8] ls / [defaultDataTest, hiveserver2_zk, test, test2, ttl1, zookeeper] #等待10秒后再次查看,ttl1节点自动被删除。 [zk: localhost:2181(CONNECTED) 9] ls / [defaultDataTest, hiveserver2_zk, test, test2, zookeeper]ACL权限
  创建节点时设置ACL,主要通过 withACL  方法设置,接收一个 List  类型的参数。
  ACL  实例对象,通过该类的构造方法创建,类似 ACL acl = new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);  /**  * 测试acl  * @throws Exception  */ @Test public void testCreate7() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     List aclList = new ArrayList<>();     ACL acl = new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);     aclList.add(acl);     curatorFramework.create().withACL(aclList).forPath("/acl1"); }
  运行结果:
  运行完毕后,通过命令行查看权限,可以看到已经设置成功。
  如果不设置ACL,默认则是 new ACL(Perms.ALL, ANYONE_ID_UNSAFE)  。 查询值
  查询数据使用 getData  方法。 /**  * 查询节点的值  * @throws Exception  */ @Test public void testGetData() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     byte[] bytes = curatorFramework.getData().forPath("/test");     System.out.println("/test节点的值是:" + new String(bytes, StandardCharsets.UTF_8)); }
  结果:
  设置值
  使用 setData  ,配合 forpath  方法。 /**  * 设置节点的值  * @throws Exception  */ @Test public void testGetData2() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     byte[] bytes = curatorFramework.getData().forPath("/test");     System.out.println("/test节点的原始值是:" + new String(bytes, StandardCharsets.UTF_8));     curatorFramework.setData().forPath("/test", "updated".getBytes(StandardCharsets.UTF_8));     bytes = curatorFramework.getData().forPath("/test");     System.out.println("/test节点的新值是:" + new String(bytes, StandardCharsets.UTF_8)); }
  运行结果是:
  获取孩子节点/**  * 获取孩子节点  * @throws Exception  */ @Test public void testGetState() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     List children = curatorFramework.getChildren().forPath("/namespace1");     children.forEach(System.out::println); }
  运行结果:
  获取ACLpackage cn.programtalk.connection;  import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.ACL; import org.junit.jupiter.api.Test;  import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.TimeUnit;  public class ACLTest {     String connectString = "172.30.140.89:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);      /**      * 获取Acl列表      */     @Test     public void testAcl1() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         List acls = curatorFramework.getACL().forPath("/test");         acls.forEach(acl -> System.out.println(acl.getId() + " " + acl.getPerms()));     }  }
  运行结果:
  删除节点
  使用 delete  ,搭配 forPath  方法,删除指定的节点。 package cn.programtalk.connection;  import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  import java.util.List;  public class DeleteNodeTest {     String connectString = "172.30.140.89:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);      /**      * 删除节点      * @throws Exception      */     @Test     public void testDelete1() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         curatorFramework.delete().forPath("/test");     } }
  程序执行完毕后,通过命令行查询 /test  可知已经被删除。
  如果被删除的节点有孩子节点,则无法删除,抛出 NotEmptyException  。
  那么如何删除包含子节点的节点呢?需要使用 deletingChildrenIfNeeded  方法 /**  * 删除节点(包含子节点)  * @throws Exception  */ @Test public void testDelete2() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     curatorFramework.delete().deletingChildrenIfNeeded().forPath("/namespace1"); }
  运行后,查看该节点
  节点已经被删除。并且级联删除了子节点。 检查节点是否存在
  使用 checkExists()  搭配 forPath  来实现,返回一个 Stat  对象信息。 package cn.programtalk.connection;  import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.Test;  public class CheckExistsTest {     String connectString = "172.30.140.89:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);      /**      * 检查是否存在      * @throws Exception      */     @Test     public void testGetState() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         Stat stat = curatorFramework.checkExists().forPath("/namespace1");         System.out.println(stat);     } }
  运行结果:
  stat  的具体信息如下:
  查看会话状态
  使用 getState()  。 package cn.programtalk.connection;  import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  import java.util.List; import java.util.concurrent.TimeUnit;  public class GetStateTest {     String connectString = "172.30.140.89:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);      /**      * 查询客户端状态      * @throws Exception      */     @Test     public void testGetState() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         CuratorFrameworkState state = curatorFramework.getState();         System.out.println("状态是" + state); // 状态是LATENT         curatorFramework.start();         state = curatorFramework.getState();         System.out.println("状态是" + state); // 状态是STARTED         curatorFramework.close();         state = curatorFramework.getState();         System.out.println("状态是" + state); // 状态是STOPPED     } }事务package cn.programtalk.connection;  import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.GetConfigBuilder; import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransaction; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.junit.jupiter.api.Test;  import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.List;  public class TransactionTest{     String connectString = "172.30.140.89:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);      /**      * 查询客户端状态      * @throws Exception      */     @Test     public void testTransaction() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         CuratorOp createOp = curatorFramework.transactionOp().create().forPath("/transaction1");         CuratorOp setDataOp = curatorFramework.transactionOp().setData().forPath("/transaction2", "transaction2".getBytes(StandardCharsets.UTF_8));         CuratorOp deleteOp = curatorFramework.transactionOp().delete().forPath("/transaction3");          List result = curatorFramework.transaction().forOperations(createOp, setDataOp, deleteOp);         result.forEach(rt -> System.out.println(rt.getForPath() + "---" + rt.getType()));     } }
  运行程序前先看下zk节点情况
  可以看到没有 transaction1  和 transaction2  和 transaction3  。
  运行程序会出现如下异常。
  出现异常则事务应该回滚,也就是说 transaction1  节点不应该创建成功。
  通过上图可知确实没有创建成功。
  接下来我通过命令长创建 /transaction2  和 /transaction3  这两个节点。
  创建完毕,并且可以看到 /transaction2  节点的值是 null  。
  重新运行程序后,不会发生异常。
  通过命令行看下事务是否完全执行成功。
  可以看到 /transaction1  节点创建成功, /transaction2  节点的值修改成功。 /transaction3  节点被删除。说明事务是有效的!
  为了演示清晰,我先清理掉所有节点。
  监听节点
  本版本中 PathChildrenCache  、 NodeCache  、 TreeCache  都已经是过期的了,官方推荐使用 CuratorCache  。
  并且api风格也更改了,改为了流式风格。
  CuratorCacheListener  提供了多种监听器,比如 forInitialized  , forCreates  等。 package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.TimeUnit;  @Slf4j public class CacheTest {     String connectString = "localhost:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);       /**      *      * @throws Exception      */     @Test     public void testCache1() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         CuratorCache curatorCache = CuratorCache.builder(curatorFramework, "/ns1").build();         CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder()                 .forInitialized(() -> {                     log.info("forInitialized回调");                     log.info("--------");                 })                  .forCreates(childData -> {                     log.info("forCreates回调执行, path=[{}], data=[{}], stat=[{}]", childData.getPath(), Objects.isNull(childData.getData()) ? null : new String(childData.getData(), StandardCharsets.UTF_8), childData.getStat());                     log.info("--------");                 })                  .forNodeCache(() -> {                     log.info("forNodeCache回调");                     log.info("--------");                 })                  .forChanges((oldNode, node) -> {                     log.info("forChanges回调, oldNode.path=[{}], oldNode.data=[{}], oldNode.stat=[{}], node.path=[{}], node.data=[{}], node.stat=[{}]", oldNode.getPath(), Objects.isNull(oldNode.getData()) ? null : new String(oldNode.getData(), StandardCharsets.UTF_8), oldNode.getStat(), node.getPath(), Objects.isNull(node.getData()) ? null : new String(node.getData(), StandardCharsets.UTF_8), node.getStat());                     log.info("--------");                 })                  .forDeletes(childData -> {                     log.info("forDeletes回调执行, path=[{}], data=[{}], stat=[{}]", childData.getPath(), Objects.isNull(childData.getData()) ? null : new String(childData.getData(), StandardCharsets.UTF_8), childData.getStat());                     log.info("--------");                 })                  .forAll((type, oldNode, node) -> {                     log.info("forAll回调");                     log.info("type=[{}]", type);                     if (Objects.nonNull(oldNode)) {                         log.info("oldNode.path=[{}], oldNode.data=[{}], oldNode.stat=[{}]", oldNode.getPath(), Objects.isNull(oldNode.getData()) ? null : new String(oldNode.getData(), StandardCharsets.UTF_8), oldNode.getStat());                     }                     if (Objects.nonNull(node)) {                         log.info("node.path=[{}], node.data=[{}], node.stat=[{}]", node.getPath(), Objects.isNull(node.getData()) ? null : new String(node.getData(), StandardCharsets.UTF_8), node.getStat());                     }                     log.info("--------");                 })                  .forCreatesAndChanges((oldNode, node) -> {                     log.info("forCreatesAndChanges回调");                     if (Objects.nonNull(oldNode)) {                         log.info("oldNode.path=[{}], oldNode.data=[{}], oldNode.stat=[{}]", oldNode.getPath(), Objects.isNull(oldNode.getData()) ? null : new String(oldNode.getData(), StandardCharsets.UTF_8), oldNode.getStat());                     }                     if (Objects.nonNull(node)) {                         log.info("node.path=[{}], node.data=[{}], node.stat=[{}]", node.getPath(), Objects.isNull(node.getData()) ? null : new String(node.getData(), StandardCharsets.UTF_8), node.getStat());                     }                     log.info("--------");                 })                 .build();         // 获取监听器列表容器         Listenable listenable = curatorCache.listenable();         // 将监听器放入容器中         listenable.addListener(curatorCacheListener);         // curatorCache必须启动         curatorCache.start();         // 延时,以保证连接不关闭         TimeUnit.DAYS.sleep(10);         curatorCache.close();     } }
  上面的代码就是创建监听节点的核心代码。
  以前的监听类型是不同的类(过期的类)实现的。现在是通过不同的forXXX方法指定的(例如: forInitialized  )。
  在测试前我将zk中的数据清理掉 [zk: localhost:2181(CONNECTED) 5] ls / [zookeeper]
  可以看到完全清理掉了。 API说明
  在测试之前要简单地说明下API的基本使用方式。curator监听主要有如下几个主要的类。 CuratorFrameworkFactory  这是简单的静态工厂类,用于创建连接zk的客户端(client),里面提供了 newClient  的多态方法,也可以使用 builder  建造者模式类创建客户端。 String connectString = "localhost:2181";
  RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  // 使用newClient方法
  CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
  // 也可以使用静态builder()方法
  CuratorFramework curatorFramework2 = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(retryPolicy).build();  CuratorCache  类,该类也有提供builder方法 CuratorCache curatorCache = CuratorCache.builder(curatorFramework, "/ns1").build();  也提供了build方法,可以像下面这样使用。 CuratorCache curatorCache = CuratorCache.builder(curatorFramework, "/ns1").build();
  curatorCache= CuratorCache.build(curatorFramework, "/ns1", CuratorCache.Options.SINGLE_NODE_CACHE, CuratorCache.Options.COMPRESSED_DATA, CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE);  CuratorCacheListener  监听器类,里面可以定义各种监听器。 测试启动
  运行上面的示例,会打印如下内容:
  可见初始化回调被调用。 创建节点
  创建CuratorCache监听的节点 /ns1  ,需要注意的是此时节点并不存在。
  命令行操作如下:
  程序输出如下:
  我们看到当创建节点的时候有四个回调函数被执行。
  结论:  当创建节点的时候 forCreates  、 forAll  、 forCreatesAndChanges  被回调。
  那么如果再创建子节点情况会是什么样的呢?比如我创建 /ns1/sub1  。
  命令行:
  控制台:
  节点创建监听器,监听类型是 CuratorCacheListener.Type.NODE_CREATED  ,创建节点的时候会触发,当创建子节点的时候也会触发 。
  结论:  创建子节点依然会回调上述所说的四个监听器。 修改数据
  修改监听的根节点 /ns1  的值
  命令行修改值:
  控制台输出:
  当修改监听根节点 /ns1  的值的时候, forChanges  、 forAll  、 forCreatesAndChanges  四个监听器被触发。
  接下来再修改其子节点的值
  控制台输出如下:
  依然回调 forChanges  、 forAll  、 forCreatesAndChanges  四个监听器函数。
  结论:  修改监听节点以及其子节点都会触发 forChanges  、 forAll  、 forCreatesAndChanges  监听器。 ACL设置
  命令行:
  控制台没有打印回调:
  结论:  设置ACL不会触发监听器。 删除节点
  首先我先删除监听节点 /ns1  下的子节点
  命令行:
  控制台:
  删除子节点的时候会触发 forDeletes  、 forNodeCache  、 forAll  执行。
  接下来再删除监听根节点 /ns1  。
  命令行:
  控制台输出:
  跟上面子节点的删除触发的监听器回调一样!
  总结:  删除监听根节点以及其子节点会触发 forDeletes  、 forAll  监听器。
  那么如果我删除的是一个父级节点呢?会出现什么情况?
  因为我之前的实验,删除了 /ns1/sub1  所以重建,重建后使用 deleteall /ns1
  命令行:
  控制台:
  可以看到,级联删除,会多次触发 forDeletes  ,根节点和其子节点的删除都会触发。同理 forAll  也会多次触发。
  总结:  对于节点的删除,无论是单个删除还是级联删除,每个节点的删除都会触发 forDeletes  、 forAll  监听器。
  那么上面这些总结对吗?起码默认情况是对的!因为缓存我使用这样的方式创建的 CuratorCache curatorCache = CuratorCache.builder(curatorFramework, "/ns1").build();  CuratorCache配置
  上面的代码中我使用的 CuratorCache curatorCache = CuratorCache.builder(curatorFramework, "/ns1").build();  会导致子节点的操作也会触发监听器,这是因为默认就是如此,当然如果想值监听一个节点,可以使用如下方法(源码如下): static CuratorCache build(CuratorFramework client, String path, Options... options) {     return builder(client, path).withOptions(options).build(); }
  第三个参数Options就可以配置。
  比如我配置就监听一个节点,就可以按照如下方式创建 CuratorCache  : CuratorCache curatorCache = CuratorCache.build(curatorFramework, "/ns1", CuratorCache.Options.SINGLE_NODE_CACHE);
  这里我传递了第三个参数 CuratorCache.Options.SINGLE_NODE_CACHE  。也就实现了只监听 /ns1  节点的功能。
  CuratorCache.Options.SINGLE_NODE_CACHE :单节点缓存
  CuratorCache.Options.COMPRESSED_DATA :通过以下方式解压缩数据 org.apache.curator.framework.api.GetDataBuilder.decompressed()
  CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE :通常,当缓存通过 关闭 close()时,存储将通过 清除 CuratorCacheStorage.clear()。此选项可防止清除存储。
  使用 CuratorCache.builder(curatorFramework, "/ns1").build()  构建的时候, CuratorCache.Options.SINGLE_NODE_CACHE=FALSE  、 CuratorCache.Options.COMPRESSED_DATA=FALAW  、 CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE=true  。
  通过Debug可以看到对应的配置如下:
  说明
  上面我使用了命令行搭配代码的方式大致测试了下监听器的类型。接下来详细说明下各种监听器的作用。 forInitialized
  初始化完毕触发,也就是说 CuratorFramework  的 start  方法执行完毕后就会被触发。 forCreates
  触发条件:  CuratorCacheListener.Type.NODE_CREATED  ,也就是说被监听节点或者子节点创建就会被触发。 forChanges
  触发条件:  CuratorCacheListener.Type.NODE_CHANGED  ,也就是说被监听节点或者子节点值修改就会被触发。 forCreatesAndChanges
  触发条件:  CuratorCacheListener.Type.NODE_CREATED 和 CuratorCacheListener.Type.NODE_CHANGED  ,也就是说被监听节点或者子节点创建或者值修改就会被触发。 forDeletes
  触发条件:  CuratorCacheListener.Type.NODE_DELETED  ,也就是说被监听节点或者子节点删除就会被触发。 forAll
  触发条件:上面的 forCreates  、 forChanges  、 forCreatesAndChanges  、 forDeletes  触发的时候都会同时触发 forAll  。 forNodeCache、forTreeCache、forPathChildrenCache
  这三个是一种桥接监听器,它允许将旧式监听器 PathChildrenCache  、 NodeCache  、 TreeCache  与 CuratorCache  重用,不过我觉得上面的那些监听器已经能够满足需求,无需使用这三个了。
  如果读者有不一样的间接,欢迎留言!!! @Test public void testCache2() throws Exception {     curatorFramework.start();     CuratorCache curatorCache = CuratorCache.bridgeBuilder(curatorFramework, "/ns1").build();     CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder()         .forNodeCache(() -> {             log.info("forNodeCache回调");             log.info("----------------------------------------");         })         .forTreeCache(curatorFramework, (client, event) -> {             log.info("forTreeCache回调");             log.info("type=[{}], data=[{}], oldData=[{}]", event.getType(), event.getData(), event.getOldData());             log.info("----------------------------------------");         })         .forPathChildrenCache("/test", curatorFramework, (client, event) -> {             log.info("forPathChildrenCache回调");             log.info("type=[{}], data=[{}], InitialData=[{}]", event.getType(), event.getData(), event.getInitialData());             log.info("----------------------------------------");         })         .build();     // 获取监听器列表容器     Listenable listenable = curatorCache.listenable();     // 将监听器放入容器中     listenable.addListener(curatorCacheListener);     // curatorCache必须启动     curatorCache.start();      TimeUnit.MILLISECONDS.sleep(500);     byte[] oldData = "A".getBytes(StandardCharsets.UTF_8);     byte[] newData = "B".getBytes(StandardCharsets.UTF_8);     // 创建根节点     curatorFramework.create().forPath("/ns1", oldData);     log.info("创建/ns1节点");     curatorFramework.create().forPath("/ns1/sub1", oldData);     log.info("创建/ns1/sub1节点");      // 修改根节点的值     curatorFramework.setData().forPath("/ns1", newData);     log.info("修改/ns1节点的值");     // 修改子节点的值     curatorFramework.setData().forPath("/ns1/sub1", newData);     log.info("修改/ns1/sub1节点的值");      // 删除子节点     curatorFramework.delete().forPath("/ns1/sub1");     log.info("删除/ns1/sub1节点");      // 删除根节点     curatorFramework.delete().forPath("/ns1");     log.info("删除/ns1节点");      curatorCache.close(); }
  运行日志如下: INFO forTreeCache回调 INFO type=[INITIALIZED], data=[null], oldData=[null] INFO ---------------------------------------- INFO forPathChildrenCache回调 INFO type=[INITIALIZED], data=[null], InitialData=[null] INFO ---------------------------------------- INFO 创建/ns1节点 INFO 创建/ns1/sub1节点 INFO forNodeCache回调 INFO ---------------------------------------- INFO forTreeCache回调 DEBUG Reading reply session id: 0x10001d2b6b70006, packet:: clientPath:/ns1/sub1 serverPath:/ns1/sub1 finished:false header:: 10,4  replyHeader:: 10,226,0  request:: "/ns1/sub1,F  response:: #41,s{226,226,1674566507592,1674566507592,0,0,0,0,1,0,226}  INFO type=[NODE_ADDED], data=[ChildData{path="/ns1", stat=225,225,1674566507586,1674566507586,0,1,0,0,1,1,226 , data=[65]}], oldData=[null] INFO ---------------------------------------- INFO forPathChildrenCache回调 INFO type=[CHILD_ADDED], data=[ChildData{path="/ns1", stat=225,225,1674566507586,1674566507586,0,1,0,0,1,1,226 , data=[65]}], InitialData=[null] INFO ---------------------------------------- INFO forNodeCache回调 INFO ---------------------------------------- INFO forTreeCache回调 INFO type=[NODE_ADDED], data=[ChildData{path="/ns1/sub1", stat=226,226,1674566507592,1674566507592,0,0,0,0,1,0,226 , data=[65]}], oldData=[null] INFO ---------------------------------------- INFO forPathChildrenCache回调 INFO type=[CHILD_ADDED], data=[ChildData{path="/ns1/sub1", stat=226,226,1674566507592,1674566507592,0,0,0,0,1,0,226 , data=[65]}], InitialData=[null] INFO ---------------------------------------- INFO 修改/ns1节点的值 INFO forNodeCache回调 INFO ---------------------------------------- INFO forTreeCache回调 INFO type=[NODE_UPDATED], data=[ChildData{path="/ns1", stat=225,227,1674566507586,1674566507601,1,1,0,0,1,1,226 , data=[66]}], oldData=[ChildData{path="/ns1", stat=225,225,1674566507586,1674566507586,0,1,0,0,1,1,226 , data=[65]}] INFO ---------------------------------------- INFO forPathChildrenCache回调 INFO type=[CHILD_UPDATED], data=[ChildData{path="/ns1", stat=225,227,1674566507586,1674566507601,1,1,0,0,1,1,226 , data=[66]}], InitialData=[null] INFO ---------------------------------------- INFO 修改/ns1/sub1节点的值 INFO forNodeCache回调 INFO ---------------------------------------- INFO forTreeCache回调 INFO type=[NODE_UPDATED], data=[ChildData{path="/ns1/sub1", stat=226,228,1674566507592,1674566507605,1,0,0,0,1,0,226 , data=[66]}], oldData=[ChildData{path="/ns1/sub1", stat=226,226,1674566507592,1674566507592,0,0,0,0,1,0,226 , data=[65]}] INFO ---------------------------------------- INFO forPathChildrenCache回调 INFO type=[CHILD_UPDATED], data=[ChildData{path="/ns1/sub1", stat=226,228,1674566507592,1674566507605,1,0,0,0,1,0,226 , data=[66]}], InitialData=[null] INFO ---------------------------------------- INFO 删除/ns1/sub1节点 INFO forNodeCache回调 INFO ---------------------------------------- INFO forTreeCache回调 INFO type=[NODE_REMOVED], data=[ChildData{path="/ns1/sub1", stat=226,228,1674566507592,1674566507605,1,0,0,0,1,0,226 , data=[66]}], oldData=[null] INFO ---------------------------------------- INFO forPathChildrenCache回调 INFO type=[CHILD_REMOVED], data=[ChildData{path="/ns1/sub1", stat=226,228,1674566507592,1674566507605,1,0,0,0,1,0,226 , data=[66]}], InitialData=[null] INFO ---------------------------------------- INFO forNodeCache回调 INFO ---------------------------------------- INFO forTreeCache回调 INFO type=[NODE_REMOVED], data=[ChildData{path="/ns1", stat=225,227,1674566507586,1674566507601,1,1,0,0,1,1,226 , data=[66]}], oldData=[null] INFO ---------------------------------------- INFO forPathChildrenCache回调 INFO 删除/ns1节点 INFO type=[CHILD_REMOVED], data=[ChildData{path="/ns1", stat=225,227,1674566507586,1674566507601,1,1,0,0,1,1,226 , data=[66]}], InitialData=[null] INFO ----------------------------------------
  这里有个问题, CuratorCache.bridgeBuilder(curatorFramework, "/ns1").build()  设置监听的是 /ns1  ,后面又通过 .forPathChildrenCache("/test", curatorFramework, (client, event) -> {  设置了监听 /test  ,那么到底监听哪个?从日志上看是监听了 /ns  ,那为什么要设置 /test  ,是API设计问题?还是我用错了?欢迎交流!!! 计数器Shared Counter
  ShareCount  是 curator  的一个共享计数器,所有监视同一路径的客户端都将具有共享整数的最新值(考虑到 ZK 的一致性保证)。
  主要涉及三个类 ShareCount  、 SharedCountReader  ,  SharedCountListener  。以下是基本使用方法 package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.SharedCountListener; import org.apache.curator.framework.recipes.shared.SharedCountReader; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;  @Slf4j public class ShareCountTest {     // 连接地址     public static final String CONNECT_STRING = "172.24.246.68:2181";      public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);      private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();      @Test     public void testShareCount() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_STRING, RETRY_POLICY);         curatorFramework.start();         SharedCount sharedCount = new SharedCount(curatorFramework, "/ShareCount", 0);         sharedCount.start();         sharedCount.addListener(new SharedCountListener() {             @Override             public void countHasChanged(SharedCountReader sharedCountReader, int newCount) throws Exception {                 log.info("countHasChanged callback");                 log.info("newCount={}", newCount);             }              @Override             public void stateChanged(CuratorFramework client, ConnectionState newState) {              }         }, EXECUTOR_SERVICE);         sharedCount.setCount(1);         TimeUnit.DAYS.sleep(1);         sharedCount.close();     } }
  运行结果:
  成功获取到了监听的值。 Distributed Atomic Long
  尝试原子增量的计数器。它首先尝试使用乐观锁定。如果失败,则采用可选的 InterProcessMutex。对于乐观和互斥锁,使用重试策略重试增量。
  有两个构造方法:
  public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy)  采用乐观模式。 package cn.programtalk;  import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; import org.apache.curator.retry.RetryForever; import org.junit.jupiter.api.Test;  @Slf4j public class DistributedAtomicLongTest {     @SneakyThrows     @Test     public void testDistributedAtomicLong1() {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("172.24.246.68:2181", new RetryForever(1000));         curatorFramework.start();         DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(curatorFramework, "/DistributedAtomicLong", new RetryForever(1000));         AtomicValue longAtomicValue = distributedAtomicLong.get();         log.info("1. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());         // 设置初始值,如果节点已经存在,则会返回false.         boolean succeed = distributedAtomicLong.initialize(0L);         log.info("initialize succeed? {}", succeed);         longAtomicValue = distributedAtomicLong.get();         log.info("2. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());         // add 将增量添加到当前值并返回新值信息。请记住始终检查 AtomicValue.succeeded().         distributedAtomicLong.add(10L);         longAtomicValue = distributedAtomicLong.get();         log.info("3. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());         // subtract 从当前值中减去增量并返回新值信息。请记住始终检查 AtomicValue.succeeded().         distributedAtomicLong.subtract(1L);         longAtomicValue = distributedAtomicLong.get();         log.info("4. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());         // increment 加一         distributedAtomicLong.increment();         longAtomicValue = distributedAtomicLong.get();         log.info("5. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());         // decrement 减一         distributedAtomicLong.decrement();         longAtomicValue = distributedAtomicLong.get();         log.info("6. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());     } }
  运行结果: INFO 1. preValue=0, postValue=0, succeeded=true INFO initialize succeed? true INFO 2. preValue=0, postValue=0, succeeded=true INFO 3. preValue=10, postValue=10, succeeded=true INFO 4. preValue=9, postValue=9, succeeded=true INFO 5. preValue=10, postValue=10, succeeded=true INFO 6. preValue=9, postValue=9, succeeded=true
  另外一个构造方法,提供类锁的模式,在互斥升级模式下创建。将首先使用给定的重试策略尝试乐观锁定。如果增量不成功, InterProcessMutex 将使用自己的重试策略尝试。 @SneakyThrows @Test public void testDistributedAtomicLong2() {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("172.24.246.68:2181", new RetryForever(1000));     curatorFramework.start();     DistributedAtomicLong distributedAtomicLong;     distributedAtomicLong = new DistributedAtomicLong(curatorFramework, "/DistributedAtomicLong"                                                       , new RetryForever(1000)                                                       , PromotedToLock.builder().lockPath("/DistributedAtomicLongPromotedToLock").timeout(3000, TimeUnit.MILLISECONDS).retryPolicy(new RetryOneTime(1000)).build());     AtomicValue longAtomicValue = distributedAtomicLong.get();     log.info("1. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());     // 设置初始值,如果节点已经存在,则会返回false.     boolean succeed = distributedAtomicLong.initialize(0L);     log.info("initialize succeed? {}", succeed);     longAtomicValue = distributedAtomicLong.get();     log.info("2. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());     // add 将增量添加到当前值并返回新值信息。请记住始终检查 AtomicValue.succeeded().     distributedAtomicLong.add(10L);     longAtomicValue = distributedAtomicLong.get();     log.info("3. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());     // subtract 从当前值中减去增量并返回新值信息。请记住始终检查 AtomicValue.succeeded().     distributedAtomicLong.subtract(1L);     longAtomicValue = distributedAtomicLong.get();     log.info("4. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());     // increment 加一     distributedAtomicLong.increment();     longAtomicValue = distributedAtomicLong.get();     log.info("5. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded());     // decrement 减一     distributedAtomicLong.decrement();     longAtomicValue = distributedAtomicLong.get();     log.info("6. preValue={}, postValue={}, succeeded={}", longAtomicValue.preValue(), longAtomicValue.postValue(), longAtomicValue.succeeded()); }
  运行结果: INFO 1. preValue=9, postValue=9, succeeded=true INFO initialize succeed? false INFO 2. preValue=9, postValue=9, succeeded=true INFO 3. preValue=19, postValue=19, succeeded=true INFO 4. preValue=18, postValue=18, succeeded=true INFO 5. preValue=19, postValue=19, succeeded=true INFO 6. preValue=18, postValue=18, succeeded=true锁
  使用ZK可以实现分布式锁功能。 Shared Reentrant Lock(InterProcessMutex)基本说明
  全局同步的完全分布式锁,这意味着没有两个客户端可以同时持有相同的锁。
  其提供了如下构造方法 public InterProcessMutex(CuratorFramework client, String path) {     this(client, path, new StandardLockInternalsDriver()); }
  这里有两个参数 client  :CuratorFramework客户端, path  :zookeeper的node,抢锁路径,同一个锁path需一致。 public void testLock1() throws Exception {     curatorFramework.start();     // 定义锁     InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/program-talk-lock");     // 获取锁     lock.acquire();     log.info("此处是业务代码");     // 模拟业务执行30秒     TimeUnit.SECONDS.sleep(30);     // 释放锁     lock.release(); }
  某个时刻,查看Zk的节点,可以看到如下所示内容。
  当执行完毕的时候,如果正常释放锁,则会清理到对应的信息。
  JavaDoc文档中其实有说跨JVM的锁,那么同一个JVM中多线程使用这个锁可以吗,可以! package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;  import java.util.concurrent.TimeUnit;  @Slf4j public class InterProcessMutexThreadTest implements Runnable {     String connectString = "localhost:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);       @Override     public void run() {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         // 定义锁         InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/InterProcessMutex");         try {             lock.acquire();             String threadName = Thread.currentThread().getName();             log.info("{} ,执行业务代码开始", threadName);             TimeUnit.SECONDS.sleep(10);             log.info("{} ,执行业务代码完毕", threadName);         } catch (Exception e) {             e.printStackTrace();         } finally {             try {                 lock.release();             } catch (Exception e) {                 e.printStackTrace();             }         }     }      public static void main(String[] args) {         InterProcessMutexThreadTest task = new InterProcessMutexThreadTest();         Thread t1 = new Thread(task, "任务1");         Thread t2 = new Thread(task, "任务2");         t1.start();         t2.start();     } }
  运行结果如下: INFO 任务1 ,执行业务代码开始 DEBUG Reading reply session id: 0x100000022e20032, packet:: clientPath:null serverPath:null finished:false header:: 8,4  replyHeader:: 8,458,0  request:: "/InterProcessMutex/_c_4929b7d6-6c6b-4a9a-ae48-5315dc67523e-lock-0000000000,T  response:: #3139322e3136382e31302e31,s{457,457,1674654986411,1674654986411,0,0,0,72057594623164465,12,0,457}  INFO 任务1 ,执行业务代码完毕 DEBUG Got notification session id: 0x100000022e20032 DEBUG Reading reply session id: 0x100000022e20031, packet:: clientPath:null serverPath:null finished:false header:: 8,2  replyHeader:: 8,459,0  request:: "/InterProcessMutex/_c_4929b7d6-6c6b-4a9a-ae48-5315dc67523e-lock-0000000000,-1  response:: null DEBUG Got ping response for session id: 0x100000022e20031 after 5ms. DEBUG Got WatchedEvent state:SyncConnected type:NodeDeleted path:/InterProcessMutex/_c_4929b7d6-6c6b-4a9a-ae48-5315dc67523e-lock-0000000000 for session id 0x100000022e20032 DEBUG Got ping response for session id: 0x100000022e20032 after 2ms. DEBUG Reading reply session id: 0x100000022e20032, packet:: clientPath:null serverPath:null finished:false header:: 9,12  replyHeader:: 9,459,0  request:: "/InterProcessMutex,F  response:: v{"_c_284a2de1-37d1-42c4-b818-3d2206a50c34-lock-0000000001},s{455,455,1674654986408,1674654986408,0,3,0,0,0,1,459}  INFO 任务2 ,执行业务代码开始 INFO 任务2 ,执行业务代码完毕 DEBUG Reading reply session id: 0x100000022e20032, packet:: clientPath:null serverPath:null finished:false header:: 10,2  replyHeader:: 10,460,0  request:: "/InterProcessMutex/_c_284a2de1-37d1-42c4-b818-3d2206a50c34-lock-0000000001,-1  response:: null DEBUG Got ping response for session id: 0x100000022e20032 after 7ms. Disconnected from the target VM, address: "127.0.0.1:58751", transport: "socket"  Process finished with exit code 0
  可以看到两个任务是顺序执行的,不过单个JVM基本不使用分布式锁,JDK内置的锁即可! 定义锁
  正如上面所说的那样,通过构造方法去定义一个可重入排它锁, InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/program-talk-lock");  。 获取锁
  获取锁有两种方法,一种是使用上面所使用的 lock.acquire();  ,这是个无参函数,他会一直尝试获取锁,如果获取不到则会一直阻塞。
  另外一种是使用 public boolean acquire(long time, TimeUnit unit) throws Exception  ,不同于上面那个,这个不会一直阻塞, time  是时间参数, unit  是时间的单位。超过这个时间则会放弃获取锁。
  示例代码如下:  lock.acquire(10, TimeUnit.SECONDS);
  此代码的意思就是如果在10秒内能获取到锁则返回 true  ,超过10秒获取不到则返回 false  。不会一直阻塞。 释放锁
  当程序执行完毕后必须释放锁,释放锁使用 release()  方法。 可重入性package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  import java.util.concurrent.TimeUnit;  /**  *  InterProcessMutex 锁的可重入性测试  */ @Slf4j public class InterProcessMutexReentrantTest {     String connectString = "localhost:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/InterProcessMutexReentrantTest");     void a() throws Exception {         lock.acquire();         log.info("a方法执行");         b();         lock.release();     }     void b() throws Exception {         lock.acquire();         log.info("b方法执行");         lock.release();     }      @Test     public void test() throws Exception {         curatorFramework.start();         a();     } }
  上面的代码中,a函数调用b函数,并且a和b都是用了同一个锁。执行结果如下:
  程序正常执行,说明锁具备可重入性。 Shared Lock(InterProcessSemaphoreMutex)基本说明
  InterProcessSemaphoreMutex  也是一个排它锁,不同于 InterProcessMutex  的是, InterProcessSemaphoreMutex  不是一个可重入锁。
  使用方法(定义锁、获取锁、释放锁)跟 InterProcessMutex  没有太大区别。
  代码示例: @Test public void testLock3() throws Exception {     curatorFramework.start();     // 定义锁     InterProcessLock lock = new InterProcessSemaphoreMutex(curatorFramework, "/InterProcessSemaphoreMutex");     // 获取锁     try {         boolean got = lock.acquire(30, TimeUnit.SECONDS);         if (got) {             log.info("此处是业务代码");             // 模拟业务执行30秒             TimeUnit.SECONDS.sleep(30);         } else {             log.warn("未获取到锁");         }     }catch (Exception e) {         e.printStackTrace();     }     finally {         // 释放锁         lock.release();     } }
  某个时刻,查看Zk的节点,可以看到如下所示内容。
  可重入性package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.Test;  /**  *  InterProcessSemaphoreMutex 锁的可重入性测试  */ @Slf4j public class InterProcessSemaphoreMutexReentrantTest {     String connectString = "localhost:2181";     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curatorFramework, "/InterProcessSemaphoreMutex");     void a() throws Exception {         lock.acquire();         log.info("a方法执行");         b();         lock.release();     }     void b() throws Exception {         lock.acquire();         log.info("b方法执行");         lock.release();     }      @Test     public void test() throws Exception {         curatorFramework.start();         a();     } }
  运行效果如下图:
  不会正常执行完毕,会一直锁住,说明此锁不具备可重入性。 Shared Reentrant Read Write Lock(InterProcessReadWriteLock)基本说明
  InterProcessReadWriteLock  是类似JDK的 ReentrantReadWriteLock  . 一个读写锁管理一对相关的锁。 一个负责读操作,另外一个负责写操作。 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。 此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。 这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。 从读锁升级成写锁是不成的。
  读锁和写锁有如下关系: 读写互斥 写写互斥 读读不互斥
  重入性
  读写锁是可以重入的,意味着你获取了一次读锁/写锁,那么你可以再次获取。但是要记得最后释放锁,获取了几次就得释放几次。 定义锁// 定义读锁 InterProcessReadWriteLock lock  = new InterProcessReadWriteLock(curatorFramework, "/InterProcessReadWriteLock");获取锁InterProcessReadWriteLock lock  = new InterProcessReadWriteLock(curatorFramework, "/InterProcessReadWriteLock"); // 获取读锁 InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); // 获取写锁 InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();释放锁
  同样使用 release()  释放锁 writeLock.release(); readLock.release();测试
  读写互斥
  读写互斥就是说,当写的时候(写锁没有释放的时候,无法读取)。 package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryForever; import org.junit.jupiter.api.Test;  import java.util.concurrent.TimeUnit;  @Slf4j public class InterProcessReadWriteLockTest {     String connectString = "172.24.246.68:2181";     RetryPolicy retryPolicy = new RetryForever(1000);      @Test     public void testRead() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         InterProcessReadWriteLock lock  = new InterProcessReadWriteLock(curatorFramework, "/InterProcessReadWriteLock");         InterProcessReadWriteLock.ReadLock readLock = lock.readLock();         readLock.acquire();         log.info("读成功");         readLock.release();     }      @Test     public void testWrite() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         InterProcessReadWriteLock lock  = new InterProcessReadWriteLock(curatorFramework, "/InterProcessReadWriteLock");         InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();         writeLock.acquire();         TimeUnit.SECONDS.sleep(30);         log.info("写成功");         writeLock.release();     } }
  testRead  方法是读, testWrite  方法是写, testWrite  我休眠了30秒,主要是为了锁释放慢一点,来测试是否可读。
  首先运行 testWrite  ,然后运行 testRead  (不到超过30后再运行,为了保证此时写锁并没有释放)。
  在读锁没有释放之前,运行效果图如下:
  可以看到读也阻塞着,等待一段时间后,写锁释放,读也就不会继续阻塞,运行完毕。
  写写互斥
  运行两次 testWrite  方法,要保证多实例运行。idea需要设置。按照下图设置。
  接下来运行 testWrite  方法。
  第一个没运行完,第二个也会阻塞。
  读读不互斥
  我就不具体测试了,道理一样。
  可重入性 public void testWrite() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     InterProcessReadWriteLock lock  = new InterProcessReadWriteLock(curatorFramework, "/InterProcessReadWriteLock");     InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();     writeLock.acquire();     writeLock.acquire();     log.info("写成功");     writeLock.release();     writeLock.release(); }
  程序能够正常执行完毕,说明具备可重入性。 使用场景
  分布式读写锁适用于读多写少的情况。 Multi Shared Lock(InterProcessMultiLock)基本说明
  InterProcessMultiLock  是多锁的意思,相当于一个容器,包含了多个锁。统一管理,一起获取锁,一起释放锁。
  定义锁
  他有两个构造方法。
  InterProcessMultiLock(CuratorFramework, List)  创造的是一个 InterProcessMutex  的锁。 package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMultiLock; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.curator.retry.RetryForever; import org.junit.jupiter.api.Test;  import java.util.List;  @Slf4j public class InterProcessMultiLockTest {     String connectString = "172.24.246.68:2181";     RetryPolicy retryPolicy = new RetryForever(1000);      @Test     public void testInterProcessMultiLock1() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         InterProcessMultiLock lock  = new InterProcessMultiLock(curatorFramework, List.of("/InterProcessMultiLock1", "/InterProcessMultiLock2"));         lock.acquire();         log.info("读成功");         lock.release();     } }
  运行后,从命令行看:
  创建了两个节点。
  两外一个构造方法是 InterProcessMultiLock(List locks)  ,它则允许任何实现 InterProcessLock  的锁。 @Test public void testInterProcessMultiLock2() throws Exception {     CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);     curatorFramework.start();     List mutexes = Lists.newArrayList();     InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/InterProcessMultiLock3");     mutexes.add(interProcessMutex);     InterProcessSemaphoreMutex interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(curatorFramework, "/InterProcessMultiLock4");     mutexes.add(interProcessSemaphoreMutex);      InterProcessMultiLock lock  = new InterProcessMultiLock(mutexes);     lock.acquire();     log.info("读成功");     lock.release(); }
  运行结果:命令行查看。
  Shared Semaphore(InterProcessSemaphoreV2)基本说明
  InterProcessSemaphoreV2  是一个信号量,跨JVM工作,多个客户端使用通过一个path则会统一受到进程间租约限制。这个信号量是公平的,会按照顺序获得租约。
  直白点说: InterProcessSemaphoreV2  就类似JDK中的 Semaphore  , Semaphore  用于控制进入临界区的线程数,而 InterProcessSemaphoreV2  是跨JVM的而已。
  有两个构造方法: // 最大租约是由给定路径的用户维护的约定。 public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) // SharedCountReader 用作给定路径的信号量的方法,以确定最大租约。 public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count)
  第一个我们就叫做 INT类型构造方法  ,第二个叫做 SharedCountReader类型构造方法  。他们是有区别的,接下来我通过图片加描述的方式来说明下:
  这两个构造方法的主要区别在第三个参数上,前者是 int  类型,后者是 SharedCountReader  类型,也就是说前者不是分布式共享的数,后者是共享的。
  INT类型构造方法
  INT类型构造方法  的 maxLeases  参数是用户传递进入构造方法中的,也就是说在JVM中直接设置,那么就有可能出现在JVM1中设置的是2,在JVM2中设置的是3,并且Curator明确说明不会检查,这就可能出现,两个JVM中最大规约不一致导致出现问题。所以使用者必须保证设置相同。
  SharedCountReader类型构造方法
  SharedCountReader类型构造方法  不是直接设置,而是区Zookeeper中去获取(相当多个JVM有相同的租约存储地址),然后加载设置到JVM中。并且该种方式会有 SharedCount  的监听器。
  两者实现的功能是一样的,也都跨JVM!!! 代码示例package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2; import org.apache.curator.framework.recipes.locks.Lease; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.retry.RetryForever; import org.junit.jupiter.api.Test;  @Slf4j public class InterProcessSemaphoreV2Test {     static String connectString = "172.24.246.68:2181";     static RetryPolicy retryPolicy = new RetryForever(10000);      @Test     public void testInterProcessSemaphoreV21() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         InterProcessSemaphoreV2 interProcessSemaphoreV2 = new InterProcessSemaphoreV2(curatorFramework, "/InterProcessSemaphoreV21", 3);         Lease lease = null;         try {             lease = interProcessSemaphoreV2.acquire();             log.info("{} 获取到租约", Thread.currentThread().getName());         } catch (Exception e) {             e.printStackTrace();         } finally {             // 为了测试租约等待情况,我不释放租约             //interProcessSemaphoreV2.returnLease(lease);             //log.info("{} 释放掉租约", Thread.currentThread().getName());         }         while (true) {          }     }      @Test     public void testInterProcessSemaphoreV22() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);         curatorFramework.start();         SharedCount sharedCount = new SharedCount(curatorFramework, "/InterProcessSemaphoreV22_SharedCount", 3);         sharedCount.start();         InterProcessSemaphoreV2 interProcessSemaphoreV2 = new InterProcessSemaphoreV2(curatorFramework, "/InterProcessSemaphoreV22", sharedCount);         Lease lease = null;         try {             lease = interProcessSemaphoreV2.acquire();             log.info("{} 获取到租约", Thread.currentThread().getName());         } catch (Exception e) {             e.printStackTrace();         } finally {             // 为了测试租约等待情况,我不释放租约             //interProcessSemaphoreV2.returnLease(lease);             //log.info("{} 释放掉租约", Thread.currentThread().getName());         }         while (true) {          }     } }
  testInterProcessSemaphoreV21  方法,使用的是 INT类型构造方法  , testInterProcessSemaphoreV22  使用的是 SharedCountReader类型构造方法  。
  因为两者功能一样,我就使用 testInterProcessSemaphoreV22  进行测试。
  使用IDEA运行 testInterProcessSemaphoreV22  (多实例运行)四次。
  截图如下:
  第一次:
  第二次:
  第三次:
  第四次:
  前三次都能够正常执行(正常打印),第四次次一直在等待获取租约,没有问题,因为我信号量设置的最大租约就是3!。 屏障(Barriers)Barrier
  DistributedBarrier  分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。
  创建屏障
  DistributedBarrier  提供了一个构造方法。
  使用者通过构造方法直接 new  即可。
  设置屏障
  解除屏障
  代码示例package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.RetryForever; import org.junit.jupiter.api.Test;  import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;  @Slf4j public class DistributedBarrierTest {     @Test     public void testDistributedBarrier() throws Exception {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("172.24.246.68:2181", new RetryForever(1000));         curatorFramework.start();         // 创建DistributedBarrier         DistributedBarrier distributedBarrier = new DistributedBarrier(curatorFramework, "/DistributedBarrier");         // setBarrier的功能是创建path         distributedBarrier.setBarrier();         ExecutorService executorService = Executors.newCachedThreadPool();         for (int i = 0; i < 10; i++) {             executorService.submit(() -> {                 try {                     String threadName = Thread.currentThread().getName();                     log.info("{}线程设置屏障", threadName);                     distributedBarrier.waitOnBarrier();                     log.info("屏障被移除,{}线程继续执行", threadName);                 } catch (Exception e) {                     throw new RuntimeException(e);                 }             });         }         TimeUnit.SECONDS.sleep(5);         log.info(">>移除屏障<<");         distributedBarrier.removeBarrier();         while (true){          }     } }
  运行结果:  2023-01-31 14:49:05 [pool-4-thread-7] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-7线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-5] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-5线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-1] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-1线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-8] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-8线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-10] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-10线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-9] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-9线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-3] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-3线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-2] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-2线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-6] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-6线程设置屏障 2023-01-31 14:49:05 [pool-4-thread-4] INFO cn.programtalk.DistributedBarrierTest - pool-4-thread-4线程设置屏障 2023-01-31 14:49:10 [main] INFO cn.programtalk.DistributedBarrierTest - >>移除屏障<< 2023-01-31 14:49:19 [pool-4-thread-7] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-7线程继续执行 2023-01-31 14:49:20 [pool-4-thread-5] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-5线程继续执行 2023-01-31 14:49:20 [pool-4-thread-1] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-1线程继续执行 2023-01-31 14:49:21 [pool-4-thread-8] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-8线程继续执行 2023-01-31 14:49:21 [pool-4-thread-10] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-10线程继续执行 2023-01-31 14:49:21 [pool-4-thread-9] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-9线程继续执行 2023-01-31 14:49:21 [pool-4-thread-3] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-3线程继续执行 2023-01-31 14:49:21 [pool-4-thread-2] INFO cn.programtalk.DistributedBarrierTest - 屏障被移除,pool-4-thread-2线程继续执行
  线程任务中设置了屏障,主线程等了5秒,之后解除了屏障,屏障解除后,所有线程继续执行后面的代码。 DistributedDoubleBarrier
  DistributedDoubleBarrier  双重屏障能够让客户端在任务的开始和结束阶段更好的同步控制。 当有足够的任务已经进入到屏障后,一起开始,一旦任务完成则离开屏障。
  不同于 DistributedBarrier  , DistributedDoubleBarrier  允许设置一个阈值数量(只是个阈值,不是个限制。),只有数目 大于等于 设置的这个阈值后才会继续执行,特别强调是 大于等于!!! 。 创建屏障
  进入屏障
  离开屏障
  代码示例package cn.programtalk;  import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier; import org.apache.curator.retry.RetryForever; import org.junit.jupiter.api.Test;  import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;  @Slf4j public class DistributedDoubleBarrierTest {     @Test     public void testDistributedDoubleBarrier() {         CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("172.24.246.68:2181", new RetryForever(1000));         curatorFramework.start();         ExecutorService executorService = Executors.newCachedThreadPool();         for (int i = 0; i < 10; i++) {             executorService.submit(() -> {                 try {                     // 创建distributedDoubleBarrier                     DistributedDoubleBarrier distributedDoubleBarrier = new DistributedDoubleBarrier(curatorFramework, "/DistributedDoubleBarrier", 2);                     distributedDoubleBarrier.enter();                     String threadName = Thread.currentThread().getName();                  ...

民间故事女子丧夫,上坟时她救下蟒蛇,蟒蛇说你的丈夫没有死宋朝时有个叫三水村的地方,这里有位叫阿梅的女子。在阿梅小时候,村子里遭遇大旱,所有村民都束手无策。一天晚上,阿梅做梦,梦里有人告诉她山上有龙骨,你们要好好将它安葬。她将事情与大家说女子家中闯进大笨鸟,单翅扶墙摆造型,人称牢底坐穿兽2021年9月的一天晚上,广西北海一女子正在家里睡大觉。突然,阳台上传来一声巨大的异响,有什么东西破窗而入。女子被惊醒,心里带着慌,手里拿着棍,拧开隔壁门,打开床头灯。开灯的一霎那为什么五家大型国企同时宣布退出美国资本市场8月12日,中国石油中国石化中国人寿中国铝业上海石油化工股份有限公司发布公告,宣布将美国存托股从纽交所退市。在现在中美关系的非常时期,五家大型国企同时宣布退市的背后说明了什么?一对与中国唱反调?欧洲两国宣布退出171合作机制,美国表示支持这段时间,佩洛西窜访台湾成为国际上备受关注的事件,中美两国也因此展开了激烈交锋,在中方发出严厉警告之后,以美方为首的西方国家却无视中方警告,继续在台海问题上挑衅,近期又有一些国家跟强盗美国!偷窃叙利亚石油,侵占阿富汗资产,几十名经济学家抗议文吴军捷工作室李不言美国原本就是强盗流氓起家,骨子里的劣根性是改不了的。虽然美国的历史与中国相比实在短暂,仅仅只有两百多年,但在这两百多年间,仅仅只有16年是没有打仗,足以说明这个美国芯片法案通过,遏制中国意图明显,违背规律,美国胜算有多大根据相关媒体报道,2022年8月9日,美国2022年芯片与科学法案经拜登总统签署,正式生效。这个法分成两大部分。第1部分,针对在美国本土制造芯片的产业进行补贴。这个补贴也分成两大块中国女富豪何巧女有钱到美国捐96亿,无钱偿还国内的200亿钱款2017年,一则浙江女首富何巧女赴美豪捐15亿美元的消息火遍网络,在庞大的网民帮助下,何巧云的信息很快便被公布出来。她是一家上市公司的老总,其身价高达300亿人民币左右,而这次豪捐蔡英文是美国遏制中国崛起最后的棋子中国的崛起,让美国感到害怕和震惊。在上面的文章中,提到了美国把不听话的安倍干掉以后,接下来就是扶持蔡英文,鼓动台海形势紧张。时间很紧迫。因为美国拖不起,时间的窗口在我们这里,中国的中方宣布停止禁毒合作,美国大呼不能接受,佩洛西还为大麻开绿灯据环球时报报道,由于美国众议院议长佩洛西执意窜访台湾,中方态度强硬,发布了8项针对美国的反制措施,而其中一项就是停止了中美的禁毒合作,包括打击非法芬太尼的走私。中方的这项反制措施对历史总会重演,一战前后把美国按地摩擦的英国,现在是美国的小弟一战前后,作为全球第一大国,日不落帝国,英国对美国的崛起,采取了从军事到经济,从技术到民生的全面遏制。一,企图拆分美国美国南北战争时期,英国扶植南方,源源不断提供各种武器物资,英军丑闻彻底曝光!佩洛西的儿子竟然持有某中国科技公司70万股份特朗普在推特上曝光佩洛西只会捞钱的时候,人们还不知道怎么回事。突然之间,佩洛西的儿子冒到台前,佩洛西53岁的儿子保罗竟然成了最耀眼的捞金客不仅仅持有中国某科技公司的巨额股份而且是以
黄果树推进打造世界旅游目的地打造世界级旅游目的地,黄果树旅游区正在紧锣密鼓地推进。2月以来,旅游区管委会一方面联系中科院规划院专家文旅基金专家开展头脑风暴一方面对照先进地区学经验,形成世界级旅游目的地参考资料原来,女人彻底动情后,是控制不住的文森屿鹿林感情里,真正的动情,是难以被控制的。心里住进了某个人,动了情,入了心,便无法再压制住对你的心动感,被你所吸引,忍不住将你靠近。或许,在遇见你之前,她一个人自由自在,并没有不回你信息的女人图片来自网络,图文无关如果一个女人不回你的信息,你就真的没必要去打扰她了。你去找她,反而显得你卑微,显得你掉价。一个女人,一天不回你信息,有可能是因为忙,一周不回你信息,一个月不回散文人生路上,懂得珍惜和感恩,迎面而来的都是花开的芬芳作者子墨又是一年春好处,春风又绿江南岸,春风十里,为花开作序,春雨绵绵,润泽了春色,温柔了时光,人间的美好,也在这个季节安家落户。晴好的春日里,独自静坐窗前,身边有一卷书一盏茶陪伴人和人相处,就是一个字!人,最怕的是激情后的陌生,信任后的欺骗付出后的心寒,掏心后的背叛温柔后的冷漠,陪伴后的厌倦深情后的无视,转身后的凉薄人,最怕的是,情深义重,换来虚情假意辛苦付出,换来毫不在乎竭尽全短评合力托举冰雪梦20222023赛季全国冬季两项锦标赛冠军赛短评合力托举冰雪梦彭雯2月19日,20222023赛季全国冬季两项冠军赛在甘肃白银国家雪上项目训练基地开赛。时隔九天,继锦标赛后,精彩在塞外明珠乌梁素海迎来今春首批北归候鸟记者从内蒙古巴彦淖尔市林草局等部门获悉,随着天气转暖,乌梁素海迎来今年开春以来首批北归候鸟。乌梁素海迎来今春首批北归候鸟。新华社记者李云平摄乌梁素海位于巴彦淖尔市乌拉特前旗境内,是人工智能大数据鸟类智慧监测为迁徙候鸟提供技术保障如何进一步保护湿地,给候鸟更好的栖息环境?鸟类智慧监测等先进技术怎样实际应用?2月18日至19日,洞庭湖候鸟保护国际论坛暨国家林业和草原局候鸟动态监测保护国家创新联盟年会在岳阳举行相遇相知相恋相守之地溱水湿地文化公园游览杂记初春乍寒,丝丝寒气弥漫空中,融融阳光把大地涂抹成灰黄色,凋零的树木还没萌芽,显的孤独萧条。值此和好友骑行三十多公里游玩溱水湿地公园。溱水湿地公园位于淮河支流历史文化溱水河畔,北起张本周周初以晴冷天气为主,周后期气温略有回升,还有一次降水,冬春交换时节,谨防感冒摘要本周上海以过程性天气为主,周初晴冷当道,周三白天起气温略有回升,冬春交替,谨防感冒!本周上海主要以过程性天气为主,周初受到冷空气影响,晴冷为主,气温一路下跌,周二气温仅在47,白天鹅宾馆,改革开放的旗帜,专属引桥却与广州沙面景区不协调白天鹅宾馆(网上图片)白天鹅宾馆开放的旗帜,引桥却与沙面不协调庞力行白天鹅宾馆坐落于广州市沙面的白鹅潭旁,坐北向南,凭栏观望珠江白鹅潭风景,可谓得天独厚!宾馆由香港霍英东先生与广东