客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的Eclipse Paho,下面开始客户端代码的编写,为了方便测试这里有android和j2se两个工程:
1、新建android工程MQTTClient
2、MainActivity代码如下:
package ldw.mqttclient; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Message; import android.view.KeyEvent; import android.widget.TextView; import android.widget.Toast; public class MainActivity extends Activity { private TextView resultTv; private String host = "tcp://127.0.0.1:1883"; private String userName = "admin"; private String passWord = "password"; private Handler handler; private MqttClient client; private String myTopic = "test/topic"; private MqttConnectOptions options; private ScheduledExecutorService scheduler; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); resultTv = (TextView) findViewById(R.id.result); init(); handler = new Handler() { @Override public void handleMessage(Message msg) { super.handleMessage(msg); if(msg.what == 1) { Toast.makeText(MainActivity.this, (String) msg.obj, Toast.LENGTH_SHORT).show(); System.out.println("-----------------------------"); } else if(msg.what == 2) { Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show(); try { client.subscribe(myTopic, 1); } catch (Exception e) { e.printStackTrace(); } } else if(msg.what == 3) { Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show(); } } }; startReconnect(); } private void startReconnect() { scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { if(!client.isConnected()) { connect(); } } }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS); } private void init() { try { //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(host, "test", new MemoryPersistence()); //MQTT的连接设置 options = new MqttConnectOptions(); //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); //设置连接的用户名 options.setUserName(userName); //设置连接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); //设置回调 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { //连接丢失后,一般在这里面进行重连 System.out.println("connectionLost----------"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { //publish后会执行到这里 System.out.println("deliveryComplete---------" + token.isComplete()); } @Override public void messageArrived(String topicName, MqttMessage message) throws Exception { //subscribe后得到的消息会执行到这里面 System.out.println("messageArrived----------"); Message msg = new Message(); msg.what = 1; msg.obj = topicName+"---"+message.toString(); handler.sendMessage(msg); } }); // connect(); } catch (Exception e) { e.printStackTrace(); } } private void connect() { new Thread(new Runnable() { @Override public void run() { try { client.connect(options); Message msg = new Message(); msg.what = 2; handler.sendMessage(msg); } catch (Exception e) { e.printStackTrace(); Message msg = new Message(); msg.what = 3; handler.sendMessage(msg); } } }).start(); } @Override public boolean onKeyDown(int keyCode, KeyEvent event) { if(client != null && keyCode == KeyEvent.KEYCODE_BACK) { try { client.disconnect(); } catch (Exception e) { e.printStackTrace(); } } return super.onKeyDown(keyCode, event); } @Override protected void onDestroy() { super.onDestroy(); try { scheduler.shutdown(); client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } }
由于项目需要,我用到了心跳重连。根据这里的解释设置apollo.xml,主要有设置主机连接的地址。另外,options还有个setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。
3、新建j2se工程MQTTServer
4、Server代码如下:
import java.awt.Container; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Server extends JFrame { private static final long serialVersionUID = 1L; private JPanel panel; private JButton button; private MqttClient client; private String host = "tcp://127.0.0.1:1883"; // private String host = "tcp://localhost:1883"; private String userName = "test"; private String passWord = "test"; private MqttTopic topic; private MqttMessage message; private String myTopic = "test/topic"; public Server() { try { client = new MqttClient(host, "Server", new MemoryPersistence()); connect(); } catch (Exception e) { e.printStackTrace(); } Container container = this.getContentPane(); panel = new JPanel(); button = new JButton("发布话题"); button.addActionListener(new ActionListener() { @Override public void actionPerformed(ActionEvent ae) { try { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println(token.isComplete()+"========"); } catch (Exception e) { e.printStackTrace(); } } }); panel.add(button); container.add(panel, "North"); } private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 设置超时时间 options.setConnectionTimeout(10); // 设置会话心跳时间 options.setKeepAliveInterval(20); try { client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("connectionLost-----------"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------"+token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage arg1) throws Exception { System.out.println("messageArrived----------"); } }); topic = client.getTopic(myTopic); message = new MqttMessage(); message.setQos(1); message.setRetained(true); System.out.println(message.isRetained()+"------ratained状态"); message.setPayload("eeeeeaaaaaawwwwww---".getBytes()); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server s = new Server(); s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); s.setSize(600, 370); s.setLocationRelativeTo(null); s.setVisible(true); } }
上面代码跟客户端的代码差不多,这里就不做解释了。
没什么好说的,MQTT就是这么简单,但开始在使用的时候要注意一些参数的设置来适应项目的需求。
还可以做得更好,就是监听网络变化
我又有个问题了,在使用apollo,明明订阅者A连上了,但是为什么有时候收不到积累的durable消息? 这个时候,发布者B再发布消息出去,A也没收到。
原来是因为username是一样才会错乱。注意不是clientId,username是验证时候的名字。apollo需要保证连上的username都是不一样才不会错乱。看来apollo还不是很成熟的
没订阅成功,或者订阅的主题和发送的主题不一致导致的,原因很多哦。。。
下载了paho的源码,但不会生成jar,求方法
你有jar包了吗
jar包eclipse官网有的,把源码下下来然后导出个jar包就可以了,很久没弄了,我的是很老的jar包了
提醒下,如果连接不上,需要设置android app的internet访问权限
jar包下载地址:https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/
我为什么一直链接不上服务器 我进http://127.0.0.1:61680/console/index.html都可以 客户端和服务端都链接不上
那要怎么样才能增加用户?
我连接两个用户 服务端发送 随机发给其中一个 好蛋疼
问题解决了么朋友?
客户端的 myTopic 不是没有用到吗?
你好,我用了linux 搭建好了 apollo , 然后该怎么用了…
我博客上写的很清楚哦,这个我很长时间没弄过了
灰常感谢,我成功了
为什么连接在不断的丢失,导致不断的连接,不断的收到了重复的消息,这是怎么回事儿,是不是哪里设置的问题
我初次连接通知服务器都能正常工作,然后关闭服务器,再次启动之后,再次连接通知服务器就报错了at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27)at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:773)at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:110)
这是怎么回事?请指点下
一次只能发送一个主题消息吗?
能不能一次发多个主题消息
哈哈,我太有才了,问题解决啦
问题终于解决了
您好!demo能不能发我一份学习学习,非常谢谢!707316407@qq.com
你好,我写了client1用来publish,client2去durable订阅,就是cleanSession设成false。先启动client2这时publish来的信息都是能接收的。然后强制关闭client2造成crash的测试case,再启动client2,就再也不能接收到信息。我登录http://127.0.0.1:61680/里面显示client2确实有durable的信息积压在topic里。请问这是为什么重连之后不能继续接收呢?
android端代码太弱,这样会极不稳定,Activity推出的情况下肯定也是收不到消息的。应该写一个常驻服务Service,在这里实现逻辑。
你好,我也遇到了一样的问题,你是怎么解决的,求教
“开始我使用的是mqtt-client,使用过后发现问题百出”,请问楼主是什么问题?因为我们这边现在就是用的mqtt-client
都是去年的事情了,很久没用过了,记得之前的问题好像是收不到消息
LZ,在 http://127.0.0.1:61680/console/index.html 页面,怎么操作才能发送消息给android客户端,客户端已经连上服务端了,但index.html页面没有可编辑的文本框和发送按钮。就是说,推送消息如何操作。
你还没看懂哦,需要调用相关的方法才能接收到消息的,你看下我上面写的J2SE端 MqttDeliveryToken token = topic.publish(message);这段代码就是发送消息的。
你还没看懂哦,需要调用相关的方法才能接收到消息的,你看下我上面写的J2SE端 MqttDeliveryToken token = topic.publish(message);这段代码就是发送消息的。
LZ,调用相关方法是可以了,但有个问题想问一下,前几天什么安全公司发布了linux漏洞:美国时间1月27日,安全公司qualys发现名为 “幽灵”(GHOST)的高危漏洞。漏洞利用Linux GNU C Library (glibc) 2.18之前的版本中gethostbyname 函数处理漏洞。最严重可导致远程执行代码,控制受影响系统。glibc是GNU发布的libc库,即c运行库。glibc是linux系统中最底层的api,几乎其它任何运行库都会依赖于glibc。glibc除了封装linux操作系统所提供的系统服务外,它本身也提供了许多其它一些必要功能服务的实现。glibc 囊括了几乎所有的 UNIX 通行的标准。glibc库中的__nss_hostname_digits_dots()函数中发现了一个缓冲区溢出的漏洞,这个bug可以经过 gethostbyname*()函数被本地或者远程的触发。应用程序主要使用gethostbyname*()函数发起DNS请求,这个函数会将主机名称转换为ip地址。Qualys公司已测试可成功利用漏洞,针对名为Exim的邮件系统服务测试,通过发送特殊构造的邮件,即可获得该邮件服务器的权限,从而控制该服务器。我的服务器是ubuntu12.04,本来是可以发布消息的,昨天打了这个漏洞补丁之后,发现执行发布的java代码连接有问题了,估计ubuntu14.04也连接不上,我使用apollo 1.7,老版本应该没问题log如下:true——ratained状态 (32103) at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27) at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:63) at org.eclipse.paho.client.mqttv3.internal.ClientComms.connect(ClientComms.java:135) at org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:339) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.connect(ApolloServerJFrame.java:104) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.(ApolloServerJFrame.java:39) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.main(ApolloServerJFrame.java:115)
true——ratained状态无法连接至服务器 (32103) – java.net.ConnectException: Connection refused: connect at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:603) at java.lang.Thread.run(Thread.java:744)Caused by: java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) … 2 more救命啊~~~~~
楼主,考验你的耐心呢0.run apollo1. Server 改 密码2.Server 改 61613
修改tcp的端口号,具体什么端口apollo.xml查看
请问你的问题解决了吗?我也遇到同样的问题了
怎么解决啊。兄弟
新手小白。。想学习。。
我启动了apollo服务器端, 用上面两个客户端都连接成功了. 我想写服务器端的业务逻辑, 应该写在哪里啊?
服务器端肯定负责发送消息,把j2se的逻辑代码嵌入到业务系统中就可以 配置一下主题应该就可以
求联系方式啊
您好 ,小白正在研究这个mqtt推送,代码给我发一下吧549372543@qq.com. 非常感谢啊
使用的是mqtt-client-0.4.0这个包,但提示java.lang.NoClassDefFoundError: org.eclipse.paho.client.mqttv3.MqttClient
请教下是什么问题???
设置cleansession=false后不能接收在线消息,设置cleansession=true不能接收离线消息,怎么能同时接收离线消息和在线消息呢?
离线消息和在线消息怎么同时接收呢?
你好,我也遇到相同的问题,你这个问题是怎么解决的,能分享下么?
我用的是apollo,但是在apollo重启后,客户端第一次连接到apollo,如果这个时候给这个客户端已订阅的topic发一条消息,客户端无法获取消息,但客户端在这个时候断开连接,然后第二次连接到apollo,就可以获取到先前发的这个topic,请教下这是什么问题?谢谢
新手小白,求Demo,754655668@qq.com
新手小白,我想请问下,是不是服务器作为发布,客户端作为订阅,然后可以用mosquitto或者其他来做代理服务器。还有就是这个你写的server是做什么用的类。。