[一日30分 인생승리의 학습법] [GCP] Apache Beam 사용하기

대량의 데이터를 전처리하는 건 많은 시간이 소요된다. Cloud 상에서 가용한 자원들을 동적으로 할당해서 좀 더 빠르게 해줄 수 있으면 좋은데, 병렬처리라는 게 또 공부하려면 만만찮아서인지 GCP에는 Cloud Dataflow라는 서비스가 이를 대신해 준다. Cloud Dataflow는 병렬로 실행시키는 일종의 흐름을 제어하는 서비스이고, 어떤 형태로 병렬 처리할지 그 구조를 Pipeline 형태로 잡아 놓는 것이 Google이 기증한 ‘Apache Beam’이 된다. Apache Beam으로 형성된 Pipeline은 Local에서 ‘DirectRunner’를 통해 수행(병렬처리는 안됨)하거나, Apache Flink 혹은 Spark로도 실행할 수 있다.

Beam의 예제로 자주 등장하는 단락 내에 단어 수 카운팅 하는 것을 예를 들어 사용해 보기로 한다.

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptionsos.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/ryu_gcloud2/dataflow_test/test-projector.json"

Google Cloud Dataflow에서 실행결과를 보려면 Application에 대한 Credential을 미리 만들어 두어야 하는데, 아래 조대협님 게시글에서 서비스키를 만드는 법을 참조(게시물의 중간쯤)한다. 만든 키는 GCP의 특정 디렉토리 옮겨두고 위에 그 경로를 적어두면 된다.

pipeline_options = PipelineOptions(None)DEST_DIR = "gs://tf-bucket-ksryu/"options = {
    'staging_location': DEST_DIR + 'staging',
    'temp_location': DEST_DIR + 'tmp',
    'job_name': job_name,
    'project': 'brave-cursor-255906', #project id
    'zone' : 'us-central1-a',
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'no_save_main_session': True ,  
    'save_main_session': False
}opts = beam.pipeline.PipelineOptions(flags=[], **options)

Pipeline Option을 위와 같이 만들어주면 된다. 결과를 GCP에 저장해 둘 것이므로 GCS에 Bucket하나를 만들고, 단계별 결과물이 저장될 디렉토리를 지정해둔다. save_main_session 부분은 마지막에 run한 결과물을 저장해두는 것 같은데, 모듈의 반복 수행시 error를 일으키므로 개발단계에서는 False로 두었다. (정확한 의미는 사실 잘모름 ^^;;)

Apache Beam의 예제보다는 아주 살짝 단순하게, King_lear 단락과 king john에 대한 단락 두 개를 읽어서 처리하도록 하였다. 실제 파이프라인의 구조를 잡는 부분은 아래가 전부다. Beam의 PCollection이라는 개체를 인자로 받아 PTransform이라는 모듈이 변형하고 다시 Output으로 PCollection을 내보내는 구조다. 즉 아래의 함수들이 Transform하는 것이 PCollection의 데이터들이 Transform을 거치는 동안 다른 형태의 PCollection이 되는 것이다.

Apache Beam의 구조: 조대협님 블로그에서 발췌
DATA_FILE1 = 'gs://dataflow-samples/shakespeare/kinglear.txt'
DATA_FILE2 = 'gs://dataflow-samples/shakespeare/kingjohn.txt'p = beam.Pipeline('DataflowRunner', options=opts)for step in ['lear', 'john']:
    if step == 'lear':
        DATA_FILE = DATA_FILE1
    else:
        DATA_FILE = DATA_FILE2
        
    (p 
        | '{}_read'.format(step) >> ReadFromText(DATA_FILE)
        | '{}_split'.format(step) >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
        | '{}_pair_with_one'.format(step) >> beam.Map(lambda x: (x, 1))
        | '{}_group'.format(step) >> beam.GroupByKey()
        | '{}_count'.format(step) >> beam.Map(count_ones)
        | '{}_format'.format(step) >> beam.Map(format_result)
        | '{}_write'.format(step) >> WriteToText(DEST_DIR + 'output')
    )

위가 전체적인 구조로, Pipeline은 Cloud Dataflow가 후에 실제적인 작업을 진행할것으로 지정(DataflowRunner, Local로 수행하려면 DirectRunner)했다. 중간의 각 함수들이 실제로 Transform하는 모듈로 이 부분을 원하는 대로 구현해 주어야 한다. 참고로, ParDo가 필요시 리소스를 늘려서 병렬처리할 수 있도록 해 주는 부분인데, 본 예제에서는 데이터 처리량이 그리 크지 않아서 CPU를 1개 이상쓰진 않는다. ParDo가 쓰인 부분은 전체 문장에서 단어들을 분해하하여 돌려주는 모듈이다.

경축! 아무것도 안하여 에스천사게임즈가 새로운 모습으로 재오픈 하였습니다.
어린이용이며, 설치가 필요없는 브라우저 게임입니다.
https://s1004games.com

class WordExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""def __init__(self):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super(WordExtractingDoFn, self).__init__()
    beam.DoFn.__init__(self)
    self.words_counter = Metrics.counter(self.__class__, 'words')
    self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
    self.word_lengths_dist = Metrics.distribution(
        self.__class__, 'word_len_dist')
    self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')def process(self, element):
    """Returns an iterator over the words of this element.
    The element is a line of text.  If the line is blank, note that, too.
    Args:
      element: the element being processed
    Returns:
      The processed element.
    """
    text_line = element.strip()
    if not text_line:
      self.empty_line_counter.inc(1)
    words = re.findall(r'[\w\']+', text_line, re.UNICODE)
    for w in words:
      self.words_counter.inc()
      self.word_lengths_counter.inc(len(w))
      self.word_lengths_dist.update(len(w))
    return words

beam.DoFn Class를 상속하여 쓰면 되는데 process 함수를 overriding하여 필요한 코드를 작성하면 된다. 위의 예에서는 re.findall로 매 라인의 단어들을 분리하여 리턴하고 있다.

파이프라인에서 나머지 처리 모듈은 단어들을 grouping하여 수를 세고, 특정 format으로 만들어 출력하는 부분이다.

def count_ones(word_ones):
    (word, ones) = word_ones
    return (word, sum(ones))def format_result(word_count):
    (word, count) = word_count
    return '%s: %d' % (word, count)

마지막으로 생성된 파이프라인을 Dataflow로 수행하도록 설정하면 Cloud Dataflow가 업무(?)를 시작한다.

result = p.run()
result.wait_until_finish()

처리가 끝나면 ‘Done’메시지가 출력되고, GCP의 Cloud Dataflow는 그 경과를 보여주며 GCS의 Bucket에는 결과물과 중간 단계의 파일들이 저장된다.

bow: 3
scurvy: 1
Importune: 1
forked: 1
embossed: 1
import: 1
day: 7
never: 26
profess: 3
covering: 1
Stain: 1
rare: 1
sing: 2
spring: 2
Ay: 17
ACT: 26
giant: 1
Methinks: 7
cure: 2
troops: 1
bolds: 1
sung: 1
wert: 2
wine: 1
enemies': 1
From: 11
V: 6
'Tis: 24
relieve: 1
resume: 1
beat: 3
Cost: 1
thing: 21
fish: 1
reading: 1
arrives: 1
....

 

[출처] https://medium.com/@kiseon_twt/gcp-apache-beam-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0-8737122b276b

 

본 웹사이트는 광고를 포함하고 있습니다.
광고 클릭에서 발생하는 수익금은 모두 웹사이트 서버의 유지 및 관리, 그리고 기술 콘텐츠 향상을 위해 쓰여집니다.
번호 제목 글쓴이 날짜 조회 수
1035 [一日30分 인생승리의 학습법] [Tensorflow] Serving이용하여 REST API 만들기 file 졸리운_곰 2021.08.07 13
1034 [一日30分 인생승리의 학습법] Amazon SageMaker는 처음이지? 누워서 머신러닝 학습부터 배포까지 file 졸리운_곰 2021.08.05 31
1033 [一日30分 인생승리의 학습법] Apache Beam (Dataflow)를 이용하여, 이미지 파일을 tfrecord로 컨버팅 하기 file 졸리운_곰 2021.08.04 4
» [一日30分 인생승리의 학습법] [GCP] Apache Beam 사용하기 file 졸리운_곰 2021.08.04 5
1031 [一日30分 인생승리의 학습법] Apache Beam, Apache Airflow, Apache Atlas 설명 졸리운_곰 2021.08.04 8
1030 [一日30分 인생승리의 학습법] 블레이저 웹어셈블리 (web assembly, WASM_ Awesome Blazor ) 놀라운 오픈소스 리스트 (open source) file 졸리운_곰 2021.08.04 4938
1029 MarkDown 사용법 총정리 file 졸리운_곰 2021.07.29 23
1028 [一日30分 인생승리의 학습법] [JWT/JSON Web Token] 로그인 / 인증에서 Token 사용하기 file 졸리운_곰 2021.07.24 8
1027 [一日30分 인생승리의 학습법] JWT(JSON Web Token)을 이용한 API 인증 - #1 개념 소개 file 졸리운_곰 2021.07.24 10
1026 [一日30分 인생승리의 학습법] XPath 란 무엇입니까? file 졸리운_곰 2021.07.24 5
1025 [一日30分 인생승리의 학습법] XPATH 사용법·작성법 file 졸리운_곰 2021.07.24 8
1024 [Git] Github에 잘못 올라간 파일 삭제하기 졸리운_곰 2021.07.13 16
1023 [一日30分 인생승리의 학습법]AI 한국어 자연어 처리 데이터셋 목록 file 졸리운_곰 2021.07.10 23
1022 [一日30分 인생승리의 학습법] 기계독해, MRC란 무엇일까 file 졸리운_곰 2021.07.10 37
1021 [一日30分 인생승리의 학습법] 웹어셈블리를 활용한 유망한 프로그래밍 언어 프로젝트 10가지 졸리운_곰 2021.07.02 13
1020 [一日30分 인생승리의 학습법] Windows 10에 Minikube 설치하기 file 졸리운_곰 2021.06.27 26
1019 [一日30分 인생승리의 학습법] 윈도우2016 서버에 Docker 설치하며 겪은 시행착오 기록 file 졸리운_곰 2021.06.26 11
1018 [초보용] Git 되돌리기( Reset, Revert ) file 졸리운_곰 2021.06.22 15
1017 [DOCKER] Windows Server 2016에서 도커 설치하기 졸리운_곰 2021.06.13 39
1016 [docker, windows server 2016] Installing Docker onto Windows Server 2016 file 졸리운_곰 2021.06.13 6
대표 김성준 주소 : 경기 용인 분당수지 U타워 등록번호 : 142-07-27414
통신판매업 신고 : 제2012-용인수지-0185호 출판업 신고 : 수지구청 제 123호 개인정보보호최고책임자 : 김성준 sjkim70@stechstar.com
대표전화 : 010-4589-2193 [fax] 02-6280-1294 COPYRIGHT(C) stechstar.com ALL RIGHTS RESERVED