博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Scala的foreachRDD
阅读量:6033 次
发布时间:2019-06-20

本文共 2793 字,大约阅读时间需要 9 分钟。

hot3.png

顾名思义是遍历RDD用的,这个函数在DStream包中的InputStream类里,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit): Unit或者def foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): UnitApply a function to each RDD in this DStream.This is an output operator, so 'this' DStream will be registered as an output stream and therefore materialized.

所以要掌握它,对它要有深入了解。

下面转载一下常见的理解错误

经常写数据到外部系统需要创建一个连接的object handle(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统。程序员可能会想当然地在spark上创建一个connection对象, 然后在spark线程里用这个对象来存RDD:

dstream.foreachRDD { rdd =>  val connection = createNewConnection()  // executed at the driver  rdd.foreach { record =>    connection.send(record) // executed at the worker  }}

这个代码会产生执行错误, 因为rdd是分布式存储的,它是一个数据结构,它是一组指向集群数据的指针, rdd.foreach会在集群里的不同机器上创建spark工作线程, 而connection对象则不会在集群里的各个机器之间传递, 所以有些spark工作线程就会产生connection对象没有被初始化的执行错误。 解决的办法可以是在spark worker里为每一个worker创建一个connection对象, 但是如果你这么做, 程序要为每一条record创建一次connection,显然效率和性能都非常差,即

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

另一种改进方法是为每个spark分区创建一个connection对象,同时维护一个全局的静态的连接池对象, 这样就可以最好的复用connection。 另外需要注意: 虽然有多个connection对象, 但在同一时间只有一个connection.send(record)执行, 因为在同一个时间里, 只有 一个微批次的RDD产生出来。

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    // ConnectionPool is a static, lazily initialized pool of connections    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  }}

有人问了个问题,为什么foreachRDD里有两层嵌套的foreach? 为什么dstream.foreachRDD里还要再套一层rdd.foreach

可以这么理解, DStream.foreachRDD 是一个输出操作符,它返回的不是RDD里的一行数据, 而是输出DStream后面的RDD, 在一个时间间隔里, 只返回一个RDD的“微批次”, 为了访问这个“微批次”RDD里的数据, 我们还需要在RDD数据对象上做进一步操作。这也印证了文首的那段英文说明。 参考下面的代码实例, 更容易理解。

给定一个 RDD [Security, Prices]数据结构     dstream.foreachRDD { pricesRDD =>  // Loop over RDD       val x= pricesRDD.count       if (x > 0)  // RDD has data       {         for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD         {           var index = line._2.split(',').view(0).toInt   // That is the index           var timestamp = line._2.split(',').view(1).toString // This is the timestamp from source           var security =  line._2.split(',').view(2).toString // This is the name of the security           var price = line._2.split(',').view(3).toFloat  // This is the price of the security           if (price.toFloat > 90.0)           {            // Do something here            // Sent notification, write to HDFS etc           }         }       }     }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载于:https://my.oschina.net/hunglish/blog/1519292

你可能感兴趣的文章
浅尝异步IO
查看>>
C - Train Problem II——(HDU 1023 Catalan 数)
查看>>
Speak loudly
查看>>
10个完整的Android开源项目,值得大家学习借鉴
查看>>
JavaScript 内功心法——变量提升及函数提升
查看>>
吴颖二:12.19 年关将在翻仓已“迫不及待”你准备好了吗
查看>>
Android类似微信图片选择器
查看>>
【漫画】为什么说O(n)复杂度的基数排序没有快速排序快?
查看>>
以太坊智能合约开发第六篇:truffle开发框架
查看>>
C++17 并行排序初体验
查看>>
提交表单且不刷新页面
查看>>
Java入门 | 如何傻瓜式的安装JDK和配置环境变量?
查看>>
关于JS引擎优化的理解
查看>>
KVO本质的推导
查看>>
Linux下Python3.6的安装及避坑指南
查看>>
手把手教你安装Linux性能监控工具——pydash
查看>>
margin-top 外边距合并
查看>>
组件化页面:封装el-form
查看>>
LinkedList源码阅读分析
查看>>
iOS__上传应用到AppStore出现Authenticating with the iTunes store
查看>>