回目录 《分布式框架zookeeper》
# QuickStart:
# Install & usage
https://cwiki.apache.org/confluence/display/ZOOKEEPER/Index https://zookeeper.apache.org/doc/current/index.html
https://docs.confluent.io/platform/current/zookeeper/deployment.html
tar zxvf zookeeper-3.4.8.tar.gz
mkdir -p /opt/dependency/zookeeper-3.4.8/zkdata
mkdir -p /opt/dependency/zookeeper-3.4.8/logs
集群配置:
This example is for a 3 node ensemble:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/dependency/zookeeper-3.4.8/zkdata
dataLogDir=/opt/dependency/zookeeper-3.4.8/logs
# the port at which the clients will connect
clientPort=2181
server.1=1.1.1.1:2888:3888
server.2=1.1.1.2:2888:3888
server.3=1.1.1.3:2888:3888
SERVER_JVMFLAGS=-Xmx1024m'
然后在各个节点上分别写入 1 2 3等到myid:
$ echo "1" > /zookeeper/zkdata/myid
好像还有个 zookeeper_server.pid?
?Failed to start and no log Resolved: yum install glibc.i686
./bin/zkServer.sh start-foreground
https://zookeeper.apache.org/doc/r3.3.3/zookeeperStarted.html
bin/zkCli.sh -server 127.0.0.1:2181
[zk: localhost:2181(CONNECTED) 0] help
[zk: localhost:2181(CONNECTED) 0] ls /
[zk: localhost:2181(CONNECTED) 0] get /test
[zk: localhost:2181(CONNECTED) 0] ls /test mywatcher
[zk: localhost:2181(CONNECTED) 0] delete /test
[zk: localhost:2181(CONNECTED) 0] rmr /test
[zk: localhost:2181(CONNECTED) 0] stat /brokers/topics/T-MEMBER
# 编译源码
https://www.cnblogs.com/MangoCai/p/10846187.html
安装ant,设置%ANT_HOME%\bin Copy ivy.jar到ant lib 根节点 ant eclipse Eclipse导入普通java project,修改build path jdk和compliance 通过VerGen.java生成Info.java(/org/apache/zookeeper/version) Copy到VerGen.java的上一层package里面
然后两种方式生成jar, 命令行和eclipse导出 根目录下执行 ant jar compile-test 或者
Copy conf/zoo.cfg修改 dataDir=C:\Workspace\Repository\zookeeper-release-3.5.6\dataDir
把jar放到根目录下,
运行
/bin/zkServer.cmd /bin/zkCli.cmd
# 管理脚本
readonly PROGNAME=$(basename $0)
readonly PROGDIR=$(readlink -m $(dirname $0))
# source env
L_INVOCATION_DIR="$(pwd)"
L_CMD_DIR="/opt/scripts"
if [ "${L_INVOCATION_DIR}" != "${L_CMD_DIR}" ]; then
pushd ${L_CMD_DIR} &> /dev/null
fi
#source ../set_env.sh
#--------------- Function Definition ---------------#
showUsage() {
echo "Usage:"
echo "$0 zookeeper start|kill"
echo ""
echo "--start or -b: Start zookeeper"
echo "--kill or -k: Stop zookeeper"
echo "--status or -s: Check zookeeper status"
}
#--------------- Main ---------------#
# Parse arguments
while [ "${1:0:1}" == "-" ]; do
case $1 in
--start)
L_FLAG="B"
;;
-b)
L_FLAG="B"
;;
--status)
L_FLAG="S"
;;
-s)
L_FLAG="S"
;;
--kill)
L_FLAG="K"
;;
-k)
L_FLAG="K"
;;
--kafka)
L_FLAG="KAFKA"
;;
*)
echo "Unknown option: $1"
echo ""
showUsage
echo ""
exit 1
;;
esac
shift
done
L_RETURN_FLAG=0 # 0 for success while 99 for failure
BIN_HOME=/opt/zookeeper-3.4.8/bin
pushd ${BIN_HOME} &>/dev/null
if [ "$L_FLAG" == "B" ]; then
echo "Starting zookeeper service..."
./zkServer.sh start
elif [ "$L_FLAG" == "K" ]; then
echo "Stopping zookeeper service..."
./zkServer.sh stop
elif [ "$L_FLAG" == "KAFKA" ]; then
./zkCli.sh ls /brokers/ids
else
echo "Checking zookeeper status..."
./zkServer.sh status
fi
exit $L_RETURN_FLAG
# 自动启动
[Unit]
Description=The Zookeeper Daemon
Wants=syslog.target
Requires=network.target
After=network.target
[Service]
Environment=PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/jdk/bin
Type=forking
User=root
ExecStart=/bin/zkCli.sh start
[Install]
WantedBy=multi-user.target
# admin
New in 3.5.0: The following options are used to configure the AdminServer (opens new window).
admin.enableServer
(Java system property: zookeeper.admin.enableServer) Set to "false" to disable the AdminServer. By default the AdminServer is enabled.
admin.serverAddress
(Java system property: zookeeper.admin.serverAddress) The address the embedded Jetty server listens on. Defaults to 0.0.0.0.
admin.serverPort
(Java system property: zookeeper.admin.serverPort) The port the embedded Jetty server listens on. Defaults to 8080.
admin.idleTimeout
(Java system property: zookeeper.admin.idleTimeout) Set the maximum idle time in milliseconds that a connection can wait before sending or receiving data. Defaults to 30000 ms.
admin.commandURL
(Java system property: zookeeper.admin.commandURL) The URL for listing and issuing commands relative to the root URL. Defaults to "/commands".
# Things to Avoid
Here are some common problems you can avoid by configuring ZooKeeper correctly:
inconsistent lists of servers
The list of ZooKeeper servers used by the clients must match the list of ZooKeeper servers that each ZooKeeper server has. Things work okay if the client list is a subset of the real list, but things will really act strange if clients have a list of ZooKeeper servers that are in different ZooKeeper clusters. Also, the server lists in each Zookeeper server configuration file should be consistent with one another.
incorrect placement of transaction log
The most performance critical part of ZooKeeper is the transaction log. ZooKeeper syncs transactions to media before it returns a response. A dedicated transaction log device is key to consistent good performance. Putting the log on a busy device will adversely effect performance. If you only have one storage device, put trace files on NFS and increase the snapshotCount; it doesn't eliminate the problem, but it should mitigate it.
incorrect Java heap size
You should take special care to set your Java max heap size correctly. In particular, you should not create a situation in which ZooKeeper swaps to disk. The disk is death to ZooKeeper. Everything is ordered, so if processing one request swaps the disk, all other queued requests will probably do the same. the disk. DON'T SWAP. Be conservative in your estimates: if you have 4G of RAM, do not set the Java max heap size to 6G or even 4G. For example, it is more likely you would use a 3G heap for a 4G machine, as the operating system and the cache also need memory. The best and only recommend practice for estimating the heap size your system needs is to run load tests, and then make sure you are well below the usage limit that would cause the system to swap.
Publicly accessible deployment
A ZooKeeper ensemble is expected to operate in a trusted computing environment. It is thus recommended to deploy ZooKeeper behind a firewall.
# 工具/日志排查
./bin/zkServer.sh start-foreground
https://zookeeper.apache.org/doc/r3.3.2/zookeeperAdmin.html
工具集合:
https://github.com/apache/zookeeper/blob/master/zookeeper-docs/src/main/resources/markdown/zookeeperTools.md
# Application Log
zkServer.sh start的log默认是在:
/bin/zookeeper.out
使用/conf/log4j.properties 设置为 zookeeper.log
https://stackoverflow.com/questions/28691341/zookeeper-log-file-not-created-inside-logs-directory
https://stackoverflow.com/questions/26612908/why-does-zookeeper-not-use-my-log4j-properties-file-log-directory
实例:
发现某个节点
netstat -anp|grep :2181 以及
netstat -anp|grep :2888 (ensemble节点互通端口)
出现大量的time_wait tcp连接,来自于各个节点,
最后通过查看
tail -f /bin/zookeeper.out 发现:
2021-04-06 20:58:23,958 [myid:1] - WARN [QuorumPeer[myid=1]/0.0.0.0:2181:Follower@89] - Exception when following the leader
java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at org.apache.zookeeper.server.quorum.Learner.syncWithLeader(Learner.java:436)
at org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:82)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:846)
2021-04-06 20:58:23,959 [myid:1] - INFO [QuorumPeer[myid=1]/0.0.0.0:2181:Follower@166] - shutdown called java.lang.Exception: shutdown Follower
at org.apache.zookeeper.server.quorum.Follower.shutdown(Follower.java:166)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:850)
2021-04-06 20:58:23,959 [myid:1] - INFO [QuorumPeer[myid=1]/0.0.0.0:2181:FollowerZooKeeperServer@140] - Shutting down
然后通过permission denied判断是权限问题,因为启动的时候是用的普通user,发现
find zookeeper/ -group root
发现了zkdata下面的文件都是root的,修改权限即可
https://stackoverflow.com/questions/54087210/kafka-create-too-many-time-wait-tcp-connection/66969946#66969946
其他类似案例:https://www.cnblogs.com/duanxz/p/3768454.html
# Data Log
https://stackoverflow.com/questions/17894808/how-do-one-read-the-zookeeper-transaction-log
--- 3.6 之前版本
具体log4j版本可以在zookeeper目录下/lib里面去看
java -cp /zookeeper-3.4.8/zookeeper-3.4.8.jar:lib/log4j-1.2.16.jar:lib/slf4j-log4j12-1.6.1.jar:lib/slf4j-api-1.6.1.jar org.apache.zookeeper.server.LogFormatter /zookeeper-3.4.8/logs/version-2/log.100000001 > /home/test/zookeeper
--- 3.6后版本
For transaction log:
bin/zkTxnLogToolkit.sh --dump /datalog/version-2/log.f3aa6
For snapshots:
./zkSnapShotToolkit.sh -d /data/zkdata/version-2/snapshot.fa01000186d
Admin https://zookeeper.apache.org/doc/current/zookeeperAdmin.html
troubleshooting https://cwiki.apache.org/confluence/display/ZOOKEEPER/Troubleshooting
somoktest latency https://cwiki.apache.org/confluence/display/ZOOKEEPER/ServiceLatencyOverview
Jmx https://zookeeper.apache.org/doc/current/zookeeperJMX.html#
vim bin/zkServer.sh
找到ZOODAMIN="" 添加 -Djava.rmi.server.hostname=$JMXHOSTNAME
到conf下面添加java.env:
JMXHOSTNAME=''
JMXPORT=2182
?Failed to resolve hostname, malformed url
edit /etc/hosts,add 127.0.0.1
jconsole Four letter words https://zookeeper.apache.org/doc/r3.4.13/zookeeperAdmin.html#sc_zkCommands
echo dump | nc localhost 2181
Baseline https://cwiki.apache.org/confluence/display/ZOOKEEPER/ServiceLatencyOverview
# 开发 programming
https://cwiki.apache.org/confluence/display/ZOOKEEPER/ErrorHandling https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html
# 原理解读
Semaphores Queues Leader election Group membership Barriers Configuration
ZooKeeper has a built-in sanity check of 1M, to prevent it from being used as a large data store, but in general it is used to store much smaller pieces of data. zxid (ZooKeeper Transaction Id). Each update will have a unique zxid 分布式协调,通过观察者模式,rpc通知客户端节点变化,默认的客户端有: 自带的zkCli.sh以及org.apache.zookeeper 客户端lib jar包 还有curator高级开发API zookeeper事件触发是通过推拉的方式,先通知watcher客户端节点变动,然后客户端再去pull下来新增或变化的节点信息; Watcher机制/观察者模式 https://www.jianshu.com/p/4c071e963f18
# 源码解读
Apache ZooKeeper Watcher 机制源码解释 https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/index.html Zookeeper源码分析之客户端源码解密 https://www.jianshu.com/p/06e859181cc0 Zookeeper源码分析之curator客户端 https://www.jianshu.com/p/bc5e24e4f614
# server端代码
server下面的FinalRequestProcessor是终极请求处理类,所有server收到的请求最终都是流向这里,下面看看如果triggerWatcher的流程: 然后进去看watcher 的process
可以看到zookeeper server“调用客户端”的watch通知是通过跟客户端的socket连接直接返回response,然后客户端收到请求后会反序列化调用自己实现的watch接口方法 zookeeper/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java 这里的NIOServerCnxn间接继承自Watcherzookeeper/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java public abstract class ServerCnxn implements Stats, Watcher { 记住这里的ServerCnxn,后面会讲解client端对应的ClientCnxn
# Zookeeper client端代码
Client由三个主要模块组成:Zookeeper, WatcherManager, ClientCnxn Zookeeper是ZK Client端的接口, WatcherManager,管理ZK Client绑定的所有Watcher。 ClientCnxn是管理所有网络IO的模块,所有和ZK Server交互的信息和数据都经过这个模块
zookeeper/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java clientWatchManager用于存储管理客户端连过来的watcher
首先是zookeeper server自带的cli工具: zookeeper/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeperMain.java
然后是zookeeper client端代码: 客户端收到消息后,会调用 ClientCnxn 的 SendThread.readResponse 方法来进行统一处理,如果响应头 replyHdr 中标识的 Xid 为 02,表示是 ping,如果为-4,表示是验证包,如果是-1,表示这是一个通知类型的响应,然后进行反序列化、处理 chrootPath、还原 WatchedEvent、回调 Watcher 等步骤,其中回调 Watcher 步骤将 WacthedEvent 对象交给 EventThread 线程,在下一个轮询周期中进行 Watcher 回调; 补充:ClientCnxnSocket会读取socket连接,发现connected信息也将交由SendThread的onconnected处理 zookeeper/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
SendThread 接收到服务端的通知事件后,会通过调用 EventThread 类的 queueEvent 方法将事件传给 EventThread 线程,queueEvent 方法根据该通知事件,从 ZKWatchManager 中取出所有相关的 Watcher 客户端在识别出事件类型 EventType 之后,会从相应的 Watcher 存储中删除对应的 Watcher,获取到相关的 Watcher 之后,会将其放入 waitingEvents 队列,该队列从字面上就能理解是一个待处理队列,线程的 run 方法会不断对该该队列进行处理,这就是一种异步处理思维的实现
# Curator api封装代码
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class LeaderElectionExample {
private static final String ZK_CONNECTION_STRING = "localhost:2181";
private static final String LEADER_PATH = "/leader";
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
ZK_CONNECTION_STRING,
new ExponentialBackoffRetry(1000, 3)
);
curatorFramework.start();
LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, LEADER_PATH,
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("Node " + curatorFramework.getClientId() + " has become the leader.");
// Implement logic for when this node becomes the leader
Thread.sleep(3000); // Example: Perform leader activities
System.out.println("Node " + curatorFramework.getClientId() + " is relinquishing leadership.");
}
@Override
public void stateChanged(CuratorFramework curatorFramework, org.apache.curator.framework.state.ConnectionState connectionState) {
if (connectionState == org.apache.curator.framework.state.ConnectionState.SUSPENDED) {
System.out.println("Node " + curatorFramework.getClientId() + " lost connection to ZooKeeper");
} else if (connectionState == org.apache.curator.framework.state.ConnectionState.RECONNECTED) {
System.out.println("Node " + curatorFramework.getClientId() + " reconnected to ZooKeeper");
} else if (connectionState == org.apache.curator.framework.state.ConnectionState.LOST) {
System.out.println("Node " + curatorFramework.getClientId() + " lost connection and session with ZooKeeper");
// Implement logic for handling loss of session, e.g., shutdown gracefully
}
}
});
leaderSelector.autoRequeue(); // Ensure requeueing of participants upon relinquishing leadership
leaderSelector.start();
// Example: Keep the application running indefinitely to observe leader changes
Thread.sleep(Long.MAX_VALUE);
leaderSelector.close();
curatorFramework.close();
}
}
七张图彻底讲清楚ZooKeeper分布式锁的实现原理 https://juejin.im/post/5c01532ef265da61362232ed Zookeeper使用之curator https://leokongwq.github.io/2018/06/17/zookeeper-curator.html Curator分布式锁 https://blog.csdn.net/xuefeng0707/article/details/80588855 Zookeeper Curator 事件监听 - 秒懂 https://www.cnblogs.com/crazymakercircle/p/10228385.html
# curator leader election代码解析
图示上半部分为跟zookeeper断线之后的逻辑,
断线处理很简单,LeaderSelectorListener继承了ConnectionStateListener, 然后这个listener注册到了CuratorFrameworkImpl维护的listener容器, 当断线时回调这个listener wrapper,wrapper再回调statechange,然后可以在statechange里面抛出异常从而触发 leaderSelector.interruptLeadership,其内部就是获取“执行takeleadership的doWorkLoop所在的线程executorService”这个task,然后调用其cancel(boolean java.util.concurrent.Future.cancel(boolean mayInterruptIfRunning))方法中断操作
额外指出,从架构设计上欣赏的curator的监听器管理,CuratorFrameworkImpl将这些ConnectionStateListener交由ConnectionStateManager来统一管理,当监测到状态变化, ConnectionStateManager调用注册好的listener的StateChange来通知变化,这是典型的一个抽象出来的状态机管理实现;
下半部分是选举的分布式锁机制,主要的逻辑基石就是zookeeper保证了sequence ephemeral节点的生成,然后后一个节点监听前一个节点,只有前一个节点删除后才会通过watcher去唤醒后面一个等待的节点
curator封装的InterProcessMutex注释:
/**
* A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that
* use the same lock path will achieve an inter-process critical section. Further, this mutex is
* "fair" - each user will get the mutex in the order requested (from ZK's point of view)
*/
public class InterProcessMutex implements InterProcessLock, Revocable
- 首先还是要考虑同一个application,即同一个jvm进程中多线程的竞争问题,所以引入了大量的synchronized方法和synchronized(this)方法块,
- 跨JVMs主要就是利用了StandardLockInternalsDriver的getsTheLock,利用ephemeral sequential先获取当前leader下面注册了多少child,然后当前的node就去pathToWatch前面一个node(children.get(ourIndex - maxLeases),maxLeases=1)
# PathChildrenCache解析: zookeeper推拉消息
如果我们直接使用zookeeper client开发,只能自定义各种watcher,然后向zookeeper也就是zookeeper client端接口注册,然后最终注册到zookeeper服务端, 然后服务端的变化通知watcher,由于watcher是一次性的,所以每次getChildren或getData时都要设置参数项watcher为true,才可以继续监听zknode变化。
而使用Curator封装好的NodeCache PathChildrenCache等类,我们只需要创建实现一个继承了curator相关listener接口的listener,即可实现‘实时’监听zknode变化,不需要再写繁杂的watcher代码;
看看封装好的这个PathChildrenCache,
我们自定义了listener,并向PathChildrenCache提供的listenerContainer注册,然后启动start
启动后,实际上PathChildrenCache并没有将我们自定义的listener注册到curatorFramework client(实际类为CuratorFrameworkImpl),而是自己注册了一个自己的ConnectionStateListener, 不过这个主要是管理跟zookeeper服务端的连接情况, 然后OfferOperation会调用refresh(), 先向curatorFramework注册一个childWatcher,然后第一次比如节点创建后回调callback方法processResult调用processChildren,然后调用getDataAndStat, 这里又会注册一个dataWatcher, 当然这两个watcher都是最终是层层转接注册到前面说的zookeeper服务端的,下面我们就先看看到底是怎么层层转到zookeeper client接口的;
上面说了watcher的注册,watcher的作用就是接收zookeeper服务端zknode节点及数据变化,接收的参数WatchedEvent,只是会知道变化的类型: public enum EventType { None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4); 然后具体变化还要主动去拉取, 所以粗略的逻辑是,curatorFramework把收到的watcher注册请求都注册给zookeeper,zookeeper服务端通知客户端watcher,然后curator在PathChildrenCache的watcher再去拉取数据;
这两个watcher注册的地方都是创建者模式builder pattern的fluent API流畅连写: client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path); client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
下面就以dataWatcher这个来说明:
注意这里的getData()是调用的curatorFramework的,所以实际是拿到dataBuilder,对应的实现类是getDataBuilderImpl,然后最后是落脚在forPath,实际就是调用了 getDataBuilderImpl的forPath方法,我们就看下这个的forPath方法:
可以看到,有前台和后台执行两种模式,前台执行很直接,调用zookeeper的getdata, 继续看后台执行的模式,而后台跑则调用curatorFramework的processBackgroundOperation: 此时CuratorEvent==null,则isInitialExecution==true,所以只会调用performBackgroundOperation,然后最终是调用接口BackgroundOperation.performBackgroundOperation, 即最终调用回该接口的引用getDataBuilderImpl的performBackgroundOperation,
if ( watching.isWatched() )
{
client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext());
}
else
{
client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
}
真香,可以看到最终肯定还是调用zookeeper client的getdata,然后注意到其定义的callback里面组装了一个CuratorEvent
CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_DATA, rc, path, null, ctx, stat, data, null, null, null);
client.processBackgroundOperation(operationAndData, event);
然后看到其callback里面再次调用curatorFramework的processBackgroundOperation,这一次event!=null了,所以走到下面的
if ( operationAndData.getCallback() != null )
{
sendToBackgroundCallback(operationAndData, event);
break;
}
processEvent(event);
这个callback是啥,回过头查一下就知道这个是一开始在PathChildrenCache里面注册的:
void getDataAndStat(final String fullPath) throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
applyNewData(fullPath, event.getResultCode(), event.getStat(), cacheData ? event.getData() : null);
}
};
if ( USE_EXISTS && !cacheData )
{
client.checkExists().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
else
{
// always use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
if ( dataIsCompressed && cacheData )
{
client.getData().decompressed().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
else
{
client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
}
}
回到前面,所以整个意思是调用zookeeper client接口获取的data要封装成curatorEvent再调用回PathChildrenCache的callback,callback调用applyNewData:
if ( previousData == null ) // i.e. new
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
}
else if ( previousData.getStat().getVersion() != stat.getVersion() )
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
}
可以看到这里封装了PathChidrenCacheEvent,再最终回调一开始开头注册好的监听方法 listener.childEvent(client, event); 然后看到前面CuratorFramework还调用了processEvent,继续查到里面是调用CuratorListener, 我们事先并没有注册任何CuratorListener,而是调用的PathChildrenCacheListener,所以这个processEvent这里没有起到任何作用;
# 深入解读
https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeperPresentations Sometimes developers mistakenly assume one other guarantee that ZooKeeper does not in fact make. This is: * Simultaneously Consistent Cross-Client Views* : ZooKeeper does not guarantee that at every instance in time, two different clients will have identical views of ZooKeeper data. Due to factors like network delays, one client may perform an update before another client gets notified of the change. Consider the scenario of two clients, A and B. If client A sets the value of a znode /a from 0 to 1, then tells client B to read /a, client B may read the old value of 0, depending on which server it is connected to. If it is important that Client A and Client B read the same value, Client B should should call the sync() method from the ZooKeeper API method before it performs its read. So, ZooKeeper by itself doesn't guarantee that changes occur synchronously across all servers, but ZooKeeper primitives can be used to construct higher level functions that provide useful client synchronization. (For more information, see the ZooKeeper Recipes. [tbd:..]).
CAP理论,zookeeper是CP模型,保证sequential consistency (opens new window), 比eventual consistency强一些,但是不是strong consistency,因为我们前面说了,zookeeper是采用推拉模式, 所以每个客户端看的的view都可能由于网络的原因不一样, Zookeeper is not A, and can't drop P. So it's called CP apparently. In terms of CAP theorem, "C" actually means linearizability.But, Zookeeper has Sequential Consistency - Updates from a client will be applied in the order that they were sent. (opens new window)
然后又由于zookeeper是基于CP模型,所以有人提出: zookeeper 的 CP 模型不适合注册中心 https://segmentfault.com/a/1190000021356988
丢失watcher的原因:
//连续修改会‘丢失’nodeChanged event的原因是,每次获取到nodechanged之后curator的nodecache实际上还要再次重设watch,因为watch是一次性的; //客户端set 111 , 服务端处理set //客户端注册watch,服务端注册watch //客户端set 222, 服务端处理set,服务端刚才的watch刚注册好 //客户端注册watch,服务端通知client端变化 //客户端set 333, 服务端处理set //客户端收到变化通知打印,然后还要去服务端拉取,此时最新值是333 //但是实际上setDAta应该都是成功的,因为zookeeper是顺序处理 client.setData().forPath(nodePath, "111".getBytes()); client.setData().forPath(nodePath, "222".getBytes());
所以要用对zookeeper还是要学习下其他产品,比如: kafka 中 zookeeper 具体是做什么的? (opens new window)
ref: https://www.cnblogs.com/duanxz/p/3783266.html
Zookeeper集群"脑裂"问题 - 运维总结 https://www.cnblogs.com/kevingrace/p/12433503.html