我在 HDFS 上有一个文件夹,其中包含 10 个 CSV 文件。每个 CSV 文件包含 10000 行和 17 列。

目标

响应式读取 HDFS 上的文件夹。

如果文件夹中包含文件,则从文件夹中一次读取一个文件(从旧到新)。

在 Shiny 中绘制一些参数。

当新文件添加到文件夹或从文件夹中读取时更新绘图。

状态 目前,借助 SparklyR,我能够一次响应式(Reactive)读取所有文件并生成包含 100000 个点的绘图 (ggplot)。如果我在启动应用程序后添加第 11 个文件(包含 10000 行),绘图将更新为 110000 个点。

library(sparklyr) 
 
conf = spark_config() 
conf$spark.driver.memory="50g" 
sc <- spark_connect(master = "local[*]", config = conf) 
read_folder <- stream_read_csv(sc, "hdfs://localhost:9000/nik_ml/") 
 
ui <- function(){ 
  plotOutput("plot") 
} 
 
server <- function(input, output, session){ 
 
  ps <- reactiveSpark(read_folder, intervalMillis = 10) 
  output$plot <- renderPlot({ 
    df2 = ps() 
    # str(df2) 
    ggplot(data = df2, aes(x=Time, y=outletN2)) + geom_point() + ggtitle(nrow(df2)) + theme_bw() 
  }) 
} 
shinyApp(ui, server) 
 
 
SessionInfo() 
 
# R version 3.5.1 (2018-07-02) 
# Platform: x86_64-w64-mingw32/x64 (64-bit) 
# Running under: Windows Server >= 2012 x64 (build 9200) 
#  
# Matrix products: default 
#  
# locale: 
# [1] LC_COLLATE=English_United States.1252  LC_CTYPE=English_United States.1252    
# [3] LC_MONETARY=English_United States.1252 LC_NUMERIC=C                           
# [5] LC_TIME=English_United States.1252     
#  
# attached base packages: 
#   [1] stats     graphics  grDevices utils     datasets  methods   base      
#  
# other attached packages: 
#   [1] shinyFiles_0.7.2    bindrcpp_0.2.2      dplyr_0.7.8         shiny_1.2.0         ggplot2_3.1.0       
# [6] future_1.10.0       sparklyr_0.9.3.9000 
#  
# loaded via a namespace (and not attached): 
#   [1] tidyselect_0.2.5 forge_0.1.9002   purrr_0.2.5      listenv_0.7.0    lattice_0.20-38  colorspace_1.3-2 
# [7] generics_0.0.2   htmltools_0.3.6  yaml_2.2.0       base64enc_0.1-3  rlang_0.3.0.1    later_0.7.5      
# [13] pillar_1.3.0     glue_1.3.0       withr_2.1.2      DBI_1.0.0        dbplyr_1.2.2     bindr_0.1.1      
# [19] plyr_1.8.4       munsell_0.5.0    gtable_0.2.0     htmlwidgets_1.3  codetools_0.2-15 labeling_0.3     
# [25] httpuv_1.4.5     parallel_3.5.1   broom_0.5.1      r2d3_0.2.2       Rcpp_1.0.0       xtable_1.8-3     
# [31] openssl_1.1      promises_1.0.1   backports_1.1.2  scales_1.0.0     jsonlite_1.6     config_0.3       
# [37] fs_1.2.6         mime_0.6         digest_0.6.18    grid_3.5.1       rprojroot_1.3-2  tools_3.5.1      
# [43] magrittr_1.5     lazyeval_0.2.1   tibble_1.4.2     crayon_1.3.4     tidyr_0.8.2      pkgconfig_2.0.2  
# [49] rsconnect_0.8.12 assertthat_0.2.0 httr_1.4.0       rstudioapi_0.8   R6_2.3.0         globals_0.12.4   
# [55] nlme_3.1-137     compiler_3.5.1   

但我真正想要的是响应式地一次读取一个文件并制作一个 ggplot。这类似于 Spark Streaming,但 Spark Streaming(据我了解)将所有文本文件读入单个 RDD。从 Spark 的文档中,Python 中存在一个名为 SparkContext.wholeTextFiles 的函数,它可以让您读取包含多个小文本文件的目录,并以(文件名,内容)对的形式返回每个文件(link) .我还没有测试它,因为我现在想将所有内容都保留在 R 中。我查看了 shinyFiles 但找不到执行此操作的任何函数 ( https://github.com/thomasp85/shinyFiles )。

R/Sparklyr 中有类似的东西吗?我想做的事听起来很傻吗?如果您认为在 R 中有更有效的方法来实现它,我洗耳恭听!

谢谢。

请您参考如下方法:

我在我的一个项目中遇到了你的问题。我最终使用的是 reactivePoll 函数来更新我的情节。

所以你有两个选择,要么每 x 秒更新一次绘图,而不知道是否有新文件。在这个例子中 120 秒所以两分钟: 您在应用程序代码的开头初始化累加器 b。

b <- 0 
 
IsThereNew = function(){ 
  b <<- b+1 
  b 
} 
 
ReadHdfsData=function(){ # A function that calculates the underlying value 
  path <- paste0("/your/path/to/data.json") 
  df <- sc %>% 
    spark_read_json("name", path) %>% 
    collect() 
  return(df) 
} 
 
df <- reactivePoll(120 * 1000, session, IsThereNew, ReadHdfsData) 

所以在这种情况下,即使没有新数据,您也会以一种愚蠢的方式每 2 分钟更新一次绘图。

您可以做的另一种方法是列出 hdfs 目录中的文件数,每 x 秒一次,如果修改了列表计数,则绘图将更新 因此,您必须定义一个返回文件数量的函数 listNumberOfFiles 并替换 isThereNew 函数。


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!