🐝

【Apache Beam】Transform チートシート(Python)

2022/08/14に公開

はじめに

Apache Beam Python SDK で提供されている Transform をまとめる。
https://beam.apache.org/documentation/transforms/python/overview/

1. 要素ごとの処理(Element-wise)

1.1. Filter - 要素のフィルタリング

filter.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >>
      beam.Filter(lambda plant: plant['duration'] == 'perennial')
      | beam.Map(print))
output
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}

1.2. FlatMap - 要素に関数を適用(反復可)

flat_map.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
          ['🍅Tomato', '🥔Potato'],
      ])
      | 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
      | beam.Map(print))
output
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

1.3. Keys - 要素の Key を抽出

keys.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  icons = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Keys' >> beam.Keys()
      | beam.Map(print))
output
🍓
🥕
🍆
🍅
🥔

1.4. Kvswap - 要素の Key と Value を交換

kvswap.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Key-Value swap' >> beam.KvSwap()
      | beam.Map(print))
output
('Strawberry', '🍓')
('Carrot', '🥕')
('Eggplant', '🍆')
('Tomato', '🍅')
('Potato', '🥔')

1.5. Map - 要素に関数を適用

Map.py
import apache_beam as beam

def strip_header_and_newline(text):
  return text.strip('# \n')

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(strip_header_and_newline)
      | beam.Map(print))
output
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

1.6. ParDo - DoFn の実行

par_do.py
import apache_beam as beam

class SplitWords(beam.DoFn):
  def __init__(self, delimiter=','):
    self.delimiter = delimiter

  def process(self, text):
    for word in text.split(self.delimiter):
      yield word

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.ParDo(SplitWords(','))
      | beam.Map(print))
output
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

1.7. Partition - 要素の分割

partition.py
import apache_beam as beam

durations = ['annual', 'biennial', 'perennial']

def by_duration(plant, num_partitions):
  return durations.index(plant['duration'])

with beam.Pipeline() as pipeline:
  annuals, biennials, perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Partition' >> beam.Partition(by_duration, len(durations))
  )

  annuals | 'Annuals' >> beam.Map(lambda x: print('annual: {}'.format(x)))
  biennials | 'Biennials' >> beam.Map(
      lambda x: print('biennial: {}'.format(x)))
  perennials | 'Perennials' >> beam.Map(
      lambda x: print('perennial: {}'.format(x)))
output
perennial: {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
biennial: {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}
perennial: {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
annual: {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
perennial: {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}

1.8. WithTimestamps - 要素のタイムスタンプ化

with_timestamps.py
import apache_beam as beam

class GetTimestamp(beam.DoFn):
  def process(self, plant, timestamp=beam.DoFn.TimestampParam):
    yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])

with beam.Pipeline() as pipeline:
  plant_timestamps = (
      pipeline
      | 'Garden plants' >> beam.Create([
          {'name': 'Strawberry', 'season': 1585699200}, # April, 2020
          {'name': 'Carrot', 'season': 1590969600},     # June, 2020
          {'name': 'Artichoke', 'season': 1583020800},  # March, 2020
          {'name': 'Tomato', 'season': 1588291200},     # May, 2020
          {'name': 'Potato', 'season': 1598918400},     # September, 2020
      ])
      | 'With timestamps' >> beam.Map(
          lambda plant: beam.window.TimestampedValue(plant, plant['season']))
      | 'Get timestamp' >> beam.ParDo(GetTimestamp())
      | beam.Map(print)
  )
output
2020-04-01 00:00:00 - Strawberry
2020-06-01 00:00:00 - Carrot
2020-03-01 00:00:00 - Artichoke
2020-05-01 00:00:00 - Tomato
2020-09-01 00:00:00 - Potato

1.9. Values - 要素の Value を抽出

values.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Garden plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Values' >> beam.Values()
      | beam.Map(print))
output
Strawberry
Carrot
Eggplant
Tomato
Potato

2. 集約処理(Aggregation)

2.1. CoGroupByKey - 要素を Key で集約(複数の PCollection)

co_group_by_key.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  icon_pairs = pipeline | 'Create icons' >> beam.Create([
      ('Apple', '🍎'),
      ('Apple', '🍏'),
      ('Eggplant', '🍆'),
      ('Tomato', '🍅'),
  ])

  duration_pairs = pipeline | 'Create durations' >> beam.Create([
      ('Apple', 'perennial'),
      ('Carrot', 'biennial'),
      ('Tomato', 'perennial'),
      ('Tomato', 'annual'),
  ])

  plants = (({
      'icons': icon_pairs, 'durations': duration_pairs
  })
            | 'Merge' >> beam.CoGroupByKey()
            | beam.Map(print))
output
('Apple', {'icons': ['🍎', '🍏'], 'durations': ['perennial']})
('Carrot', {'icons': [], 'durations': ['biennial']})
('Tomato', {'icons': ['🍅'], 'durations': ['perennial', 'annual']})
('Eggplant', {'icons': ['🍆'], 'durations': []})

2.2. CombineGlobally - 要素の結合

combine_globally.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  common_items = (
      pipeline
      | 'Create produce' >> beam.Create([
          {'🍓', '🥕', '🍌', '🍅', '🌶️'},
          {'🍇', '🥕', '🥝', '🍅', '🥔'},
          {'🍉', '🥕', '🍆', '🍅', '🍍'},
          {'🥑', '🥕', '🌽', '🍅', '🥥'},
      ])
      | 'Get common items' >>
      beam.CombineGlobally(lambda sets: set.intersection(*(sets or [set()])))
      | beam.Map(print))
output
{'🍅', '🥕'}

2.3. CombinePerKey - 要素を Key で集約

combine_per_key.py
import apache_beam as beam

def saturated_sum(values):
  max_value = 8
  return min(sum(values), max_value)

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Saturated sum' >> beam.CombinePerKey(saturated_sum)
      | beam.Map(print))
output
('🥕', 5)
('🍆', 1)
('🍅', 8)

2.4. CombineValues - 要素の Value の集約

combine_values.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create produce counts' >> beam.Create([
          ('🥕', [3, 2]),
          ('🍆', [1]),
          ('🍅', [4, 5, 3]),
      ])
      | 'Sum' >> beam.CombineValues(sum)
      | beam.Map(print))
output
('🥕', 5)
('🍆', 1)
('🍅', 12)

2.5. Count - 要素のカウント

count.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  total_elements_per_keys = (
      pipeline
      | 'Create plants' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('summer', '🥕'),
          ('fall', '🥕'),
          ('spring', '🍆'),
          ('winter', '🍆'),
          ('spring', '🍅'),
          ('summer', '🍅'),
          ('fall', '🍅'),
          ('summer', '🌽'),
      ])
      | 'Count elements per key' >> beam.combiners.Count.PerKey()
      | beam.Map(print))
output
('spring', 4)
('summer', 3)
('fall', 2)
('winter', 1)

2.6. Distinct - 要素の重複削除

distinct.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  unique_elements = (
      pipeline
      | 'Create produce' >> beam.Create([
          '🥕',
          '🥕',
          '🍆',
          '🍅',
          '🍅',
          '🍅',
      ])
      | 'Deduplicate elements' >> beam.Distinct()
      | beam.Map(print))
output
🥕
🍆
🍅

2.7. GroupByKey - 要素をKeyで集約

group_by_key.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  produce_counts = (
      pipeline
      | 'Create produce counts' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('spring', '🍆'),
          ('spring', '🍅'),
          ('summer', '🥕'),
          ('summer', '🍅'),
          ('summer', '🌽'),
          ('fall', '🥕'),
          ('fall', '🍅'),
          ('winter', '🍆'),
      ])
      | 'Group counts per produce' >> beam.GroupByKey()
      | beam.MapTuple(lambda k, vs: (k, sorted(vs)))  # sort and format
      | beam.Map(print))
output
('spring', ['🍓', '🥕', '🍆', '🍅'])
('summer', ['🥕', '🍅', '🌽'])
('fall', ['🥕', '🍅'])
('winter', ['🍆'])

2.8. Max - 要素の最大値

max.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  elements_with_max_value_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Get max value per key' >> beam.CombinePerKey(max)
      | beam.Map(print))
output
('🥕', 3)
('🍆', 1)
('🍅', 5)

2.9. Mean - 要素の平均値

mean.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  elements_with_mean_value_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Get mean value per key' >> beam.combiners.Mean.PerKey()
      | beam.Map(print))
output
('🥕', 2.5)
('🍆', 1.0)
('🍅', 4.0)

2.10. Min - 要素の最小値

min.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  elements_with_min_value_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Get min value per key' >> beam.CombinePerKey(min)
      | beam.Map(print))
output
('🥕', 2)
('🍆', 1)
('🍅', 3)

2.11. Sample - 要素のランダム抽出

sample.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  sample = (
      pipeline
      | 'Create produce' >> beam.Create([
          '🍓 Strawberry',
          '🥕 Carrot',
          '🍆 Eggplant',
          '🍅 Tomato',
          '🥔 Potato',
      ])
      | 'Sample N elements' >> beam.combiners.Sample.FixedSizeGlobally(3)
      | beam.Map(print))
output
['🥕 Carrot', '🍆 Eggplant', '🍅 Tomato']

2.12. Sum - 要素の合計

sum.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  totals_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Sum values per key' >> beam.CombinePerKey(sum)
      | beam.Map(print))
output
('🥕', 5)
('🍆', 1)
('🍅', 12)

2.13. Top - 要素の最大(or最小)値の抽出

top.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  largest_elements_per_key = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Largest N values per key' >> beam.combiners.Top.LargestPerKey(2)
      | beam.Map(print))
output
('🥕', [3, 2])
('🍆', [1])
('🍅', [5, 4])

3. その他の処理(Others)

3.1. Flatten - 複数 PCollection の結合

flatten.py
import apache_beam as beam

with beam.Pipeline() as pipeline:
  icon_pairs = pipeline | 'Create icons' >> beam.Create([
      ('Apple', '🍎'),
      ('Apple', '🍏'),
      ('Eggplant', '🍆'),
      ('Tomato', '🍅'),
  ])

  duration_pairs = pipeline | 'Create durations' >> beam.Create([
      ('Apple', 'perennial'),
      ('Carrot', 'biennial'),
      ('Tomato', 'perennial'),
      ('Tomato', 'annual'),
  ])

  merged = ((icon_pairs, duration_pairs)
            | beam.Flatten()
            | beam.Map(print))
output
('Apple', 'perennial')
('Carrot', 'biennial')
('Tomato', 'perennial')
('Tomato', 'annual')
('Apple', '🍎')
('Apple', '🍏')
('Eggplant', '🍆')
('Tomato', '🍅')

Discussion