網(wǎng)站建設(shè)尾款營(yíng)銷咨詢公司排名前十
概覽
技術(shù)方案:
- 日志采集服務(wù):通過利用Flume-ng對(duì)業(yè)務(wù)平臺(tái)中用戶對(duì)于電影的一次評(píng)分行為進(jìn)行采集,實(shí)時(shí)發(fā)送到Kafka集群。
- 消息緩沖服務(wù):項(xiàng)目采用Kafka作為流式數(shù)據(jù)的緩存組件,接受來自Flume的數(shù)據(jù)采集請(qǐng)求。并將數(shù)據(jù)推送到項(xiàng)目的實(shí)時(shí)推薦系統(tǒng)部分。
- 實(shí)時(shí)推薦服務(wù):項(xiàng)目采用Spark Streaming作為實(shí)時(shí)推薦系統(tǒng),通過接收Kafka中緩存的數(shù)據(jù),通過設(shè)計(jì)的推薦算法實(shí)現(xiàn)對(duì)實(shí)時(shí)推薦的數(shù)據(jù)處理,并將結(jié)構(gòu)合并更新到MongoDB數(shù)據(jù)庫。
1. 實(shí)現(xiàn)思路
我們應(yīng)該如何實(shí)現(xiàn)?
- 首先應(yīng)該redis安裝,這里存儲(chǔ)用戶的第K次評(píng)分(用戶評(píng)分存入redis中)
- 安裝zookeeper,安裝kafka,都是standlone模式
- 測(cè)試Kafka與Spark Streaming 聯(lián)調(diào)。Kafka生產(chǎn)一條數(shù)據(jù),Spark Streaming 可以消費(fèi)成功,并根據(jù)redis中的數(shù)據(jù)和MongoDB數(shù)據(jù)進(jìn)行推薦,存入MongoDB中
- 在業(yè)務(wù)系統(tǒng)寫埋點(diǎn)信息,測(cè)試時(shí)寫入本地文件,之后再遠(yuǎn)程測(cè)試寫入云服務(wù)器log文件中
- flume配置文件書寫,kafka創(chuàng)建兩個(gè)topic,對(duì)整個(gè)過程進(jìn)行測(cè)試
2 環(huán)境準(zhǔn)備
1.1 redis 安裝
- redis安裝redis安裝
- 密碼:123456
- 存入redis一些數(shù)據(jù) lpush uid:1 mid:score
- redis 教程:教程
1.2 zookeeper單機(jī)版安裝
- zookeeper安裝:zookeeper安裝
- 版本:3.7.1
- 遇到的坑:8080端口連接占用,我們需要在zoo.cpg文件中加上
admin.serverPort=8001
重新啟動(dòng)即可。
1.3 kafka單機(jī)安裝
- kafka安裝:官網(wǎng)下載地址
- 安裝使用的為:127.0.0.1
- 啟動(dòng)kafka:kafka教程
bin/kafka-server-start.sh config/server.properties
- 創(chuàng)建一個(gè)topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic recommender
- 生產(chǎn)一個(gè)消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic recommender
- 消費(fèi)一個(gè)消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic recommender --from-beginning
3 測(cè)試kafka與spark streaming聯(lián)調(diào)
- kafka版本:2.2.0
- spark版本:2.3.0
- 因此使用
spark-streaming-kafka-0-10
- 啟動(dòng)kafka,生產(chǎn)一條信息
- 書寫程序
// 定義kafka連接參數(shù)val kafkaParam = Map("bootstrap.servers" -> "服務(wù)器IP:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "recommender","auto.offset.reset" -> "latest")// 通過kafka創(chuàng)建一個(gè)DStreamval kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam ))// 把原始數(shù)據(jù)UID|MID|SCORE|TIMESTAMP 轉(zhuǎn)換成評(píng)分流// 1|31|4.5|val ratingStream = kafkaStream.map{msg =>val attr = msg.value().split("\\|")( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )}
- 若是kafka報(bào)錯(cuò),如果你同樣也是云服務(wù)器,請(qǐng)注意kafka的配置信息(很重要!)
(1)解決方法:修改kafka配置文件,設(shè)置為設(shè)置listeners為內(nèi)網(wǎng)ip,設(shè)置外網(wǎng)ip
- 解決方案修改內(nèi)網(wǎng)ip
(2)重新啟動(dòng),成功
- 內(nèi)網(wǎng)外網(wǎng)分流:內(nèi)網(wǎng)外網(wǎng)分流
- kafka入門教程:入門教程
- redis報(bào)錯(cuò):開啟保護(hù)模式了,需要修改conf文件
效果
在kafka生產(chǎn)一個(gè)數(shù)據(jù),可以在MongoDB中得到推薦的電影結(jié)果
4 后端埋點(diǎn)
前端進(jìn)行評(píng)分后,觸發(fā)click事件,后端進(jìn)行測(cè)試埋點(diǎn),利用log4j寫入本地文件中。
4.1 本地測(cè)試
- log4j配置文件
log4j.rootLogger=INFO, file, stdout# write to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n# write to file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.Append=true
log4j.appender.FILE.Threshold=INFO
log4j.appender.file.File=F:/demoparent/business/src/main/log/agent.txt
log4j.appender.file.MaxFileSize=1024KB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
- 埋點(diǎn)實(shí)現(xiàn)
//埋點(diǎn)日志
import org.apache.log4j.Logger;// 關(guān)鍵代碼
Logger log = Logger.getLogger(MovieController.class.getName());
log.info(MOVIE_RATING_PREFIX + ":" + uid +"|"+ mid +"|"+ score +"|"+ System.currentTimeMillis()/1000)
4.2 寫入遠(yuǎn)程測(cè)試
- Linux安裝syslog服務(wù),進(jìn)行測(cè)試
- 主機(jī)log4j配置文件設(shè)置服務(wù)器ip
- log4j配置:寫入遠(yuǎn)程服務(wù)器
log4j.appender.syslog=org.apache.log4j.net.SyslogAppender
log4j.appender.syslog.SyslogHost= 服務(wù)器IP
log4j.appender.syslog.Threshold=INFO
log4j.appender.syslog.layout=org.apache.log4j.PatternLayout
log4j.appender.syslog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%20t] %-130c:(line:%4L) : %m%n
5 flume配置
- flume對(duì)接kafka:flume對(duì)接文件
- flume設(shè)置source和sink,source為文件地址,sink為kafka的log
# log-kafka.properties
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
agent.sources.exectail.type = exec
agent.sources.exectail.command = tail -f /project/logs/agent.log agent.sources.exectail.interceptors=i1 agent.sources.exectail.interceptors.i1.type=regex_filter agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+ agent.sources.exectail.channels = memoryChannelagent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkasink.kafka.topic = log agent.sinks.kafkasink.kafka.bootstrap.servers = 服務(wù)器地址:9092 agent.sinks.kafkasink.kafka.producer.acks = 1 agent.sinks.kafkasink.kafka.flumeBatchSize = 20 agent.sinks.kafkasink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
6 實(shí)時(shí)推薦
ratingStream.foreachRDD{rdds => rdds.foreach{case (uid, mid, score, timestamp) => {println("rating data coming! >>>>>>>>>>>>>>>>")println(uid+",mid:"+mid)// 1. 從redis里獲取當(dāng)前用戶最近的K次評(píng)分,保存成Array[(mid, score)]val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )println("用戶最近的K次評(píng)分:"+userRecentlyRatings)// 2. 從相似度矩陣中取出當(dāng)前電影最相似的N個(gè)電影,作為備選列表,Array[mid]val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )println("電影最相似的N個(gè)電影:"+candidateMovies)// 3. 對(duì)每個(gè)備選電影,計(jì)算推薦優(yōu)先級(jí),得到當(dāng)前用戶的實(shí)時(shí)推薦列表,Array[(mid, score)]val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )println("當(dāng)前用戶的實(shí)時(shí)推薦列表:"+streamRecs)// 4. 把推薦數(shù)據(jù)保存到mongodbsaveDataToMongoDB( uid, streamRecs )}}
}
def computeMovieScores(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={// 定義一個(gè)ArrayBuffer,用于保存每一個(gè)備選電影的基礎(chǔ)得分val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()// 定義一個(gè)HashMap,保存每一個(gè)備選電影的增強(qiáng)減弱因子val increMap = scala.collection.mutable.HashMap[Int, Int]()val decreMap = scala.collection.mutable.HashMap[Int, Int]()for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){// 拿到備選電影和最近評(píng)分電影的相似度val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )if(simScore > 0.7){// 計(jì)算備選電影的基礎(chǔ)推薦得分scores += ( (candidateMovie, simScore * userRecentlyRating._2) )if( userRecentlyRating._2 > 3 ){increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1} else{decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1}}}// 根據(jù)備選電影的mid做groupby,根據(jù)公式去求最后的推薦評(píng)分scores.groupBy(_._1).map{// groupBy之后得到的數(shù)據(jù) Map( mid -> ArrayBuffer[(mid, score)] )case (mid, scoreList) =>( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )}.toArray.sortWith(_._2>_._2)
}
7 啟動(dòng)順序
- 啟動(dòng)hadoop、spark的容器
cd /docker
docker-compose up -d
docker-compose ps
- 啟動(dòng)mongodb和redis服務(wù)
netstat?-lanp?|?grep?"27017"
bin/redis-server etc/redis.conf
- 啟動(dòng)zookeeper、kafka服務(wù)
./zkServer.sh start
bin/kafka-server-start.sh config/server.properties
- 啟動(dòng)flume服務(wù)
bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent
實(shí)現(xiàn)效果
前端評(píng)分成功后寫入日志文件,flume對(duì)接log日志文件無問題,kafka對(duì)接flume無問題,spark streaming處理收到的一條數(shù)據(jù),進(jìn)行推薦,存入MongoDB中。
總結(jié)
由于時(shí)間匆忙,寫的有些匆忙,如果有需要前端設(shè)計(jì)代碼和后端的代碼可以評(píng)論我,我整理整理發(fā)到github上。
前端設(shè)計(jì)部分沒有時(shí)間去詳細(xì)做,后續(xù)再對(duì)前端頁面進(jìn)行美化。本科當(dāng)時(shí)整合了一個(gè)管理系統(tǒng),現(xiàn)在也沒有時(shí)間做,總之,一周多時(shí)間把當(dāng)時(shí)的系統(tǒng)快速復(fù)現(xiàn)了下,算是一個(gè)復(fù)習(xí)。
在進(jìn)行開發(fā)時(shí),遇到許多問題,版本問題、服務(wù)器內(nèi)網(wǎng)外網(wǎng)問題、docker容器相關(guān)問題、協(xié)同過濾算法設(shè)計(jì)問題,但幫著自己復(fù)習(xí)了下Vue和SpringBoot。
遇到問題時(shí)
- 遇到問題不應(yīng)該盲目解決,應(yīng)該靜下心看看報(bào)錯(cuò)原因,想想為何報(bào)錯(cuò)
- 版本尤其重要,因此最好在一個(gè)project的pom設(shè)定版本
- 使用服務(wù)器搭建docker-compose,利用該方法來搭建集群,快速簡(jiǎn)單,但涉及的端口轉(zhuǎn)發(fā)等一些網(wǎng)絡(luò)知識(shí)需要耐下心來看
- Vue-Cli+Element-ui搭配起來開發(fā)簡(jiǎn)單
- 寫程序時(shí),我們應(yīng)該提前約定好接口,否則后續(xù)會(huì)很混亂…
后續(xù)
- 后續(xù)將優(yōu)化下前端頁面,設(shè)計(jì)更多功能
- 改進(jìn)推薦算法
- 增加冷啟動(dòng)方案