Java利用Redis实现消息队列的示例代码
本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下:
应用场景
为什么要用redis?
二进制存储、java序列化传输、IO连接数高、连接频繁
一、序列化
这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象;主要是用到了ByteArrayOutputStream和ByteArrayInputStream;注意:每个需要序列化的对象都要实现Serializable接口;
其代码如下:
packageUtils;
importjava.io.*;
/**
*CreatedbyKinglfon2016/10/17.
*/
publicclassObjectUtil{
/**
*对象转byte[]
*@paramobj
*@return
*@throwsIOException
*/
publicstaticbyte[]object2Bytes(Objectobj)throwsIOException{
ByteArrayOutputStreambo=newByteArrayOutputStream();
ObjectOutputStreamoo=newObjectOutputStream(bo);
oo.writeObject(obj);
byte[]bytes=bo.toByteArray();
bo.close();
oo.close();
returnbytes;
}
/**
*byte[]转对象
*@parambytes
*@return
*@throwsException
*/
publicstaticObjectbytes2Object(byte[]bytes)throwsException{
ByteArrayInputStreamin=newByteArrayInputStream(bytes);
ObjectInputStreamsIn=newObjectInputStream(in);
returnsIn.readObject();
}
}
二、消息类(实现Serializable接口)
packageModel;
importjava.io.Serializable;
/**
*CreatedbyKinglfon2016/10/17.
*/
publicclassMessageimplementsSerializable{
privatestaticfinallongserialVersionUID=-389326121047047723L;
privateintid;
privateStringcontent;
publicMessage(intid,Stringcontent){
this.id=id;
this.content=content;
}
publicintgetId(){
returnid;
}
publicvoidsetId(intid){
this.id=id;
}
publicStringgetContent(){
returncontent;
}
publicvoidsetContent(Stringcontent){
this.content=content;
}
}
三、Redis的操作
利用redis做队列,我们采用的是redis中list的push和pop操作;
结合队列的特点:
只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或pop的对象仅需要转换成byte[]即可
java采用Jedis进行Redis的存储和Redis的连接池设置
上代码:
packageUtils;
importredis.clients.jedis.Jedis;
importredis.clients.jedis.JedisPool;
importredis.clients.jedis.JedisPoolConfig;
importjava.util.List;
importjava.util.Map;
importjava.util.Set;
/**
*CreatedbyKinglfon2016/10/17.
*/
publicclassJedisUtil{
privatestaticStringJEDIS_IP;
privatestaticintJEDIS_PORT;
privatestaticStringJEDIS_PASSWORD;
privatestaticJedisPooljedisPool;
static{
//Configuration自行写的配置文件解析类,继承自Properties
Configurationconf=Configuration.getInstance();
JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
JEDIS_PORT=conf.getInt("jedis.port",6379);
JEDIS_PASSWORD=conf.getString("jedis.password",null);
JedisPoolConfigconfig=newJedisPoolConfig();
config.setMaxActive(5000);
config.setMaxIdle(256);
config.setMaxWait(5000L);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setMinEvictableIdleTimeMillis(60000L);
config.setTimeBetweenEvictionRunsMillis(3000L);
config.setNumTestsPerEvictionRun(-1);
jedisPool=newJedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
}
/**
*获取数据
*@paramkey
*@return
*/
publicstaticStringget(Stringkey){
Stringvalue=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
value=jedis.get(key);
}catch(Exceptione){
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
close(jedis);
}
returnvalue;
}
privatestaticvoidclose(Jedisjedis){
try{
jedisPool.returnResource(jedis);
}catch(Exceptione){
if(jedis.isConnected()){
jedis.quit();
jedis.disconnect();
}
}
}
publicstaticbyte[]get(byte[]key){
byte[]value=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
value=jedis.get(key);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnvalue;
}
publicstaticvoidset(byte[]key,byte[]value){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.set(key,value);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
publicstaticvoidset(byte[]key,byte[]value,inttime){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.set(key,value);
jedis.expire(key,time);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
publicstaticvoidhset(byte[]key,byte[]field,byte[]value){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.hset(key,field,value);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
publicstaticvoidhset(Stringkey,Stringfield,Stringvalue){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.hset(key,field,value);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
/**
*获取数据
*
*@paramkey
*@return
*/
publicstaticStringhget(Stringkey,Stringfield){
Stringvalue=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
value=jedis.hget(key,field);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnvalue;
}
/**
*获取数据
*
*@paramkey
*@return
*/
publicstaticbyte[]hget(byte[]key,byte[]field){
byte[]value=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
value=jedis.hget(key,field);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnvalue;
}
publicstaticvoidhdel(byte[]key,byte[]field){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.hdel(key,field);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
/**
*存储REDIS队列顺序存储
*@paramkeyreids键名
*@paramvalue键值
*/
publicstaticvoidlpush(byte[]key,byte[]value){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.lpush(key,value);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
/**
*存储REDIS队列反向存储
*@paramkeyreids键名
*@paramvalue键值
*/
publicstaticvoidrpush(byte[]key,byte[]value){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.rpush(key,value);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
/**
*将列表source中的最后一个元素(尾元素)弹出,并返回给客户端
*@paramkeyreids键名
*@paramdestination键值
*/
publicstaticvoidrpoplpush(byte[]key,byte[]destination){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.rpoplpush(key,destination);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
/**
*获取队列数据
*@paramkey键名
*@return
*/
publicstaticListlpopList(byte[]key){
Listlist=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
list=jedis.lrange(key,0,-1);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnlist;
}
/**
*获取队列数据
*@paramkey键名
*@return
*/
publicstaticbyte[]rpop(byte[]key){
byte[]bytes=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
bytes=jedis.rpop(key);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnbytes;
}
publicstaticvoidhmset(Objectkey,Maphash){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.hmset(key.toString(),hash);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
publicstaticvoidhmset(Objectkey,Maphash,inttime){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.hmset(key.toString(),hash);
jedis.expire(key.toString(),time);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
publicstaticListhmget(Objectkey,String...fields){
Listresult=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
result=jedis.hmget(key.toString(),fields);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnresult;
}
publicstaticSethkeys(Stringkey){
Setresult=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
result=jedis.hkeys(key);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnresult;
}
publicstaticListlrange(byte[]key,intfrom,intto){
Listresult=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
result=jedis.lrange(key,from,to);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnresult;
}
publicstaticMaphgetAll(byte[]key){
Mapresult=null;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
result=jedis.hgetAll(key);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnresult;
}
publicstaticvoiddel(byte[]key){
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.del(key);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
}
publicstaticlongllen(byte[]key){
longlen=0;
Jedisjedis=null;
try{
jedis=jedisPool.getResource();
jedis.llen(key);
}catch(Exceptione){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnlen;
}
}
四、Configuration主要用于读取Redis的配置信息
packageUtils;
importjava.io.IOException;
importjava.io.InputStream;
importjava.util.Properties;
/**
*CreatedbyKinglfon2016/10/17.
*/
publicclassConfigurationextendsProperties{
privatestaticfinallongserialVersionUID=-2296275030489943706L;
privatestaticConfigurationinstance=null;
publicstaticsynchronizedConfigurationgetInstance(){
if(instance==null){
instance=newConfiguration();
}
returninstance;
}
publicStringgetProperty(Stringkey,StringdefaultValue){
Stringval=getProperty(key);
return(val==null||val.isEmpty())?defaultValue:val;
}
publicStringgetString(Stringname,StringdefaultValue){
returnthis.getProperty(name,defaultValue);
}
publicintgetInt(Stringname,intdefaultValue){
Stringval=this.getProperty(name);
return(val==null||val.isEmpty())?defaultValue:Integer.parseInt(val);
}
publiclonggetLong(Stringname,longdefaultValue){
Stringval=this.getProperty(name);
return(val==null||val.isEmpty())?defaultValue:Integer.parseInt(val);
}
publicfloatgetFloat(Stringname,floatdefaultValue){
Stringval=this.getProperty(name);
return(val==null||val.isEmpty())?defaultValue:Float.parseFloat(val);
}
publicdoublegetDouble(Stringname,doubledefaultValue){
Stringval=this.getProperty(name);
return(val==null||val.isEmpty())?defaultValue:Double.parseDouble(val);
}
publicbytegetByte(Stringname,bytedefaultValue){
Stringval=this.getProperty(name);
return(val==null||val.isEmpty())?defaultValue:Byte.parseByte(val);
}
publicConfiguration(){
InputStreamin=ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
try{
this.loadFromXML(in);
in.close();
}catch(IOExceptionioe){
}
}
}
五、测试
importModel.Message;
importUtils.JedisUtil;
importUtils.ObjectUtil;
importredis.clients.jedis.Jedis;
importjava.io.IOException;
/**
*CreatedbyKinglfon2016/10/17.
*/
publicclassTestRedisQueue{
publicstaticbyte[]redisKey="key".getBytes();
static{
try{
init();
}catch(IOExceptione){
e.printStackTrace();
}
}
privatestaticvoidinit()throwsIOException{
for(inti=0;i<1000000;i++){
Messagemessage=newMessage(i,"这是第"+i+"个内容");
JedisUtil.lpush(redisKey,ObjectUtil.object2Bytes(message));
}
}
publicstaticvoidmain(String[]args){
try{
pop();
}catch(Exceptione){
e.printStackTrace();
}
}
privatestaticvoidpop()throwsException{
byte[]bytes=JedisUtil.rpop(redisKey);
Messagemsg=(Message)ObjectUtil.bytes2Object(bytes);
if(msg!=null){
System.out.println(msg.getId()+"----"+msg.getContent());
}
}
}
每执行一次pop()方法,结果如下:
1----这是第1个内容
2----这是第2个内容
3----这是第3个内容
4----这是第4个内容
总结
至此,整个Redis消息队列的生产者和消费者代码已经完成
1.Message需要传送的实体类(需实现Serializable接口)
2.ConfigurationRedis的配置读取类,继承自Properties
3.ObjectUtil将对象和byte数组双向转换的工具类
4.Jedis通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。