Java Kafka Producer, Consumer 예제 구현

Java를 이용하여 Kafka Producer와 Kakfa Consumer를 구현해보자.

 

Java Kafka Producer, Consumer 예제 구현

Java를 이용하여 Kafka Producer와 Kakfa Consumer를 구현해보자.

Getting Started With Kafka!

Kafka Producer와 Consumer를 자바로 직접 구현하는 것은 생각보다 간단합니다. 하지만 코드를 실행하여 결과까지 확인하기 위해서는 아래와 같이 Kafka 설치 과정이 필요합니다. kafka는 zookeeper와 같이 움직입니다. 그렇기 때문에 zookeeper 설치도 필요합니다.

kafka & zookeeper 설치하기

brew install kafka
brew install zookeeper
zkServer start # zookeeper running
kafka-server-start /usr/local/etc/kafka/server.properties # kafka running
Bash

kafka topic 생성하기

# replication-factor : 복제본 개수(1)
# partitions : 파티션 개수(1)
# topic : 토픽명(taeng)
$ kafka-topics --create --zookeeper localhost:2181 \ 
      --replication-factor 1 --partitions 1 --topic taeng
Bash

일단 카프카를 코드로 구현할 준비는 모두 끝났습니다. 다만 간단한 예제를 살펴보기 위한 최소한의 준비입니다. 실제 실무에서는 지금과 같이 단일 시스템으로 사용하지는 않습니다. 그리고 본격적인 구현에 앞서! 이번 포스팅에서는 kafka나 zookeeper에 대한 이론적인 부분은 자세히 다루지 않습니다.

 

구현해보자! Kafka Producer

예제는 간단합니다. 단순하게 키보드 입력을 받아 전송하는 것이지요. 실무에서는 kafka 전송에 있어서 연결을 끊거나 유지하는 것이 엄격하게 관리되겠지만 지금은 간단한 예제인만큼 단순하게 특정 메시지를 입력하면 연결이 종료되도록 합시다.

/**
 * Kafka Producer.
 * keyboard input을 통해 메시지를 전송한다.
 *
 * @author Kimtaeng
 * Created on 2018. 9. 10.
 */
public class MyKafkaProducer {
    private static final String TOPIC_NAME = "taeng";
    private static final String FIN_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        while(true) {
            Scanner sc = new Scanner(System.in);
            System.out.print("Input > ");
            String message = sc.nextLine();

            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            try {
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        // some exception
                    }
                });

            } catch (Exception e) {
                // exception
            } finally {
                producer.flush();
            }

            if(StringUtils.equals(message, FIN_MESSAGE)) {
                producer.close();
                break;
            }
        }
    }
}
Java

 

구현해보자! Kafka Consumer

Consumer의 경우는 구독(subscribe)을 시작한 후 poll을 통해 레코드를 처리합니다. topic의 경우에 List로 설정 가능합니다. 단일 topic이 아니라는 것이지요. poll 메서드의 파라미터는 레코드를 기다릴 최대 블럭 시간입니다. 그리고 앞서 살펴본 Producer와 동일하게 특정 메시지를 받으면 종료하게 됩니다.

/**
 * Kafka Consumer.
 * Producer가 전송한 메시지를 받는다.
 *
 * @author Kimtaeng
 * Created on 2018. 9. 10.
 */
public class MyKafkaConsumer {
    private static final String TOPIC_NAME = "taeng";
    private static final String FIN_MESSAGE = "exit";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        String message = null;
        try {
            do {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));

                for (ConsumerRecord<String, String> record : records) {
                    message = record.value();
                    System.out.println(message);
                }
            } while (!StringUtils.equals(message, FIN_MESSAGE));
        } catch(Exception e) {
            // exception
        } finally {
            consumer.close();
        }
    }
}
Java

 

경축! 아무것도 안하여 에스천사게임즈가 새로운 모습으로 재오픈 하였습니다.
어린이용이며, 설치가 필요없는 브라우저 게임입니다.
https://s1004games.com

실행해보자!

먼저 Consumer를 미리 실행한 후에 Producer를 실행해야 합니다. Producer를 실행한 후 키보드 입력으로 메시지를 입력하면 Consumer의 콘솔 화면에서 전송된 메시지를 확인할 수 있습니다.

kafka producer

kafka consumer

자바로 Kafka Producer와 Consumer를 간단하게 구현하고 실행해보았습니다. 위의 예제 코드에서 살펴본 것 외에도 더 많은 옵션 설정이 있는데요. 이 설정들은 이어지는 글을 통해서 알아봅시다.

[출처] https://madplay.github.io/post/java-kafka-example

 

 

본 웹사이트는 광고를 포함하고 있습니다.
광고 클릭에서 발생하는 수익금은 모두 웹사이트 서버의 유지 및 관리, 그리고 기술 콘텐츠 향상을 위해 쓰여집니다.
번호 제목 글쓴이 날짜 조회 수
공지 오라클 기본 샘플 데이터베이스 졸리운_곰 2014.01.02 25084
공지 [SQL컨셉] 서적 "SQL컨셉"의 샘플 데이타 베이스 SAMPLE DATABASE of ORACLE 가을의 곰을... 2013.02.10 24563
공지 [G_SQL] Sample Database 가을의 곰을... 2012.05.20 25942
962 [MySQL] MySQL ROLLUP , summary, 부분합 구하기 file 졸리운_곰 2021.09.01 25
961 [python][tensorflow - gpu] [파이썬] 텐서플로(TensorFlow) 설치하는 방법, 딥러닝 환경 구축하기 file 졸리운_곰 2021.08.17 46
960 [tensorflow 설치] windows에서 tensorflow-gpu 1.x 버전 설치, python - 이전 버전의 Tensorflow GPU 설치 졸리운_곰 2021.08.17 20
959 [한글 처리][tensorflow] 한글 자연어처리를 위한 도구들, 자료들, 정보들을 정리해 보았습니다. 졸리운_곰 2021.08.11 84
958 카프카 설치 시 가장 중요한 설정 4가지 졸리운_곰 2021.07.13 46
957 [데이터분석][파이썬][python] Awesome Dash Awesome file 졸리운_곰 2021.07.10 47
956 [데이터분석][파이썬][python] ???? Introducing Dash ???? file 졸리운_곰 2021.07.10 110
955 [딥러닝] [텐서플로우][SSAC X AIFFEL] 작사가 인공지능 만들기 file 졸리운_곰 2021.07.10 45
954 [Kafka] Kafka 한번 살펴보자... Quickstart file 졸리운_곰 2021.06.18 27
» Java Kafka Producer, Consumer 예제 구현 Java를 이용하여 Kafka Producer와 Kakfa Consumer를 구현해보자. file 졸리운_곰 2021.06.18 105
952 Beginner’s Guide to Understand Kafka file 졸리운_곰 2021.06.18 23
951 [Kafka] Kafka 설치/실행 및 테스트 file 졸리운_곰 2021.06.18 36
950 [java] [kafka] [Kafka] 개념 및 기본예제 file 졸리운_곰 2021.06.16 115
949 [Oracle, 오라클 dbms] [ORACLE] 오라클 테이블 & 컬럼 조회 하는 방법 졸리운_곰 2021.05.17 90
948 [dataset] (한글) 욕설 감지 데이터셋 file 졸리운_곰 2021.05.12 199
947 [java dbms][database] [컴] Apache Derby 사용하기 - 4 - in-memory DB 졸리운_곰 2021.04.15 76
946 [java dbms][database] [컴] Apache Derby 사용하기 - 3 - Apache Derby Network Server 졸리운_곰 2021.04.15 17
945 [java dbms][database] [컴] Apache Derby 사용하기 - 2 - sql script tool ij 사용하기 졸리운_곰 2021.04.15 123
944 [java dbms][database] [컴] Apache Derby 사용하기 - 1 - Derby 설치 file 졸리운_곰 2021.04.15 110
943 [spark][sparksql][odbc][jdbc] JDBC and ODBC drivers and configuration parameters file 졸리운_곰 2021.04.14 44
대표 김성준 주소 : 경기 용인 분당수지 U타워 등록번호 : 142-07-27414
통신판매업 신고 : 제2012-용인수지-0185호 출판업 신고 : 수지구청 제 123호 개인정보보호최고책임자 : 김성준 sjkim70@stechstar.com
대표전화 : 010-4589-2193 [fax] 02-6280-1294 COPYRIGHT(C) stechstar.com ALL RIGHTS RESERVED