惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

S
Security Archives - TechRepublic
MongoDB | Blog
MongoDB | Blog
量子位
博客园 - 叶小钗
罗磊的独立博客
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
Hacker News: Ask HN
Hacker News: Ask HN
MyScale Blog
MyScale Blog
GbyAI
GbyAI
Help Net Security
Help Net Security
Y
Y Combinator Blog
Engineering at Meta
Engineering at Meta
Hacker News - Newest:
Hacker News - Newest: "LLM"
Latest news
Latest news
H
Hacker News: Front Page
Blog — PlanetScale
Blog — PlanetScale
雷峰网
雷峰网
Microsoft Azure Blog
Microsoft Azure Blog
P
Proofpoint News Feed
C
CXSECURITY Database RSS Feed - CXSecurity.com
Scott Helme
Scott Helme
S
Schneier on Security
博客园 - 司徒正美
Hugging Face - Blog
Hugging Face - Blog
S
Security @ Cisco Blogs
Recorded Future
Recorded Future
S
Securelist
博客园 - Franky
Application and Cybersecurity Blog
Application and Cybersecurity Blog
A
About on SuperTechFans
N
News and Events Feed by Topic
AI
AI
T
Tenable Blog
N
News | PayPal Newsroom
C
Cybersecurity and Infrastructure Security Agency CISA
V
V2EX - 技术
T
Threat Research - Cisco Blogs
Cisco Talos Blog
Cisco Talos Blog
L
LINUX DO - 热门话题
N
Netflix TechBlog - Medium
S
SegmentFault 最新的问题
T
The Blog of Author Tim Ferriss
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Google Online Security Blog
Google Online Security Blog
S
Security Affairs
Webroot Blog
Webroot Blog
D
Darknet – Hacking Tools, Hacker News & Cyber Security
博客园 - 三生石上(FineUI控件)
C
Comments on: Blog
G
GRAHAM CLULEY

博客园 - 王晓成

Spark RDD 操作 (转)Mysql哪些字段适合建立索引 Specified key was too long; max key length is 767 bytes解决方案 (转)并发编程 – Concurrent 用户指南 - 王晓成 协同过滤 (转)K-近邻算法(KNN) 贝叶斯、朴素贝叶斯及调用spark官网 mllib NavieBayes示例 决策树之ID3,C4.5及CART kmeans Spark下的FP-Growth和Apriori scala spark-streaming整合kafka (spark 2.3 kafka 0.10) (转)Java 详解 JVM 工作原理和流程 Scala map与flatMap php 正则表达式 (转发)storm 入门原理介绍 shell :将标准输出及标准错误输出写到指定文件 shell循环(两个日期比较,改变某个特定日期来改变当前比较值) MongoDB基本操作 (转)cenntos 安装mongodb
通过spark sql 将 hdfs上文件导入到mongodb
王晓成 · 2018-07-20 · via 博客园 - 王晓成

功能:通过spark sql 将hdfs 中文件导入到mongdo

 所需jar包有:mongo-spark-connector_2.11-2.1.2.jar、mongo-java-driver-3.8.0.jar

 scala代码如下: 

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import com.mongodb.spark._
import org.bson.Document
import com.mongodb.spark.config._

object Exec {
def main(args: Array[String]) {

if (args.length < 6) {
System.err.println("Usage: Exec <hdfsServer> <logPath> <fileName> <mongoHost> <mongoDB> <mongoCollection>")
System.exit(1)
}
val hdfsServer = args(0) // "hdfs://master"
val logPath = args(1) // "/user/hdfs/log/"
val fileName = args(2) // 2017-05-04.txt
val mongoHost = args(3) // "10.15.22.22:23000"
val mongoDB = args(4) // "mongo db"
val mongoCollection = args(5) //"mongo collection"

try {
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.master("local")
.appName("SparkImportDataToMongo")
.config("spark.debug.maxToStringFields", 500).getOrCreate()
import spark.implicits._
val df = spark.read.json(hdfsServer + logPath + "/" + fileName)
df.printSchema()
df.write.mode("append").format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://" + mongoHost + "/" + mongoDB + "." + mongoCollection).save()

} catch {
case ex: Exception => {
printf(ex.toString())
}
}
}
}

在spark 运行目录执行如下命令:

./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test

运行:

[root@master spark-2.1.1-bin-hadoop2.6]#   ./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test

18/07/20 23:41:13 INFO spark.SparkContext: Running Spark version 2.1.1

18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls to: root

18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls to: root

18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls groups to: 

18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls groups to: 

18/07/20 23:41:14 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()

18/07/20 23:41:14 INFO util.Utils: Successfully started service 'sparkDriver' on port 24073.

18/07/20 23:41:14 INFO spark.SparkEnv: Registering MapOutputTracker

18/07/20 23:41:14 INFO spark.SparkEnv: Registering BlockManagerMaster

18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information

18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up

18/07/20 23:41:14 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9c42a710-559b-4c97-b92a-58208a77afeb

18/07/20 23:41:14 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB

18/07/20 23:41:14 INFO spark.SparkEnv: Registering OutputCommitCoordinator

18/07/20 23:41:14 INFO util.log: Logging initialized @1777ms

18/07/20 23:41:14 INFO server.Server: jetty-9.2.z-SNAPSHOT

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c65a5ef{/jobs,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6b5176f2{/jobs/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@b672aa8{/jobs/job,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2fab4aff{/jobs/job/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@ec0c838{/stages,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6e46d9f4{/stages/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5cc69cfe{/stages/stage,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@29cfd92b{/stages/stage/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@21c64522{/stages/pool,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7997b197{/stages/pool/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11dee337{/storage,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@460f76a6{/storage/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@55f3c410{/storage/rdd,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11acdc30{/storage/rdd/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@770d4269{/environment,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4a8ab068{/environment/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1922e6d{/executors,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@76a82f33{/executors/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6bab2585{/executors/threadDump,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@74bdc168{/executors/threadDump/json,null,AVAILABLE,@Spark}

18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@644c78d4{/static,null,AVAILABLE,@Spark}