Zookeeper

本贴最后更新于 2648 天前,其中的信息可能已经渤澥桑田

ZooKeeper

1.运行服务


java -cp zookeeper-3.4.8.jar:lib/slf4j-api-1.6.1.jar:lib/slf4j-log4j12-1.6.1.jar:lib/log4j-1.2.15.jar:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo.cfg

这个也是启动命令,随着 Zookeeper 启动,JMX 也会被启动,方便在 JMX 在管理控制台上进行监控与操作。

1.1 可执行脚本

zkCleanup 清理 Zookeeper 的历史数据,包括事务日志文件和快照数据文件

1.2 常见异常

1.2.1 磁盘没有剩余空间


java.io.IOException: No space left on device

1.2.2 无法找到 myid 文件


ERROR [main:QuorumPeerMain@85] Invalid config ,exiting abnormally

Cause by: java.lang.IlleagelArgumentException: /tmp/zookeeper/myid file is missing

解决办法:在数据目录下创建一个 myid 文件

1.2.3 集群中其他机器 Leader 选举端口未开


WARN Cannot open channel to 2 at election address /ip:port

解决办法,等到所有机器都开起后就好了

2.客户端脚本

2.1 创建


create [-s] [-e] path data acl

-s 或-e 分别指定节点特性:顺序或临时节点。默认情况下创建持久节点

2.2 更新


set path data [-version]

version 的参数不添加默认加一

3.Java 客户端 API 的使用

3.1 创建会话

客户端可以通过创建一个 Zookeeper 实例来连接服务器。


ZooKeeper(String connectString,int sessionTimeout ,Wather watcher.long sessionId,byte[] sessionPasswd,boolean canBeReadOnly);

connectString:指 Zookeeper 服务器列表,多个的话用英文状态逗号分开,host:port,也可以加上根目录,ex:192.168.1.1:2181/zk-haha,这样对服务器的所有操作都是基于这个根目录。

最基本的建立会话


import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooKeeper;

//Chapter: 5.3.1 Java API -> 创建连接 -> 创建一个最基本的ZooKeeper对象实例

public class ZooKeeper_Constructor_Usage_Simple implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

    

 public static void main(String[] args) throws Exception{

        

 ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 

        5000, //

        new ZooKeeper_Constructor_Usage_Simple());

 System.out.println(zookeeper.getState());

 try {

 connectedSemaphore.await();

 } catch (InterruptedException e) {}

 System.out.println("ZooKeeper session established.");

 }

 public void process(WatchedEvent event) {

 System.out.println("Receive watched event:" + event);

 if (KeeperState.SyncConnected == event.getState()) {

 connectedSemaphore.countDown();

 }

 }

}

复用 sessionId 和 sessionPasswd 来创建一个 ZooKeeper 对象实份


import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooKeeper;

//Chapter: 5.3.1 Java API -> 创建连接 -> 创建一个最基本的ZooKeeper对象实例,复用sessionId和

public class ZooKeeper_Constructor_Usage_With_SID_PASSWD implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

 public static void main(String[] args) throws Exception{

 ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new ZooKeeper_Constructor_Usage_With_SID_PASSWD());

 connectedSemaphore.await();

 long sessionId = zookeeper.getSessionId();

 byte[] passwd = zookeeper.getSessionPasswd();

        

 //Use illegal sessionId and sessionPassWd

 zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new ZooKeeper_Constructor_Usage_With_SID_PASSWD(),//

1l,//

"test".getBytes());

 //Use correct sessionId and sessionPassWd

 zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new ZooKeeper_Constructor_Usage_With_SID_PASSWD(),//

sessionId,//

passwd);

 Thread.sleep( Integer.MAX_VALUE );

 }

 public void process(WatchedEvent event) {

 System.out.println("Receive watched event:" + event);

 if (KeeperState.SyncConnected == event.getState()) {

 connectedSemaphore.countDown();

 }

 }

}

3.2 创建节点

下面两个接口分别以同步和异步的方式创建节点


String create(final String path,byte data[],List acl,CreateMode createMode)

void create(final String path,byte data[],List acl,CreateMode createMode,StringCallBack cb,Object ctx)

参数方法说明

hek-bookfoo

  • path 路径

  • data[] 一个字节数组,是节点创建后的初始内容

  • acl 节点的 ACL 策略

  • createMode 持久(PERSISTENT)

持久顺序(PERSISTENT SEOUENTTAL)

临时(EPHEMERAL)

·临时顺序(EPHEMERAL SEOUENTIAL)

  • cb 注册一个异步回调函数。开发人员需要实现 StringCallback 提口,主要是对下面这个方法的重写:

void processResult(int rc,String path,object ctx,string

name)

当服务端节点创建完毕后,ZooKeeper 客户端就会自 动调用这个方法,这样就可以处理相关的业务逻辑了

  • ctx 用于传递一个对象,可以在回调方法执行的时候使 用,通常是放一个上下交(Context)信息

需要注意几点,无论是同步还是异步接口,ZooKeeper 都不支持递归创建,即无法在父节点不存在的情况下创建一个子节点。另外,如果一个节点已经存在了,那么创建同名节点的时候,会抛出 NodeExistsException 异常。

目前,ZooKeeper 的节点内容只支持字节数组(byte[])类型,也就是说,ZooKeeper 不负责为节点内容进行序列化,开发人员需要自己使用序列化工具将节点内容进行序列化和反序列化。对于字符串,可以简单地使用”string”.getBytes()来生成一个字

节数组;对于其他复杂对象,可以使用 Hessian 或是 Kryo 等专门的序列化工具来进行序

同步创建节点


import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

//ZooKeeper API创建节点,使用同步(sync)接口。

public class ZooKeeper_Create_API_Sync_Usage implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

 public static void main(String[] args) throws Exception{

 ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new ZooKeeper_Create_API_Sync_Usage());

 connectedSemaphore.await();

 String path1 = zookeeper.create("/zk-test-ephemeral-", 

        "".getBytes(), 

        Ids.OPEN_ACL_UNSAFE, 

        CreateMode.EPHEMERAL);

 System.out.println("Success create znode: " + path1);

 String path2 = zookeeper.create("/zk-test-ephemeral-", 

        "".getBytes(), 

        Ids.OPEN_ACL_UNSAFE,

 CreateMode.EPHEMERAL_SEQUENTIAL);

 System.out.println("Success create znode: " + path2);

 }

 public void process(WatchedEvent event) {

 if (KeeperState.SyncConnected == event.getState()) {

 connectedSemaphore.countDown();

 }

 }

}

异步


import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

// ZooKeeper API创建节点,使用异步(async)接口。

public class ZooKeeper_Create_API_ASync_Usage implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

 public static void main(String[] args) throws Exception{

        

ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new ZooKeeper_Create_API_ASync_Usage());

connectedSemaphore.await();

   

zookeeper.create("/zk-test-ephemeral-", "".getBytes(), 

    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 

    new IStringCallback(), "I am context.");

   

zookeeper.create("/zk-test-ephemeral-", "".getBytes(), 

    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 

    new IStringCallback(), "I am context.");

   

zookeeper.create("/zk-test-ephemeral-", "".getBytes(), 

    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, 

    new IStringCallback(), "I am context.");

Thread.sleep( Integer.MAX_VALUE );

 }

    

 public void process(WatchedEvent event) {

 if (KeeperState.SyncConnected == event.getState()) {

 connectedSemaphore.countDown();

 }

 }

}

class IStringCallback implements AsyncCallback.StringCallback{

 public void processResult(int rc, String path, Object ctx, String name) {

 System.out.println("Create path result: [" + rc + ", " + path + ", "

 + ctx + ", real path name: " + name);

 }

 }

3.4 读取数据

3.4.1 getChildren


ListgetChildren(final String path,Watcher watcher);

ListgetChildren(String path,boolean watch);

void getchildren(final String path,Watcher watcher,Childrencallback cb,Object ctx);

void getChildren(String path,boolean watch,Childrencallback cb,Object ctx)

ListgetChildren(final String path,Watcher watcher,Stat stat)

ListgetChildren(String path,boolean watch,Stat stat)

void getChildren(final String path,watcher watcher,Children2callback cb,0bject ctx)

Void getchildren(String path,boolean watch,Chitdrenzcatlback cb,object ctx)

同步获取子节点列表

通过 getChildren 获取的子节点列表都是相对路径,,Watcher 是一次性的,触发一次通知后,该 Watcher 就失效了,因此客户端需要反复注册 Watcher


import java.util.List;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.EventType;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

// ZooKeeper API 获取子节点列表,使用同步(sync)接口。

public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

 private static ZooKeeper zk = null;

    

 public static void main(String[] args) throws Exception{

    String path = "/zk-book";

 zk = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new ZooKeeper_GetChildren_API_Sync_Usage());

 connectedSemaphore.await();

 zk.create(path, "".getBytes(), 

 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

 zk.create(path+"/c1", "".getBytes(), 

 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        

 List childrenList = zk.getChildren(path, true);

 System.out.println(childrenList);

        

 zk.create(path+"/c2", "".getBytes(), 

 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        

 Thread.sleep( Integer.MAX_VALUE );

 }

 public void process(WatchedEvent event) {

 if (KeeperState.SyncConnected == event.getState()) {

 if (EventType.None == event.getType() && null == event.getPath()) {

 connectedSemaphore.countDown();

 } else if (event.getType() == EventType.NodeChildrenChanged) {

 try {

 System.out.println("ReGet Child:"+zk.getChildren(event.getPath(),true));

 } catch (Exception e) {}

 }

 }

 }

}

异步获取子节点列表


import java.util.List;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.EventType;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

//ZooKeeper API 获取子节点列表,使用异步(ASync)接口。

public class ZooKeeper_GetChildren_API_ASync_Usage implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

 private static ZooKeeper zk = null;

 public static void main(String[] args) throws Exception{

    String path = "/zk-book";

 zk = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new ZooKeeper_GetChildren_API_ASync_Usage());

 connectedSemaphore.await();

 zk.create(path, "".getBytes(), 

 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

 zk.create(path+"/c1", "".getBytes(), 

 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        

 zk.getChildren(path, true, new IChildren2Callback(), null);

        

 zk.create(path+"/c2", "".getBytes(), 

 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        

 Thread.sleep( Integer.MAX_VALUE );

 }

 public void process(WatchedEvent event) {

 if (KeeperState.SyncConnected == event.getState()) {

 if (EventType.None == event.getType() && null == event.getPath()) {

 connectedSemaphore.countDown();

 } else if (event.getType() == EventType.NodeChildrenChanged) {

 try {

 System.out.println("ReGet Child:"+zk.getChildren(event.getPath(),true));

 } catch (Exception e) {}

 }

 }

 }

}

class IChildren2Callback implements AsyncCallback.Children2Callback{

public void processResult(int rc, String path, Object ctx, List children, Stat stat) {

 System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path

 + ", ctx: " + ctx + ", children list: " + children + ", stat: " + stat);

 }

}

getData


byte[] getData(final String path,watcher watcher,Stat stat)

byte[] getData(String path,boolean watch,Stat stat)

void getData(final String path,Watcher watcher,Datacatlback cb,object ctx)

void getData(String path,boolean watch,Datacallback cb,object ctx)

同步获取数据


import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.EventType;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

// ZooKeeper API 获取节点数据内容,使用同步(sync)接口。

public class GetData_API_Sync_Usage implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

 private static ZooKeeper zk = null;

 private static Stat stat = new Stat();

 public static void main(String[] args) throws Exception {

    String path = "/zk-book";

    zk = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new GetData_API_Sync_Usage());

 connectedSemaphore.await();

 zk.create( path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );

        

 System.out.println(new String(zk.getData( path, true, stat )));

 System.out.println(stat.getCzxid()+","+stat.getMzxid()+","+stat.getVersion());

        

 zk.setData( path, "123".getBytes(), -1 );

        

 Thread.sleep( Integer.MAX_VALUE );

 }

 public void process(WatchedEvent event) {

 if (KeeperState.SyncConnected == event.getState()) {

 if (EventType.None == event.getType() && null == event.getPath()) {

 connectedSemaphore.countDown();

 } else if (event.getType() == EventType.NodeDataChanged) {

 try {

 System.out.println(new String(zk.getData( event.getPath(), true, stat )));

 System.out.println(stat.getCzxid()+","+

 stat.getMzxid()+","+

               stat.getVersion());

 } catch (Exception e) {}

 }

 }

 }

}

异步获取数据


import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.EventType;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

// ZooKeeper API 获取节点数据内容,使用异步(async)接口。

public class GetData_API_ASync_Usage implements Watcher {

 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

 private static ZooKeeper zk;

 public static void main(String[] args) throws Exception {

    String path = "/zk-book";

    zk = new ZooKeeper("domain1.book.zookeeper:2181", 

5000, //

new GetData_API_ASync_Usage());

 connectedSemaphore.await();

        

 zk.create( path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );

        

 zk.getData( path, true, new IDataCallback(), null );

        

 zk.setData( path, "123".getBytes(), -1 );

        

 Thread.sleep( Integer.MAX_VALUE );

 }

 public void process(WatchedEvent event) {

 if (KeeperState.SyncConnected == event.getState()) {

 if (EventType.None == event.getType() && null == event.getPath()) {

 connectedSemaphore.countDown();

 } else if (event.getType() == EventType.NodeDataChanged) {

 try {

          zk.getData( event.getPath(), true, new IDataCallback(), null );

 } catch (Exception e) {}

 }

 }

 }

}

class IDataCallback implements AsyncCallback.DataCallback{

public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {

 System.out.println(rc + ", " + path + ", " + new String(data));

 System.out.println(stat.getCzxid()+","+

  stat.getMzxid()+","+

 stat.getVersion());

 }

}

权限管理


import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

//使用错误权限信息的ZooKeeper会话访问含权限信息的数据节点

public class AuthSample_Get2 {

 final static String PATH = "/zk-book-auth_test";

 public static void main(String[] args) throws Exception {

 ZooKeeper zookeeper1 = new ZooKeeper("domain1.book.zookeeper:2181",5000,null);

 zookeeper1.addAuthInfo("digest", "foo:true".getBytes());

 zookeeper1.create( PATH, "init".getBytes(), //

  Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL );

        

 ZooKeeper zookeeper2 = new ZooKeeper("domain1.book.zookeeper:2181",50000,null);

 zookeeper2.addAuthInfo("digest", "foo:true".getBytes());

 System.out.println(zookeeper2.getData( PATH, false, null ));

        

 ZooKeeper zookeeper3 = new ZooKeeper("domain1.book.zookeeper:2181",50000,null);

 zookeeper3.addAuthInfo("digest", "foo:false".getBytes());

 zookeeper3.getData( PATH, false, null );

 }

}

ZKClient


org.apache.zookeeper

zookeeper

${zookeeper.version}

com.github.sgroschupf

zkclient

RELEASE

建立会话


import java.io.IOException;

import org.I0Itec.zkclient.ZkClient;

// 使用ZkClient来创建一个ZooKeeper客户端

public class Create_Session_Sample {

 public static void main(String[] args) throws IOException, InterruptedException {

    ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181", 5000);

    System.out.println("ZooKeeper session established.");

 }

}

建立节点


import org.I0Itec.zkclient.ZkClient;

// 使用ZkClient创建节点

public class Create_Node_Sample {

 public static void main(String[] args) throws Exception {

    ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181", 5000);

 String path = "/zk-book/c1";

 zkClient.createPersistent(path, true);

 }

}

获取子节点列表


import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.ZkClient;

// ZkClient获取子节点列表。

public class Get_Children_Sample {

 public static void main(String[] args) throws Exception {

    String path = "/zk-book";

 ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181", 5000);

 zkClient.subscribeChildChanges(path, new IZkChildListener() {

 public void handleChildChange(String parentPath, List currentChilds) throws Exception {

 System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);

 }

 });

        

 zkClient.createPersistent(path);

 Thread.sleep( 1000 );

 System.out.println(zkClient.getChildren(path));

 Thread.sleep( 1000 );

 zkClient.createPersistent(path+"/c1");

 Thread.sleep( 1000 );

 zkClient.delete(path+"/c1");

 Thread.sleep( 1000 );

 zkClient.delete(path);

 Thread.sleep( Integer.MAX_VALUE );

 }

}

获取数据


import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

//ZkClient获取节点数据

public class Get_Data_Sample {

 public static void main(String[] args) throws Exception {

    

    String path = "/zk-book";

 ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181", 5000);

 zkClient.createEphemeral(path, "123");

        

 zkClient.subscribeDataChanges(path, new IZkDataListener() {

 public void handleDataDeleted(String dataPath) throws Exception {

 System.out.println("Node " + dataPath + " deleted.");

 }

 public void handleDataChange(String dataPath, Object data) throws Exception {

 System.out.println("Node " + dataPath + " changed, new data: " + data);

 }

 });

        

 System.out.println(zkClient.readData(path));

 zkClient.writeData(path,"456");

 Thread.sleep(1000);

 zkClient.delete(path);

 Thread.sleep( Integer.MAX_VALUE );

 }

}

Curator


org.apache.curator

curator-framework

org.apache.curator

curator-recipes

org.apache.curator

curator-test

  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 15 关注

相关帖子

回帖

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...