[python, 인터넷] [카프카] Python으로 Kafka에 전송(Producer)하고 가져오기(consumer) 

카프카(Kafka)에서는 다양한 언어로 데이터를 주고 받는 기능을 제공하는데 본 포스팅은 파이썬(Python)으로 구현하는 프로듀서(producer)/컨슈머(consumer) 즉 데이터를 보내고 받는 방법을 설명한다.

 

파이썬으로 카프카를 호출하는 방법이 대표적으로 2가지 방법이 존재하는 것 같다. 하나는 카프카를 만든 제이 크렙스(Jay Kreps)가 만든 회사인 confluent가 제공하는 라이브러리이고, 다른 하나는 kafka-python이라는 라이브러리를 사용하는 방법이다. 후자인 kafka-python을 범용적으로 많이 사용하는데 성능은 컨플루언트의 라이브러리가 더 좋다.

 

 

 

 

후자를 사용하는 이유는 단하나 confluent는 c로 만든 라이브러리를 호출하여 사용하는 방식이라 별도의 설치과정이 존재하기 때문이다. 즉 성능을 중시 여긴다면 c로 만든 confluent를 사용하면 될 것이고, 쉽게 사용하는 것을 목적으로 한다면 후자를 사용해도 상관이 없을 것이다.

 

kafka-pyhon 라이브러리 설치

pip install kafka-python

 

kafka-python 설치 과정

 

 

Producer 구현

라이브러리를 설치했으면 아래와 같은 코드를 작성하고 실행해본다.

 

from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

start = time.time()
for i in range(10000):
    data = {'str' : 'result'+str(i)}
    producer.send('test', value=data)
    producer.flush()
print("elapsed :", time.time() - start)

위 코드는 json 형태로 data를 생성하는데 "result"라는 문자열에 + i값을 만번까지 증가시켜서 test라는 토픽으로 보내는 내용이다. 실행을 하게 되면 최종적으로는 만번 전달하는데 걸린 시간이 출력이 되는 것인데 필자는 다음과 같은 속도가 나왔다.

 

프로듀서 옵션

KafkaProducer를 선언할 때 acks가 0으로 확인 없이 전송하며, compression_type이 gzip으로 gzip형태로 압축하여 전송을 한다. 그리고 카프카 서버가 여러대일 경우 bootstrap_servers에 브로커 리스트를 배열 값으로 지정을 하면 되며 자세한 옵션 정보는 아래와 같다.

 

 

경축! 아무것도 안하여 에스천사게임즈가 새로운 모습으로 재오픈 하였습니다.
어린이용이며, 설치가 필요없는 브라우저 게임입니다.
https://s1004games.com

프로듀서 옵션 정보

 

 

Produce 결과

 

console로 확인하는 producer 실시간 전송내역들

 

C:\Users\user\anaconda3\python.exe D:/rainbow/application/producer.py
elapsed : 5.04697060585022

Process finished with exit code 0

만번 전송하는데 5.04초가 걸렸으니 초당 약 2천건 정도 보내졌다는 것을 알 수 있다.

Consumer 구현

이제 데이터를 전송했으면 받는 부분인 consumer를 구현해보도록 한다.

 

from kafka import KafkaConsumer
from json import loads

# topic, broker list
consumer = KafkaConsumer(
    'test',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')),
     consumer_timeout_ms=1000
)

# consumer list를 가져온다
print('[begin] get consumer list')
for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
        message.topic, message.partition, message.offset, message.key, message.value
    ))
print('[end] get consumer list')

Consumer를 선언할 때, test라는 topic을 기준으로 카프카 서버는 localhost를 지정하였다. 

 

Consumer 옵션

옵션 설명
bootstrap_servers 카프카 클러스터들의 호스트와 포트 정보 리스트
auto_offset_reset earliest : 가장 초기 오프셋값
latest : 가장 마지막 오프셋값
none : 이전 오프셋값을 찾지 못할 경우 에러
enable_auto_commit 주기적으로 offset을 auto commit
group_id 컨슈머 그룹을 식별하기 위한 용도
value_deserializer producer에서 value를 serializer를 했기 때문에 사용
consumer_timeout_ms 이 설정을 넣지 않으면 데이터가 없어도 오랜기간 connection한 상태가 된다. 데이터가 없을 때 빠르게 종료시키려면 timeout 설정을 넣는다.

 

Consumer 결과

C:\Users\user\anaconda3\python.exe D:/rainbow/application/consumer.py
[begin] get consumer list
Topic: test, Partition: 0, Offset: 0, Key: None, Value: {'str': 'result0'}
Topic: test, Partition: 0, Offset: 1, Key: None, Value: {'str': 'result1'}
Topic: test, Partition: 0, Offset: 2, Key: None, Value: {'str': 'result2'}
Topic: test, Partition: 0, Offset: 3, Key: None, Value: {'str': 'result3'}
Topic: test, Partition: 0, Offset: 4, Key: None, Value: {'str': 'result4'}
...
Topic: test, Partition: 0, Offset: 9996, Key: None, Value: {'str': 'result9996'}
Topic: test, Partition: 0, Offset: 9997, Key: None, Value: {'str': 'result9997'}
Topic: test, Partition: 0, Offset: 9998, Key: None, Value: {'str': 'result9998'}
Topic: test, Partition: 0, Offset: 9999, Key: None, Value: {'str': 'result9999'}
[end] get consumer list

Process finished with exit code 0

 

위 예제는 간단히 확인해보는 예제로 로그성 데이터가 아니라 중요한 데이터를 가져와야 되는 경우 설정이 중요해질 수 있기 때문에 github에 있는 예제들을 확인하는 것이 좋을 것이다.

 

python-kafka github

https://github.com/dpkp/kafka-python

 



출처: https://needjarvis.tistory.com/607 [자비스가 필요해]



출처: https://needjarvis.tistory.com/607 [자비스가 필요해]

 

[출처] https://needjarvis.tistory.com/607

본 웹사이트는 광고를 포함하고 있습니다.
광고 클릭에서 발생하는 수익금은 모두 웹사이트 서버의 유지 및 관리, 그리고 기술 콘텐츠 향상을 위해 쓰여집니다.
번호 제목 글쓴이 날짜 조회 수
352 [scrapy] https://pypi.org/project/scrapy-save-as-pdf/ 졸리운_곰 2021.07.03 75
351 Pipeline to Download PDF or Save page as PDF for scrapy item 졸리운_곰 2021.06.26 16
» [python, 인터넷] [카프카] Python으로 Kafka에 전송(Producer)하고 가져오기(consumer) file 졸리운_곰 2021.06.19 19
349 [Python, 인터넷] 네이버 뉴스 기사 크롤링 졸리운_곰 2021.05.23 128
348 [Python, GUI tool] GUI drag & drop style GUI Builder for Python Tkinter file 졸리운_곰 2021.05.17 150
347 [python] 파이썬 기초 문법 정리 졸리운_곰 2021.05.17 49
346 [python][flask] webpage-scraper file 졸리운_곰 2021.04.28 46
345 [python][자동화] python으로 카카오톡 자동 메시지 전송 졸리운_곰 2021.04.27 51
344 [python 파이썬 2d 그래픽스] The Interesting Python Graphics Libraries for Python Programmers file 졸리운_곰 2021.04.27 512
343 [python] [GPU]GPU 사용 Python 코드 실행 졸리운_곰 2021.04.21 132
342 [python][ip 추적] 영화와 같은 ip 위치 추적 python 소스 IP Radar 2 file 졸리운_곰 2021.04.15 68
341 [웹서버] Flask + REST API + Swagger file 졸리운_곰 2021.04.04 50
340 Python Flask 로 간단한 REST API 작성하기 file 졸리운_곰 2021.04.04 317
339 [python][jupyter notebook][JSON API] Building a JSON API Using Jupyter Notebooks in Under 5 Minutes file 졸리운_곰 2021.03.28 59
338 Python Flask 프레임워크 이해하기 file 졸리운_곰 2021.03.21 29
337 Python Flask 로 간단한 REST API 작성하기 file 졸리운_곰 2021.03.21 32
336 [python][인공지능] FLASK를 이용하여 PYTHON에서 PYTORCH를 REST API로 배포하기 졸리운_곰 2021.03.20 75
335 [python] Apache Airflow PythonOperator 실습하기(중급) file 졸리운_곰 2021.03.18 17
334 [python] Apache Airflow 소개 및 실습하기(기초) file 졸리운_곰 2021.03.18 17
333 [python internet, MSA] REST API Development with Flask file 졸리운_곰 2021.03.14 56
대표 김성준 주소 : 경기 용인 분당수지 U타워 등록번호 : 142-07-27414
통신판매업 신고 : 제2012-용인수지-0185호 출판업 신고 : 수지구청 제 123호 개인정보보호최고책임자 : 김성준 sjkim70@stechstar.com
대표전화 : 010-4589-2193 [fax] 02-6280-1294 COPYRIGHT(C) stechstar.com ALL RIGHTS RESERVED