上篇文章写完了 ES 流程启动的一部分,main 方法都入口,以及创建 Elasticsearch 运行的必须环境以及相关配置,接着就是创建该环境的节点了。
Node 的创建看下新建节点的代码:(代码比较多,这里是比较关键的地方,我就把注释直接写在代码上面了,实在不好拆开这段代码,300 多行代码)
1public Node(Environment environment) {
2 this(environment, Collections.emptyList()); //执行下面的代码
3 }
4
5protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
6 final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
7 boolean success = false;
8 {
9// use temp logger just to say we are starting. we can't use it later on because the node name might not be set
10 Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
11 logger.info("initializing ...");
12 }
13 try {
14 originalSettings = environment.settings();
15 Settings tmpSettings = Settings.builder().put(environment.settings())
16 .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
17
18// create the node environment as soon as possible, to recover the node id and enable logging
19 try {
20 nodeEnvironment = new NodeEnvironment(tmpSettings, environment); //1、创建节点环境,比如节点名称,节点ID,分片信息,存储元,以及分配内存准备给节点使用
21 resourcesToClose.add(nodeEnvironment);
22 } catch (IOException ex) {
23 throw new IllegalStateException("Failed to create node environment", ex);
24 }
25 final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
26 final String nodeId = nodeEnvironment.nodeId();
27 tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
28 final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
29// this must be captured after the node name is possibly added to the settings
30 final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
31 if (hadPredefinedNodeName == false) {
32 logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
33 } else {
34 logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
35 }
36
37 //2、打印出JVM相关信息
38 final JvmInfo jvmInfo = JvmInfo.jvmInfo();
39 logger.info(
40"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
41
47 logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
48 //检查当前版本是不是 pre-release 版本(Snapshot),
49 warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
50 。。。
51 this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins); //3、利用PluginsService加载相应的模块和插件
52 this.settings = pluginsService.updatedSettings();
53 localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
54
55// create the environment based on the finalized (processed) view of the settings
56// this is just to makes sure that people get the same settings, no matter where they ask them from
57 this.environment = new Environment(this.settings, environment.configFile());
58 Environment.assertEquivalent(environment, this.environment);
59
60 final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings); //线程池
61
62 final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
63 resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
64 // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
65 DeprecationLogger.setThreadContext(threadPool.getThreadContext());
66 resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
67
68 final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings()); //额外配置
69 final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
70 for (final ExecutorBuilder<?> builder : threadPool.builders()) {
71 //4、加载一些额外配置
72 additionalSettings.addAll(builder.getRegisteredSettings());
73 }
74 client = new NodeClient(settings, threadPool);//5、创建一个节点客户端
75
76 //6、缓存一系列模块,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule
77 final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
78 final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
79 AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
80 // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool so we might be late here already
81 final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
82scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
83
94 final UsageService usageService = new UsageService(settings);
95
96 ModulesBuilder modules = new ModulesBuilder();
97// plugin modules must be added here, before others or we can get crazy injection errors...
98 for (Module pluginModule : pluginsService.createGuiceModules()) {
99 modules.add(pluginModule);
100 }
101 final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
102 ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
103 modules.add(clusterModule);
104 IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
105 modules.add(indicesModule);
106
107 SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
108 CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
109 settingsModule.getClusterSettings());
110 resourcesToClose.add(circuitBreakerService);
111 modules.add(new GatewayModule());
112
113 PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
114 BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
115 resourcesTo���,̩��Close.add(bigArrays);
116 modules.add(settingsModule);
117
133 modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
134 final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
135 final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
136analysisModule.getAnalysisRegistry(), clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),client, metaStateService);
137
138 Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
139 .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,namedWriteableRegistry).stream())
140.collect(Collectors.toList());
141
142 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
143 settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
144 modules.add(actionModule);
145
146 //7、获取RestController,用于处理各种Elasticsearch的rest命令,如_cat,_all,_cat/health,_clusters等rest命令(Elasticsearch称之为action)
147 final RestController restController = actionModule.getRestController();
148 SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
173 final Consumer<Binder> httpBind;
174 final HttpServerTransport httpServerTransport;
175 if (networkModule.isHttpEnabled()) {
176 httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
177 httpBind = b -> {
178b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
179 };
180 } else {
181 httpBind = b -> {
182 b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
183 };
184 httpServerTransport = null;
185 }
186
187 final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),clusterModule.getAllocationService());
188
189 this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,searchTransportService);
190
191 final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),responseCollectorService);
192
193 final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
194 .filterPlugins(PersistentTaskPlugin.class).stream()
195 .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
196 .flatMap(List::stream)
197 .collect(toList());
198
199 final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
200 final PersistentTasksClusterService persistentTasksClusterService =
201 new PersistentTasksClusterService(settings, registry, clusterService);
202 final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
203
204//8、绑定处理各种服务的实例,这里是最核心的地方,也是Elasticsearch能处理各种服务的核心.
205 modules.add(b -> {
206 b.bind(Node.class).toInstance(this);
207 b.bind(NodeService.class).toInstance(nodeService);
208 b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
209
249
250 // TODO hack around circular dependencies problems in AllocationService
251clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));
252
253 List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
254 .filter(p -> p instanceof LifecycleComponent)
255 .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
256
257 //9、利用Guice将各种模块以及服务(xxxService)注入到Elasticsearch环境中
258pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream() .map(injector::getInstance).collect(Collectors.toList()));
259 resourcesToClose.addAll(pluginLifecycleComponents);
260 this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
261 client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
262
263 if (NetworkModule.HTTP_ENABLED.get(settings)) { //如果elasticsearch.yml文件中配置了http.enabled参数(默认为true),则会初始化RestHandlers
264 logger.debug("initializing HTTP handlers ...");
265 actionModule.initRestHandlers(() -> clusterService.state().nodes()); //初始化RestHandlers, 解析集群命令,如_cat/,_cat/health
266 }
267 //10、初始化工作完成
268 logger.info("initialized");
269
270 success = true;
271 } catch (IOException ex) {
272 throw new ElasticsearchException("failed to bind service", ex);
273 } finally {
274 if (!success) {
275 IOUtils.closeWhileHandlingException(resourcesToClose);
276 }
277 }
278}
上面代码真的很多,这里再说下上面这么多代码主要干了什么吧:(具体是哪行代码执行的如下流程,上面代码中也标记了)
1、创建节点环境,比如节点名称,节点 ID,分片信息,存储元,以及分配内存准备给节点使用
2、打印出 JVM 相关信息
3、利用 PluginsService 加载相应的模块和插件,具体哪些模块可以去 modules 目录下查看
4、加载一些额外的配置参数
5、创建一个节点客户端
6、缓存一系列模块,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule
7、获取 RestController,用于处理各种 Elasticsearch 的 rest 命令,如 _cat, _all, _cat/health, _clusters 等 rest命令
8、绑定处理各种服务的实例
9、利用 Guice 将各种模块以及服务(xxxService)注入到 Elasticsearch 环境中
10、初始化工作完成(打印日志)
JarHell 报错解释前一篇阅读源码环境搭建的文章写过用 JDK 1.8 编译 ES 源码是会遇到如下异常:
1org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: jar hell!
这里说下就是 setup 方法中的如下代码导致的
1try {
2 // look for jar hell
3 final Logger logger = ESLoggerFactory.getLogger(JarHell.class);
4 JarHell.checkJarHell(logger::debug);
5} catch (IOException | URISyntaxException e) {
6 throw new BootstrapException(e);
7}
所以你如果是用 JDK 1.8 编译的,那么就需要把所有的有这块的代码给注释掉就可以编译成功的。
我自己试过用 JDK 10 编译是没有出现这里报错的。
正式启动 ES 节点回到上面 Bootstrap 中的静态 init 方法中,接下来就是正式启动 elasticsearch 节点了:
1INSTANCE.start(); //调用下面的 start 方法
2
3private void start() throws NodeValidationException {
4 node.start(); //正式启动 Elasticsearch 节点
5 keepAliveThread.start();
6}
接下来看看这个 start 方法里面的 node.start()
方法源码:
1public Node start() throws NodeValidationException {
2 if (!lifecycle.moveToStarted()) {
3 return this;
4 }
5
6 Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
7 logger.info("starting ...");
8 pluginLifecycleComponents.forEach(LifecycleComponent::start);
9
10 //1、利用Guice获取上述注册的各种模块以及服务
11 //Node 的启动其实就是 node 里每个组件的启动,同样的,分别调用不同的实例的 start 方法来启动这个组件, 如下:
12 injector.getInstance(MappingUpdatedAction.class).setClient(client);
13 injector.getInstance(IndicesService.class).start();
20
21 final ClusterService clusterService = injector.getInstance(ClusterService.class);
22
23 final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
24 nodeConnectionsService.start();
25 clusterService.setNodeConnectionsService(nodeConnectionsService);
26
27 injector.getInstance(ResourceWatcherService.class).start();
28 injector.getInstance(GatewayService.class).start();
29 Discovery discovery = injector.getInstance(Discovery.class);
30 clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
31
32 // Start the transport service now so the publish address will be added to the local disco node in ClusterService
33 TransportService transportService = injector.getInstance(TransportService.class);
34 transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
35 transportService.start();
36 assert localNodeFactory.getNode() != null;
37 assert transportService.getLocalNode().equals(localNodeFactory.getNode())
38 : "transportService has a different local node than the factory provided";
39 final MetaData onDiskMetadata;
40 try {
41 // we load the global state here (the persistent part of the cluster state stored on disk) to
42 // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
43 if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {//根据配置文件看当前节点是master还是data节点
44 onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
45 } else {
46 onDiskMetadata = MetaData.EMPTY_META_DATA;
47 }
48 assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
49 } catch (IOException e) {
50 throw new UncheckedIOException(e);
51 }
52 validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
53 .filterPlugins(Plugin
54 .class)
55 .stream()
56 .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
57
58 //2、将当前节点加入到一个集群簇中去,并启动当前节点
59 clusterService.addStateApplier(transportService.getTaskManager());
60 // start after transport service so the local disco is known
61 discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
62 clusterService.start();
63 assert clusterService.localNode().equals(localNodeFactory.getNode())
64 : "clusterService has a different local node than the factory provided";
65 transportService.acceptIncomingRequests();
66 discovery.startInitialJoin();
67 // tribe nodes don't have a master so we shouldn't register an observer s
68 final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
69 if (initialStateTimeout.millis() > 0) {
70 final ThreadPool thread = injector.getInstance(ThreadPool.class);
71 ClusterState clusterState = clusterService.state();
72 ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
73 if (clusterState.nodes().getMasterNodeId() == null) {
74
93 try {
94 latch.await();
95 } catch (InterruptedException e) {
96 throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
97 }
98 }
99 }
100
101
102 if (NetworkModule.HTTP_ENABLED.get(settings)) {
103 injector.getInstance(HttpServerTransport.class).start();
104 }
105
106 if (WRITE_PORTS_FILE_SETTING.get(settings)) {
107 if (NetworkModule.HTTP_ENABLED.get(settings)) {
108 HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
109 writePortsFile("http", http.boundAddress());
110 }
111 TransportService transport = injector.getInstance(TransportService.class);
112 writePortsFile("transport", transport.boundAddress());
113 }
114
115 logger.info("started");
116
117 pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
118
119 return this;
120}
上面代码主要是:
1、利用 Guice 获取上述注册的各种模块以及服务,然后启动 node 里每个组件(分别调用不同的实例的 start 方法来启动)
2、打印日志(启动节点完成)
总结这篇文章主要把大概启动流程串通了,讲了下 node 节点的创建和正式启动 ES 节点了。因为篇幅较多所以拆开成两篇,先不扣细节了,后面流程启动文章写完后我们再单一的扣细节。
关注我zhisheng
转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/12/es-code03/
相关文章1、渣渣菜鸡为什么要看 ElasticSearch 源码?
2、渣渣菜鸡的 ElasticSearch 源码解析 —— 环境搭建
3、Elasticsearch 默认分词器
4、Elasticsearch 可用中分分词器
5、Elasticsearch 自定义分词器
6、全文搜索引擎 Elasticsearch 集群搭建入门教程
7、Elasticsearch 系列文章(三):ElasticSearch 集群监控
8、Elasticsearch 系列文章(四):ElasticSearch 单个节点监控
9、Elasticsearch 系列文章(五):ELK 实时日志分析平台环境搭建
10、教你如何在 IDEA 远程 Debug ElasticSearch