IT虾米网

Map-Reduce和Spark详解

luoye 2018年06月26日 大数据 187 0

Overview

  1. 回顾python中的函数式编程
  2. python中的map和reduce函数
  3. 用map写并行代码
  4. Map-Reduce编程模型
  5. 用python写spark程序

Reading

  • Introduction to Parallel Computing, Blaise Barney, Lawrence Livermore National Laboratory.

  • Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113.

  • Spark Programming Guide

  • Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly.


Functional programming

考虑以下代码:
def double_everything_in(data): 
    result = [] 
    for i in data: 
        result.append(2 * i) 
    return result 
 
def quadruple_everything_in(data): 
    result = [] 
    for i in data: 
        result.append(4 * i) 
    return result 
 
double_everything_in([1, 2, 3, 4, 5]) 
 
[2, 4, 6, 8, 10] 
 
quadruple_everything_in([1, 2, 3, 4, 5]) 
 
[4, 8, 12, 16, 20]
  • 上述代码没有很好的践行软件工程中“不要重复自己”的原则。
  • 应该如何避免重复呢?
def multiply_by_x_everything_in(x, data): 
    result = [] 
    for i in data: 
        result.append(x * i) 
    return result 
 
multiply_by_x_everything_in(2, [1, 2, 3, 4, 5]) 
[2, 4, 6, 8, 10] 
multiply_by_x_everything_in(4, [1, 2, 3, 4, 5]) 
[4, 8, 12, 16, 20]
  • 再考虑下面的代码
def squared(x): 
    return x*x 
 
def double(x): 
    return x*2 
 
def square_everything_in(data): 
    result = [] 
    for i in data: 
        result.append(squared(i)) 
    return result 
 
def double_everything_in(data): 
    result = [] 
    for i in data: 
        result.append(double(i)) 
    return result 
 
square_everything_in([1, 2, 3, 4, 5]) 
[1, 4, 9, 16, 25] 
double_everything_in([1, 2, 3, 4, 5]) 
[2, 4, 6, 8, 10]
  • 应该如何避免重复呢
把函数作为值
def apply_f_to_everything_in(f, data): 
    result = [] 
    for x in data: 
        result.append(f(x)) 
    return result 
apply_f_to_everything_in(squared, [1, 2, 3, 4, 5]) 
[1, 4, 9, 16, 25] 
apply_f_to_everything_in(double, [1, 2, 3, 4, 5]) 
[2, 4, 6, 8, 10]
  • Lambda表达式:每次想用map的时候又不得不定义一个函数的时候可以用匿名函数。
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])
 
  

  

Python's map function

python中有一个内置的mpo函数比我们自己写的快很多。
map(lambda x: x*x, [1, 2, 3, 4, 5]) 
[1, 4, 9, 16, 25]

Implementing reduce



  
  • reduce函数有一个fold的例子
  • 有好几种实现fold的方法
  • 下面的方式叫做left fold
def foldl(f, data, z): 
    if (len(data) == 0): 
        print z 
        return z 
    else: 
        head = data[0] 
        tail = data[1:] 
        print "Folding", head, "with", tail, "using", z 
        partial_result = f(z, data[0]) 
        print "Partial result is", partial_result 
        return foldl(f, tail, partial_result)  
 
def add(x, y): 
    return x + y 
 
foldl(add, [1, 2, 3, 4, 5], 0) 
 
 
Folding 1 with [2, 3, 4, 5] using 0 
Partial result is 1 
Folding 2 with [3, 4, 5] using 1 
Partial result is 3 
Folding 3 with [4, 5] using 3 
Partial result is 6 
Folding 4 with [5] using 6 
Partial result is 10 
Folding 5 with [] using 10 
Partial result is 15 
15
  • 用lambda表达式也一样
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0) 
 
Folding 1 with [2, 3, 4, 5] using 0 
Partial result is 1 
Folding 2 with [3, 4, 5] using 1 
Partial result is 3 
Folding 3 with [4, 5] using 3 
Partial result is 6 
Folding 4 with [5] using 6 
Partial result is 10 
Folding 5 with [] using 10 
Partial result is 15 
15

Python's reduce function.

python内部的reduce函数是left fold
reduce(lambda x, y: x + y, [1, 2, 3, 4, 5]) 
15 
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) 
-15 

Functional programming and parallelism


  • 函数式编程用在并行编程里
  • map函数可以通过数据级的并行化轻松实现并行计算
  •     把函数作为参数传递进去可以避免一些副作用
def perform_computation(f, result, data, i): 
    print "Computing the ", i, "th result..." 
    # This could be scheduled on a different CPU 
    result[i] = f(data[i]) 
 
def my_map(f, data): 
    result = [None] * len(data) 
    for i in range(len(data)): 
        perform_computation(f, result, data, i) 
    # Wait for other CPUs to finish, and then.. 
    return result 
 
my_map(lambda x: x * x, [1, 2, 3, 4, 5]) 
 
Computing the  0 th result... 
Computing the  1 th result... 
Computing the  2 th result... 
Computing the  3 th result... 
Computing the  4 th result... 
[1, 4, 9, 16, 25]

A multi-threaded map function

from threading import Thread 
 
def schedule_computation_threaded(f, result, data, threads, i):     
    # Each function evaluation is scheduled on a different core. 
    def my_job():  
        print "Processing data:", data[i], "... " 
        result[i] = f(data[i]) 
        print "Finished job #", i     
        print "Result was", result[i]         
    threads[i] = Thread(target=my_job) 
     
def my_map_multithreaded(f, data): 
    n = len(data) 
    result = [None] * n 
    threads = [None] * n 
    print "Scheduling jobs.. " 
    for i in range(n): 
        schedule_computation_threaded(f, result, data, threads, i) 
    print "Starting jobs.. " 
    for i in range(n): 
        threads[i].start() 
    print "Waiting for jobs to finish.. " 
    for i in range(n): 
        threads[i].join() 
    print "All done." 
    return result 
 
my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5]) 
 
Scheduling jobs..  
Starting jobs..  
Processing data: 1 ...  
Finished job # 0 
Result was 1 
Processing data: 2 ...  
Finished job # 1 
Result was 4 
Processing data: 3 ...  
Finished job # 2 
Result was 9 
Processing data: 4 ...  
Finished job # 3 
Result was 16 
Processing data: 5 ...  
Finished job # 4 
Result was 25 
Waiting for jobs to finish..  
All done. 
[1, 4, 9, 16, 25]

from numpy.random import uniform 
from time import sleep 
 
def a_function_which_takes_a_long_time(x): 
    sleep(uniform(2, 10))  # Simulate some long computation 
    return x*x 
 
my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5]) 
 
Scheduling jobs..  
Starting jobs..  
Processing data: 1 ...  
Processing data: 2 ...  
Processing data: 3 ...  
Processing data: 4 ...  
Processing data: 5 ...  
Waiting for jobs to finish..  
Finished job # 4 
Result was 25 
Finished job # 0 
Result was 1 
Finished job # 3 
Result was 16 
Finished job # 2 
Result was 9 
Finished job # 1 
Result was 4 
All done. 
Out[31]: 
[1, 4, 9, 16, 25]

Map Reduce

  • map reduce是一种大规模并行处理的编程模型
  • 大规模意味着它可以借助大量的计算集群来处理大数据
  • 有很多实现:hadoop和spark
  • 我们可以用任何语言实现map-reduce:hadoop里用java,spark用scala,但也有python接口
  • python或者scala非常适合map-reduce模型,但我们不必函数式编程
  • mapreduce的实现中关注了底层的功能操作,我们不必担心。

Typical steps in a Map Reduce Computation


  1. ETL一个数据集
  2. Map操作:每一行提取你关心的信息
  3. "Shuffle and Sort":task/node allocation
  4. Reduce操作:aggregate、summaries、filter or transform
  5. 保存结果

Callbacks for Map Reduce


  • 数据集和每一步的计算状态,都以键值对的形式表现
  • map(k,v)k,v
  • reduce(k,k,v)k,v
  • *指的是值的collection
  • colletions并不是有序的

Resilient Distributed Data

  • 在map-reduce计算中这些collections被称作RDDs:
    • 数据在分布在节点之间
    • 单个节点的损坏不会导致数据丢失
    • 数据一般存在HBase或者HDFS中
  • map和reduce函数可以不同的keys、elements实现并行化

Word Count Example


  • 在这个例子里,输入是一系列URL,每一个记录篇一篇文档
  • 问题:在数据集中每个单词出现了多少次

Word Count: Map

  •  输入数据进行map:
    • Key: URL
    • Value: 文档内容
document1,tobeornottobe
  • 我们需要用map处理给定的URL 
    • Key: word
    • Value: 1
  • 我们原始的数据集会被转化成:
to,1   be,1   or,1   not,1   to,1   be,1

Word Count: Reduce

  • reduce操作按照key来对values进行分组,然后执行在每个key上执行reduce。
  • mapreduce会折叠计算数算来最小化数据复制的操作。
  • 不同分区的数据单独进行reduce
  • 算子的选择是很重要的,需要累加和结合。
  • 在这个例子是函数是+运算符
be,2
not,1
or,1
to,2

MiniMapReduce

  • 为了理解map-reduce编程模型是如何运作的,我们在python里实现一个自己的map-reduce框架
  • 但这并不是hadoop或者spark的实际实现方式
########################################################## 
# 
#   MiniMapReduce 
# 
# A non-parallel, non-scalable Map-Reduce implementation 
########################################################## 
 
def groupByKey(data): 
    result = dict() 
    for key, value in data: 
        if key in result: 
            result[key].append(value) 
        else: 
            result[key] = [value] 
    return result 
         
def reduceByKey(f, data): 
    key_values = groupByKey(data) 
    return map(lambda key:  
                   (key, reduce(f, key_values[key])),  
                       key_values)

Word-count using MiniMapReduce

data = map(lambda x: (x, 1), "to be or not to be".split()) 
data 
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)] 
groupByKey(data) 
{'be': [1, 1], 'not': [1], 'or': [1], 'to': [1, 1]} 
reduceByKey(lambda x, y: x + y, data) 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

Parallelising MiniMapReduce

我们可以轻松的把刚才的map-reduce实现改成并行框架,利用刚才的my_map_mutithreaded函数就可以。
def reduceByKey_multithreaded(f, data): 
    key_values = groupByKey(data) 
    return my_map_multithreaded( 
        lambda key: (key, reduce(f, key_values[key])), key_values.keys()) 
reduceByKey_multithreaded(lambda x, y: x + y, data) 
 
Scheduling jobs..  
Starting jobs..  
Processing data: not ...  
Finished job # 0 
Result was ('not', 1) 
Processing data: to ...  
Finished job # 1 
Result was ('to', 2) 
Processing data: or ...  
Finished job # 2 
Result was ('or', 1) 
Processing data: be ...  
Finished job # 3 
Result was ('be', 2) 
Waiting for jobs to finish..  
All done. 
 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)] 

Parallelising the reduce step

  1. 假设我们的算子事累加和结合的,我们也可以并行reduce操作
  2. 把数据大概分成相等的子集
  3. 在单独的计算核心上独立reduce每一个子集
  4. 最后把各个结果组合起来

Partitioning the data

def split_data(data, split_points): 
    partitions = [] 
    n = 0 
    for i in split_points: 
        partitions.append(data[n:i]) 
        n = i 
    partitions.append(data[n:]) 
    return partitions 
 
data = ['a', 'b', 'c', 'd', 'e', 'f', 'g'] 
partitioned_data = split_data(data, [3]) 
partitioned_data

Reducing across partitions in parallel

from threading import Thread 
 
def parallel_reduce(f, partitions): 
 
    n = len(partitions) 
    results = [None] * n 
    threads = [None] * n 
     
    def job(i): 
        results[i] = reduce(f, partitions[i]) 
 
    for i in range(n): 
        threads[i] = Thread(target = lambda: job(i)) 
        threads[i].start() 
     
    for i in range(n): 
        threads[i].join() 
     
    return reduce(f, results) 
 
parallel_reduce(lambda x, y: x + y, partitioned_data)

Apache Spark and Map-Reduce


  • 我们可以用高层函数map一个RDDs到一个新的RDDs.
  • 每个RDD的实例都至少有两个相应的MapRuduce工作流:map , reduceByKey
  • 这些方法和我们之间定义的标准python collections工作原理是相同的
  • 在Apache Spark API中还有额外的RDD方法

Word-count in Apache Spark


words = "to be or not to be".split() 

The SparkContext class

  • 当我们用Spark的时候我们需要初始化一个SparkContext.
  • paralleled 方法在SparkContext中可以把任何python collection转成RDD
    • 通常情况下我们通过一个大文件或者HBase表中创建RDD
words_rdd = sc.parallelize(words) 
words_rdd 
 
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

Mapping an RDD

现在当我们在my_rdd上面执行map或者reduceByKey操作的时候可以在集群上建立一个并行计算的任务。
word_tuples_rdd = words_rdd.map(lambda x: (x, 1)) 
word_tuples_rdd 
PythonRDD[1] at RDD at PythonRDD.scala:43
  • 注意我们现在还没有产生结果
  • 计算操作在我们请求最终结果被collect之间是不会执行的
  • 通过collect()方法来激活这个计算
word_tuples_rdd.collect() 
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

Reducing an RDD

  • 但是,我们需要额外处理
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y) 
word_counts_rdd 
PythonRDD[6] at RDD at PythonRDD.scala:43
  • 现在来请求最终的结果
word_counts = word_counts_rdd.collect() 
word_counts 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

Lazy evaluation


  • 只有当我们进行collect()的时候集群才会进行计算
  • collect() 会同时激活map和reduceByKey操作
  • 如果结果collection非常大,那么这个操作开销是很大的

The head of an RDD

  •  take方法和collect类似,但是只返回前n个元素。
  •  take在测试的时候非常有用
word_counts_rdd.take(2) 
[('not', 1), ('to', 2)]

The complete word-count example

text = "to be or not to be".split() 
rdd = sc.parallelize(text) 
counts = rdd.map(lambda word: (word, 1)) \ 
             .reduceByKey(lambda x, y: x + y) 
counts.collect() 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

Additional RDD transformations

spark提供了很多额外的collections上的操作
  • Sorting: sortByKey, sortBy, takeOrdered
  • Mapping: flatMap
  • Filtering: filter
  • Counting: count
  • Set-theoretic: intersection, union

Creating an RDD from a text file

  • 上面的例子例,我们从collection对象中创建了一个RDD
  • 这不是典型的处理大数据的方法
  • 更常用的方式直接从HDFS文件或者HBase表中创建RDD
  • 下面的例子将会从一个纯ext4文件系统中创建RDD
  • 每一个RDD对应了文本中的一行
genome = sc.textFile('/tmp/genome.txt')

Genome example


  • 我们将会中这个RDD进行计算,并且根据词频来进行排序
  • 首先我们定义一个函数,把序列切成指定大小
def group_characters(line, n=5): 
    result = '' 
    i = 0 
    for ch in line: 
        result = result + ch 
        i = i + 1 
        if (i % n) == 0: 
            yield result 
            result = '' 
 
def group_and_split(line): 
    return [sequence for sequence in group_characters(line)] 
 
group_and_split('abcdefghijklmno') 
['abcde', 'fghij', 'klmno']
  • 现在我们要把原始的RDD转换成键值对的形式,key是这个序列,value是1
  • 注意如果我们简单的把每一行进行map,会得到一个高维的数据
genome.map(group_and_split).take(2) 
[[u'CAGGG', 
  u'GCACA', 
  u'GTCTC', 
  u'GGCTC', 
  u'ACTTC', 
  u'GACCT', 
  u'CTGCC', 
  u'TCCCC', 
  u'AGTTC', 
  u'AAGTG', 
  u'ATTCT', 
  u'CCTGC', 
  u'CTCAG', 
  u'TCTCC'], 
 [u'TGAGT', 
  u'AGCTG', 
  u'GGATG', 
  u'ACAGG', 
  u'AGTGG', 
  u'AGCAT', 
  u'GCCTA', 
  u'GCTAA', 
  u'TCTTT', 
  u'GTATT', 
  u'TCTAG', 
  u'TAGAG', 
  u'ATGCG', 
  u'GTTTT']]

Flattening an RDD using flatMap

  • 我们需要把数据转成序列形式的,所以使用flatMap方法
sequences = genome.flatMap(group_and_split) 
sequences.take(3) 
[u'CAGGG', u'GCACA', u'GTCTC']
counts = \ 
    sequences.map( 
        lambda w: (w, 1)).reduceByKey(lambda x, y: x + y) 
counts.take(10) 
[(u'TGTCA', 1), 
 (u'GCCCA', 3), 
 (u'CCAAG', 5), 
 (u'GCCCC', 4), 
 (u'CATGT', 1), 
 (u'AGATT', 1), 
 (u'TGTTT', 1), 
 (u'CCTAT', 4), 
 (u'TCAGT', 1), 
 (u'CAGCG', 2)] 
  • 我们根据计数队序列进行排序
  • 因此key(第一个元素)应该是计数值
  • 我们需要颠倒一下tuples的顺序
def reverse_tuple(key_value_pair): 
    return (key_value_pair[1], key_value_pair[0]) 
 
sequences = counts.map(reverse_tuple) 
sequences.take(10) 
 
[(1, u'TGTCA'), 
 (3, u'GCCCA'), 
 (5, u'CCAAG'), 
 (4, u'GCCCC'), 
 (1, u'CATGT'), 
 (1, u'AGATT'), 
 (1, u'TGTTT'), 
 (4, u'CCTAT'), 
 (1, u'TCAGT'), 
 (2, u'CAGCG')]

Sorting an RDD

  • 现在我们可以降序的方法对key进行排序
sequences_sorted = sequences.sortByKey(False) 
top_ten_sequences = sequences_sorted.take(10) 
top_ten_sequences 
[(15, u'AAAAA'), 
 (9, u'GCAGG'), 
 (8, u'ACAAA'), 
 (7, u'GGCCA'), 
 (7, u'AATTA'), 
 (7, u'AGGTT'), 
 (7, u'AGGGA'), 
 (7, u'CCAGG'), 
 (7, u'GAGCC'), 
 (7, u'AAAAC')]




发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

Spark原理总结详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。