|
|
@@ -12,6 +12,7 @@ import redis.clients.jedis.Jedis; |
|
|
|
import java.util.*; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.atomic.AtomicReference; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static net.sf.cglib.core.CollectionUtils.transform; |
|
|
|
import static org.cobbzilla.util.daemon.ZillaRuntime.*; |
|
|
@@ -178,6 +179,9 @@ public class RedisService { |
|
|
|
public Long sadd(String key, String value) { return sadd(key, new String[]{value}); } |
|
|
|
public Long sadd(String key, String[] values) { return __sadd(key, values, 0, MAX_RETRIES); } |
|
|
|
|
|
|
|
public Long sadd_plaintext(String key, String value) { return sadd_plaintext(key, new String[]{value}); } |
|
|
|
public Long sadd_plaintext(String key, String[] values) { return __sadd(key, values, 0, MAX_RETRIES, false); } |
|
|
|
|
|
|
|
public Long srem(String key, String value) { return srem(key, new String[]{value}); } |
|
|
|
public Long srem(String key, String[] values) { return __srem(key, values, 0, MAX_RETRIES); } |
|
|
|
|
|
|
@@ -198,6 +202,11 @@ public class RedisService { |
|
|
|
|
|
|
|
public long scard(String key) { return __scard(key, 0, MAX_RETRIES); } |
|
|
|
|
|
|
|
public Set<String> sunion(Collection<String> keys) { return __sunion(keys, 0, MAX_RETRIES); } |
|
|
|
public Set<String> sunion_plaintext(Collection<String> keys) { return __sunion(keys, 0, MAX_RETRIES, false); } |
|
|
|
|
|
|
|
public Long sunionstore(String destKey, Collection<String> keys) { return __sunionstore(destKey, keys, 0, MAX_RETRIES); } |
|
|
|
|
|
|
|
public Long incr(String key) { return __incrBy(key, 1, 0, MAX_RETRIES); } |
|
|
|
public Long counterValue(String key) { |
|
|
|
final String value = get_plaintext(key); |
|
|
@@ -211,6 +220,8 @@ public class RedisService { |
|
|
|
|
|
|
|
public Collection<String> keys(String key) { return __keys(key, 0, MAX_RETRIES); } |
|
|
|
|
|
|
|
public String rename(String key, String newKey) { return __rename(key, newKey, 0, MAX_RETRIES); } |
|
|
|
|
|
|
|
public static final String LOCK_SUFFIX = "._lock"; |
|
|
|
|
|
|
|
public boolean confirmLock(String key, String lock) { |
|
|
@@ -540,9 +551,13 @@ public class RedisService { |
|
|
|
} |
|
|
|
|
|
|
|
private Long __sadd(String key, String[] members, int attempt, int maxRetries) { |
|
|
|
return __sadd(key, members, attempt, maxRetries, true); |
|
|
|
} |
|
|
|
|
|
|
|
private Long __sadd(String key, String[] members, int attempt, int maxRetries, boolean crypt) { |
|
|
|
try { |
|
|
|
synchronized (redis) { |
|
|
|
return getRedis().sadd(prefix(key), encrypt(members)); |
|
|
|
return getRedis().sadd(prefix(key), crypt ? encrypt(members) : members); |
|
|
|
} |
|
|
|
} catch (RuntimeException e) { |
|
|
|
if (attempt > maxRetries) throw e; |
|
|
@@ -623,6 +638,36 @@ public class RedisService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Set<String> __sunion(Collection<String> keys, int attempt, int maxRetries) { |
|
|
|
return __sunion(keys, attempt, maxRetries, true); |
|
|
|
} |
|
|
|
private Set<String> __sunion(Collection<String> keys, int attempt, int maxRetries, boolean crypt) { |
|
|
|
try { |
|
|
|
String[] prefixedKeys = keys.stream().map(this::prefix).toArray(String[]::new); |
|
|
|
synchronized (redis) { |
|
|
|
final Set<String> values = getRedis().sunion(prefixedKeys); |
|
|
|
return crypt ? values.stream().map(this::decrypt).collect(Collectors.toSet()) : values; |
|
|
|
} |
|
|
|
} catch (RuntimeException e) { |
|
|
|
if (attempt > maxRetries) throw e; |
|
|
|
resetForRetry(attempt, "retrying RedisService.__sunion"); |
|
|
|
return __sunion(keys, attempt+1, maxRetries); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Long __sunionstore(String destKey, Collection<String> keys, int attempt, int maxRetries) { |
|
|
|
try { |
|
|
|
String[] prefixedKeys = keys.stream().map(this::prefix).toArray(String[]::new); |
|
|
|
synchronized (redis) { |
|
|
|
return getRedis().sunionstore(prefix(destKey), prefixedKeys); |
|
|
|
} |
|
|
|
} catch (RuntimeException e) { |
|
|
|
if (attempt > maxRetries) throw e; |
|
|
|
resetForRetry(attempt, "retrying RedisService.__sunionstore"); |
|
|
|
return __sunionstore(destKey, keys, attempt+1, maxRetries); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Long __incrBy(String key, long value, int attempt, int maxRetries) { |
|
|
|
try { |
|
|
|
synchronized (redis) { |
|
|
@@ -693,6 +738,21 @@ public class RedisService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private String __rename(String key, String newKey, int attempt, int maxRetries) { |
|
|
|
try { |
|
|
|
final String rval; |
|
|
|
synchronized (redis) { |
|
|
|
rval = getRedis().rename(prefix(key), newKey); |
|
|
|
} |
|
|
|
return rval; |
|
|
|
|
|
|
|
} catch (RuntimeException e) { |
|
|
|
if (attempt > maxRetries) throw e; |
|
|
|
resetForRetry(attempt, "retrying RedisService.__rename"); |
|
|
|
return __rename(key, newKey, attempt + 1, maxRetries); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void flush() { keys(ALL_KEYS).forEach(this::del_withPrefix); } |
|
|
|
|
|
|
|
} |