일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
Tags
- 컴퓨터 구조
- Context Switching
- 스프링
- 데이터엔지니어
- Kafka
- POJO
- Data Engineering
- Pub/Sub
- 의존성주입
- 합격
- CNS
- spring boot
- fork()
- 스프링빈
- CPU
- 취뽀
- 시스템호출
- 카프카
- 도커
- 부팅
- 코테
- docker
- 자바빈
- OS
- 회고
- 커널
- 운영체제
- 제어의역전
- zookeeper
- PCB
Archives
- Today
- Total
모래성 말고 철옹성
카프카 도커에 띄워서 파이썬으로 Producer, Consumer 간단한 테스트 해보기 본문
파이썬으로 카프카 consumer, producer 만들기
필자는 유닉스 기반 PC를 사용중이 아니라 가상 컨테이너 도커로 카프카와 주키퍼를 만들어 테스트 해봤다.
1. docker-compose로 카프카, 주키퍼 컨테이너 생성
카프카와 주키퍼를 운영할 컨테이너를 한 번에 띄울 수 있게 docker-compose.yml 파일 생성한다.
카프카, 주키퍼 도커 이미지는 confluent, wurstmeister, bitnami 세 가지 정도의 이미지가 있었지만 wurstmeister/kafka, zookeeper 이미지가 가장 레퍼런스가 많았기 때문에 해당 이미지로 선정했다.
- docker-compose.yml 코드
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- 컨테이너 활성화
$ docker-compose up
2. 컨테이너 내부에서 토픽 생성
- 카프카 컨테이너 내부 쉘 접근
$ docker exec -it kafka bash
- 토픽 생성
$ cd opt/kafka
$ bin/kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
3. 파이썬 Producer, Consumer 파일 만들기
파이썬 가상환경 생성 후 kafka-python 라이브러리 설치
- 가상환경 생성 및 활성화
$ virtualenv venv $ venv\Script\activate (venv)
- 라이브러리 설치
(venv)$ pip install kafka-python
Producer, Consumer
- producer.py 파일 생성
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'))
for e in range(10):
data = {'number': e}
producer.send('test', value=data)
- consumer.py 파일 생성
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('test',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
#print(consumer.bootstrap_connected())
for message in consumer:
print('Offset : {}, Value : {}'.format(message.offset, message.value))
4. 파이썬 파일 실행 결과
consumer producer
여러번 실행해서 offset이 높은 수를 나타내고 있다.
레퍼런스
- kafka-python api 공식문서
https://kafka-python.readthedocs.io/en/master/ - docker-compose.yml 생성 시 참고했던 블로그
https://akageun.github.io/2019/09/10/docker-compose-local-kafka.html
반응형
Comments