泉州做媽祖雕像網(wǎng)站常用的網(wǎng)絡(luò)營(yíng)銷工具
背景:
我們經(jīng)常會(huì)使用到比如數(shù)據(jù)庫(kù)中的配置表信息,而我們不希望每次都去查詢db,那么我們就想定時(shí)把db配置表的數(shù)據(jù)定時(shí)加載到flink的本地內(nèi)存中,那么如何實(shí)現(xiàn)呢?
外部定時(shí)器定時(shí)加載實(shí)現(xiàn)
1.在open函數(shù)中進(jìn)行定時(shí)器的創(chuàng)建和定時(shí)加載,這個(gè)方法對(duì)于所有的RichFunction富函數(shù)都適用,包括RichMap,RichFilter,RichSink等,代碼如下所示
package wikiedits.schedule;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {// 定時(shí)任務(wù)執(zhí)行器private transient ScheduledExecutorService scheduledExecutorService;// 本地變量private int threshold;@Overridepublic void open(Configuration parameters) throws Exception {// 1.從db查詢數(shù)據(jù)初始化本地變量
// threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定時(shí)任務(wù)更新本地內(nèi)存的配置信息以及更新本地變量threshold的值scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定時(shí)任務(wù)更新本地內(nèi)存配置項(xiàng)// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key", "value");
// }// 2.2 更新本地變量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {}@Overridepublic void close() throws Exception {ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);}}//本地緩存實(shí)現(xiàn)
package wikiedits.schedule;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 保存Config信息的本地緩存 ---定時(shí)同步DB配置表的數(shù)據(jù)*/
public class ConfigEntityLocalCache {private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();/*** 獲取本地緩存實(shí)例*/public static ConfigEntityLocalCache getInstance() {return instance;}/** 緩存內(nèi)存配置項(xiàng) */private static Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/*** 更新本地緩存數(shù)據(jù)*/public boolean update(String key, String value){configCache.put(key, value);return true;}/*** 更新本地緩存數(shù)據(jù)*/public String getByKey(String key){return configCache.getIfPresent(key);}}
2.在靜態(tài)類中通過(guò)static語(yǔ)句塊創(chuàng)建定時(shí)器并定時(shí)加載,代碼如下
package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 靜態(tài)類定時(shí)加載DB配置表到本地內(nèi)存中*/
public class StaticLoadUtil {// 定時(shí)任務(wù)執(zhí)行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通過(guò)定時(shí)執(zhí)行器定時(shí)同步本地緩存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定時(shí)任務(wù)更新本地內(nèi)存配置項(xiàng)// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地變量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 獲取本地緩存*/public static Cache<String, String> getConfigCache() {return configCache;}}
總結(jié):
1.外部定時(shí)器可以通過(guò)在富函數(shù)的open中進(jìn)行初始化并開始定時(shí)執(zhí)行
2.外部定時(shí)器也可以通過(guò)創(chuàng)建一個(gè)單獨(dú)的靜態(tài)類,然后在static模塊中進(jìn)行初始化并開始定時(shí)執(zhí)行