Flume 整合kafka SASL安全认证

服务端kafka开启SASL加密

# ------------------- define data source ----------------------
# source alias
agent.sources = source_from_kafka  
# channels alias
agent.channels = mem_channel  
# sink alias
agent.sinks = hdfs_sink


# define kafka source
agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource  
agent.sources.source_from_kafka.channels = mem_channel  
agent.sources.source_from_kafka.batchSize = 5000

#kafka 服务端配置了SASL加密
agent.sources.source_from_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username="dsuser" password="dsBc31";
agent.sources.source_from_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXT
agent.sources.source_from_kafka.kafka.consumer.sasl.mechanism = SCRAM-SHA-256

# set kafka broker address  金山
agent.sources.source_from_kafka.kafka.bootstrap.servers = 172.25.186.219:9092,172.25.186.220:9092,172.25.186.221:9092

# set kafka topic
agent.sources.source_from_kafka.kafka.topics = loganTopic_KFC_APP_100

# set kafka groupid
agent.sources.source_from_kafka.kafka.consumer.group.id = external_ds_td_new

# defind hdfs sink
agent.sinks.hdfs_sink.type = hdfs 

# specify the channel the sink should use  
agent.sinks.hdfs_sink.channel = mem_channel

# set store hdfs path
agent.sinks.hdfs_sink.hdfs.path = /flume/data/%Y%m%d/js/loganTopic_KFC_APP_100

# set file size to trigger roll
agent.sinks.hdfs_sink.hdfs.rollSize = 0  
agent.sinks.hdfs_sink.hdfs.rollCount = 0  
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream    
agent.sinks.hdfs_sink.hdfs.writeFormat=Text    

# define channel from kafka source to hdfs sink 
agent.channels.mem_channel.type = memory  


# channel store size
agent.channels.mem_channel.capacity = 100000
# transaction size
agent.channels.mem_channel.transactionCapacity = 10000

启动

bin/flume-ng agent -n agent -f conf/kafka2hdfs.properties -property "java.security.auth.login.config=conf/kafka_client_jaas.conf"

CDH配置

bin/flume-ng agent -n agent -f conf/kafka2hdfs.properties


Flume 整合kafka SASL安全认证
https://www.hechunyu.com/archives/1698222898377
作者
chunyu
发布于
2021年11月25日
许可协议