🐝
【Apache Beam】Transform チートシート(Python)
はじめに
Apache Beam Python SDK で提供されている Transform をまとめる。
1. 要素ごとの処理(Element-wise)
1.1. Filter - 要素のフィルタリング
filter.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
perennials = (
pipeline
| 'Gardening plants' >> beam.Create([
{
'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
},
{
'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
},
{
'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
},
{
'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
},
{
'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
},
])
| 'Filter perennials' >>
beam.Filter(lambda plant: plant['duration'] == 'perennial')
| beam.Map(print))
output
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}
1.2. FlatMap - 要素に関数を適用(反復可)
flat_map.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
['🍅Tomato', '🥔Potato'],
])
| 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
| beam.Map(print))
output
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato
1.3. Keys - 要素の Key を抽出
keys.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
icons = (
pipeline
| 'Garden plants' >> beam.Create([
('🍓', 'Strawberry'),
('🥕', 'Carrot'),
('🍆', 'Eggplant'),
('🍅', 'Tomato'),
('🥔', 'Potato'),
])
| 'Keys' >> beam.Keys()
| beam.Map(print))
output
🍓
🥕
🍆
🍅
🥔
1.4. Kvswap - 要素の Key と Value を交換
kvswap.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Garden plants' >> beam.Create([
('🍓', 'Strawberry'),
('🥕', 'Carrot'),
('🍆', 'Eggplant'),
('🍅', 'Tomato'),
('🥔', 'Potato'),
])
| 'Key-Value swap' >> beam.KvSwap()
| beam.Map(print))
output
('Strawberry', '🍓')
('Carrot', '🥕')
('Eggplant', '🍆')
('Tomato', '🍅')
('Potato', '🥔')
1.5. Map - 要素に関数を適用
Map.py
import apache_beam as beam
def strip_header_and_newline(text):
return text.strip('# \n')
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'# 🍓Strawberry\n',
'# 🥕Carrot\n',
'# 🍆Eggplant\n',
'# 🍅Tomato\n',
'# 🥔Potato\n',
])
| 'Strip header' >> beam.Map(strip_header_and_newline)
| beam.Map(print))
output
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato
1.6. ParDo - DoFn の実行
par_do.py
import apache_beam as beam
class SplitWords(beam.DoFn):
def __init__(self, delimiter=','):
self.delimiter = delimiter
def process(self, text):
for word in text.split(self.delimiter):
yield word
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.ParDo(SplitWords(','))
| beam.Map(print))
output
🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato
1.7. Partition - 要素の分割
partition.py
import apache_beam as beam
durations = ['annual', 'biennial', 'perennial']
def by_duration(plant, num_partitions):
return durations.index(plant['duration'])
with beam.Pipeline() as pipeline:
annuals, biennials, perennials = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
])
| 'Partition' >> beam.Partition(by_duration, len(durations))
)
annuals | 'Annuals' >> beam.Map(lambda x: print('annual: {}'.format(x)))
biennials | 'Biennials' >> beam.Map(
lambda x: print('biennial: {}'.format(x)))
perennials | 'Perennials' >> beam.Map(
lambda x: print('perennial: {}'.format(x)))
output
perennial: {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
biennial: {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}
perennial: {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
annual: {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
perennial: {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}
1.8. WithTimestamps - 要素のタイムスタンプ化
with_timestamps.py
import apache_beam as beam
class GetTimestamp(beam.DoFn):
def process(self, plant, timestamp=beam.DoFn.TimestampParam):
yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
with beam.Pipeline() as pipeline:
plant_timestamps = (
pipeline
| 'Garden plants' >> beam.Create([
{'name': 'Strawberry', 'season': 1585699200}, # April, 2020
{'name': 'Carrot', 'season': 1590969600}, # June, 2020
{'name': 'Artichoke', 'season': 1583020800}, # March, 2020
{'name': 'Tomato', 'season': 1588291200}, # May, 2020
{'name': 'Potato', 'season': 1598918400}, # September, 2020
])
| 'With timestamps' >> beam.Map(
lambda plant: beam.window.TimestampedValue(plant, plant['season']))
| 'Get timestamp' >> beam.ParDo(GetTimestamp())
| beam.Map(print)
)
output
2020-04-01 00:00:00 - Strawberry
2020-06-01 00:00:00 - Carrot
2020-03-01 00:00:00 - Artichoke
2020-05-01 00:00:00 - Tomato
2020-09-01 00:00:00 - Potato
1.9. Values - 要素の Value を抽出
values.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Garden plants' >> beam.Create([
('🍓', 'Strawberry'),
('🥕', 'Carrot'),
('🍆', 'Eggplant'),
('🍅', 'Tomato'),
('🥔', 'Potato'),
])
| 'Values' >> beam.Values()
| beam.Map(print))
output
Strawberry
Carrot
Eggplant
Tomato
Potato
2. 集約処理(Aggregation)
2.1. CoGroupByKey - 要素を Key で集約(複数の PCollection)
co_group_by_key.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
icon_pairs = pipeline | 'Create icons' >> beam.Create([
('Apple', '🍎'),
('Apple', '🍏'),
('Eggplant', '🍆'),
('Tomato', '🍅'),
])
duration_pairs = pipeline | 'Create durations' >> beam.Create([
('Apple', 'perennial'),
('Carrot', 'biennial'),
('Tomato', 'perennial'),
('Tomato', 'annual'),
])
plants = (({
'icons': icon_pairs, 'durations': duration_pairs
})
| 'Merge' >> beam.CoGroupByKey()
| beam.Map(print))
output
('Apple', {'icons': ['🍎', '🍏'], 'durations': ['perennial']})
('Carrot', {'icons': [], 'durations': ['biennial']})
('Tomato', {'icons': ['🍅'], 'durations': ['perennial', 'annual']})
('Eggplant', {'icons': ['🍆'], 'durations': []})
2.2. CombineGlobally - 要素の結合
combine_globally.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
common_items = (
pipeline
| 'Create produce' >> beam.Create([
{'🍓', '🥕', '🍌', '🍅', '🌶️'},
{'🍇', '🥕', '🥝', '🍅', '🥔'},
{'🍉', '🥕', '🍆', '🍅', '🍍'},
{'🥑', '🥕', '🌽', '🍅', '🥥'},
])
| 'Get common items' >>
beam.CombineGlobally(lambda sets: set.intersection(*(sets or [set()])))
| beam.Map(print))
output
{'🍅', '🥕'}
2.3. CombinePerKey - 要素を Key で集約
combine_per_key.py
import apache_beam as beam
def saturated_sum(values):
max_value = 8
return min(sum(values), max_value)
with beam.Pipeline() as pipeline:
saturated_total = (
pipeline
| 'Create plant counts' >> beam.Create([
('🥕', 3),
('🥕', 2),
('🍆', 1),
('🍅', 4),
('🍅', 5),
('🍅', 3),
])
| 'Saturated sum' >> beam.CombinePerKey(saturated_sum)
| beam.Map(print))
output
('🥕', 5)
('🍆', 1)
('🍅', 8)
2.4. CombineValues - 要素の Value の集約
combine_values.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
total = (
pipeline
| 'Create produce counts' >> beam.Create([
('🥕', [3, 2]),
('🍆', [1]),
('🍅', [4, 5, 3]),
])
| 'Sum' >> beam.CombineValues(sum)
| beam.Map(print))
output
('🥕', 5)
('🍆', 1)
('🍅', 12)
2.5. Count - 要素のカウント
count.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
total_elements_per_keys = (
pipeline
| 'Create plants' >> beam.Create([
('spring', '🍓'),
('spring', '🥕'),
('summer', '🥕'),
('fall', '🥕'),
('spring', '🍆'),
('winter', '🍆'),
('spring', '🍅'),
('summer', '🍅'),
('fall', '🍅'),
('summer', '🌽'),
])
| 'Count elements per key' >> beam.combiners.Count.PerKey()
| beam.Map(print))
output
('spring', 4)
('summer', 3)
('fall', 2)
('winter', 1)
2.6. Distinct - 要素の重複削除
distinct.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
unique_elements = (
pipeline
| 'Create produce' >> beam.Create([
'🥕',
'🥕',
'🍆',
'🍅',
'🍅',
'🍅',
])
| 'Deduplicate elements' >> beam.Distinct()
| beam.Map(print))
output
🥕
🍆
🍅
2.7. GroupByKey - 要素をKeyで集約
group_by_key.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
produce_counts = (
pipeline
| 'Create produce counts' >> beam.Create([
('spring', '🍓'),
('spring', '🥕'),
('spring', '🍆'),
('spring', '🍅'),
('summer', '🥕'),
('summer', '🍅'),
('summer', '🌽'),
('fall', '🥕'),
('fall', '🍅'),
('winter', '🍆'),
])
| 'Group counts per produce' >> beam.GroupByKey()
| beam.MapTuple(lambda k, vs: (k, sorted(vs))) # sort and format
| beam.Map(print))
output
('spring', ['🍓', '🥕', '🍆', '🍅'])
('summer', ['🥕', '🍅', '🌽'])
('fall', ['🥕', '🍅'])
('winter', ['🍆'])
2.8. Max - 要素の最大値
max.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
elements_with_max_value_per_key = (
pipeline
| 'Create produce' >> beam.Create([
('🥕', 3),
('🥕', 2),
('🍆', 1),
('🍅', 4),
('🍅', 5),
('🍅', 3),
])
| 'Get max value per key' >> beam.CombinePerKey(max)
| beam.Map(print))
output
('🥕', 3)
('🍆', 1)
('🍅', 5)
2.9. Mean - 要素の平均値
mean.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
elements_with_mean_value_per_key = (
pipeline
| 'Create produce' >> beam.Create([
('🥕', 3),
('🥕', 2),
('🍆', 1),
('🍅', 4),
('🍅', 5),
('🍅', 3),
])
| 'Get mean value per key' >> beam.combiners.Mean.PerKey()
| beam.Map(print))
output
('🥕', 2.5)
('🍆', 1.0)
('🍅', 4.0)
2.10. Min - 要素の最小値
min.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
elements_with_min_value_per_key = (
pipeline
| 'Create produce' >> beam.Create([
('🥕', 3),
('🥕', 2),
('🍆', 1),
('🍅', 4),
('🍅', 5),
('🍅', 3),
])
| 'Get min value per key' >> beam.CombinePerKey(min)
| beam.Map(print))
output
('🥕', 2)
('🍆', 1)
('🍅', 3)
2.11. Sample - 要素のランダム抽出
sample.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
sample = (
pipeline
| 'Create produce' >> beam.Create([
'🍓 Strawberry',
'🥕 Carrot',
'🍆 Eggplant',
'🍅 Tomato',
'🥔 Potato',
])
| 'Sample N elements' >> beam.combiners.Sample.FixedSizeGlobally(3)
| beam.Map(print))
output
['🥕 Carrot', '🍆 Eggplant', '🍅 Tomato']
2.12. Sum - 要素の合計
sum.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
totals_per_key = (
pipeline
| 'Create produce' >> beam.Create([
('🥕', 3),
('🥕', 2),
('🍆', 1),
('🍅', 4),
('🍅', 5),
('🍅', 3),
])
| 'Sum values per key' >> beam.CombinePerKey(sum)
| beam.Map(print))
output
('🥕', 5)
('🍆', 1)
('🍅', 12)
2.13. Top - 要素の最大(or最小)値の抽出
top.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
largest_elements_per_key = (
pipeline
| 'Create produce' >> beam.Create([
('🥕', 3),
('🥕', 2),
('🍆', 1),
('🍅', 4),
('🍅', 5),
('🍅', 3),
])
| 'Largest N values per key' >> beam.combiners.Top.LargestPerKey(2)
| beam.Map(print))
output
('🥕', [3, 2])
('🍆', [1])
('🍅', [5, 4])
3. その他の処理(Others)
3.1. Flatten - 複数 PCollection の結合
flatten.py
import apache_beam as beam
with beam.Pipeline() as pipeline:
icon_pairs = pipeline | 'Create icons' >> beam.Create([
('Apple', '🍎'),
('Apple', '🍏'),
('Eggplant', '🍆'),
('Tomato', '🍅'),
])
duration_pairs = pipeline | 'Create durations' >> beam.Create([
('Apple', 'perennial'),
('Carrot', 'biennial'),
('Tomato', 'perennial'),
('Tomato', 'annual'),
])
merged = ((icon_pairs, duration_pairs)
| beam.Flatten()
| beam.Map(print))
output
('Apple', 'perennial')
('Carrot', 'biennial')
('Tomato', 'perennial')
('Tomato', 'annual')
('Apple', '🍎')
('Apple', '🍏')
('Eggplant', '🍆')
('Tomato', '🍅')
Discussion