十个案例学习Flume

在上一篇文章中,已经知道了Flume的架构、概述、与安装,现在我们来用十个案例去学习flume的使用。

在使用之前,提供一个大致思想,使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。

我们可以在官方文档找到案例的使用方法:

官方网址:Flume 1.11.0 User Guide — Apache Flumeicon-default.png?t=N7T8https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html

案例使用

image-20240426202629292

案例一、从控制台打入数据,在控制台显示

1、确定scource类型,channel类型和sink类型

     确定的使用类型分别是,netcat source, memory channel, logger sink.

2、创建并编写conf文件

 vim netcat2logger.conf

#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a.sources=r1
a.channels=c1
a.sinks=k1

                                                                                                         

#定义source类型,这里是试用netcat的类型
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.254.100
a.sources.r1.port=8888
#定义source发送的下游channel
a.sources.r1.channels=c1

#定义channel
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000


#定义sink的类型,确定上游channel
a.sinks.k1.channel=c1
a.sinks.k1.type=logger

3、开启服务,我们重新开启一个master客户端进行开启服务

     命令: 注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径

 flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./netcat2logger.conf -Dflume.root.logger=DEBUG,console

4.在原本客户端输入命令:

    (1  yum install -y telnet

   (2)telnet master 11223

此时在该端口可以输入一些数据测试,可以在启一个master客户端去监控flume.log(该文件产生在启动服务的同一个目录下)

案例二、从本地指定路径中打入数据到HDFS/hive中

HDFS:

1、确定scource类型,channel类型和sink类型

我们确定使用的类型分别是,spooldir source, memory channel, hdfs sink

2、编写conf文件

a1.sources = r1
a1.channels = c1
a1.sinks = k1


#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata29/flumedata2     --此目录需要自己指定创建
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000

#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/bigdata29/flumeout2/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0


#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、 开启服务

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./spooldir2hdfs.conf -Dflume.root.logger=DEBUG,console

4.将数据导入/usr/local/soft/bigdata29/flumedata2文件,并监控日志查看情况

再到hdfs中查看数据:

hive表

1.编写conf文件

vim spooldir2hive.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1


#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata29/flumedata3
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hive/warehouse/bigdata29.db/students_flume
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = students_test
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 1000
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#每次从channel中取出的条数
a1.sinks.k1.hdfs.batchSize=1000 

#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000


 #组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动hiveserver2服务并创建students_flume表

建表语句:

create table students_flume(id string,name string,age string,gender string,clazz st
ring)row format delimited fields terminated by ',';

3.开启服务

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./spooldir2hive.conf -Dflume.root.logger=DEBUG,console

4.将数据导入到 flumedata3文件中:

案例三、从java代码中进行捕获打入到HDFS

1、确定的三个组件的类型是,avro source, memory channel, hdfs sink

2、打开maven项目,添加依赖

            <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flume.flume-ng-clients</groupId>
                <artifactId>flume-ng-log4jappender</artifactId>
                <version>1.11.0</version>
            </dependency>

3、创建log4j配置文件,并加入以下内容

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n


log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.19.100
log4j.appender.flume.Port = 12345
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout 
log4j.appender.flume.layout.ConversionPattern=%m%n

4、编写Java代码

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Logger;

public class LoggerToFlume {
    public static void main(String[] args) throws InterruptedException {
        //创建一个logger对象
        Logger logger = Logger.getLogger(LoggerToFlume.class.getName());

        //创建一个日期格式化对象
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        
        //写一个死循环打入日志
        while (true){
            //打入日志信息
            logger.info("dataToBigdata29:"+simpleDateFormat.format(new Date()));
            //时间设置为每0.1秒打入一次
            Thread.sleep(100);
        }
    }
}

5、编写conf文件

vim avro2hdfs.conf

#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1

#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.19.100
a.sources.r1.port = 12345

#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 1000

#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = hdfs://master:9000/bigdata29/flumeout4/jd_goods
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.rollSize =0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0

#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

6、开启服务,命令:

flume-ng agent -n a -c usr/local/soft/flume1.11/conf  -f ./avro2hdfs2.conf -Dflume.root.logger=DEBUG,console

7、运行java代码

案例四、监控HBase日志到Hbase表中

4.1监控HBase日志到Hbase表中

1、提前建好表

create 'foo_table','bar_cf'

2、编写conf文件

vim exec2base.conf

# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1 
# 给channel组件命名为c1
a.channels = c1


#指定spooldir的属性
a.sources.r1.type = exec 
a.sources.r1.command = tail -F /usr/local/soft/hbase-2.2.7/logs/hbase-root-master-master.log

#指定sink的类型
a.sinks.k1.type = hbase2
a.sinks.k1.table = foo_table
a.sinks.k1.columnFamily = bar_cf
a.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100

# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

3、启动服务并监控hbase-root-master-master.log

启动服务命令:

flume-ng agent -n a -c /usr/local/soft/flume-1.11/conf -f ./exec2base.conf
 -Dflume.root.logger=DEBUG,console

4、启动hive服务随意操作

4.2将文件数据打入到到Hive表中

1、开启hive服务并创建表

create table exec2hive
(
    id string not null,
    name string not null,
    price string not null,
    comments string not null,
    shop string not null,
    tags string not null
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

2.创建配置文件:

vim exec2hive.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/bigdata29/wendang/jd_goods.txt

#具体定义sink
a1.sinks.k1.type = hdfs

#路径要与hive中表的路径一致
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hive/warehouse/bigdata29.db/exec2hive
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount = 1000
a1.sinks.k1.hdfs.rollSize = 0
#每隔N s将临时文件滚动成一个目标文件
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
a1.sinks.k1.hdfs.batchSize=100

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、打入数据:

案例五、flume监控Http source

1、确定的三个组件的类型是,http source, memory channel, logger sink

2、编写conf文件

vim http2logger.conf

a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
 
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
 
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100

3、启动服务

flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./http2logger.conf -Dflume.root.logger=DEBUG,console

4、另开一个窗口进行打数据

案例六、多路复制

图解:

1、将flume复制到node1,node2

 scp -r flume-1.11 node1:`pwd`

 scp -r flume-1.11 node2:`pwd`

2、将master的环境配置文件复制到node1,node2并source使其生效

scp /etc/profile node1:`pwd`
scp /etc/profile node2:`pwd`

3、在node1节点新建配置文件:

     vim avro2logger.conf

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 192.168.19.110
a1.sources.r1.port = 11111

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

4、在node2节点新建配置文件:

     vim avro2logger.conf

a2.sources = r2
a2.channels = c2
a2.sources.r2.type = avro
a2.sources.r2.channels = c2
a2.sources.r2.bind = 192.168.19.120
a2.sources.r2.port = 22222

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

a2.sinks = k2
a2.sinks.k2.type = logger
a2.sinks.k2.channel = c2

5、在master节点新建配置文件:  

vim netcat2avro.conf

m.sources = r1
m.channels = c1 c2
m.sinks = k1 k2

# Describe/configure the source
m.sources.r1.type = netcat
m.sources.r1.bind = 192.168.19.100
m.sources.r1.port = 12345


# Use a channel which buffers events in memory
m.channels.c1.type = memory
m.channels.c1.capacity = 1000
m.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
m.channels.c2.type = memory
m.channels.c2.capacity = 1000
m.channels.c2.transactionCapacity = 100

# Describe the sink
m.sinks.k1.type = avro
m.sinks.k1.hostname = 192.168.19.110
m.sinks.k1.port = 11111

m.sinks.k2.type = avro
m.sinks.k2.hostname = 192.168.19.120
m.sinks.k2.port = 22222

# Bind the source and sink to the channel
m.sources.r1.channels = c1 c2
m.sinks.k1.channel = c1
m.sinks.k2.channel = c2

6、先启动node1和node2节点的服务端,在启动master的

7、在master开一个端口数据

telnet master 12345

观察node1,node2的日志:

案例七、故障转移

       效果上与hadoop中的高可用集群很相似,但是hadoop选举的机制是靠zookeeper,而flume是通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作。

       与hadoop的高可用不同,如果当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink。

1、master:vim netcat2failover.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 12345

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 11111

#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 22222

#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2  
a1.sinkgroups.g1.processor.type = failover
#设置优先级(分数)
a1.sinkgroups.g1.processor.priority.k1 = 1
a1.sinkgroups.g1.processor.priority.k2 = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

2、 node1: vim avro2logger.conf

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 192.168.19.110
a1.sources.r1.port = 11111

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

3、 node2: vim avro2logger.conf

a2.sources = r2
a2.channels = c2
a2.sources.r2.type = avro
a2.sources.r2.channels = c2
a2.sources.r2.bind = 192.168.19.120
a2.sources.r2.port = 22222

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

a2.sinks = k2
a2.sinks.k2.type = logger
a2.sinks.k2.channel = c2

4、先启动node1,node2服务,在启动master服务

5、开启端口号测试

因为设置的node1的优先级为1,node2的优先级为100,则数据打在了node2上

此时关闭node2服务数据便会打在node2上

如果再将node1服务开启,那么后续的数据还是会打在node1上,与hadoop的高可用集群不同,在此就不过多演示了

案例八、负载均衡

使用负载均衡以后,channel会轮训分配任务(随机),减少机器负荷

1、master上的配置文件:vim netcat2balance.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 12345

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 11111

#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 22222

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

2、node1和node2上配置与案例七相同即可

3、先启动node1,node2服务,在启动master服务

node1:flume-ng agent -n a1 -c /usr/local/soft/flume-1.11
onf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,console

node2:flume-ng agent -n a1 -c /usr/local/soft/flume-1.11
onf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,console

master:flume-ng agent -n a1 -c /usr/local/soft/flume-1.
/conf -f ./netcat2balance.conf -Dflume.root.logger=DEBUG,console

4、启用54321端口观察node1和node2的日志文件:

案例九、聚合

node1、node2两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log 现在要求:

把node1、node2机器中的access.log、nginx.log、web.log 采集汇总到master机器上然后统一收集到hdfs中。 但是在hdfs中要求的目录为:

图解:

1、node1和node2创建都创建3个文件用来模拟数据源

mkdir -p /usr/local/soft/bigdata29/wendang/juhe

touch /usr/local/soft/bigdata29/wendang/juhe/access.log
touch /usr/local/soft/bigdata29/wendang/juhe/nginx.log
touch /usr/local/soft/bigdata29/wendang/juhe/web.log

2、node1和node2上配置文件

 vim juhetohdfs.conf

# Name the components on this agent 
a1.sources = r1 r2 r3 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /usr/local/soft/bigdata29/wendang/juhe/access.log 
# static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = static 
a1.sources.r1.interceptors.i1.key = type 
a1.sources.r1.interceptors.i1.value = access 
# Event: { headers:{type=access} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D             hello world. }

a1.sources.r2.type = exec 
a1.sources.r2.command = tail -F /usr/local/soft/bigdata29/wendang/juhe/nginx.log 
a1.sources.r2.interceptors = i2 
a1.sources.r2.interceptors.i2.type = static 
a1.sources.r2.interceptors.i2.key = type 
a1.sources.r2.interceptors.i2.value = nginx 

a1.sources.r3.type = exec 
a1.sources.r3.command = tail -F /usr/local/soft/bigdata29/wendang/juhe/web.log 
a1.sources.r3.interceptors = i3 
a1.sources.r3.interceptors.i3.type = static 
a1.sources.r3.interceptors.i3.key = type 
a1.sources.r3.interceptors.i3.value = web 

# Describe the sink 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = master 
a1.sinks.k1.port = 12345 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sources.r2.channels = c1 
a1.sources.r3.channels = c1 
a1.sinks.k1.channel = c1

3、master: vim juhetohdfs.conf

a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# 定义source 
a1.sources.r1.type = avro 
a1.sources.r1.bind = master 
a1.sources.r1.port = 12345 
# 添加时间拦截器 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = timestamp

# 定义channels 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 

# 定义sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path=hdfs://master:9000/bigdata29/flumelogs/%{type}/%Y%m%d 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
# 时间类型 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
# 生成的文件不按条数生成 
a1.sinks.k1.hdfs.rollCount = 0 
# 生成的文件按时间生成 
a1.sinks.k1.hdfs.rollInterval = 30 
# 生成的文件按大小生成 
a1.sinks.k1.hdfs.rollSize = 10485760 
# 批量写入hdfs的个数 
a1.sinks.k1.hdfs.batchSize = 10000 
# flume操作hdfs的线程数(包括新建,写入等) 
a1.sinks.k1.hdfs.threadsPoolSize=10 
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000 


# 组装source、channel、sink 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

4、采集端文件生成脚本,在node1与node2上面编写shell脚本,模拟数据生成 

vim createdata.sh

# !/bin/bash 

num=1
while true 
    do
    num=$((num+1))
    echo "${num}_access" >> /usr/local/soft/bigdata17/scrips/taillogs/access.log; 
    echo "${num}_web" >> /usr/local/soft/bigdata17/scrips/taillogs/web.log; 
    echo "${num}_nginx" >> /usr/local/soft/bigdata17/scrips/taillogs/nginx.log; 
    sleep 0.5; 
done

5、先启动服务master服务,再启动node1和node2的服务

6、启动脚本模拟数据产生并观察日志文件

     node1:sh createdata.sh

     node2:sh createdata.sh

 

案例十、自定义Interceptor(拦截器)

使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

1、引入依赖

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.flume.flume-ng-clients</groupId>
        <artifactId>flume-ng-log4jappender</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <descriptorRefs>
                    <!--  打包出来的带依赖jar包名称 -->
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly-->
            <executions>
                <execution>
                    <id>make-assemble</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2、实现自定义的java代码

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 *  1. 如何自定义拦截器?
 *   flume的自定义拦截器需要实现Flume提供的Interceptor接口.
 *
 *  实现抽象方法:
 *      initialize: 完成一些初始化工作.
 *      close: 完成一些善后的工作
 *      intercept:拦截器的核心处理方法.  拦截的逻辑.
 *          intercept(Event event) : 单个event的拦截处理
 *          intercept(List<Event> events): 批次event的拦截处理
 *
 *  2. 拦截器的对象如何实例化?
 *    在拦截器中定义一个static的内部类,实现Flume提供的Builder接口
 *
 *   实现抽象方法:
 *      build : 用于构建拦截器对象
 *      configure:用于读取配置信息(xxxx.conf)
 *
 *
 *
 */
public class ChannelSelector implements Interceptor {
    @Override
    public void initialize() {
        //不需要结合其他的组件,这里什么都不写
    }
        /*
        将接收到的event数据进行解析,重新设置headers或者数据,返回一个新的event
        headers的主要作用是为了后面判断给哪一个sink处理的依据
        判断接收到的数据中是否包含shujia,如果包含,就设置一个键值对在headers中type=sj
        否则设置为type=nsj
     */

    @Override
    public Event intercept(Event event) {
        //获取headers的body内容,body就是我们所监控到的数据
        String info = new String(event.getBody());
        //获取headers,默认情况下,event中的headers是{}
        Map<String, String> headers = event.getHeaders();
        if(info.contains("caocao")){
            headers.put("type","wei");
            event.setHeaders(headers);
        }else {
            headers.put("type","dongwu");
            event.setHeaders(headers);
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> events = new ArrayList<>();
        for (Event event : events) {
            events.add(intercept(event));
        }
        return events;
    }

    @Override
    public void close() {
        //因为没有额外的初始化连接,也不需要关闭

    }

    public static class MyBuilder implements Builder{

        @Override
        public Interceptor build() {
            return new ChannelSelector();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

 

3、将代码打成jar包(带依赖),将jar包放在flume的lib目录下

4、配置node1文件

vim custom.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.sources.r1.type = avro
a2.sources.r1.bind = node1
a2.sources.r1.port = 5555

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.type =logger

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、配置node2文件 

vim custom.conf

a3.sources = r1
a3.channels = c1
a3.sinks = k1 

a3.sources.r1.type = avro
a3.sources.r1.bind = node2
a3.sources.r1.port = 6666

a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.type = logger

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1 

6、配置master文件 

vim custom.conf

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 11111

#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = type
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c2里
a1.sources.r1.selector.mapping.wei = c1
a1.sources.r1.selector.mapping.dongwu = c2
a1.sources.r1.selector.default=c2
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = com.shujia.jinjie.ChannelSelector$MyBuilder
# 组装channel与source
a1.sources.r1.channels = c1 c2 

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666


a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/577548.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

零基础HTML教程(30)--迈入HTML5新时代

文章目录 1. 从H4时代到H5时代2. 属性值可以不用引号3. 标签使用大小写均可4. 部分属性值可以省略5. 浏览器支持情况6. 小结 1. 从H4时代到H5时代 之前讲的29篇HTML教程&#xff0c;内容基本都是H4时代就有的。 随着时代的发展&#xff0c;H4多少有点不够用&#xff0c;所以H…

Kotlin基础​​

数据类型 定义变量 var表示定义变量&#xff0c;可以自动推导变量类型&#xff0c;所以Int可以不用写。 定义常量 条件语句 if表达式可以返回值&#xff0c;该值一般写在if里的最后一行 类似switch的用法 区间 循环 a是标签&#xff0c;可以直接break到标签的位置&#xf…

【八大排序(二)】选择排序与堆排序

❣博主主页: 33的博客❣ ▶️文章专栏分类:八大排序◀️ &#x1f69a;我的代码仓库: 33的代码仓库&#x1f69a; &#x1faf5;&#x1faf5;&#x1faf5;关注我带你了解更多排序知识 目录 1.前言2.选择排序2.1基本思想2.2画图理解2.3单向选择排序代码实现2.4双向选择排序代码…

从零入门区块链和比特币(第一期)

欢迎来到我的区块链与比特币入门指南&#xff01;如果你对区块链和比特币感兴趣&#xff0c;但不知道从何开始&#xff0c;那么你来对地方了。本博客将为你提供一个简明扼要的介绍&#xff0c;帮助你了解这个领域的基础知识&#xff0c;并引导你进一步探索这个激动人心的领域。…

swagger xss漏洞复现

swagger xss漏洞复现 文章目录 swagger xss漏洞复现漏洞介绍影响版本实现原理漏洞复现修复建议: 漏洞介绍 Swagger UI 有一个有趣的功能&#xff0c;允许您提供 API 规范的 URL - 一个 yaml 或 json 文件&#xff0c;将被获取并显示给用户 根本原因非常简单 - 一个过时的库Dom…

预见预判|AIRIOT智慧交通管理解决方案

随着机动车保有量的逐步增加&#xff0c;城市交通压力日益增大。同时&#xff0c;新能源车辆的快速发展虽然带来了环保效益&#xff0c;但也因不限号政策而进一步加剧了道路拥堵问题。此外&#xff0c;各类赛事和重大活动的交通管制措施也时常导致交通状况复杂多变。面对这些挑…

Java 基础常见面试题整理

目录 1、java的基本数据类型有哪些&#xff1f;2、java为什么要有包装类型&#xff1f;3、String a "123" 和 String a new String("123") 区别&#xff1f;4、String、StringBuilder和StringBuffer的区别&#xff1f;5、如何理解面向对象和面向过程&…

MySQL常见问题与解决方案详述

MySQL&#xff1a;常见问题与解决方案详述 作为一款广泛使用的开源关系型数据库管理系统&#xff0c;MySQL对于初学者来说既充满吸引力又充满挑战。本文将列举初学者在使用MySQL过程中可能遇到的一些典型问题&#xff0c;并提供详细的解决方案&#xff0c;配以图片辅助说明&am…

修改后门ctime | Linux 后门系列

0x00 前情提要 在 alias 后门 &#xff5c; Linux 后门系列一文中&#xff0c;我们为了让后门完美一些&#xff0c;修改了后门文件的 atime、mtime&#xff0c;但是 ctime 一直没有办法修改&#xff0c;今天我们来把这一块补齐&#xff0c;让后门更加完美 atime -> access t…

Chrome 网络调试程序 谷歌网络调试 network

目录 1.网络面板总览2.概况了解3.Waterfall接口排队等待时间4.关注请求接口的Size,可能是占据内存溢出的接口5.过滤器一栏 fetch/xhr 什么意思6. Stalled 什么意思7.Queueing 什么意思8.Queueing和Stalled之间什么关系9.为什么会有阻塞状态10.Time列是pending 什么意思 1.网络面…

Vue入门篇:生命周期,钩子函数,工程化开发Vue(脚手架安装),组件化开发(全局注册,局部注册)

目录 1.Vue生命周期和生命周期的四个阶段2.Vue生命周期函数&#xff08;钩子函数)3.工程化开发&脚手架Vue CLI1.在powershell管理员权限下打开命令行安装脚手架&#xff1a;2.查看vue版本&#xff1a;3.创建项目架子4.运行项目 4.组件化开发&根组件1.App.vue文件&#…

解决双击PDF文件出现打印的问题【Adobe DC】

问题描述 电脑安装Adobe Acrobat DC之后&#xff0c;双击PDF文件就会出现打印&#xff0c;而无法直接打开。 右键PDF文件就会发现&#xff0c;第一栏出现的不是用Adobe打开&#xff0c;而是打印。 重装软件多次仍然无法解决。 原因 右键菜单被改写了。双击其实是执行右键菜…

计算机网络—— book

文章目录 一、概述1.1互联网的核心部分1&#xff0e;电路交换的主要特点2&#xff0e;分组交换的主要特点 1.2.计算机网络的性能1&#xff0e;速率2&#xff0e;带宽3&#xff0e;吞吐量4&#xff0e;时延5&#xff0e;利用率 1.3.计算机网络体系结构协议与划分层次具有五层协议…

Git如何配合Github使用

1.安装Git https://git-scm.com/ ##2.配置 Git 安装完成后&#xff0c;你需要设置 Git 的用户名和邮箱地址&#xff0c;这样在提交代码时就能知道是谁提交的。你可以在命令行中输入以下命令来配置&#xff1a; git config --global user.name "Your Name" git con…

JavaScript创建和填充数组的更多方法

空数组fill()方法创建并填充数组 ● 我们之前创建数组的方式都是手动去创建去一个数据&#xff0c;例如 console.log([1, 2, 3, 4, 5, 6, 7]);● 当然我们也可以使用Array对象来构造数组 console.log([1, 2, 3, 4, 5, 6, 7]); console.log(new Array(1, 2, 3, 4, 5, 6, 7));…

惊爆:Apple重启OpenAI谈判为iphone引入其技术

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

用宝塔部署一套自己的漏洞扫描OpenVAS

一、OpenVAS简单说明 OpenVAS是一个开源且功能开放的网络安全漏洞评估系统&#xff0c;它集成了多种相关工具&#xff0c;构成了一套全面的网络扫描解决方案。因此&#xff0c;OpenVAS能够免费提供给用户部署和使用。在其最新版本中&#xff0c;仅需安装一个基于浏览器/服务器架…

【OceanBase诊断调优 】—— 如何快速定位SQL问题

作者简介&#xff1a; 花名&#xff1a;洪波&#xff0c;OceanBase 数据库解决方案架构师&#xff0c;目前负责 OceanBase 数据库在各大型互联网公司及企事业单位的落地与技术指导&#xff0c;曾就职于互联网大厂和金融科技公司&#xff0c;主导过多项数据库升级、迁移、国产化…

论文解读-面向高效生成大语言模型服务:从算法到系统综述

一、简要介绍 在快速发展的人工智能&#xff08;AI&#xff09;领域中&#xff0c;生成式大型语言模型&#xff08;llm&#xff09;站在了最前沿&#xff0c;彻底改变了论文与数据交互的方式。然而&#xff0c;部署这些模型的计算强度和内存消耗在服务效率方面带来了重大挑战&a…

BUUCTF-Misc22

[WUSTCTF2020]爬1 1.打开附件 第一个文件 2.foremost 用binwalk 文件名 查看文件是否包含其他文件 foremost 文件名 分离文件 打开分离的文件&#xff0c;看到PDF文件夹下有一个PDF的文本文档 打开提示被图片覆盖住了 3.WPS 用WPS打开PDF文件&#xff0c;点击编辑即可将图…
最新文章