Flume整体流程是怎样的

67次阅读
没有评论

共计 9492 个字符,预计需要花费 24 分钟才能阅读完成。

本篇内容介绍了“Flume 整体流程是怎样的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

整体流程

不管是 Source 还是 Sink 都依赖 Channel,那么启动时应该先启动 Channel 然后再启动 Source 或 Sink 即可。

Flume 有两种启动方式:使用 EmbeddedAgent 内嵌在 Java 应用中或使用 Application 单独启动一个进程,此处我们已 Application 分析为主。

首先进入 org.apache.flume.node.Application 的 main 方法启动:

//1、设置默认值启动参数、参数是否必须的
Options options = new Options();
Option option = new Option( n ,  name , true,  the name of this agent 
option.setRequired(true);
options.addOption(option);
option = new Option( f ,  conf-file , true,
 specify a config file (required if -z missing) 
option.setRequired(false);
options.addOption(option);
//2、接着解析命令行参数
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
String agentName = commandLine.getOptionValue( n 
boolean reload = !commandLine.hasOption( no-reload-conf 
if (commandLine.hasOption( z) || commandLine.hasOption(zkConnString)) {
 isZkConfigured = true;
if (isZkConfigured) { //3、如果是通过 ZooKeeper 配置,则使用 ZooKeeper 参数启动,此处忽略,我们以配置文件讲解} else {
 //4、打开配置文件,如果不存在则快速失败
 File configurationFile = new File(commandLine.getOptionValue( f 
 if (!configurationFile.exists()) {
 throw new ParseException(  The specified configuration file does not exist:   + path);
 }
 List LifecycleAware  components = Lists.newArrayList();
 if (reload) { //5、如果需要定期 reload 配置文件,则走如下方式
 //5.1、此处使用 Guava 提供的事件总线
 EventBus eventBus = new EventBus(agentName +  -event-bus 
 //5.2、读取配置文件,使用定期轮训拉起策略,默认 30s 拉取一次
 PollingPropertiesFileConfigurationProvider configurationProvider =
 new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30);
 components.add(configurationProvider);
 application = new Application(components); //5.3、向 Application 注册组件
 //5.4、向事件总线注册本应用,EventBus 会自动注册 Application 中使用 @Subscribe 声明的方法
 eventBus.register(application);
 } else { //5、配置文件不支持定期 reload
 PropertiesFileConfigurationProvider configurationProvider =
 new PropertiesFileConfigurationProvider( agentName, configurationFile);
 application = new Application();
 //6.2、直接使用配置文件初始化 Flume 组件
 application.handleConfigurationEvent(configurationProvider
 .getConfiguration());
 }
//7、启动 Flume 应用
application.start();
//8、注册虚拟机关闭钩子,当虚拟机关闭时调用 Application 的 stop 方法进行终止
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread( agent-shutdown-hook) {
 @Override
 public void run() { appReference.stop();
 }
});

以上流程只提取了核心代码中的一部分,比如 ZK 的实现直接忽略了,而 Flume 启动大体流程如下:

1、读取命令行参数;

2、读取配置文件;

3、根据是否需要 reload 使用不同的策略初始化 Flume;如果需要 reload,则使用 Guava 的事件总线实现,Application 的 handleConfigurationEvent 是事件订阅者,PollingPropertiesFileConfigurationProvider 是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而 handleConfigurationEvent 会收到该配置变更重新进行初始化;

4、启动 Application,并注册虚拟机关闭钩子。

handleConfigurationEvent 方法比较简单,首先调用了 stopAllComponents 停止所有组件,接着调用 startAllComponents 使用配置文件初始化所有组件: 

@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents();
 startAllComponents(conf);
}

MaterializedConfiguration 存储 Flume 运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner 等,其是通过 ConfigurationProvider 进行初始化获取,比如 PollingPropertiesFileConfigurationProvider 会读取配置文件然后进行组件的初始化。

对于 startAllComponents 实现大体如下: 

//1、首先启动 Channel
supervisor.supervise(Channels,
 new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//2、确保所有 Channel 是否都已启动
for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START
   !supervisor.isComponentInErrorState(ch)){
 try { Thread.sleep(500);
 } catch (InterruptedException e) { Throwables.propagate(e);
 }
 }
//3、启动 SinkRunner
supervisor.supervise(SinkRunners, 
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//4、启动 SourceRunner
supervisor.supervise(SourceRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//5、初始化监控服务
this.loadMonitoring();

从如下代码中可以看到,首先要准备好 Channel,因为 Source 和 Sink 会操作它,对于 Channel 如果初始化失败则整个流程是失败的;然后启动 SinkRunner,先准备好消费者;接着启动 SourceRunner 开始进行采集日志。此处我们发现有两个单独的组件 LifecycleSupervisor 和 MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。

对于 stopAllComponents 实现大体如下:

//1、首先停止 SourceRunner
supervisor.unsupervise(SourceRunners);
//2、接着停止 SinkRunner
supervisor.unsupervise(SinkRunners);
//3、然后停止 Channel
supervisor.unsupervise(Channels);
//4、最后停止 MonitorService
monitorServer.stop();

此处可以看出,停止的顺序是 Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。

Application 中的 start 方法代码实现如下:

public synchronized void start() { for(LifecycleAware component : components) {
 supervisor.supervise(component,
 new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
 }
}

其循环 Application 注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持 reload 配置文件,则之前启动 Application 时注册过 PollingPropertiesFileConfigurationProvider 组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。

而 Application 关闭执行了如下动作: 

public synchronized void stop() { supervisor.stop();
 if(monitorServer != null) { monitorServer.stop();
 }
}

即关闭守护哨兵和监控服务。

到此基本的 Application 分析结束了,我们还有很多疑问,守护哨兵怎么实现的。 

整体流程可以总结为:

1、首先初始化命令行配置;

2、接着读取配置文件;

3、根据是否需要 reload 初始化配置文件中的组件;如果需要 reload 会使用 Guava 事件总线进行发布订阅变化;

4、接着创建 Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;

5、如果配置文件需要定期 reload,则需要注册 Polling***ConfigurationProvider 到守护哨兵;

6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。

轮训实现的 SourceRunner  和 SinkRunner 会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。

首先创建 LifecycleSupervisor:

//1、用于存放被守护的组件
 supervisedProcesses = new HashMap LifecycleAware, Supervisoree 
 //2、用于存放正在被监控的组件
 monitorFutures = new HashMap LifecycleAware, ScheduledFuture ? ();
 //3、创建监控服务线程池
 monitorService = new ScheduledThreadPoolExecutor(10,
 new ThreadFactoryBuilder().setNameFormat(  lifecycleSupervisor-  + Thread.currentThread().getId() +  -%d)
 .build());
 monitorService.setMaximumPoolSize(20);
 monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
 //4、定期清理被取消的组件
 purger = new Purger();
 //4.1、默认不进行清理
 needToPurge = false;

LifecycleSupervisor 启动时会进行如下操作:

public synchronized void start() { monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
 lifecycleState = LifecycleState.START;
}

首先每隔两个小时执行清理组件,然后改变状态为启动。而 LifecycleSupervisor 停止时直接停止了监控服务,然后更新守护组件状态为 STOP:

//1、首先停止守护监控服务
 if (monitorService != null) { monitorService.shutdown();
 try { monitorService.awaitTermination(10, TimeUnit.SECONDS);
 } catch (InterruptedException e) {
 logger.error( Interrupted while waiting for monitor service to stop 
 }
 }
 //2、更新所有守护组件状态为 STOP,并调用组件的 stop 方法进行停止
 for (final Entry LifecycleAware, Supervisoree  entry : supervisedProcesses.entrySet()) { if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) { entry.getValue().status.desiredState = LifecycleState.STOP;
 entry.getKey().stop();
 }
 }
 //3、更新本组件状态
 if (lifecycleState.equals(LifecycleState.START)) {
 lifecycleState = LifecycleState.STOP;
 }
 //4、最后的清理
 supervisedProcesses.clear();
 monitorFutures.clear();

接下来就是调用 supervise 进行组件守护了:

if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
 || this.monitorService.isTerminating()){
 //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护
 }
 //2、初始化守护组件
 Supervisoree process = new Supervisoree();
 process.status = new Status();
 //2.1、默认策略是失败重启
 process.policy = policy;
 //2.2、初始化组件默认状态,大多数组件默认为 START
 process.status.desiredState = desiredState;
 process.status.error = false;
 //3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件
 MonitorRunnable monitorRunnable = new MonitorRunnable();
 monitorRunnable.lifecycleAware = lifecycleAware;
 monitorRunnable.supervisoree = process;
 monitorRunnable.monitorService = monitorService;
 supervisedProcesses.put(lifecycleAware, process);
 //4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件
 ScheduledFuture ?  future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS);
 monitorFutures.put(lifecycleAware, future);
}

如果不需要守护了,则需要调用 unsupervise:

public synchronized void unsupervise(LifecycleAware lifecycleAware) { synchronized (lifecycleAware) { Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
 //1.1、设置守护组件的状态为被丢弃
 supervisoree.status.discard = true;
 //1.2、设置组件盼望的最新生命周期状态为 STOP
 this.setDesiredState(lifecycleAware, LifecycleState.STOP);
 //1.3、停止组件
 lifecycleAware.stop();
 }
 //2、从守护组件中移除
 supervisedProcesses.remove(lifecycleAware);
 //3、取消定时监控组件服务
 monitorFutures.get(lifecycleAware).cancel(false);
 //3.1、通知 Purger 需要进行清理,Purger 会定期的移除 cancel 的组件
 needToPurge = true;
 monitorFutures.remove(lifecycleAware);
}

接下来我们再看下 MonitorRunnable 的实现,其负责进行组件状态迁移或组件故障恢复:

public void run() { long now = System.currentTimeMillis();
 try { if (supervisoree.status.firstSeen == null) {
 supervisoree.status.firstSeen = now; //1、记录第一次状态查看时间
 }
 supervisoree.status.lastSeen = now; //2、记录最后一次状态查看时间
 synchronized (lifecycleAware) {
 //3、如果守护组件被丢弃或出错了,则直接返回
 if (supervisoree.status.discard || supervisoree.status.error) {
 return;
 }
 //4、更新最后一次查看到的状态
 supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
 //5、如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
 if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { switch (supervisoree.status.desiredState) { 
 case START: //6、如果是启动状态,则启动组件
 try { lifecycleAware.start();
 } catch (Throwable e) { if (e instanceof Error) {
 supervisoree.status.desiredState = LifecycleState.STOP;
 try { lifecycleAware.stop();
 } catch (Throwable e1) {
 supervisoree.status.error = true;
 if (e1 instanceof Error) { throw (Error) e1;
 }
 }
 }
 supervisoree.status.failures++;
 }
 break;
 case STOP: //7、如果是停止状态,则停止组件
 try { lifecycleAware.stop();
 } catch (Throwable e) { if (e instanceof Error) { throw (Error) e;
 }
 supervisoree.status.failures++;
 }
 break;
 default:
 }
 } catch(Throwable t) { }
 }
}

如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。 

“Flume 整体流程是怎样的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计9492字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)