學生做網(wǎng)站期末作業(yè)新產(chǎn)品推廣方案怎么寫
文章目錄
系列文章索引
MongoDB基礎(chǔ)入門到深入(一)安裝、文檔操作
MongoDB基礎(chǔ)入門到深入(二)聚合高級操作
MongoDB基礎(chǔ)入門到深入(三)索引高級操作
MongoDB基礎(chǔ)入門到深入(四)復制(副本)集
MongoDB基礎(chǔ)入門到深入(五)分片集群
MongoDB基礎(chǔ)入門到深入(六)多文檔事務(wù)
MongoDB基礎(chǔ)入門到深入(七)建模、調(diào)優(yōu)
MongoDB基礎(chǔ)入門到深入(八)MongoDB整合SpringBoot、Chang Streams
十五、MongoDB整合SpringBoot
1、環(huán)境準備
1、引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
2、配置yml
spring:data:mongodb:uri: mongodb://root:root@192.168.56.10:27017/test?authSource=admin#uri等同于下面的配置#database: test#host: 192.168.56.10#port: 27017#username: root#password: root#authentication-database: admin#復制集 https://docs.mongodb.com/manual/reference/connection-string/#uri: mongodb://root:root@192.168.56.10:28017,192.168.56.10:28018,192.168.56.10:28019/test?authSource=admin&replicaSet=rs0
連接配置參考文檔:https://www.mongodb.com/docs/manual/reference/connection-string/
3、使用時注入mongoTemplate
@Autowired
MongoTemplate mongoTemplate;
4、demo
//連接單點
//MongoClient mongoClient = MongoClients.create("mongodb://192.168.56.10:27017");
//連接副本集
MongoClient mongoClient = MongoClients.create("mongodb://root:root@192.168.56.10:28017,192.168.56.10:28018,192.168.56.10:28019/test?authSource=admin&replicaSet=rs0");
//連接分片集群 節(jié)點:mongos
//MongoClient mongoClient = MongoClients.create("mongodb://192.168.56.10:27017,192.168.56.11:27017,192.168.56.12:27017");//獲得數(shù)據(jù)庫對象
MongoDatabase database = mongoClient.getDatabase("test");
//獲得集合
MongoCollection<Document> collection = database.getCollection("emp");System.out.println("emp文檔數(shù):"+collection.countDocuments());
2、集合操作
@Test
public void testCollection() {boolean exists = mongoTemplate.collectionExists("emp");if (exists) {//刪除集合mongoTemplate.dropCollection("emp");}//創(chuàng)建集合mongoTemplate.createCollection("emp");
}
3、文檔操作
(1)相關(guān)注解
@Document
修飾范圍: 用在類上
作用: 用來映射這個類的一個對象為mongo中一條文檔數(shù)據(jù)。
屬性:( value 、collection )用來指定操作的集合名稱
@Id
修飾范圍: 用在成員變量、方法上
作用: 用來將成員變量的值映射為文檔的_id的值
@Field
修飾范圍: 用在成員變量、方法上
作用: 用來將成員變量及其值映射為文檔中一個key:value對。
屬性:( name , value )用來指定在文檔中 key的名稱,默認為成員變量名
@Transient
修飾范圍:用在成員變量、方法上
作用:用來指定此成員變量不參與文檔的序列化
(2)創(chuàng)建實體
import java.util.Date;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Document("emp") //對應emp集合中的一個文檔
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Employee {@Id //映射文檔中的_idprivate Integer id;@Field("username")private String name;@Fieldprivate int age;@Fieldprivate Double salary;@Fieldprivate Date birthday;
}
(3)添加文檔
insert方法返回值是新增的Document對象,里面包含了新增后_id的值。如果集合不存在會自動創(chuàng)建集合。通過Spring Data MongoDB還會給集合中多加一個_class的屬性,存儲新增時Document對應Java中類的全限定路徑。這么做為了查詢時能把Document轉(zhuǎn)換為Java類型。
@Test
public void testInsert() {Employee employee = new Employee(1, "小明", 30, 10000.00, new Date());//添加文檔// sava: _id存在時更新數(shù)據(jù)//mongoTemplate.save(employee);// insert: _id存在拋出異常 支持批量操作mongoTemplate.insert(employee);List<Employee> list = Arrays.asList(new Employee(2, "張三", 21, 5000.00, new Date()),new Employee(3, "李四", 26, 8000.00, new Date()),new Employee(4, "王五", 22, 8000.00, new Date()),new Employee(5, "張龍", 28, 6000.00, new Date()),new Employee(6, "趙虎", 24, 7000.00, new Date()),new Employee(7, "趙六", 28, 12000.00, new Date()));//插入多條數(shù)據(jù)mongoTemplate.insert(list, Employee.class);
}
插入重復數(shù)據(jù)時: insert報 DuplicateKeyException提示主鍵重復; save對已存在的數(shù)據(jù)進行更新。
批處理操作時: insert可以一次性插入所有數(shù)據(jù),效率較高;save需遍歷所有數(shù)據(jù),一次插入或更新,效率較低。
(4)查詢文檔
Criteria是標準查詢的接口,可以引用靜態(tài)的Criteria.where的把多個條件組合在一起,就可以輕松地將多個方法標準和查詢連接起來,方便我們操作查詢語句。
@Test
public void testFind() {System.out.println("==========查詢所有文檔===========");//查詢所有文檔List<Employee> list = mongoTemplate.findAll(Employee.class);list.forEach(System.out::println);System.out.println("==========根據(jù)_id查詢===========");//根據(jù)_id查詢Employee e = mongoTemplate.findById(1, Employee.class);System.out.println(e);System.out.println("==========findOne返回第一個文檔===========");//如果查詢結(jié)果是多個,返回其中第一個文檔對象Employee one = mongoTemplate.findOne(new Query(), Employee.class);System.out.println(one);System.out.println("==========條件查詢===========");//new Query() 表示沒有條件//查詢薪資大于等于8000的員工//Query query = new Query(Criteria.where("salary").gte(8000));//查詢薪資大于4000小于10000的員工//Query query = new Query(Criteria.where("salary").gt(4000).lt(10000));//正則查詢(模糊查詢) java中正則不需要有////Query query = new Query(Criteria.where("name").regex("張"));//and or 多條件查詢Criteria criteria = new Criteria();//and 查詢年齡大于25&薪資大于8000的員工//criteria.andOperator(Criteria.where("age").gt(25),Criteria.where("salary").gt(8000));//or 查詢姓名是張三或者薪資大于8000的員工criteria.orOperator(Criteria.where("name").is("張三"), Criteria.where("salary").gt(5000));Query query = new Query(criteria);//sort排序//query.with(Sort.by(Sort.Order.desc("salary")));//skip limit 分頁 skip用于指定跳過記錄數(shù),limit則用于限定返回結(jié)果數(shù)量。query.with(Sort.by(Sort.Order.desc("salary"))).skip(0) //指定跳過記錄數(shù).limit(4); //每頁顯示記錄數(shù)//查詢結(jié)果List<Employee> employees = mongoTemplate.find(query, Employee.class);employees.forEach(System.out::println);
}
@Test
public void testFindByJson() {//使用json字符串方式查詢//等值查詢//String json = "{name:'張三'}";//多條件查詢String json = "{$or:[{age:{$gt:25}},{salary:{$gte:8000}}]}";Query query = new BasicQuery(json);//查詢結(jié)果List<Employee> employees = mongoTemplate.find(query, Employee.class);employees.forEach(System.out::println);
}
(5)更新文檔
在Mongodb中無論是使用客戶端API還是使用Spring Data,更新返回結(jié)果一定是受行數(shù)影響。如果更新后的結(jié)果和更新前的結(jié)果是相同,返回0。
updateFirst() 只更新滿足條件的第一條記錄
updateMulti() 更新所有滿足條件的記錄
upsert() 沒有符合條件的記錄則插入數(shù)據(jù)
@Test
public void testUpdate() {//query設(shè)置查詢條件Query query = new Query(Criteria.where("salary").gte(15000));System.out.println("==========更新前===========");List<Employee> employees = mongoTemplate.find(query, Employee.class);employees.forEach(System.out::println);Update update = new Update();//設(shè)置更新屬性update.set("salary", 13000);//updateFirst() 只更新滿足條件的第一條記錄//UpdateResult updateResult = mongoTemplate.updateFirst(query, update, Employee.class);//updateMulti() 更新所有滿足條件的記錄//UpdateResult updateResult = mongoTemplate.updateMulti(query, update, Employee.class);//upsert() 沒有符合條件的記錄則插入數(shù)據(jù)//update.setOnInsert("id",11); //指定_idUpdateResult updateResult = mongoTemplate.upsert(query, update, Employee.class);//返回修改的記錄數(shù)System.out.println(updateResult.getModifiedCount());//返回匹配的記錄數(shù)System.out.println(updateResult.getMatchedCount());System.out.println("==========更新后===========");employees = mongoTemplate.find(query, Employee.class);employees.forEach(System.out::println);
}
(6)刪除文檔
@Test
public void testDelete() {//刪除所有文檔//mongoTemplate.remove(new Query(),Employee.class);//條件刪除Query query = new Query(Criteria.where("salary").gte(10000));mongoTemplate.remove(query, Employee.class);}
(7)聚合操作
MongoTemplate提供了aggregate方法來實現(xiàn)對數(shù)據(jù)的聚合操作。
基于聚合管道m(xù)ongodb提供的可操作的內(nèi)容:
基于聚合操作Aggregation.group,mongodb提供可選的表達式:
// 以聚合管道示例2為例
// 返回人口超過1000萬的州
db.zips.aggregate( [{ $group: { _id: "$state", totalPop: { $sum: "$pop" } } },{ $match: { totalPop: { $gt: 10*1000*1000 } } }
] )@Test
public void test(){//$groupGroupOperation groupOperation = Aggregation.group("state").sum("pop").as("totalPop");//$matchMatchOperation matchOperation = Aggregation.match(Criteria.where("totalPop").gte(10*1000*1000));// 按順序組合每一個聚合步驟TypedAggregation<Zips> typedAggregation = Aggregation.newAggregation(Zips.class, groupOperation, matchOperation);//執(zhí)行聚合操作,如果不使用 Map,也可以使用自定義的實體類來接收數(shù)據(jù)AggregationResults<Map> aggregationResults = mongoTemplate.aggregate(typedAggregation, Map.class);// 取出最終結(jié)果List<Map> mappedResults = aggregationResults.getMappedResults();for(Map map:mappedResults){System.out.println(map);}
}
// 返回各州平均城市人口
db.zips.aggregate( [{ $group: { _id: { state: "$state", city: "$city" }, cityPop: { $sum: "$pop" } } },{ $group: { _id: "$_id.state", avgCityPop: { $avg: "$cityPop" } } },{ $sort:{avgCityPop:-1}}
] )@Test
public void test2(){//$groupGroupOperation groupOperation = Aggregation.group("state","city").sum("pop").as("cityPop");//$groupGroupOperation groupOperation2 = Aggregation.group("_id.state").avg("cityPop").as("avgCityPop");//$sortSortOperation sortOperation = Aggregation.sort(Sort.Direction.DESC,"avgCityPop");// 按順序組合每一個聚合步驟TypedAggregation<Zips> typedAggregation = Aggregation.newAggregation(Zips.class, groupOperation, groupOperation2,sortOperation);//執(zhí)行聚合操作,如果不使用 Map,也可以使用自定義的實體類來接收數(shù)據(jù)AggregationResults<Map> aggregationResults = mongoTemplate.aggregate(typedAggregation, Map.class);// 取出最終結(jié)果List<Map> mappedResults = aggregationResults.getMappedResults();for(Map map:mappedResults){System.out.println(map);}
}
// 按州返回最大和最小的城市
db.zips.aggregate( [{ $group:{_id: { state: "$state", city: "$city" },pop: { $sum: "$pop" }}},{ $sort: { pop: 1 } },{ $group:{_id : "$_id.state",biggestCity: { $last: "$_id.city" },biggestPop: { $last: "$pop" },smallestCity: { $first: "$_id.city" },smallestPop: { $first: "$pop" }}},{ $project:{ _id: 0,state: "$_id",biggestCity: { name: "$biggestCity", pop: "$biggestPop" },smallestCity: { name: "$smallestCity", pop: "$smallestPop" }}},{ $sort: { state: 1 } }
] )@Test
public void test3(){//$groupGroupOperation groupOperation = Aggregation.group("state","city").sum("pop").as("pop");//$sortSortOperation sortOperation = Aggregation.sort(Sort.Direction.ASC,"pop");//$groupGroupOperation groupOperation2 = Aggregation.group("_id.state").last("_id.city").as("biggestCity").last("pop").as("biggestPop").first("_id.city").as("smallestCity").first("pop").as("smallestPop");//$projectProjectionOperation projectionOperation = Aggregation.project("state","biggestCity","smallestCity").and("_id").as("state").andExpression("{ name: \"$biggestCity\", pop: \"$biggestPop\" }").as("biggestCity").andExpression("{ name: \"$smallestCity\", pop: \"$smallestPop\" }").as("smallestCity").andExclude("_id");//$sortSortOperation sortOperation2 = Aggregation.sort(Sort.Direction.ASC,"state");// 按順序組合每一個聚合步驟TypedAggregation<Zips> typedAggregation = Aggregation.newAggregation(Zips.class, groupOperation, sortOperation, groupOperation2,projectionOperation,sortOperation2);//執(zhí)行聚合操作,如果不使用 Map,也可以使用自定義的實體類來接收數(shù)據(jù)AggregationResults<Map> aggregationResults = mongoTemplate.aggregate(typedAggregation, Map.class);// 取出最終結(jié)果List<Map> mappedResults = aggregationResults.getMappedResults();for(Map map:mappedResults){System.out.println(map);}}
(8)小技巧:如何去掉_class屬性
@Configuration
public class MongoConfig {/*** 定制TypeMapper去掉_class屬性* @param mongoDatabaseFactory* @param context* @param conversions* @return*/@BeanMappingMongoConverter mappingMongoConverter(MongoDatabaseFactory mongoDatabaseFactory,MongoMappingContext context, MongoCustomConversions conversions){DbRefResolver dbRefResolver = new DefaultDbRefResolver(mongoDatabaseFactory);MappingMongoConverter mappingMongoConverter =new MappingMongoConverter(dbRefResolver,context);mappingMongoConverter.setCustomConversions(conversions);//構(gòu)造DefaultMongoTypeMapper,將typeKey設(shè)置為空值mappingMongoConverter.setTypeMapper(new DefaultMongoTypeMapper(null));return mappingMongoConverter;}}
4、事務(wù)操作
官方文檔:https://www.mongodb.com/docs/upcoming/core/transactions/
(1)編程式事務(wù)
/*** 事務(wù)操作API* https://docs.mongodb.com/upcoming/core/transactions/*/
@Test
public void updateEmployeeInfo() {//連接復制集MongoClient client = MongoClients.create("mongodb://root:root@192.168.56.10:28017,192.168.56.10:28018,192.168.56.10:28019/test?authSource=admin&replicaSet=rs0");MongoCollection<Document> emp = client.getDatabase("test").getCollection("emp");MongoCollection<Document> events = client.getDatabase("test").getCollection("events");//事務(wù)操作配置TransactionOptions txnOptions = TransactionOptions.builder().readPreference(ReadPreference.primary()).readConcern(ReadConcern.MAJORITY).writeConcern(WriteConcern.MAJORITY).build();try (ClientSession clientSession = client.startSession()) {//開啟事務(wù)clientSession.startTransaction(txnOptions);try {emp.updateOne(clientSession,Filters.eq("username", "張三"),Updates.set("status", "inactive"));int i=1/0;events.insertOne(clientSession,new Document("username", "張三").append("status", new Document("new", "inactive").append("old", "Active")));//提交事務(wù)clientSession.commitTransaction();}catch (Exception e){e.printStackTrace();//回滾事務(wù)clientSession.abortTransaction();}}
}
(2)聲明式事務(wù)
1、配置事務(wù)管理器
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory factory){//事務(wù)操作配置
TransactionOptions txnOptions = TransactionOptions.builder().readPreference(ReadPreference.primary()).readConcern(ReadConcern.MAJORITY).writeConcern(WriteConcern.MAJORITY).build();return new MongoTransactionManager(factory);
}
2、編程測試service
@Service
public class EmployeeService {@AutowiredMongoTemplate mongoTemplate;@Transactionalpublic void addEmployee(){Employee employee = new Employee(100,"張三", 21,15000.00, new Date());Employee employee2 = new Employee(101,"趙六", 28,10000.00, new Date());mongoTemplate.save(employee);//int i=1/0;mongoTemplate.save(employee2);}
}
十六、Chang Streams
1、什么是 Chang Streams
Change Stream指數(shù)據(jù)的變化事件流,MongoDB從3.6版本開始提供訂閱數(shù)據(jù)變更的功能。
Change Stream 是 MongoDB 用于實現(xiàn)變更追蹤的解決方案,類似于關(guān)系數(shù)據(jù)庫的觸發(fā)器,但原理不完全相同:
2、Change Stream 的實現(xiàn)原理
Change Stream 是基于 oplog 實現(xiàn)的,提供推送實時增量的推送功能
。它在 oplog 上開啟一個 tailable cursor 來追蹤所有復制集上的變更操作,最終調(diào)用應用中定義的回調(diào)函數(shù)。
被追蹤的變更事件主要包括:
insert/update/delete:插入、更新、刪除;
drop:集合被刪除;
rename:集合被重命名;
dropDatabase:數(shù)據(jù)庫被刪除;
invalidate:drop/rename/dropDatabase 將導致 invalidate 被觸發(fā), 并關(guān)閉 change stream;
如果只對某些類型的變更事件感興趣,可以使用使用聚合管道的過濾步驟過濾事件:
var cs = db.user.watch([{$match:{operationType:{$in:["insert","delete"]}}
}])
Change Stream會采用 "readConcern:majority"這樣的一致性級別,保證寫入的變更不會被回滾
。因此:
未開啟 majority readConcern 的集群無法使用 Change Stream;
當集群無法滿足 {w: “majority”}
時,不會觸發(fā) Change Stream(例如 PSA 架構(gòu) 中的 S 因故障宕機)。
3、MongoShell測試
# 窗口1
db.user.watch([],{maxAwaitTimeMS:1000000}).pretty()# 窗口2
db.user.insert({name:"xxxx"})
變更事件字段說明
4、Change Stream 故障恢復
假設(shè)在一系列寫入操作的過程中,訂閱 Change Stream 的應用在接收到“寫3”之后 于 t0 時刻崩潰,重啟后后續(xù)的變更怎么辦?
想要從上次中斷的地方繼續(xù)獲取變更流,只需要保留上次變更通知中的 _id 即可。
Change Stream 回調(diào)所返回的的數(shù)據(jù)帶有 _id,這個 _id 可以用于斷點恢復。例如:
var cs = db.collection.watch([], {resumeAfter: <_id>})
即可從上一條通知中斷處繼續(xù)獲取后續(xù)的變更通知。
5、使用場景
跨集群的變更復制——在源集群中訂閱 Change Stream,一旦得到任何變更立即寫 入目標集群。
微服務(wù)聯(lián)動——當一個微服務(wù)變更數(shù)據(jù)庫時,其他微服務(wù)得到通知并做出相應的變更。
其他任何需要系統(tǒng)聯(lián)動的場景。
案例 1.監(jiān)控
用戶需要及時獲取變更信息(例如賬戶相關(guān)的表),ChangeStreams 可以提供監(jiān)控功能,一旦相關(guān)的表信息發(fā)生變更,就會將變更的消息實時推送出去。
案例 2.分析平臺
例如需要基于增量去分析用戶的一些行為,可以基于 ChangeStreams 把數(shù)據(jù)拉出來,推到下游的計算平臺, 比如 類似 Flink、Spark 等計算平臺等等。
案例 3.數(shù)據(jù)同步
基于 ChangeStreams,用戶可以搭建額外的 MongoDB 集群,這個集群是從原端的 MongoDB 拉取過來的, 那么這個集群可以做一個熱備份,假如源端集群發(fā)生 網(wǎng)絡(luò)不通等等之類的變故,備集群就可以接管服務(wù)。 還可以做一個冷備份,如用戶基于 ChangeStreams 把數(shù)據(jù)同步到文件,萬一源端數(shù)據(jù)庫發(fā)生不可服務(wù), 就可以從文件里恢復出完整的 MongoDB 數(shù)據(jù)庫, 繼續(xù)提供服務(wù)。(當然,此處還需要借助定期全量備份來一同完成恢復) 另外數(shù)據(jù)同步它不僅僅局限于同一地域,可以跨地域,從北京到上海甚至從中國到美國等等。
案例 4.消息推送
假如用戶想實時了解公交車的信息,那么公交車的位置每次變動,都實時推送變更的信息給想了解的用 戶,用戶能夠?qū)崟r收到公交車變更的數(shù)據(jù),非常便捷實用。
注意事項
Change Stream 依賴于 oplog,因此中斷時間不可超過 oplog 回收的最大時間窗;
在執(zhí)行 update 操作時,如果只更新了部分數(shù)據(jù),那么 Change Stream 通知的也是增量部分;
刪除數(shù)據(jù)時通知的僅是刪除數(shù)據(jù)的 _id。
6、Chang Stream整合Spring Boot
1、引入依賴
<!--spring data mongodb-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
2、配置yml
spring:data:mongodb:uri: mongodb://root:root@192.168.56.10:27017/test?authSource=admin#uri等同于下面的配置#database: test#host: 192.168.56.10#port: 27017#username: root#password: root#authentication-database: admin#復制集 https://docs.mongodb.com/manual/reference/connection-string/#uri: mongodb://root:root@192.168.56.10:28017,192.168.56.10:28018,192.168.56.10:28019/test?authSource=admin&replicaSet=rs0
3、配置 mongo監(jiān)聽器的容器MessageListenerContainer,spring啟動時會自動啟動監(jiān)聽的任務(wù)用于接收changestream
@Configuration
public class MongodbConfig {@BeanMessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {Executor executor = Executors.newFixedThreadPool(5);MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {@Overridepublic boolean isAutoStartup() {return true;}};ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(documentMessageListener).collection("user") //需要監(jiān)聽的集合名//過濾需要監(jiān)聽的操作類型,可以根據(jù)需求指定過濾條件.filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete"))))//不設(shè)置時,文檔更新時,只會發(fā)送變更字段的信息,設(shè)置UPDATE_LOOKUP會返回文檔的全部信息.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();messageListenerContainer.register(request, Document.class);return messageListenerContainer;}
}
4、配置mongo監(jiān)聽器,用于接收數(shù)據(jù)庫的變更信息
@Component
public class DocumentMessageListener<S, T> implements MessageListener<S, T> {@Overridepublic void onMessage(Message<S, T> message) {System.out.println(String.format("Received Message in collection %s.\n\trawsource: %s\n\tconverted: %s",message.getProperties().getCollectionName(), message.getRaw(), message.getBody()));}}
5、測試
mongo shell插入一條文檔
6、控制臺輸出