// RabbitMQConfig.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class RabbitMQConfig {
public static final String QUEUE_NAME = "cache_invalidation_queue";
public static Channel createChannel() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // RabbitMQ 服务器地址
Connection connection = factory.newConnection();
return connection.createChannel();
}
}
// CacheService.java
public class CacheService {
private LocalCache localCache;
private RedisCache redisCache;
private Database database;
private Channel channel;
public CacheService() throws Exception {
this.channel = RabbitMQConfig.createChannel();
channel.queueDeclare(RabbitMQConfig.QUEUE_NAME, false, false, false, null);
}
public void updateData(Data data) throws Exception {
// 更新数据库
database.update(data);
// 生成缓存键
String cacheKey = "data:" + data.getId();
// 发送缓存失效消息到 RabbitMQ
channel.basicPublish("", RabbitMQConfig.QUEUE_NAME, null, cacheKey.getBytes());
}
}
// CacheInvalidationListener.java
import com.rabbitmq.client.*;
public class CacheInvalidationListener {
private LocalCache localCache;
private RedisCache redisCache;
private Channel channel;
public CacheInvalidationListener() throws Exception {
this.channel = RabbitMQConfig.createChannel();
channel.queueDeclare(RabbitMQConfig.QUEUE_NAME, false, false, false, null);
}
public void startListener() throws Exception {
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String cacheKey = new String(body, "UTF-8");
localCache.delete(cacheKey);
redisCache.delete(cacheKey);
}
};
channel.basicConsume(RabbitMQConfig.QUEUE_NAME, true, consumer);
}
public static void main(String[] args) throws Exception {
CacheInvalidationListener listener = new CacheInvalidationListener();
listener.startListener();
}
}