Flink基本概念
首先来讲,Flink是一个面向数据流处理和批处理的分布式开源计算框架。
那么,流处理和批处理分别处理什么样的数据呢,这就涉及两个概念-无界流和有界流
无界流VS有界流
任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。
Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行
任何处理无界流的应用
流数据分为无界流和有界流。
- 1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处理方式。
- 2) 有界流:有定义流的开始, 也有定义流的结束, 需要在获取所有数据后再进行计算,有界流采用的是批处理方式。
组件结构
DataSet 一般用来处理有界流数据。
DataStream一般用来处理无界流数据。
Flink基础Demo案例
1、基本环境搭建
pom.xml核心配置
- <properties>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <flink.version>1.11.2flink.version>
- <java.version>1.8java.version>
- <scala.binary.version>2.11scala.binary.version>
- <maven.compiler.source>${java.version}maven.compiler.source>
- <maven.compiler.target>${java.version}maven.compiler.target>
- <log4j.version>2.12.1log4j.version>
- <spring.boot.version>2.1.6.RELEASEspring.boot.version>
- <mysql.jdbc.version>5.1.47mysql.jdbc.version>
- properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-walkthrough-common_${scala.binary.version}artifactId>
- <version>${flink.version}version>
- dependency>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
- <version>${flink.version}version>
- dependency>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-clients_${scala.binary.version}artifactId>
- <version>${flink.version}version>
- dependency>
-
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <version>1.18.8version>
- dependency>
- dependencies>
2、批处理Demo实现
这个demo实现-----通过批处理方式,统计日志文件中的异常数量。
文件准备order_info.log,文件内容如下
- 2019-08-25 16:32:55,626 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$e1b53a9c] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
- 2019-08-25 16:32:56,918 [main] INFO [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
- 2019-08-25 16:32:57,829 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
- 2019-08-25 16:32:57,834 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
- 2019-08-25 16:32:57,847 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 5ms. Found 0 repository interfaces.
- 2019-08-25 16:32:57,858 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
- 2019-08-25 16:32:57,859 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
- 2019-08-25 16:32:57,870 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
- 2019-08-25 16:32:57,908 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
- 2019-08-25 16:32:57,928 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
- 2019-08-25 16:32:58,144 [main] INFO [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
- 2019-08-25 16:32:58,155 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
- 2019-08-25 16:32:58,162 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
- 2019-08-25 16:32:58,176 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
- 2019-08-25 16:32:58,212 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
- 2019-08-25 16:32:58,267 [configOperate_1_2] WARN [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
- 2019-08-25 16:32:58,269 [main] WARN [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
- 2019-08-25 16:32:58,280 [main] INFO [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -
-
- Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
- 2019-08-25 16:32:58,289 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
- org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
- at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
- at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
- at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
- at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
- at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
- at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
- at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
- at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
- at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
- at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
- at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
- at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
- at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
- Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
- at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
- at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
- ... 19 common frames omitted
- Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
- at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
- at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
- at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
- at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
- at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
- at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
- at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.CGLIB$globalTransactionScanner$0(<generated>)
- at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba$$FastClassBySpringCGLIB$$ec5dcab5.invoke(<generated>)
- at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
- at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
- at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.globalTransactionScanner(<generated>)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
- ... 20 common frames omitted
- Caused by: java.lang.IllegalArgumentException: illegal type:null
- at io.seata.config.ConfigType.getType(ConfigType.java:62)
- at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
- ... 35 common frames omitted
- 2019-08-25 16:36:05,248 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$ed668c12] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
- 2019-08-25 16:36:06,559 [main] INFO [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
- 2019-08-25 16:36:07,554 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
- 2019-08-25 16:36:07,555 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
- 2019-08-25 16:36:07,568 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 6ms. Found 0 repository interfaces.
- 2019-08-25 16:36:07,581 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
- 2019-08-25 16:36:07,583 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
- 2019-08-25 16:36:07,595 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
- 2019-08-25 16:36:07,639 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
- 2019-08-25 16:36:07,661 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
- 2019-08-25 16:36:07,919 [main] INFO [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
- 2019-08-25 16:36:07,930 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
- 2019-08-25 16:36:07,937 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
- 2019-08-25 16:36:07,944 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
- 2019-08-25 16:36:07,987 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
- 2019-08-25 16:36:10,247 [configOperate_1_2] WARN [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
- 2019-08-25 16:36:14,137 [main] WARN [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
- 2019-08-25 16:36:14,150 [main] INFO [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -
-
- Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
- 2019-08-25 16:36:14,161 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
- org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
- at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
- at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
- at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
- at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
- at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
- at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
- at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
- at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
- at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
- at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
- at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
- at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
- at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
- Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
- at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
- at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
- ... 19 common frames omitted
- Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
- at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
- at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
- at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
- at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
- at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
- at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
- at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.CGLIB$globalTransactionScanner$0(<generated>)
- at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830$$FastClassBySpringCGLIB$$691b278a.invoke(<generated>)
- at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
- at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
- at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.globalTransactionScanner(<generated>)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
- ... 20 common frames omitted
- Caused by: java.lang.IllegalArgumentException: illegal type:null
- at io.seata.config.ConfigType.getType(ConfigType.java:62)
- at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
- ... 35 common frames omitted
- 2019-08-25 16:36:21,421 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean
代码实现
- public class BatchProcessorApplication {
- public static void main(String[] args) throws Exception{
- //1.定义Flink运行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- //2.读取数据源(日志文件信息)
- DataSource
logData = env.readTextFile("./data/order_info.log"); -
- //3.清洗转换数据
- logData.flatMap(new FlatMapFunction
>() { - @Override
- public void flatMap(String value, Collector
> collector) throws Exception { - // 1) 根据正则,提取每行日志的级别
- Pattern pattern = Pattern.compile("\\[main\\](.*?)\\[");
- Matcher matcher = pattern.matcher(value);
- if (matcher.find()){
- // 2) 如果匹配符合规则,放置元组,输出数据
- collector.collect(new Tuple2<>(matcher.group(1).trim(),1));
- }
- }
- //groupBy(0)代表对collector中每个tuple2的进行分组,sum(1)代表对tuple2中的Integer进行求和
- }).groupBy(0).sum(1).print();
- }
- }
3、流处理Demo实现
本地模拟socket请求
代码实现
- public class StreamProcessorApplication {
-
- public static void main(String[] args) throws Exception{
- //1.定义Flink执行环境
- StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //2.从socket数据流中读取实时流数据
- DataStreamSource
dataStreamSource = streamEnv.socketTextStream("127.0.0.1", 9999); -
- dataStreamSource.flatMap(new FlatMapFunction
>() { - @Override
- public void flatMap(String line, Collector
> collector) throws Exception { - String[] split = line.split("\t");
- collector.collect(new Tuple2<>(split[0],1));
- }
- // setParallelism设置并行流计算有多少个线程
- }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
-
- streamEnv.execute();
- }
-
- }
Flink部署安装配置
首先确保你的Linux系统已经安装好了JDK
解压flink安装包
tar -zxvf flink-1.11.2-bin-scala_2.11.tgz
进入flink配置目录
masters 文件用于指定 Flink 集群中的主节点(JobManager)地址。它帮助集群中的各个节点知道主节点的位置,从而能够正确地连接和通信。
flink-conf.yaml 文件是 Flink 的主要配置文件,用于配置 Flink 集群的各种参数。这个文件包含了丰富的配置选项,涵盖了从基本的集群设置到高级的性能调优。
其中的常见配置:
-
jobmanager.rpc.address
:- 描述: 指定 JobManager 的主机名或IP地址。
- 示例:
jobmanager.rpc.address: localhost
-
jobmanager.rpc.port
:- 描述: 指定 JobManager 的RPC端口号。
- 示例:
jobmanager.rpc.port: 6123
-
jobmanager.memory.process.size
:- 描述: 指定 JobManager 的总内存大小。
- 示例:
jobmanager.memory.process.size: 1600m
-
taskmanager.memory.process.size
:- 描述: 指定 TaskManager 的总内存大小。
- 示例:
taskmanager.memory.process.size: 1600m
-
taskmanager.numberOfTaskSlots
:- 描述: 指定每个 TaskManager 的任务槽位数。
- 示例:
taskmanager.numberOfTaskSlots: 4
-
parallelism.default
:- 描述: 指定默认的并行度。
- 示例:
parallelism.default: 4
-
state.backend
:- 描述: 指定状态后端,可以选择
MemoryStateBackend
、FsStateBackend
或RocksDBStateBackend
。 - 示例:
state.backend: rocksdb
- 描述: 指定状态后端,可以选择
-
rest.address
:- 描述: 指定 REST API 的主机名或IP地址。
- 示例:
rest.address: localhost
-
rest.port
:- 描述: 指定 REST API 的端口号。
- 示例:
rest.port: 8081
启动flink,进入到bin目录
访问8081端口
到此flink安装完毕
Flink任务提交
在pom.xml文件中配置打包插件
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-compiler-pluginartifactId>
- <version>3.5.1version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
-
- configuration>
- plugin>
-
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-shade-pluginartifactId>
- <version>2.3version>
- <executions>
- <execution>
- <phase>packagephase>
- <goals>
- <goal>shadegoal>
- goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*artifact>
- <excludes>
-
- <exclude>META-INF/*.SFexclude>
- <exclude>META-INF/*.DSAexclude>
- <exclude>META-INF/*.RSAexclude>
- excludes>
- filter>
- filters>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-
- <mainClass>com.demo.flink.usage.stream.StreamProcessorApplicationmainClass>
- transformer>
- transformers>
- configuration>
- execution>
- executions>
- plugin>
- plugins>
- build>
点击package打成jar包
界面提交
上传jar包,信息无误后,点击提交
在这里就能看到输出结果了
命令行提交
上传jar包至linux
进入flink的bin目录
执行命令
./flink run -c com.demo.flink.usage.stream.StreamProcessorApplication /usr/local/flink-usage-1.0-SNAPSHOT.jar
评论记录:
回复评论: