好湿?好紧?好多水好爽自慰,久久久噜久噜久久综合,成人做爰A片免费看黄冈,机机对机机30分钟无遮挡

主頁 > 知識庫 > Redis實現分布式鎖和等待序列的方法示例

Redis實現分布式鎖和等待序列的方法示例

熱門標簽:地圖標注工廠入駐 一個地圖標注多少錢 四川穩定外呼系統軟件 臺灣電銷 高碑店市地圖標注app b2b外呼系統 廊坊外呼系統在哪買 400電話辦理的口碑 南京手機外呼系統廠家

在集群下,經常會因為同時處理發生資源爭搶和并發問題,但是我們都知道同步鎖 synchronized 、 cas 、 ReentrankLock 這些鎖的作用范圍都是 JVM ,說白了在集群下沒啥用。這時我們就需要能在多臺 JVM 之間決定執行順序的鎖了,現在分布式鎖主要有 redis 、 Zookeeper 實現的,還有數據庫的方式,不過性能太差,也就是需要一個第三方的監管。

背景

最近在做一個消費 Kafka 消息的時候發現,由于線上的消費者過多,經常會遇到,多個機器同時處理一個主鍵類型的數據的情況發生,如果最后是執行更新操作的話,也就是一個更新順序的問題,但是如果恰好都需要插入數據的時候,會出現主鍵重復的問題。這是生產上不被允許的(因為公司有異常監管的機制,扣分啥的),這是就需要個分布式鎖了,斟酌后用了 Redis 的實現方式(因為網上例子多)

分析

redis 實現的分布式鎖,實現原理是 set 方法,因為多個線程同時請求的時候,只有一個線程可以成功并返回結果,還可以設置有效期,來避免死鎖的發生,一切都是這么的完美,不過有個問題,在 set 的時候,會直接返回結果,成功或者失敗,不具有阻塞效果,需要我們自己對失敗的線程進程處理,有兩種方式

  • 丟棄
  • 等待重試 由于我們的系統需要這些數據,那么只能重新嘗試獲取。這里使用 redis 的 List 類型實現等待序列的作用

代碼

直接上代碼 其實直接redis的工具類就可以解決了

package com.test
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.List;

/**
 * @desc redis隊列實現方式
 * @anthor 
 * @date 
 **/
public class RedisUcUitl {

  private static final String LOCK_SUCCESS = "OK";
  private static final String SET_IF_NOT_EXIST = "NX";
  private static final String SET_WITH_EXPIRE_TIME = "PX";

  private static final Long RELEASE_SUCCESS = 1L;

  private RedisUcUitl() {

  }
  /**
   * logger
   **/

  /**
   * 存儲redis隊列順序存儲 在隊列首部存入
   *
   * @param key  字節類型
   * @param value 字節類型
   */
  public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {

    return jedis.lpush(key, value);
  
  }

  /**
   * 移除列表中最后一個元素 并將改元素添加入另一個列表中 ,當列表為空時 將阻塞連接 直到等待超時
   *
   * @param srckey
   * @param dstkey
   * @param timeout 0 表示永不超時
   * @return
   */
  public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {

    return jedis.brpoplpush(srckey, dstkey, timeout);

  }

  /**
   * 返回制定的key,起始位置的redis數據
   * @param redisKey
   * @param start
   * @param end -1 表示到最后
   * @return
   */
  public static Listbyte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
    
    return jedis.lrange(redisKey, start, end);
  }

  /**
   * 刪除key
   * @param redisKey
   */
  public static void delete(Jedis jedis, final byte[] redisKey) {
    
     return jedis.del(redisKey);
  }

  /**
   * 嘗試加鎖
   * @param lockKey key名稱
   * @param requestId 身份標識
   * @param expireTime 過期時間
   * @return
   */
  public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    return LOCK_SUCCESS.equals(result);

  }

  /**
   * 釋放鎖
   * @param lockKey key名稱
   * @param requestId 身份標識
   * @return
   */
  public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
    final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

    return RELEASE_SUCCESS.equals(result);

  }
}

業務邏輯主要代碼如下

1.先消耗隊列中的

while(true){
  // 消費隊列
  try{
    // 被放入redis隊列的數據 序列化后的
    byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
    if(bytes == null || bytes.isEmpty()){
      // 隊列中沒數據時退出
      break;
    }
    // 反序列化對象
    MapString, Object> singleMap = (MapString, Object>) ObjectSerialUtil.bytesToObject(bytes);
    // 塞入唯一的值 防止被其他線程誤解鎖
    String requestId = UUID.randomUUID().toString();
    boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
    if(lockGetFlag){
      // 成功獲取鎖 進行業務處理
      //TODO
      // 處理完畢釋放鎖 
      boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);

    }else{
      // 未能獲得鎖放入等待隊列
     RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
  
    }
    
  }catch(Exception e){
    break;
  }
  
}

2.處理最新接到的數據

同樣是走嘗試獲取鎖,獲取不到放入隊列的流程

一般序列化用 fastJson 之列的就可以了,這里用的是 JDK 自帶的,工具類如下

public class ObjectSerialUtil {

  private ObjectSerialUtil() {
//    工具類
  }

  /**
   * 將Object對象序列化為byte[]
   *
   * @param obj 對象
   * @return byte數組
   * @throws Exception
   */
  public static byte[] objectToBytes(Object obj) throws IOException {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(obj);
    byte[] bytes = bos.toByteArray();
    bos.close();
    oos.close();
    return bytes;
  }


  /**
   * 將bytes數組還原為對象
   *
   * @param bytes
   * @return
   * @throws Exception
   */
  public static Object bytesToObject(byte[] bytes) {
    try {
      ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
      ObjectInputStream ois = new ObjectInputStream(bin);
      return ois.readObject();
    } catch (Exception e) {
      throw new BaseException("反序列化出錯!", e);
    }
  }
}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

您可能感興趣的文章:
  • Redis分布式鎖的正確實現方法總結
  • Redis分布式鎖的實現方式(redis面試題)
  • SpringBoot使用Redisson實現分布式鎖(秒殺系統)
  • SpringBoot集成Redisson實現分布式鎖的方法示例
  • Java Redis分布式鎖的正確實現方式詳解
  • redis分布式鎖的問題與解決方法
  • 淺談Redis分布式鎖的正確實現方式
  • 單機redis分布式鎖實現原理解析

標簽:畢節 南寧 伊春 泰州 河源 拉薩 定州 甘南

巨人網絡通訊聲明:本文標題《Redis實現分布式鎖和等待序列的方法示例》,本文關鍵詞  Redis,實現,分布式,鎖,和,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《Redis實現分布式鎖和等待序列的方法示例》相關的同類信息!
  • 本頁收集關于Redis實現分布式鎖和等待序列的方法示例的相關信息資訊供網民參考!
  • 推薦文章
    主站蜘蛛池模板: 色情版巜罗拉情放荡id剧情 | 网禁??拗女稀缺1366| 深夜噪音| 免费播放A片视频在线观看| 91人妻人人爽精品破学生处| 波野多结衣家庭教师| 日本生活中的玛丽| 熟女人妻?人妻の视频| 在线免费观看污片| 日韩欧美视频一区二区| 一人之下下拉式漫画免费| 久久精品露脸对白国产| 好爽?好紧?宝贝别夹大巴视频 | 91国偷自产一区二区三区女王 | 狠狠操狠狠色| 国产精品27页| 欧美熟妇另类久久久久久牛牛影视| 不戴套玩弄新婚人妻视频| 极品国产一区二区三区| 午夜香港三级a三级三点| 亚洲欧美曰韩国产综合图片| 乱系列高H全肉| 刺激仑乱视频| 半夜农村土炕性事| 免费观看影片| 国产精品秘?视频免费观看| 24小时日本电影免费看| 久久丁香| 又粗又硬又大又爽免费视频播放 | 扒开双腿疯狂进出爽爽爽文字| 91最新在线| 99riAV1国产精品视频| 刘亦菲一级婬片A片AAA毛| 色婷婷天天综合在线| 色视频网站人成免费| 欧美精产国品一二三产品区别大吗| 中国一级特黄毛片大片| 99热最新网址获取| 美国一级黄色片| 国产香蕉精品视频在| 亚洲中文无码亚洲人成人二区|