基于Docker的Kafka服务

📅 发布时间:2026/7/6 4:26:17 👁️ 浏览次数:
基于Docker的Kafka服务
目录注意1. 说明2. 服务器规划3. docker-compose文件kafka{i}.yamlkafka-ui.yaml4. kafka-ui配置集群监控5. 参数表6. 测试脚本生产者-异步生产: AsyncKafkaProducer1.py消费者-异步消费: AsyncKafkaConsumer1.py7. 参考注意最近重新安装基于bitnami/kafka的服务时发现其已不再开源免费现采用apache/kafka作为容器镜像。附单节点配置services: kafka-single: image: apache/kafka:latest container_name: kafka-single ports: - 9092:9092 - 9093:9093 environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://kafka-single:9092,CONTROLLER://kafka-single:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-single:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CONTROLLER_QUORUM_VOTERS: 1kafka-single:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_NUM_PARTITIONS: 31. 说明创建一个本地开发环境所需的kafka集群分布在3个虚拟机上以docker容器方式互联互通2. 服务器规划Host端口备注host001.dev.sb9092, 9093, 9081kafka ui 访问kafka0 节点host002.dev.sb9092, 9093kafka1 节点host003.dev.sb9092, 9093kafka2 节点3. docker-compose文件kafka{i}.yaml- 其中 {i} 对应0,1,2- 用户密码都配在文件里面services: kafka: image: bitnami/kafka:3.6.2 container_name: kafka{i} hostname: kafka{i} restart: always ports: - 9092:9092 - 9093:9093 environment: # KRaft - KAFKA_CFG_NODE_ID{i} - KAFKA_CFG_PROCESS_ROLEScontroller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS0kafka0:9093,1kafka1:9093,2kafka2:9093 - KAFKA_KRAFT_CLUSTER_IDsbcluster01-mnopqrstuv # Listeners - KAFKA_CFG_LISTENERSINTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPINTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERSINTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMESCONTROLLER - KAFKA_CFG_NUM_PARTITIONS3 - KAFKA_CFG_INTER_BROKER_LISTENER_NAMEINTERNAL # Clustering - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR3 - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR3 - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR2 # Log - KAFKA_CFG_LOG_RETENTION_HOURS 72 # SASL - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLPLAIN - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLPLAIN - KAFKA_CFG_SASL_ENABLED_MECHANISMSPLAIN - KAFKA_CONTROLLER_USERkfkuser - KAFKA_CONTROLLER_PASSWORDyouknow - KAFKA_INTER_BROKER_USERkfkuser - KAFKA_INTER_BROKER_PASSWORDyouknow - KAFKA_CLIENT_USERSkfkuser - KAFKA_CLIENT_PASSWORDSyouknow # Others - TZAsia/Shanghai volumes: - /data0/Server/Db/kafka0:/bitnami/kafka extra_hosts: - kafka0:172.16.20.60 - kafka1:172.16.20.61 - kafka2:172.16.20.62kafka-ui.yamlservices: kafka-ui: image: provectuslabs/kafka-ui:master container_name: kafka-ui restart: always ports: - 9081:8080 environment: - KAFKA_CLUSTERS_0_NAMElocal - DYNAMIC_CONFIG_ENABLEDtrue - AUTH_TYPELOGIN_FORM - SPRING_SECURITY_USER_NAMEadmin - SPRING_SECURITY_USER_PASSWORDyouknow extra_hosts: - kafka0:172.16.20.60 - kafka1:172.16.20.61 - kafka2:172.16.20.624. kafka-ui配置集群监控5. 参数表参数说明KAFKA_CFG_PROCESS_ROLESkafka角色做broker, controller示例KAFKA_CFG_PROCESS_ROLEScontroller,brokerKAFKA_KRAFT_CLUSTER_ID集群id, 同属节点需一样KAFKA_CFG_CONTROLLER_QUORUM_VOTERS投票选举列表KAFKA_CFG_CONTROLLER_LISTENER_NAMES控制器名称KAFKA_CFG_NUM_PARTITIONS默认分区数KAFKA_CFG_LISTENERS监听器的地址和端口KAFKA_CFG_ADVERTISED_LISTENERS发布监听器的地址和端口KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP监听器的协议 这里sasl_plain表示 仅认证加密 传输不加密KAFKA_CLIENT_USERS加密客户端账号KAFKA_CLIENT_PASSWORDS加密客户端密码#ClusteringKAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORKafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移量KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORKafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRKafka 内部使用的 __transaction_state 主题的最小 ISRIn-Sync Replicas数量。ISR 是与leader 保持同步的副本集合#LogKAFKA_CFG_LOG_DIRS日志目录KAFKA_CFG_LOG_RETENTION_HOURS数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理默认168小时一周时间6. 测试脚本生产者-异步生产: AsyncKafkaProducer1.pyfrom confluent_kafka import Producer import json def delivery_report(err, msg): Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). if err is not None: print(fMessage delivery failed: {err}) else: print(fMessage delivered to {msg.topic()} [{msg.partition()}]) def create_async_producer(config): Creates an instance of an asynchronous Kafka producer. return Producer(config) def produce_messages(producer, topic, messages): Asynchronously produces messages to a Kafka topic. for message in messages: # Trigger any available delivery report callbacks from previous produce() calls producer.poll(0) # Asynchronously produce a message, the delivery report callback # will be triggered from poll() above, or flush() below, when the message has # been successfully delivered or failed permanently. producer.produce( topic, json.dumps(message).encode(utf-8), callbackdelivery_report ) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. producer.flush() if __name__ __main__: # Kafka configuration # Replace these with your servers configuration conf { bootstrap.servers: host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092, client.id: PythonProducer, security.protocol: SASL_PLAINTEXT, sasl.mechanisms: PLAIN, sasl.username: kfkuser, sasl.password: youknow, } # Create an asynchronous Kafka producer async_producer create_async_producer(conf) # Messages to send to Kafka messages_to_send [{key: value1a}, {key: value2a}, {key: value3a}] # Produce messages produce_messages(async_producer, zx001.msg.user, messages_to_send)消费者-异步消费: AsyncKafkaConsumer1.pyfrom confluent_kafka import Consumer, KafkaError, KafkaException import asyncio import json import logging from datetime import datetime # 设置日志格式%()表示日志参数 log_format %(message)s logging.basicConfig( filenamelogs/kafka_messages1.log, formatlog_format, levellogging.INFO ) async def consume_loop(consumer, topics): try: # 订阅主题 consumer.subscribe(topics) while True: # 轮询消息 msg consumer.poll(timeout1.0) if msg is None: continue if msg.error(): if msg.error().code() KafkaError._PARTITION_EOF: # End of partition event print( %% %s [%d] reached end at offset %d\n % (msg.topic(), msg.partition(), msg.offset()) ) elif msg.error(): raise KafkaException(msg.error()) else: # 正常消息 raw_message msg.value() # print(fRaw message: {raw_message}) str_msg raw_message.decode(utf-8) parsed_message json.loads(str_msg) parsed_message[time] datetime.now().strftime(%Y-%m-%d %H:%M:%S) print(fReceived message: {type(parsed_message)} : {parsed_message}) json_data json.dumps(parsed_message, ensure_asciiFalse) logging.info({}.format(json_data)) await asyncio.sleep(0.01) # 小睡片刻让出控制权 finally: # 关闭消费者 consumer.close() async def consume(): # 消费者配置 conf { bootstrap.servers: host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092, group.id: MsgGroup2, auto.offset.reset: earliest, client.id : PythonConsumer, security.protocol : SASL_PLAINTEXT, sasl.mechanisms : PLAIN, sasl.username : kfkuser, sasl.password : youknow } # 创建消费者 consumer Consumer(conf) await consume_loop(consumer, [zx001.msg.user]) if __name__ __main__: asyncio.run(consume())7. 参考- https://hub.docker.com/r/apache/kafka- Apache Kafka® Quick Start - Local Install With Docker- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub- https://juejin.cn/post/7187301063832109112