《《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx》由会员分享,可在线阅读,更多相关《《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx(7页珍藏版)》请在优知文库上搜索。
1、课题读取电影评分数据创建DStream课时2课时(90min)教学目标知识技能目标:(1)熟悉基础数据源(2)熟悉高级数据源(3)掌握读取数据创建DStream的方法素质目标:培养自我学习和持续学习能力,能够及时掌握新技术和工具,并将其应用到实际项目中教学重难点教学重点:基础数据源、高级数据源教学难点:读取数据创建DS(ream教学方法案例分析法、问答法、讨论法、讲授法教学用具电脑、投影仪、多媒体课件、教材教学过程主要教学内容及步骤课前任务【教师】布置课前任务,和学生负责人取得联系,让其提醒同学通过APP或其他学习软件,完成课前任务请大家了解什么是数据源,什么是DSlream.【学生】完成课前
2、任务考勤【教师】使用APP进行签到【学生】班干部报请假人员及原因问题导入(5min)【教师】提出以下问题:什么是数据源?数据源可分为哪些类型?【学生】思考、举手回答传授新知【教师】通过学生的回答引入新知,介绍基础数据源和高级数据源的相关知识一、基础数据源【教师】介绍基础数据源的概念和类型在SparkStreaming中,基础数据源指的是可以用来读取实时数据并创建DStream的常见数据源。这些数据源已经被广泛使用和测试,并且被集成到了SparkStreaming框架中,用户只需调用相应的API即可读取数据。基础数据源包括文件流、套接字流和RDD队列流等。1.文件流在SparkStreaming
3、中,文件流(filestream)是一种可以从本地文件系统或分布式文件系统(如HDFS)中读取数据的输入流。它允许将一个目录视为一个数据源,并读取目录中实时生成或更新的文件。在SParkStreaming中,可以使用textFileStream()方法创建DStream定义一个输入流用于监视HadOOP兼容的文件系统中的新文件,并将其作为文本文件读取。文件必须通过同一文件系统中的另一个位置移动到监视目录中。该方法的基本格式如下。(extFileStream(directory)其中,参数directory表示指定的目录。读取不同文件流创建DStream的参考示例如下。ssc=SIreaming
4、COnIeXl(SC,10)#读取本地文件流dstream_(ext=ssc.IexiFileSlream(file:/spark_dstream)曦取HDFS文件流dstream-hdfs=ssc.textFileStream(hdfs:/spark_dstream)【教师】通过例子,帮助学生掌握文件流的应用【例4-1以读取HDFS文件为例,编写SparkStreaming应用程序实时监视HDFS文件目录,当发现新文件到达后,输出文件中的数据。打开第1个终端,执行以下命令,启动HDFS服务并创建spark_dstream”目录.hadoopbogon$Cdusrlocalhadoopsbin
5、hadoop(三)bogonsbin$./start-dfs.sh#在HDFS上新建一个Hspark_dstreamH目录hadoo(3)bogonsbin$Cdusrlocalhadoopbinhadoopbogonbin$hdfsdfs-mkdirspark-dstream在usrlocalsparkmycodeDSIream”目录下新建3个文件,分别为filel.txl、file2.lxt和file3.ixl,其内容如图4-9所示。耽3,Bfi,elttt保存三*HRio1,aME夕存二X11(O),B3保存三JM-IloveSpark10veHddOoPIloveDStrean1amI
6、earnllgSMra】nlearningHadoopXanlearningDStrcanSparkXSverySiNIeHadoopisverysinpleOStreamisverySinple女本8-3fi,215114人文本8我符3L8。第3行,第7列福人文本.帮表为贡度:8淤3行,第8外Ja入图4-93个文件的内容打开第2个终端,执行以下命令,进入PySpark交互式执行环境,编写代码,监视HDFS文件目录。SparkStreaming实时计算启动后,还未接收到数据时,终端显示的信息如图4-10所示。hadoopbogon$pysparkfrompyspak.streamingimpo
7、rtStreamingContext舱(J建StreamingContext对象,设置批处理时间间隔为20秒ssc=StreamingContext(sc,20)跄J建DStream,监视HDFS文件目录dstream=ssc.textFileStream(hdfs:/spark_dstream)# 打印监懒!1的瘫dstream.pprint()# 启动StreamingContext对象ssc.start()# 等待StreamingContexi对象终止ssc.awaitTermina(ion()图4-10未接收至媵煽时的终端显示信息在第1个终端上执行以下命令,将filel.txt.fi
8、le2.txt和file3.txt文件依次上传到HDFS的wspark-dstreamw目录下。hadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile1.txtspark-dstreamhadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile2.txtspark-dstreamhadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile3.txtspark-dstream在第2个终端可以监视到HDFS不断有数据流入,并
9、输出结果,如图4-11所示。hadoopbogon:X文件(F)嫡辑(E)查看M搜索终端(T)帮助(三)Time:2023-08-0314:02:40IloveSparkIamlearningSparkSparkisverysimpleTime:2023-08-0314:03:00IloveHadoopIamlearningHadoopHadoopisverysimpleTime:2023-08-0314:03:20IloveDStreamIamlearningDStreamDStreamisverysimple图4-11SparkStreaming监视HDFS并输出结果2.套接字流套接字流(
10、socketstream)是SParkStreaming中用于从网络套接字接收数据的输入流。它可以连接到指定的主机和端口,并实时接收通过套接字发送的数据。在SparkStreaming中,可以使用SOCkelTeXlStream()方法读取套接字流创建DStreame该方法的基本格式如下.socketTextStream(hostname,port,StorageLevel)其中,参数的含义如下.(1)hostname:表示要连接的主机名或IP地址。(2)port:表示要连接的端口号。(3)StOrageLeVeI(可选):表示流数据的存储级别,常见的存储级别包括MEMORY-ONLY.MEM
11、oRY_AND_DISK、MEMORY_ONLY_SER和MEMORY_AND_DISK_SER等,默认值为MEMORY_AND_DISK_SER_2e(详见教材)3RDD队列流RDD队列流由一个RDD队列构成,其中每个RDD包含作为输入源数据的批次内容。在SparkStreaming中,可以使用queueStream()方法创建基于RDD队列的DStreame该方法的基本格式如下。queueStream(rdds,OneAtATime,deult)其中,参数的含义如下.(1)rdds:RDD队列。(2)OneAtATime(可选):每次选取一RDD还是一次性选取所有RDDe默认值为True,
12、即每次只选取一个RDDe(3)default(可选):当队列为空时返回的默认值。如果队列中没有可用的RDD时,返回此默认值。(详见教材)【教师】通过例子,帮助学生掌握RDD队列流的应用【例4-2读取RDD队列流创建DStreame首先创建一个RDD队列作为数据源,然后使用queueStream()方法创建DStream定义一介输入流inputstream,SparkStreaming每两秒从RDD队列中获取一批数据,最后输出RDD队列流中的数据,如图4/2所示.hadoopbogon$pysparkfrompyspark.StreamingimportStreamingContextssc=S
13、treamingContext(sc,2)舱U建一个空的RDD队列rddQueue=|foriinrange(5):rddQueue.append(sc.parallelize(range(1.1001),10)跄J建DStream,定义输入流生成RDD队列流inputStream=ssc.queueStream(rddQueue)#打印RDD队列流中的数据inputStream.pprint()ssc.start()ssc.awaitTermination()inputstream.pprint()ssc.start()Time:2023-08-0314:35:5035678910Time:
14、2023-08-0314:35:52234567810图4/2输出RDD队列流中的嫡二、高级数据源【教师】介绍高级数据源的类型和应用除了文件流、套接字流f口RDD队列流等基础数据源外,SparkStreaming还支持KaHa和Kinesis等高级数据源。SparkSlreaming可以让高级雌源产生的数据发送给应用程序,应用程序再对接收到的数据进行实时处理,从而完成一个典型的实时计算过程。1 .KafkaKatla最初由LinkedIn开发,是一种高性能、分布式的消息传递系统。它支持水平伸缩,可以通过添加更多的代理服务器来增加处理能力。此外,Kaki还具有许多特性,使得它在实时数据处理和大数
15、据场景下的应用非常灵活.因此,KaRa被广泛应用于实时流处理、日志收集、大数据分析等领域.在Spark中,可以读取Kafka数据源,实现方法是先使用SparkSession对象的readStream属性返回DaIaSIreanIReader对象;然后使用该对象的formal。方法指定数据源类型为Kafka;接着使用OPtionO方法设置Kafka的相关选项,如Kafka服务器地址和端口(kafka.btstrap.servers),以及要订阅的主题名称(subscribe)等;最后使用IOadO方法加载流数据,并返回一个DataFramee参考示例如下。dstream=SparkSession.readStream.format(kafka)