国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

如何做網(wǎng)站與網(wǎng)頁微信營銷軟件

如何做網(wǎng)站與網(wǎng)頁,微信營銷軟件,網(wǎng)站開發(fā)實訓日記,優(yōu)設網(wǎng)網(wǎng)址一、 分布式遠程服務(Remote Service) 基于Redis的Java分布式遠程服務,可以用來通過共享接口執(zhí)行存在于另一個Redisson實例里的對象方法。換句話說就是通過Redis實現(xiàn)了Java的遠程過程調(diào)用(RPC)。分布式遠程服務基于可…

一、?分布式遠程服務(Remote Service)

基于Redis的Java分布式遠程服務,可以用來通過共享接口執(zhí)行存在于另一個Redisson實例里的對象方法。換句話說就是通過Redis實現(xiàn)了Java的遠程過程調(diào)用(RPC)。分布式遠程服務基于可以用POJO對象,方法的參數(shù)和返回類不受限制,可以是任何類型。

分布式遠程服務(Remote Service)提供了兩種類型的RRemoteService實例:

  • 服務端(遠端)實例 - 用來執(zhí)行遠程方法(工作者實例即worker instance). 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceImpl someServiceImpl = new SomeServiceImpl();// 在調(diào)用遠程方法以前,應該首先注冊遠程服務
// 只注冊了一個服務端工作者實例,只能同時執(zhí)行一個并發(fā)調(diào)用
remoteService.register(SomeServiceInterface.class, someServiceImpl);// 注冊了12個服務端工作者實例,可以同時執(zhí)行12個并發(fā)調(diào)用
remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
  • 客戶端(本地)實例 - 用來請求遠程方法. 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);String result = service.doSomeStuff(1L, "secondParam", new AnyParam());

客戶端和服務端必須使用一樣的共享接口,生成兩者的Redisson實例必須采用相同的連接配置??蛻舳撕头斩藢嵗梢赃\行在同一個JVM里,也可以是不同的??蛻舳撕头斩说臄?shù)量不收限制。(注意:盡管Redisson不做任何限制,但是Redis的限制仍然有效。)

在服務端工作者可用實例數(shù)量 大于1 的時候,將并行執(zhí)行并發(fā)調(diào)用的遠程方法。

并行執(zhí)行工作者數(shù)量計算方法如下: T = R * N

T - 并行執(zhí)行工作者總數(shù) R - Redisson服務端數(shù)量 N - 注冊服務端時指定的執(zhí)行工作者數(shù)量

超過該數(shù)量的并發(fā)請求將在列隊中等候執(zhí)行。

在服務端工作者實例可用數(shù)量為 1 時,遠程過程調(diào)用將會按 順序執(zhí)行。這種情況下,每次只有一個請求將會被執(zhí)行,其他請求將在列隊中等候執(zhí)行。

1. 分布式遠程服務工作流程

分布式遠程服務為每個注冊接口建立了兩個列隊。一個列隊用于請求,由服務端監(jiān)聽,另一個列隊用于應答回執(zhí)和結(jié)果回復,由客戶端監(jiān)聽。應答回執(zhí)用于判定該請求是否已經(jīng)被接受。如果在指定的超時時間內(nèi)沒有被執(zhí)行工作者執(zhí)行將會拋出RemoteServiceAckTimeoutException錯誤。

下圖描述了每次發(fā)起遠程過程調(diào)用請求的工作流程。

2. 發(fā)送即不管(Fire-and-Forget)模式和應答回執(zhí)(Ack-Response)模式

分布式遠程服務通過org.redisson.core.RemoteInvocationOptions類,為每個遠程過程調(diào)用提供了一些可配置選項。這些選項可以用來指定和修改請求超時和選擇跳過應答回執(zhí)或結(jié)果的發(fā)送模式。例如:

// 應答回執(zhí)超時1秒鐘,遠程執(zhí)行超時30秒鐘
RemoteInvocationOptions options = RemoteInvocationOptions.defaults();// 無需應答回執(zhí),遠程執(zhí)行超時30秒鐘
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck();// 應答回執(zhí)超時1秒鐘,不等待執(zhí)行結(jié)果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();// 應答回執(zhí)超時1分鐘,不等待執(zhí)行結(jié)果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult();// 發(fā)送即不管(Fire-and-Forget)模式,無需應答回執(zhí),不等待結(jié)果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();RRemoteService remoteService = redisson.getRemoteService();
YourService service = remoteService.get(YourService.class, options);

3. 異步調(diào)用

遠程過程調(diào)用也可以采用異步的方式執(zhí)行。異步調(diào)用需要單獨提交一個帶有@RRemoteAsync注解(annotation)的異步接口類。異步接口方法簽名必須與遠程接口的方法簽名相符。異步接口的返回類必須是org.redisson.api.RFuture對象或其子對象。在調(diào)用RRemoteService.get方法時將對異步接口的方法進行驗證。異步接口無須包含所有的遠程接口里的方法,只需要包含要求異步執(zhí)行的方法即可。

// 遠程接口
public interface RemoteInterface {Long someMethod1(Long param1, String param2);void someMethod2(MyObject param);MyObject someMethod3();}// 匹配遠程接口的異步接口
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync {RFuture<Long> someMethod1(Long param1, String param2);RFuture<Void> someMethod2(MyObject param);}RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class);

4. 取消異步調(diào)用

通過調(diào)用Future.cancel()方法可以非常方便的取消一個異步調(diào)用。分布式遠程服務允許在三個階段中任何一個階段取消異步調(diào)用:

  1. 遠程調(diào)用請求在列隊中排隊階段
  2. 遠程調(diào)用請求已經(jīng)被分布式遠程服務接受,還未發(fā)送應答回執(zhí),執(zhí)行尚未開始。
  3. 遠程調(diào)用請求已經(jīng)在執(zhí)行階段

想要正確的處理第三個階段,在服務端代碼里應該檢查Thread.currentThread().isInterrupted()的返回狀態(tài)。范例如下:

// 遠程接口
public interface MyRemoteInterface {Long myBusyMethod(Long param1, String param2);}// 匹配遠程接口的異步接口
@RRemoteAsync(MyRemoteInterface.class)
public interface MyRemoteInterfaceAsync {RFuture<Long> myBusyMethod(Long param1, String param2);}// 遠程接口的實現(xiàn)
public class MyRemoteServiceImpl implements MyRemoteInterface {public Long myBusyMethod(Long param1, String param2) {for (long i = 0; i < Long.MAX_VALUE; i++) {iterations.incrementAndGet();if (Thread.currentThread().isInterrupted()) {System.out.println("interrupted! " + i);return;}}}}RRemoteService remoteService = redisson.getRemoteService();
ExecutorService executor = Executors.newFixedThreadPool(5);
// 注冊遠程服務的服務端的同時,通過單獨指定的ExecutorService來配置執(zhí)行線程池
MyRemoteInterface serviceImpl = new MyRemoteServiceImpl();
remoteService.register(MyRemoteInterface.class, serviceImpl, 5, executor);// 異步調(diào)用方法
MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class);
RFuture<Long> future = asyncService.myBusyMethod(1L, "someparam");
// 取消異步調(diào)用
future.cancel(true);

二、分布式實時對象(Live Object)服務

1. 介紹

一個 分布式實時對象(Live Object) 可以被理解為一個功能強化后的Java對象。該對象不僅可以被一個JVM里的各個線程相引用,還可以被多個位于不同JVM里的線程同時引用。Wikipedia對這種特殊對象的概述是:

Live distributed object (also abbreviated as live object) refers to a running instance of a distributed multi-party (or peer-to-peer) protocol, viewed from the object-oriented perspective, as an entity that has a distinct identity, may encapsulate internal state and threads of execution, and that exhibits a well-defined externally visible behavior.

Redisson分布式實時對象(Redisson Live Object,簡稱RLO)運用即時生成的代理類(Proxy),將一個指定的普通Java類里的所有字段,以及針對這些字段的操作全部映射到一個Redis Hash的數(shù)據(jù)結(jié)構,實現(xiàn)這種理念。每個字段的get和set方法最終被轉(zhuǎn)譯為針對同一個Redis Hash的hget和hset命令,從而使所有連接到同一個Redis節(jié)點的所有可以客戶端同時對一個指定的對象進行操作。眾所周知,一個對象的狀態(tài)是由其內(nèi)部的字段所賦的值來體現(xiàn)的,通過將這些值保存在一個像Redis這樣的遠程共享的空間的過程,把這個對象強化成了一個分布式對象。這個分布式對象就叫做Redisson分布式實時對象(Redisson Live Object,簡稱RLO)。

通過使用RLO,運行在不同服務器里的多個程序之間,共享一個對象實例變得和在單機程序里共享一個對象實例一樣了。同時還避免了針對任何一個字段操作都需要將整個對象序列化和反序列化的繁瑣,進而降低了程序開發(fā)的復雜性和其數(shù)據(jù)模型的復雜性:從任何一個客戶端修改一個字段的值,處在其他服務器上的客戶端(幾乎^)即刻便能查看到。而且實現(xiàn)代碼與單機程序代碼無異。(^連接到從節(jié)點的客戶端仍然受Redis的最終一致性的特性限制)

鑒于Redis是一個單線程的程序,針對實時對象的所有的字段操作可以理解為全部是原子性操作,也就是說在讀取一個字段的過程不會擔心被其他線程所修改。

通過使用RLO,可以把Redis當作一個允許被多個JVM同時操作且不受GC影響的共享堆(Heap Space)。

2. 使用方法

Redisson為分布式實時對象提供了一系列不同功能的注解,其中@REntity和@RId兩個注解是分布式實時對象的必要條件。

@REntity
public class MyObject {@RIdprivate String id;@RIndexprivate String value;private MyObject parent;public MyObject(String id) {this.id = id;}public MyObject() {}// getters and setters}

在開始使用分布式實時對象以前,需要先通過Redisson服務將指定的對象連接(attach),合并(merge)或持久化(persist)到Redis里。

RLiveObjectService service = redisson.getLiveObjectService();
MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 將myObject對象當前的狀態(tài)持久化到Redis里并與之保持同步。
myObject = service.persist(myObject);MyLiveObject myObject = new MyLiveObject("1");
// 拋棄myObject對象當前的狀態(tài),并與Redis里的數(shù)據(jù)建立連接并保持同步。
myObject = service.attach(myObject);MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 將myObject對象當前的狀態(tài)與Redis里的數(shù)據(jù)合并之后與之保持同步。
myObject = service.merge(myObject);
myObject.setValue("somevalue");// 通過ID獲取分布式實時對象
MyLiveObject myObject = service.get(MyLiveObject.class, "1");// 通過索引查找分布式實時對象
Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.in("value", "somevalue", "somevalue2"));Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.and(Conditions.in("value", "somevalue", "somevalue2"), Conditions.eq("secondfield", "test")));

“parent”字段中包含了指向到另一個分布式實時對象的引用,它可以與包含類是同一類型也可以不同。Redisson內(nèi)部采用了與Java的引用類似的方式保存這個關系,而非將全部對象序列化,可視為與普通的引用同等效果。

//RLO對象:
MyObject myObject = service.get(MyObject.class, "1");
MyObject myParentObject = service.get(MyObject.class, "2");
myObject.setValue(myParentObject);

RLO的字段類型基本上無限制,可以是任何類型。比如Java util包里的集合類,Map類等,也可以是自定義的對象。只要指定的編碼解碼器能夠?qū)ζ溥M行編碼和解碼操作便可。關于編碼解碼器的詳細信息請查閱高級使用方法章節(jié)。

盡管RLO的字段類型基本上無限制,個別類型還是受限。注解了RId的字段類型不能是數(shù)組類(Array),比如int[],long[],double[],byte[]等等。更多關于限制有關的介紹和原理解釋請查閱使用限制 章節(jié)。

為了保證RLO的用法和普通Java對象的用法盡可能一直,Redisson分布式實時對象服務自動將以下普通Java對象轉(zhuǎn)換成與之匹配的Redisson分布式對象RObject。

普通Java類

轉(zhuǎn)換后的Redisson類

SortedSet.class

RedissonSortedSet.class

Set.class

RedissonSet.class

ConcurrentMap.class

RedissonMap.class

Map.class

RedissonMap.class

BlockingDeque.class

RedissonBlockingDeque.class

Deque.class

RedissonDeque.class

BlockingQueue.class

RedissonBlockingQueue.class

Queue.class

RedissonQueue.class

List.class

RedissonList.class

類型轉(zhuǎn)換將按照從上至下的順序匹配類型,例如LinkedList類同時實現(xiàn)了Deque,List和Queue,由于Deque排在靠上的位置,因此它將會被轉(zhuǎn)換成一個RedissonDeque類型。

Redisson的分布式對象也采用類似的方式,將自身的狀態(tài)儲存于Redis當中,(幾乎^)所有的狀態(tài)改變都直接映射到Redis里,不在本地JVM中保留任何賦值。(^本地緩存對象除外,比如RLocalCachedMap)

3. 高級使用方法

正如上述介紹,RLO類其實都是按需實時生成的代理(Proxy)類。生成的代理類和原類都一同緩存Redisson實例里。這個過程會消耗一些時間,在對耗時比較敏感的情況下,建議通過RedissonLiveObjectService提前注冊所有的RLO類。這個服務也可以用來注銷不再需要的RLO類,也可以用來查詢一個類是否已經(jīng)注冊了。

RLiveObjectService service = redisson.getLiveObjectService();
service.registerClass(MyClass.class);
service.unregisterClass(MyClass.class);
Boolean registered = service.isClassRegistered(MyClass.class);

4. 注解(Annotation)使用方法

@REntity

僅適用于類。通過指定@REntity的各個參數(shù),可以詳細的對每個RLO類實現(xiàn)特殊定制,以達到改變RLO對象的行為。

  • namingScheme - 命名方案。命名方案規(guī)定了每個實例在Redis中對應key的名稱。它不僅被用來與已存在的RLO建立關聯(lián),還被用來儲存新建的RLO實例。默認采用Redisson自帶的DefaultNamingScheme對象。
  • codec - 編碼解碼器。在運行當中,Redisson用編碼解碼器來對RLO中的每個字段進行編碼解碼。Redisson內(nèi)部采用了實例池管理不同類型的編碼解碼器實例。Redisson提供了多種不同的編碼解碼器,默認使用JsonJacksonCodec。
  • fieldTransformation - 字段轉(zhuǎn)換模式。如上所述,為了盡可能的保證RLO的用法和普通Java對象一致,Redisson會自動將常用的普通Java對象轉(zhuǎn)換成與其匹配的Redisson分布式對象。這是由于字段轉(zhuǎn)換模式的默認值是ANNOTATION_BASED,修改為IMPLEMENTATION_BASED就可以不轉(zhuǎn)換。

@RId

僅適用于字段。@RId注解只能用在具備區(qū)分實例的字段上,這類字段可以理解為一個類的id字段或主鍵字段。這個字段的值將被命名方案namingScheme用來與事先存在的RLO建立引用。加了該注解的字段是唯一在本地JVM里同時保存賦值的字段。一個類只能有一個字段包含@RId注解。

可以通過指定一個生成器generator策略來實現(xiàn)自動生成這個字段的值。默認不提供生成器。

@RIndex

僅適用于字段。用來指定可用于搜索的字段??梢酝ㄟ^RLiveObjectService.find方法來根據(jù)條件精細查找分布式實時對象。查詢條件可以是含(IN),或(OR),和(AND)或相等(EQ)以及它們的任意組合。

使用范例如下:

public class MyObject {@RIndexString field1;@RIndexString field2;@RIndexString field3;
}Collection<MyObject> objects = RLiveObjectService.find(MyObject.class, Conditions.or(Conditions.and(Conditions.eq("field1", "value"), Conditions.eq("field2", "value")), Conditions.in("field3", "value1", "value2"));

@RObjectField

僅適用于字段。允許通過該注解中的namingScheme或codec來改變該字段的命名或編碼方式,用來區(qū)別于@REntity中指定的預設方式。

@RCascade

僅適用于字段。用來指定包含于分布式實時對象字段內(nèi)其它對象的級聯(lián)操作方式。

可選的級聯(lián)操作方式為如下:

RCascadeType.ALL - 執(zhí)行所有級聯(lián)操作
RCascadeType.PERSIST - 僅在執(zhí)行RLiveObjectService.persist()方法時進行級聯(lián)操作 RCascadeType.DETACH - 僅在執(zhí)行RLiveObjectService.detach()方法時進行級聯(lián)操作 RCascadeType.MERGE - 僅在執(zhí)行RLiveObjectService.merge()方法時進行級聯(lián)操作 RCascadeType.DELETE - 僅在執(zhí)行RLiveObjectService.delete()方法時進行級聯(lián)操作

5. 使用限制

如上所述,帶有RId注解字段的類型不能使數(shù)組類,這是因為目前默認的命名方案類DefaultNamingScheme還不能正確地將數(shù)組類序列化和反序列化。在改善了DefaultNamingScheme類的不足以后會考慮取消這個限制。另外由于帶有RId注解的字段是用來指定Redis中映射的key的名稱,因此組建一個只含有唯一一個字段的RLO類是毫無意義的。選用RBucket會更適合這樣的場景。

三、分布式執(zhí)行服務(Executor Service)

1. 分布式執(zhí)行服務概述

Redisson的分布式執(zhí)行服務實現(xiàn)了java.util.concurrent.ExecutorService接口,支持在不同的獨立節(jié)點里執(zhí)行基于java.util.concurrent.Callable接口或java.lang.Runnable接口或Lambda的任務。這樣的任務也可以通過使用Redisson實例,實現(xiàn)對儲存在Redis里的數(shù)據(jù)進行操作。Redisson分布式執(zhí)行服務是最快速和有效執(zhí)行分布式運算的方法。

2. 任務

Redisson獨立節(jié)點不要求任務的類在類路徑里。他們會自動被Redisson獨立節(jié)點的ClassLoader加載。因此每次執(zhí)行一個新任務時,不需要重啟Redisson獨立節(jié)點。

采用Callable任務的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

采用Runnable任務的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param = param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}

在創(chuàng)建ExecutorService時可以配置以下參數(shù):

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新嘗試執(zhí)行任務的時間間隔。
// ExecutorService的工作節(jié)點將等待10分鐘后重新嘗試執(zhí)行任務
//
// 設定為0則不進行重試
//
// 默認值為5分鐘
options.taskRetryInterval(10, TimeUnit.MINUTES);
RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
executorService.submit(new RunnableTask(123));RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit(new CallableTask());
Long result = future.get();

使用Lambda任務的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit((Callable & Serializable)() -> {System.out.println("task has been executed!");
});
Long result = future.get();

可以通過@RInject注解來為任務實時注入Redisson實例依賴。

3. 取消任務

通過Future.cancel()方法可以很方便的取消所有已提交的任務。通過對Thread.currentThread().isInterrupted()方法的調(diào)用可以在已經(jīng)處于運行狀態(tài)的任務里實現(xiàn)任務中斷:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了許多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任務被取消了return null;}result += value;}return result;}}RExecutorService executorService = redisson.getExecutorService("myExecutor");
Future<Long> future = executorService.submit(new CallableTask());
// 或
RFuture<Long> future = executorService.submitAsync(new CallableTask());
// ...
future.cancel(true);

四、 分布式調(diào)度任務服務(Scheduler Service)

1. 分布式調(diào)度任務服務概述

Redisson的分布式調(diào)度任務服務實現(xiàn)了java.util.concurrent.ScheduledExecutorService接口,支持在不同的獨立節(jié)點里執(zhí)行基于java.util.concurrent.Callable接口或java.lang.Runnable接口的任務。Redisson獨立節(jié)點按順序運行Redis列隊里的任務。調(diào)度任務是一種需要在未來某個指定時間運行一次或多次的特殊任務。

2. 設定任務計劃

Redisson獨立節(jié)點不要求任務的類在類路徑里。他們會自動被Redisson獨立節(jié)點的ClassLoader加載。因此每次執(zhí)行一個新任務時,不需要重啟Redisson獨立節(jié)點。

采用Callable任務的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

在創(chuàng)建ExecutorService時可以配置以下參數(shù):

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新嘗試執(zhí)行任務的時間間隔。
// ExecutorService的工作節(jié)點將等待10分鐘后重新嘗試執(zhí)行任務
//
// 設定為0則不進行重試
//
// 默認值為5分鐘
options.taskRetryInterval(10, TimeUnit.MINUTES);
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<Long> future = executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES);
Long result = future.get();

使用Lambda任務的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
ScheduledFuture<Long> future = executorService.schedule((Callable & Serializable)() -> {System.out.println("task has been executed!");
}, 10, TimeUnit.MINUTES);
Long result = future.get();

采用Runnable任務的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param= param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<?> future1 = executorService.schedule(new RunnableTask(123), 10, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future2 = executorService.scheduleAtFixedRate(new RunnableTask(123), 10, 25, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future3 = executorService.scheduleWithFixedDelay(new RunnableTask(123), 5, 10, TimeUnit.HOURS);

3. 通過CRON表達式設定任務計劃

在分布式調(diào)度任務中,可以通過CRON表達式來為任務設定一個更復雜的計劃。表達式與Quartz的CRON格式完全兼容。

例如:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

4. 取消計劃任務

分布式調(diào)度任務服務提供了兩張取消任務的方式:通過調(diào)用ScheduledFuture.cancel()方法或調(diào)用RScheduledExecutorService.cancelScheduledTask方法。通過對Thread.currentThread().isInterrupted()方法的調(diào)用可以在已經(jīng)處于運行狀態(tài)的任務里實現(xiàn)任務中斷:

public class RunnableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了許多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任務被取消了return null;}result += value;}return result;}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
RScheduledFuture<Long> future = executorService.scheduleAsync(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
future.cancel(true);
// 或
String taskId = future.getTaskId();
// ...
executorService.cancelScheduledTask(taskId);

五、分布式映射歸納服務(MapReduce)

1.?介紹

Redisson提供了通過映射歸納(MapReduce)編程模式來處理儲存在Redis環(huán)境里的大量數(shù)據(jù)的服務。這個想法來至于其他的類似實現(xiàn)方式和谷歌發(fā)表的研究。所有 映射(Map)歸納(Reduce) 階段中的任務都是被分配到各個獨立節(jié)點(Redisson Node)里并行執(zhí)行的。以下所有接口均支持映射歸納(MapReduce)功能: RMap、 RMapCache、 RLocalCachedMap、 RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue 和 RPriorityDeque

映射歸納(MapReduce)的功能是通過RMapper、 RCollectionMapper、 RReducer 和 RCollator 這幾個接口實現(xiàn)的。

1.RMapper映射器接口適用于映射(Map)類,它用來把映射(Map)中的每個元素轉(zhuǎn)換為另一個作為歸納(Reduce)處理用的鍵值對。

public interface RMapper<KIn, VIn, KOut, VOut> extends Serializable {void map(KIn key, VIn value, RCollector<KOut, VOut> collector);}

2.RCollectionMapper 映射器接口僅適用于集合(Collection)類型的對象,它用來把集合(Collection)中的元素轉(zhuǎn)換成一組作為歸納(Reduce)處理用的鍵值對。

public interface RCollectionMapper<VIn, KOut, VOut> extends Serializable {void map(VIn value, RCollector<KOut, VOut> collector);}

3.RReducer歸納器接口用來將上面這些,由映射器生成的鍵值對列表進行歸納整理。

public interface RReducer<K, V> extends Serializable {V reduce(K reducedKey, Iterator<V> values);}

4.RCollator收集器接口用來把歸納整理以后的結(jié)果化簡為單一一個對象。

public interface RCollator<K, V, R> extends Serializable {R collate(Map<K, V> resultMap);}

以上每個階段的任務都可以用@RInject注解的方式來獲取RedissonClient實例:

    public class WordMapper implements RMapper<String, String, String, Integer> {@RInjectprivate RedissonClient redissonClient;@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {// ...redissonClient.getAtomicLong("mapInvocations").incrementAndGet();}}

2. 映射(Map)類型的使用范例

Redisson提供的RMap、 RMapCache和RLocalCachedMap這三種映射(Map)類型的對象均可以使用這種分布式映射歸納(MapReduce)服務。

以下是在映射(Map)類型的基礎上采用映射歸納(MapReduce)來實現(xiàn)字數(shù)統(tǒng)計的范例:

  1. Mapper對象將每個映射的值用空格且分開。
    public class WordMapper implements RMapper<String, String, String, Integer> {@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
  1. Reducer對象計算統(tǒng)計所有單詞的使用情況。
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
  1. Collator對象統(tǒng)計所有單詞的使用情況。
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
  1. 把上面的各個對象串起來使用:
    RMap<String, String> map = redisson.getMap("wordsMap");map.put("line1", "Alice was beginning to get very tired");map.put("line2", "of sitting by her sister on the bank and");map.put("line3", "of having nothing to do once or twice she");map.put("line4", "had peeped into the book her sister was reading");map.put("line5", "but it had no pictures or conversations in it");map.put("line6", "and what is the use of a book");map.put("line7", "thought Alice without pictures or conversation");RMapReduce<String, String, String, Integer> mapReduce= map.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 統(tǒng)計詞頻Map<String, Integer> mapToNumber = mapReduce.execute();// 統(tǒng)計字數(shù)Integer totalWordsAmount = mapReduce.execute(new WordCollator());

3. 集合(Collection)類型的使用范例

Redisson提供的RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue和RPriorityDeque這幾種集合(Collection)類型的對象均可以使用這種分布式映射歸納(MapReduce)服務。

以下是在集合(Collection)類型的基礎上采用映射歸納(MapReduce)來實現(xiàn)字數(shù)統(tǒng)計的范例:

    public class WordMapper implements RCollectionMapper<String, String, Integer> {@Overridepublic void map(String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
    RList<String> list = redisson.getList("myList");list.add("Alice was beginning to get very tired");list.add("of sitting by her sister on the bank and");list.add("of having nothing to do once or twice she");list.add("had peeped into the book her sister was reading");list.add("but it had no pictures or conversations in it");list.add("and what is the use of a book");list.add("thought Alice without pictures or conversation");RCollectionMapReduce<String, String, Integer> mapReduce= list.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 統(tǒng)計詞頻Map<String, Integer> mapToNumber = mapReduce.execute();// 統(tǒng)計字數(shù)Integer totalWordsAmount = mapReduce.execute(new WordCollator());

http://m.aloenet.com.cn/news/929.html

相關文章:

  • 陜西省建設監(jiān)理協(xié)會證書查詢網(wǎng)站寧波網(wǎng)站推廣怎么做
  • 管理咨詢公司稅收優(yōu)惠青島seo優(yōu)化
  • 鎮(zhèn)江網(wǎng)站推廣南寧seo外包要求
  • 網(wǎng)站有了訂單郵箱提醒代碼營銷策略模板
  • 這么做國外網(wǎng)站的國內(nèi)鏡像站甘肅搜索引擎網(wǎng)絡優(yōu)化
  • 西安網(wǎng)絡公司網(wǎng)站建設小紅書推廣平臺
  • 網(wǎng)站開發(fā)基于什么平臺自己代理一款手游需要多少錢
  • go語言怎么搭建網(wǎng)頁東莞網(wǎng)站優(yōu)化關鍵詞排名
  • 用jsp做網(wǎng)站的難點云優(yōu)化
  • index.html網(wǎng)站怎么做重慶seo優(yōu)化公司
  • 免費下載建設銀行官方網(wǎng)站濟南優(yōu)化網(wǎng)站的哪家好
  • 岳陽網(wǎng)站開發(fā)商城網(wǎng)絡推廣項目計劃書
  • 校園網(wǎng)站建設模板上海網(wǎng)站排名seo公司哪家好
  • 做網(wǎng)站賭博代理違法嗎品牌推廣是做什么的
  • 做營銷網(wǎng)站制作seo綜合查詢是什么意思
  • 企業(yè)網(wǎng)站怎么做畢業(yè)設計網(wǎng)站怎么營銷推廣
  • b2b平臺有哪些類別網(wǎng)絡營銷優(yōu)化
  • 做免費網(wǎng)站教程國vs百度seo排名優(yōu)化軟件化
  • 萊蕪二手房網(wǎng)湖南seo優(yōu)化報價
  • wordpress顯示輪播圖深圳市seo上詞多少錢
  • 網(wǎng)絡投注網(wǎng)站是怎么建設簡述網(wǎng)絡營銷的概念
  • 可以免費開店的平臺windows11優(yōu)化大師
  • 網(wǎng)站動態(tài)小圖標青島網(wǎng)絡seo公司
  • 日本中古手表網(wǎng)站申請網(wǎng)站怎么申請
  • 襄陽做網(wǎng)站哪家好b2b平臺有哪些
  • 電子商務網(wǎng)站開發(fā)的基本流程軟文營銷的特點有哪些
  • 深圳 建設銀行國際互聯(lián)網(wǎng)站網(wǎng)絡推廣公司排名
  • 這幾年做網(wǎng)站怎么樣百度搜索風云榜排行榜
  • 東莞網(wǎng)站設計報價天津百度網(wǎng)站排名優(yōu)化
  • 有哪些網(wǎng)站可以做淘寶客搜索seo怎么優(yōu)化