데이터 엔지니어링/Kafka
카프카 도커에 띄워서 파이썬으로 Producer, Consumer 간단한 테스트 해보기
JDhyeok
2022. 2. 22. 01:04
파이썬으로 카프카 consumer, producer 만들기
필자는 유닉스 기반 PC를 사용중이 아니라 가상 컨테이너 도커로 카프카와 주키퍼를 만들어 테스트 해봤다.
1. docker-compose로 카프카, 주키퍼 컨테이너 생성
카프카와 주키퍼를 운영할 컨테이너를 한 번에 띄울 수 있게 docker-compose.yml 파일 생성한다.
카프카, 주키퍼 도커 이미지는 confluent, wurstmeister, bitnami 세 가지 정도의 이미지가 있었지만 wurstmeister/kafka, zookeeper 이미지가 가장 레퍼런스가 많았기 때문에 해당 이미지로 선정했다.
- docker-compose.yml 코드
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- 컨테이너 활성화
$ docker-compose up
2. 컨테이너 내부에서 토픽 생성
- 카프카 컨테이너 내부 쉘 접근
$ docker exec -it kafka bash
- 토픽 생성
$ cd opt/kafka
$ bin/kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
3. 파이썬 Producer, Consumer 파일 만들기
파이썬 가상환경 생성 후 kafka-python 라이브러리 설치
- 가상환경 생성 및 활성화
$ virtualenv venv $ venv\Script\activate (venv)
- 라이브러리 설치
(venv)$ pip install kafka-python
Producer, Consumer
- producer.py 파일 생성
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'))
for e in range(10):
data = {'number': e}
producer.send('test', value=data)
- consumer.py 파일 생성
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('test',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
#print(consumer.bootstrap_connected())
for message in consumer:
print('Offset : {}, Value : {}'.format(message.offset, message.value))
4. 파이썬 파일 실행 결과
consumer producer
여러번 실행해서 offset이 높은 수를 나타내고 있다.
레퍼런스
- kafka-python api 공식문서
https://kafka-python.readthedocs.io/en/master/ - docker-compose.yml 생성 시 참고했던 블로그
https://akageun.github.io/2019/09/10/docker-compose-local-kafka.html
반응형