using System.Collections.Concurrent; using Microsoft.Extensions.Caching.Memory; using Newtonsoft.Json; using Nop.Core.Caching; using Nop.Core.Configuration; using StackExchange.Redis; namespace Nop.Services.Caching; /// /// Represents a local in-memory cache with distributed synchronization by Redis /// /// /// This class should be registered on IoC as singleton instance /// public partial class RedisSynchronizedMemoryCache : ISynchronizedMemoryCache { #region Fields protected readonly string _processId; protected bool _disposed; /// /// Holds the keys known by this nopCommerce instance /// protected readonly ICacheKeyManager _keyManager; protected readonly IMemoryCache _memoryCache; protected readonly IRedisConnectionWrapper _connection; protected readonly ConcurrentQueue _messageQueue = new(); protected readonly Timer _timer; #endregion #region Ctor public RedisSynchronizedMemoryCache(IMemoryCache memoryCache, IRedisConnectionWrapper connectionWrapper, ICacheKeyManager cacheKeyManager, AppSettings appSettings) { _processId = $"{Guid.NewGuid()}:{Environment.ProcessId}"; _memoryCache = memoryCache; _keyManager = cacheKeyManager; _connection = connectionWrapper; var publishIntervalMs = appSettings.Get().PublishIntervalMs; if (publishIntervalMs > 0) { var timeSpan = TimeSpan.FromMilliseconds(publishIntervalMs); _timer = new(_ => PublishQueuedChangeEvents(), null, timeSpan, timeSpan); } Subscribe(); } #endregion #region Utilities /// /// Subscribe to perform some operation when a message to the preferred/active node is broadcast /// protected void Subscribe() { var channel = ChannelPrefix; var subscriber = _connection.GetSubscriber(); subscriber.Subscribe(RedisChannel.Pattern(channel + "*"), (redisChannel, value) => { var publisher = ((string)redisChannel).Replace(channel, ""); if (publisher == _processId) return; var keys = JsonConvert.DeserializeObject(value); if (keys == null) return; foreach (var key in keys) { _memoryCache.Remove(key); _keyManager.RemoveKey(key); } }); } /// /// Unique channel prefix /// protected string ChannelPrefix => $"__change@{_connection.GetDatabase().Database}__{_connection.Instance}__:"; /// /// Enqueue or publish change event /// /// The evicted cache key to be published protected void PublishChangeEvent(object key) { var stringKey = key.ToString(); if (_timer == null) BatchPublishChangeEvents(stringKey); else _messageQueue.Enqueue(stringKey); } /// /// Publish accumulated change events on key channel /// protected void PublishQueuedChangeEvents() { IEnumerable getKeys() { while (_messageQueue.TryDequeue(out var key)) yield return key; } BatchPublishChangeEvents(getKeys().Distinct().ToArray()); } /// /// Publish change events on key channel /// The evicted entries to publish on the key channel. /// protected void BatchPublishChangeEvents(params string[] keys) { if (keys.Length == 0) return; var subscriber = _connection.GetSubscriber(); subscriber.Publish(RedisChannel.Pattern( $"{ChannelPrefix}{_processId}"), JsonConvert.SerializeObject(keys), CommandFlags.FireAndForget); } /// /// The callback method to run after the entry is evicted /// /// The key of the entry being evicted. /// The value of the entry being evicted. /// The . /// The information that was passed when registering the callback. protected void OnEviction(object key, object value, EvictionReason reason, object state) { switch (reason) { case EvictionReason.Replaced: case EvictionReason.TokenExpired: // e.g. clear cache event PublishChangeEvent(key); break; // don't publish here on removed, as it could be triggered by a redis event itself default: break; } } #endregion /// /// Create or overwrite an entry in the cache. /// /// An object identifying the entry. /// The newly created instance. public ICacheEntry CreateEntry(object key) { return _memoryCache.CreateEntry(key).RegisterPostEvictionCallback(OnEviction); } /// /// Removes the object associated with the given key. /// /// An object identifying the entry. public void Remove(object key) { _memoryCache.Remove(key); //publish event manually instead of through eviction callback to avoid feedback loops PublishChangeEvent(key); } /// /// Gets the item associated with this key if present. /// /// An object identifying the requested entry. /// The located value or null. /// True if the key was found. public bool TryGetValue(object key, out object value) { return _memoryCache.TryGetValue(key, out value); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { if (!_disposed) { _timer?.Dispose(); var subscriber = _connection.GetSubscriber(); subscriber.Unsubscribe(RedisChannel.Pattern(ChannelPrefix + "*")); _disposed = true; } GC.SuppressFinalize(this); } }