IT虾米网

linux之PySpark 速度 Ubuntu 与 Windows

zhengyun_ustc 2023年11月10日 编程语言 221 0

我有一个 PySpark 示例作业,它是 PageRank 算法的一个版本。 代码如下:

from __future__ import print_function 
from operator import add 
import timeit 
from pyspark.sql import SparkSession 
 
# Normalize a list of pairs(url, rank) to 1 
def normalize(ranks): 
    norm = sum([rank for u, rank in ranks]) 
    ranks = [(u, rank / norm) for (u, rank) in ranks ] 
    return sorted(ranks, key=lambda x: x[1], reverse=True) 
 
def pagerank_2(edgeList, n, niter): 
    # Loads all URLs from input file and initialize their neighbors. 
    m = edgeList.groupByKey().cache() 
    s = 0.85 
 
    # Loads all URLs with other URL(s) link to from input file  
    # and initialize ranks of them to one. 
    q = spark.sparkContext.range(n).map(lambda x: (x, 1.0)).cache() 
    r = spark.sparkContext.range(n).map(lambda x: (x, 0.0)).cache() 
 
    # Calculates and updates URL ranks continuously  
    # using PageRank algorithm. 
    for iteration in range(niter): 
        # Calculates URL contributions to the rank of other URLs. 
        # Add URL ranks based on neighbor contributions. 
        # Do not forget to add missing values in q and set to 0.0 
        q = q.fullOuterJoin(m)\ 
             .flatMap(lambda x: (x[1][1] and [(u, x[1][0]/len(x[1][1])) for u in x[1][1]]) or [])\ 
             .reduceByKey(add)\ 
             .rightOuterJoin(r)\ 
             .mapValues(lambda x: (x[0] or 0)*s + (1-s)) 
        print("iteration = ", iteration) 
 
    # Collects all URL ranks and dump them to console after normalization 
    ranks = normalize(q.collect()) 
    print(ranks[0:10]) 
 
 
if __name__ == "__main__": 
 
    spark = SparkSession\ 
            .builder\ 
            .master('local[*]')\ 
            .appName("SparkPageRank")\ 
            .config('spark.driver.allowMultipleContexts', 'true')\ 
            .config('spark.sql.warehouse.dir', 'file:///C:/Home/Org/BigData/python/BE4/') \ 
            .config('spark.sql.shuffle.partitions', '10')\ 
            .getOrCreate() 
 
    spark.sparkContext.setLogLevel('WARN') 
 
    g = [(0, 1), (0, 5), (1, 2), (1, 3), (2, 3), 
         (2, 4), (2, 5), (3, 0), (5, 0), (5, 2)] 
    n = 6 
    edgeList = spark.sparkContext.parallelize(g) 
    print(timeit.timeit('pagerank_2(edgeList, 6, 10)', number=1, globals=globals())) 

节点从 0 到 n-1 编号。 edgeList 参数是一个 RDD,其中包含节点对(也称为边)列表。

我在 Windows 10(Anaconda、Spark 2.1.0、winutils)上以本地模式运行它。 这项工作被分配为 2896 个任务,这些任务都很轻。

我的问题是运行时间。 以上面的例子为例:

  • Windows 10:>4000 万!
  • 适用于 Linux 的 Windows 子系统 (Ubuntu 14.04):30 秒

该计算机是笔记本电脑核心 i7-4702HQ、16Gb 内存、512Gb SSD。 在启动过程中,Windows 比 Linux 慢,但慢了 50 倍?肯定有办法缩小这种差距?

我已为所有相关文件禁用 Windows Defender:java 目录、python 目录等。 关于要看什么还有其他想法吗?

感谢任何线索。

请您参考如下方法:

也许关键是 local[*] 这意味着

Run Spark locally with as many worker threads as logical cores on your machine.

尝试使用诸如local[10]


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!