BigData-storm(2)
storm 命令的使用
bin/strom nimbus
命令的执行过程
执行 storm shell 脚本
1
exec "$PYTHON" "${STORM_BIN_DIR}/storm.py" "$@"
调用 storm.py 脚本
1
java org.apache.storm.daemon.nimbus ## other args
执行 org.apache.storm.daemon.nimbus 类的 main 方法
nimbus 的启动
nimbus 的核心实现在 src/clj/org/apache/storm/daemon/nimbus.clj
实现 org.apache.storm.scheduler.INimbus 接口
1 | (defn standalone-nimbus [] |
读取配置文件
读取 defaults.yaml, storm.yaml 及 storm-cluster-auth.yaml 配置文件,返回一个Map对象 conf
1 | (defn -launch [nimbus] |
read-storm-config
读取 defaults.yaml 和 storm.yaml 文件的配置,并使用 storm.yaml 覆盖 defaults.yaml 的配置。返回一个 Map 对象。
read-yaml-config
读取 storm-cluster-auth.yaml 配置文件。
启动 Server
1 | (defn launch-server! [conf nimbus] |
上面的 ThriftServer 对象的过程使用 Java 代码实现就是。
1 | // 实现 iface 接口。 |
TServer 的创建可以参考 org.apache.storm.security.auth.SimpleTransportPlugin.getServer 方法。
创建一个在 TNonblockingServerSocket 对象。
这个对象持有 nimbus.thrift.port , 当 Server 启动后,会在这个端口上进行监听。
创建一个 ThreadPoolExecutor
1
2
3
4// numWorkerThreads 为 nimbus.thrift.threads 默认 64
// queueSize 为 nimbus.queue.size 默认 100000
new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize))
由此可知,ThriftServer 的 serve 方法将在 nimbus.thrift.port 端口进行监听,一旦有请求过来。则 submit 到线程池 ThreadPoolExecutor 中进行执行。
也就是说,随着请求的到来,TServer 会创建 64 个线程,当 64 个线程都在运行的时候,新的请求将被添加到队列当前。直到有空闲线程来处理这个请求。
Nimbus 最终所对外提供的服务就是由 nimbus.clj 中的 service-handler 函数中实现的 Nimbus$Iface 接口。这个接口,最终将以 ThriftServer 的形式向外提供服务。
Nimbus 对外提供的服务
nimbus 对外提供的服务定义在 storm.thrift 文件中。
1 | service Nimbus { |
org.apache.storm.cluster.StormClusterState 接口
org/apache/storm/cluster.clj 文件中定义了一个接口 StormClusterState 这个接口是用来完成 storm 集群状态维护的核心。
其实现,由 cluster.clj 中的 mk-storm-cluster-state 函数。
mk-storm-cluster-state 的实现则是借助于 org.apache.storm.cluster.ClusterState
1 | (defnk mk-storm-cluster-state |
mk-storm-cluster-state 函数的主要功能。
创建 storm 根目录
创建 storm.zookeeper.root: “/storm” 目录.
创建下面的状态目录
1
2
3(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
LOGCONFIG-SUBTREE BACKPRESSURE-SUBTREE]]
(.mkdirs cluster-state p acls))创建 cluster-state 对象
service-handler 的实现
当启动一个 Nimbus 的时候,会调用 nimbus.clj 的 service-handler 函数,这个函数除了返回一个 实现 IFace 接口的对象。还会执行以下操作.
将当前启动的这个 Nimbus 添加到 nimbus 集群中
创建 Storm 相关的状态对象。将 NimbusSummary 序列化到字节数组中。将这些信息保存到 “/nimbuses/
1 | ;add to nimbuses |
addToLeaderLockQueue
将当前正在启动的这个 Nimbus 添加到 Leader 队列中。这会导致 Leader 选举。选出新的 nimbus 的 Leader。
注册 blobstore 回调
当 zookeeper 的 “/blobstore” 目录发生变化的时候。将新的 blob 同步下来。并且为当前所有的 blob 在 zookeeper 设置状态信息。
如果 Nimbus 是 Leader 的话,激活所有的 storms
1 | ;; 当前 nimbus 如果是 Leader 的话。 |
启动定时调度任务
do-cleanup
nimbus.monitor.freq.secs (默认 10 秒)
clean-inbox
Deletes jar files in dir older than seconds.
以 nimbus.cleanup.inbox.freq.secs 为周期(默认 10 分钟),清理已经过期 nimbus.inbox.jar.expiration.secs (默认 1 个小时)的 jar 包。
获得 storm.home/{storm.local.dir}/nimbus/inbox 目录。
这个目录下面存储着 jar 文件。进行定时删除。
blob-sync
在 不是 Leader 的 Nimbus 上执行 每隔 nimbus.code.sync.freq.secs: 120 (2分钟) 将 Leader Nimbus 上的代码同步下来。
每一个 Nimbus 本地都存储着一份 blob. 同步 对于 Leader nimbus 来说,新的 topology 提交之后,会在 zookeeper 的路径上维护这些信息。
然后 非 Leader 的 nimbus, 每隔 2 分钟,就比较一下本地的存储和 zookeeper 上的存储。然后将需要同步的 jar 下载下来。
这里同步的文件是 “storm.home/{storm.local.dir}/blobs” 目录下面的文件。
clean-topology-history
Schedule topology history cleaner。
Deletes topologies from history older than minutes.
启动状态报告线程。
1
(start-metrics-reporters conf)
可以由下面的属性进行配置。
storm.daemon.metrics.reporter.plugins:
org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter
总结
当我们说,我们启动了一个 nimbus 的时候,我们在说什么?
启动一个 nimbus, 首先创建一个 CuratorFramework 的 zookeeper 连接。并尝试去创建 storm 所需要的 zookeeper 相关路径。
然后,会将当前正在启动的这个 nimbus 添加到 nimbus 集群队列中,此时就会进行 Leader 的选举。如果有多个 nimbus,则此后,当前 nimbus 是 leader 还是 flower 已经确定。
启动相关定时任务。例如: 定时 清理 inbox 目录。定时,将 blobs 目录下面的文件从 Leader 上下载下来。
启动 ThriftServer 对外发布服务。