Springboot基于Apache Curator框架的ZooKeeper使用详解

一 简介

Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。通过查看官方文档,可以发现Curator主要解决了三类问题:

  • 封装ZooKeeper client与ZooKeeper server之间的连接处理
  • 提供了一套Fluent风格的操作API
  • 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装

Curator主要从以下几个方面降低了zk使用的复杂性:

  • 重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几* 种标准的重试策略(比如指数补偿)
  • 连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
  • zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性
  • 各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况

二、引入Maven

    <!-- zookeeper 框架  curator-->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.7.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>5.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-client</artifactId>
        <version>5.2.0</version>
    </dependency>

三、设置Config

@Component
public class ZookeeperConfig {

    @Bean
    public CuratorFramework zookeeperClient() {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                //.namespace("test")
                .build();
        client.start();
        return client;
    }
}

四、基于Curator的ZooKeeper基本用法

基本操作

  • orSetData()方法:如果节点存在则Curator将会使用给出的数据设置这个节点的值,相当于 setData() 方法

  • creatingParentContainersIfNeeded()方法:如果指定节点的父节点不存在,则Curator将会自动级联创建父节点

  • guaranteed()方法:如果服务端可能删除成功,但是client没有接收到删除成功的提示,Curator将会在后台持续尝试删除该节点

  • deletingChildrenIfNeeded()方法:如果待删除节点存在子节点,则Curator将会级联删除该节点的子节点

    //创建永久节点 client.create().forPath("/curator", "/curator data".getBytes());

    //创建永久有序节点 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential", "/curator_sequential data".getBytes());

    //创建临时节点 client.create().withMode(CreateMode.EPHEMERAL) .forPath("/curator/ephemeral", "/curator/ephemeral data".getBytes());

    //创建临时有序节点 client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator/ephemeral_path1", "/curator/ephemeral_path1 data".getBytes());

    client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator/ephemeral_path2", "/curator/ephemeral_path2 data".getBytes());

    //测试检查某个节点是否存在 Stat stat1 = client.checkExists().forPath("/curator"); Stat stat2 = client.checkExists().forPath("/curator2");

    System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false)); System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false));

    //获取某个节点的所有子节点 System.out.println(client.getChildren().forPath("/"));

    //获取某个节点数据 System.out.println(new String(client.getData().forPath("/curator")));

    //设置某个节点数据 client.setData().forPath("/curator", "/curator modified data".getBytes());

    //创建测试节点 client.create().orSetData().creatingParentContainersIfNeeded() .forPath("/curator/del_key1", "/curator/del_key1 data".getBytes());

    client.create().orSetData().creatingParentContainersIfNeeded() .forPath("/curator/del_key2", "/curator/del_key2 data".getBytes());

    client.create().forPath("/curator/del_key2/test_key", "test_key data".getBytes());

    //删除该节点 client.delete().forPath("/curator/del_key1");

    //级联删除子节点 client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");

事务

//定义几个基本操作
CuratorOp createOp1 = client.transactionOp().create()
            .forPath("/curator1","父".getBytes());
CuratorOp createOp = client.transactionOp().create()
            .forPath("/curator1/one_path","儿子".getBytes());
//
CuratorOp setDataOp = client.transactionOp().setData()
            .forPath("/curator1","other data".getBytes());

CuratorOp deleteOp = client.transactionOp().delete()
            .forPath("/curator1");

//事务执行结果
List<CuratorTransactionResult> results = client.transaction()
            .forOperations(createOp1,createOp,setDataOp,deleteOp);

//遍历输出结果
for(CuratorTransactionResult result : results){
    System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());
}

监听

Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。

  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

    client.create().withMode(CreateMode.CONTAINER).forPath("/zk-huey"); client.create().withMode(CreateMode.PERSISTENT).forPath("/zk-huey/cnode","dd".getBytes(StandardCharsets.UTF_8)); /**

    • 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理 */ ExecutorService pool = Executors.newFixedThreadPool(2);

    /**

    • 监听数据节点的变化情况 */ final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false); nodeCache.start(true); nodeCache.getListenable().addListener( new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("Node data is changed, new data: " + new String(nodeCache.getCurrentData().getData())); } }, pool );

    /**

    • 监听子节点的变化情况 */ final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } }, pool );

    client.setData().forPath("/zk-huey/cnode", "world".getBytes());

    Thread.sleep(10 * 1000); pool.shutdown(); client.close();

分布式锁

分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。

下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。

private static final String ZK_LOCK_PATH = "/zktest";

@PostMapping(value = "/lock")
public void lock() throws Exception {
    client.create().withMode(CreateMode.CONTAINER).forPath(ZK_LOCK_PATH);

    Thread t1 = new Thread(() -> {
        doWithLock(client);
    }, "t1");
    Thread t2 = new Thread(() -> {
        doWithLock(client);
    }, "t2");

    t1.start();
    t2.start();
}

private static void doWithLock(CuratorFramework client) {
    InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
    try {
        if (lock.acquire(1, TimeUnit.SECONDS)) {
            System.out.println(Thread.currentThread().getName() + " hold lock");
            Thread.sleep(5000L);
            System.out.println(Thread.currentThread().getName() + " release lock");
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

已有 0 条评论

    欢迎您,新朋友,感谢参与互动!