MQTT实现消息推送

本贴最后更新于 3870 天前,其中的信息可能已经时移世易

MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法

[java]

mqttClient.registerSimpleHandler(simpleCallbackHandler);
// 注册接收消息方法

[/java]

和订阅接主题

[java]</pre>
mqttClient.subscribe(TOPICS, QOS_VALUES);

// 订阅接主题
<pre>[/java]

服务端:

[java]</pre>
package com.gmcc.kuchuan.business;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttSimpleCallback;

/**
* MQTT消息发送与接收
* @author Join
*
*/
public class MqttBroker {
private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日志对象
// 连接参数
private final static String CONNECTION_STRING = "tcp://localhost:9901";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID = "master";// 客户端标识
private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别
private final static String[] TOPICS = { "Test/TestTopics/Topic1",
"Test/TestTopics/Topic2", "Test/TestTopics/Topic3",
"client/keepalive" };
private static MqttBroker instance = new MqttBroker();

private MqttClient mqttClient;

/**
* 返回实例对象
*
* @return
*/
public static MqttBroker getInstance() {
return instance;
}

/**
* 重新连接服务
*/
private void connect() throws MqttException {
logger.info("connect to mqtt broker.");
mqttClient = new MqttClient(CONNECTION_STRING);
logger.info("***********register Simple Handler***********");
SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法
mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
logger.info("***********subscribe receiver topics***********");
mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题

logger.info("***********CLIENT_ID:" + CLIENT_ID);
/**
* 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
*/
mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0],
true);// 增加心跳,保持网络通畅
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

logger.info("send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
mqttClient.publish("GMCC/client/" + clientId, message.getBytes(),
0, false);
} catch (MqttException e) {
logger.error(e.getCause());
e.printStackTrace();
}
}

/**
* 简单回调函数,处理server接收到的主题消息
*
* @author Join
*
*/
class SimpleCallbackHandler implements MqttSimpleCallback {

/**
* 当客户机和broker意外断开时触发 可以再此处理重新订阅
*/
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println("客户机和broker已经断开");
}

/**
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
@Override
public void publishArrived(String topicName, byte[] payload, int Qos,
boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅主题: " + topicName);
System.out.println("消息数据: " + new String(payload));
System.out.println("消息级别(0,1,2): " + Qos);
System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "
+ retained);
}

}

public static void main(String[] args) {
new MqttBroker().sendMessage("client", "message");
}
}
<pre>
[/java]

Android客户端:

核心代码:MQTTConnection内部类

[java]

<pre class="java" name="code">import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;

import android.app.AlarmManager;
import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.content.SharedPreferences;
import android.database.Cursor;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Binder;
import android.os.Bundle;
import android.os.IBinder;
import android.provider.ContactsContract;
import android.util.Log;
//此部分项目导包已被删除</pre><pre class="java" name="code">import com.ibm.mqtt.IMqttClient;
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttPersistence;
import com.ibm.mqtt.MqttPersistenceException;
import com.ibm.mqtt.MqttSimpleCallback;

/*
* PushService that does all of the work.
* Most of the logic is borrowed from KeepAliveService.
* http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
*/
public class PushService extends Service {
private MyBinder mBinder = new MyBinder();
// this is the log tag
public static final String TAG = "PushService";

// the IP address, where your MQTT broker is running.
private static final String MQTT_HOST = "120.197.230.53"; // "209.124.50.174";//
// the port at which the broker is running.
private static int MQTT_BROKER_PORT_NUM = 9901;
// Let's not use the MQTT persistence.
private static MqttPersistence MQTT_PERSISTENCE = null;
// We don't need to remember any state between the connections, so we use a
// clean start.
private static boolean MQTT_CLEAN_START = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested
// this value much. It could probably be increased.
private static short MQTT_KEEP_ALIVE = 60 * 15;
// Set quality of services to 0 (at most once delivery), since we don't want
// push notifications
// arrive more than once. However, this means that some messages might get
// lost (delivery is not guaranteed)
private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 };
private static int MQTT_QUALITY_OF_SERVICE = 0;
// The broker should not retain any messages.
private static boolean MQTT_RETAINED_PUBLISH = false;

// MQTT client ID, which is given the broker. In this example, I also use
// this for the topic header.
// You can use this to run push notifications for multiple apps with one
// MQTT broker.
public static String MQTT_CLIENT_ID = "client";

// These are the actions for the service (name are descriptive enough)
public static final String ACTION_START = MQTT_CLIENT_ID + ".START";
private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID
+ ".KEEP_ALIVE";
private static final String ACTION_RECONNECT = MQTT_CLIENT_ID
+ ".RECONNECT";

// Connection log for the push service. Good for debugging.
private ConnectionLog mLog;

// Connectivity manager to determining, when the phone loses connection
private ConnectivityManager mConnMan;
// Notification manager to displaying arrived push notifications
private NotificationManager mNotifMan;

// Whether or not the service has been started.
private boolean mStarted;

// This the application level keep-alive interval, that is used by the
// AlarmManager
// to keep the connection active, even when the device goes to sleep.
private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;

// Retry intervals, when the connection is lost.
private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;
private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;

// Preferences instance
private SharedPreferences mPrefs;
// We store in the preferences, whether or not the service has been started
public static final String PREF_STARTED = "isStarted";
// We also store the deviceID (target)
public static final String PREF_DEVICE_ID = "deviceID";
// We store the last retry interval
public static final String PREF_RETRY = "retryInterval";

// Notification title
public static String NOTIF_TITLE = "client";
// Notification id
private static final int NOTIF_CONNECTED = 0;

// This is the instance of an MQTT connection.
private MQTTConnection mConnection;
private long mStartTime;
boolean mShowFlag = true;// 是否显示通知
public static Context ctx;
private boolean mRunFlag = true;// 是否向服务器发送心跳
Timer mTimer = new Timer();

// Static method to start the service
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_START);
ctx.startService(i);
PushService.ctx = ctx;
}

// Static method to stop the service
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
}

// Static method to send a keep alive message
public static void actionPing(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
}

@Override
public void onCreate() {
super.onCreate();

log("Creating service");
mStartTime = System.currentTimeMillis();

try {
mLog = new ConnectionLog();
Log.i(TAG, "Opened log at " + mLog.getPath());
} catch (IOException e) {
Log.e(TAG, "Failed to open log", e);
}

// Get instances of preferences, connectivity manager and notification
// manager
mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);

/*
* If our process was reaped by the system for any reason we need to
* restore our state with merely a call to onCreate. We record the last
* "started" value and restore it here if necessary.
*/
handleCrashedService();
}

// This method does any necessary clean-up need in case the server has been
// destroyed by the system
// and then restarted
private void handleCrashedService() {
if (wasStarted() == true) {
log("Handling crashed service...");
// stop the keep alives
stopKeepAlives();

// Do a clean start
start();
}
}

@Override
public void onDestroy() {
log("Service destroyed (started=" + mStarted + ")");

// Stop the services, if it has been started
if (mStarted == true) {
stop();
}

try {
if (mLog != null)
mLog.close();
} catch (IOException e) {
}
}

@Override
public void onStart(Intent intent, int startId) {
super.onStart(intent, startId);
log("Service started with intent=" + intent);
if (intent == null) {
return;
}
// Do an appropriate action based on the intent.
if (intent.getAction().equals(ACTION_STOP) == true) {
stop();
stopSelf();
} else if (intent.getAction().equals(ACTION_START) == true) {
start();

} else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {
keepAlive();
} else if (intent.getAction().equals(ACTION_RECONNECT) == true) {
if (isNetworkAvailable()) {
reconnectIfNecessary();
}
}
}

public class MyBinder extends Binder {
public PushService getService() {
return PushService.this;
}
}

@Override
public IBinder onBind(Intent intent) {
return mBinder;
}

// log helper function
private void log(String message) {
log(message, null);
}

private void log(String message, Throwable e) {
if (e != null) {
Log.e(TAG, message, e);

} else {
Log.i(TAG, message);
}

if (mLog != null) {
try {
mLog.println(message);
} catch (IOException ex) {
}
}
}

// Reads whether or not the service has been started from the preferences
private boolean wasStarted() {
return mPrefs.getBoolean(PREF_STARTED, false);
}

// Sets whether or not the services has been started in the preferences.
private void setStarted(boolean started) {
mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
mStarted = started;
}

private synchronized void start() {
log("Starting service...");

// Do nothing, if the service is already running.
if (mStarted == true) {
Log.w(TAG, "Attempt to start connection that is already active");
return;
}

// Establish an MQTT connection

connect();

// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
mTimer.schedule(new TimerTask() {
@Override
public void run() {
if (!mRunFlag) {
// this.cancel();
// PushService.this.stopSelf();
return;
}
System.out.println("run");
try {
if (isNetworkAvailable()) {
SharedPreferences pref = getSharedPreferences(
"client", 0);
String MOBILE_NUM = pref.getString("MOBILE_NUM", "");
HttpUtil.post(Constants.KEEPALIVE + "&mobile="
+ MOBILE_NUM + "&online_flag=1");
}
} catch (Exception e) {
e.printStackTrace();
// TODO: handle exception
}
}
}, 0, 60 * 1000);
// Register a connectivity listener
registerReceiver(mConnectivityChanged, new IntentFilter(
ConnectivityManager.CONNECTIVITY_ACTION));
}

private synchronized void stop() {
// Do nothing, if the service is not running.
if (mStarted == false) {
Log.w(TAG, "Attempt to stop connection not active.");
return;
}

// Save stopped state in the preferences
setStarted(false);

// Remove the connectivity receiver
unregisterReceiver(mConnectivityChanged);
// Any existing reconnect timers should be removed, since we explicitly
// stopping the service.
cancelReconnect();

// Destroy the MQTT connection if there is one
if (mConnection != null) {
mConnection.disconnect();
mConnection = null;
}
}

//
private synchronized void connect() {
log("Connecting...");
// Thread t = new Thread() {
// @Override
// public void run() {
// fetch the device ID from the preferences.
String deviceID = "GMCC/client/"
+ mPrefs.getString(PREF_DEVICE_ID, null);

// Create a new connection only if the device id is not NULL
try {
mConnection = new MQTTConnection(MQTT_HOST, deviceID);
} catch (MqttException e) {
// Schedule a reconnect, if we failed to connect
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"));
if (isNetworkAvailable()) {
scheduleReconnect(mStartTime);
}
}
setStarted(true);
// }
// };
// t.start();
// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
}

private synchronized void keepAlive() {
try {
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null) {
mConnection.sendKeepAlive();
}
} catch (MqttException e) {
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"), e);

mConnection.disconnect();
mConnection = null;
cancelReconnect();
}
}

// Schedule application level keep-alives using the AlarmManager
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);
}

// Remove all scheduled keep alives
private void stopKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

// We schedule a reconnect based on the starttime of the service
public void scheduleReconnect(long startTime) {
// the last keep-alive interval
long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);

// Calculate the elapsed time since the start
long now = System.currentTimeMillis();
long elapsed = now - startTime;

// Set an appropriate interval based on the elapsed time since start
if (elapsed < interval) {
interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);
} else {
interval = INITIAL_RETRY_INTERVAL;
}

log("Rescheduling connection in " + interval + "ms.");

// Save the new internval
mPrefs.edit().putLong(PREF_RETRY, interval).commit();

// Schedule a reconnect using the alarm manager.
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);
}

// Remove the scheduled reconnect
public void cancelReconnect() {
Intent i = new Intent();
i.setClass(PushService.this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(PushService.this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

private synchronized void reconnectIfNecessary() {
log("mStarted" + mStarted);
log("mConnection" + mConnection);
if (mStarted == true && mConnection == null) {
log("Reconnecting...");
connect();
}
}

// This receiver listeners for network changes and updates the MQTT
// connection
// accordingly
private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {
@Override
public void onReceive(Context context, final Intent intent) {
// Get network info
// Thread mReconnect = new Thread(){
// public void run() {
NetworkInfo info = (NetworkInfo) intent
.getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO);
// Is there connectivity?
boolean hasConnectivity = (info != null && info.isConnected()) ? true
: false;

log("Connectivity changed: connected=" + hasConnectivity);

if (hasConnectivity) {
reconnectIfNecessary();
} else if (mConnection != null) {
// Thread cancelConn = new Thread(){
// public void run() {
// // if there no connectivity, make sure MQTT connection is
// destroyed
log("cancelReconnect");
mConnection.disconnect();
mConnection = null;
log("cancelReconnect" + mConnection);
cancelReconnect();
// }
// };
// cancelConn.start();
}
// };
//
// };
// mReconnect.start();
}
};

// Display the topbar notification
private void showNotification(String text, Request request) {

Notification n = new Notification();
n.flags |= Notification.FLAG_SHOW_LIGHTS;
n.flags |= Notification.FLAG_AUTO_CANCEL;
n.defaults = Notification.DEFAULT_ALL;
n.icon = R.drawable.ico;
n.when = System.currentTimeMillis();
Intent intent = new Intent();
Bundle bundle = new Bundle();
bundle.putSerializable("request", request);
bundle.putString("currentTab", "1");
intent.putExtras(bundle);
intent.setClass(this, MainActivity.class);
intent.setAction(Intent.ACTION_MAIN);
intent.addCategory(Intent.CATEGORY_LAUNCHER);
intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK
| Intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED);
// Simply open the parent activity
PendingIntent pi = PendingIntent.getActivity(this, 0, intent, 0);

// Change the name of the notification here
n.setLatestEventInfo(this, NOTIF_TITLE, text, pi);
mNotifMan.notify(NOTIF_CONNECTED, n);
}

// Check if we are online
private boolean isNetworkAvailable() {
NetworkInfo info = mConnMan.getActiveNetworkInfo();
if (info == null) {
return false;
}
return info.isConnected();
}

<span style="BACKGROUND-COLOR: #ccffff"> // This inner class is a wrapper on top of MQTT client.
private class MQTTConnection implements MqttSimpleCallback {
IMqttClient mqttClient = null;

// Creates a new connection given the broker address and initial topic
public MQTTConnection(String brokerHostName, String initTopic)
throws MqttException {
// Create connection spec
String mqttConnSpec = "tcp://" + brokerHostName + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
Log.d(TAG, "mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);

// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);

// Subscribe to an initial topic, which is combination of client ID
// and device ID.
// initTopic = MQTT_CLIENT_ID + "/" + initTopic;
subscribeToTopic(initTopic);

log("Connection established to " + brokerHostName + " on topic "
+ initTopic);

// Save start time
mStartTime = System.currentTimeMillis();
// Star the keep-alives
startKeepAlives();
}

// Disconnect
public void disconnect() {
// try {
stopKeepAlives();
log("stopKeepAlives");
Thread t = new Thread() {
public void run() {
try {
mqttClient.disconnect();
log("mqttClient.disconnect();");
} catch (MqttPersistenceException e) {
log("MqttException"
+ (e.getMessage() != null ? e.getMessage()
: " NULL"), e);
}
};
};
t.start();
// } catch (MqttPersistenceException e) {
// log("MqttException"
// + (e.getMessage() != null ? e.getMessage() : " NULL"),
// e);
// }
}

/*
* Send a request to the message broker to be sent messages published
* with the specified topic name. Wildcards are allowed.
*/
private void subscribeToTopic(String topicName) throws MqttException {

if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
// a connection
log("Connection error" + "No connection");
} else {
String[] topics = { topicName };
mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
}
}

/*
* Sends a message to the message broker, requesting that it be
* published to the specified topic.
*/
private void publishToTopic(String topicName, String message)
throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and publish if we don't have
// a connection
log("No connection to public to");
} else {
mqttClient.publish(topicName, message.getBytes(),
MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH);
}
}

/*
* Called if the application loses it's connection to the message
* broker.
*/
public void connectionLost() throws Exception {
log("Loss of connection" + "connection downed");
stopKeepAlives();
// 取消定时发送心跳
mRunFlag = false;
// 向服务器发送请求,更改在线状态
// SharedPreferences pref = getSharedPreferences("client",0);
// String MOBILE_NUM=pref.getString("MOBILE_NUM", "");
// HttpUtil.post(Constants.KEEPALIVE + "&mobile="
// + MOBILE_NUM+"&online_flag=0");
// null itself
mConnection = null;
if (isNetworkAvailable() == true) {
reconnectIfNecessary();
}
}

/*
* Called when we receive a message from the message broker.
*/
public void publishArrived(String topicName, byte[] payload, int qos,
boolean retained) throws MqttException {
// Show a notification
// synchronized (lock) {
String s = new String(payload);
Request request = null;
try {// 解析服务端推送过来的消息
request = XmlPaserTool.getMessage(new ByteArrayInputStream(s
.getBytes()));
// request=Constants.request;
} catch (Exception e) {
e.printStackTrace();
}
final Request mRequest = request;
DownloadInfo down = new DownloadInfo(mRequest);
down.setDownLoad(down);
downloadInfos.add(down);
sendUpdateBroast();
down.start();
showNotification("您有一条新的消息!", mRequest);
Log.d(PushService.TAG, s);
Log.d(PushService.TAG, mRequest.getMessageId());
// 再向服务端推送消息
new AdvancedCallbackHandler().sendMessage(MQTT_CLIENT_ID
+ "/keepalive", "***********send message**********");
}

public void sendKeepAlive() throws MqttException {
log("Sending keep alive");
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive",
mPrefs.getString(PREF_DEVICE_ID, ""));
}
}

class AdvancedCallbackHandler {
IMqttClient mqttClient = null;
public final int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别

/**
* 重新连接服务
*/
private void connect() throws MqttException {
String mqttConnSpec = "tcp://" + MQTT_HOST + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
Log.d(TAG, "连接服务器,推送消息");
Log.d(TAG, "**mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
Log.d(TAG, MQTT_CLIENT_ID + "/keepalive");
// 增加心跳,保持网络通畅
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
"keepalive".getBytes(), QOS_VALUES[0], true);
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

Log.d(TAG, "send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
// mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
// message.getBytes(), 0, false);
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
message.getBytes(), 0, false);
} catch (MqttException e) {
Log.d(TAG, e.getCause() + "");
e.printStackTrace();
}
}
}
</span>
public String getPeople(String phone_number) {
String name = "";
String[] projection = { ContactsContract.PhoneLookup.DISPLAY_NAME,
ContactsContract.CommonDataKinds.Phone.NUMBER };
Log.d(TAG, "getPeople ---------");
// 将自己添加到 msPeers 中
Cursor cursor = this.getContentResolver().query(
ContactsContract.CommonDataKinds.Phone.CONTENT_URI,
projection, // Which columns to return.
ContactsContract.CommonDataKinds.Phone.NUMBER + " = '"
+ phone_number + "'", // WHERE clause.
null, // WHERE clause value substitution
null); // Sort order.

if (cursor == null) {
Log.d(TAG, "getPeople null");
return "";
}
Log.d(TAG, "getPeople cursor.getCount() = " + cursor.getCount());
if (cursor.getCount() > 0) {
cursor.moveToPosition(0);

// 取得联系人名字
int nameFieldColumnIndex = cursor
.getColumnIndex(ContactsContract.PhoneLookup.DISPLAY_NAME);
name = cursor.getString(nameFieldColumnIndex);
Log.i("Contacts", "" + name + " .... " + nameFieldColumnIndex); // 这里提示
// force
// close
System.out.println("联系人姓名:" + name);
return name;
}
return phone_number;
}

public void sendUpdateBroast() {
Intent intent = new Intent();
intent.setAction("update");
sendBroadcast(intent);
}

public void sendUpdateFinishBroast() {
Intent intent = new Intent();
intent.setAction("updateFinishList");
sendBroadcast(intent);
}

public class DownloadInfo extends Thread {
boolean runflag = true;
Request mRequest;
public float progress;
public MessageBean bean = null;
DownloadInfo download = null;
MessageDetailDao dao = new MessageDetailDao(
PushService.this.getApplicationContext());

public synchronized void stopthread() {
runflag = false;
}

public synchronized boolean getrunflag() {
return runflag;
}

DownloadInfo(Request mRequest) {
this.mRequest = mRequest;

}

public void setDownLoad(DownloadInfo download) {
this.download = download;
}

@Override
public void run() {
try {

File dir = new File(Constants.DOWNLOAD_PATH);
if (!dir.exists()) {
dir.mkdirs();
}
String filePath = Constants.DOWNLOAD_PATH
+ mRequest.getMessageId() + "." + mRequest.getExt();
bean = new MessageBean();
bean.setPath(filePath);
bean.setStatus(0);
bean.setDate(mRequest.getTimestamp());
bean.setLayoutID(R.layout.list_say_he_item);
bean.setPhotoID(R.drawable.receive_ico);
bean.setMessage_key(mRequest.getMessageId());
bean.setPhone_number(mRequest.getReceiver());
bean.setAction(1);
String name = getPeople(mRequest.getSender());
bean.setName(name);
bean.setFileType(Integer.parseInt(mRequest.getCommand()));
if (mRequest.getCommand().equals(Request.TYPE_MUSIC)) {
bean.setMsgIco(R.drawable.music_temp);
bean.setText(name + "给你发送了音乐");
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_CARD)) {
bean.setMsgIco(R.drawable.card_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_LBS)) {
bean.setMsgIco(R.drawable.address_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_PHOTO)) {
bean.setText(name + "向你发送了照片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_PIC)) {
bean.setText(name + "向你发送了图片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_SMS)) {
bean.setFileType(0);
}

if (!mRequest.getCommand().equals(Request.TYPE_CARD)
&& !mRequest.getCommand().equals(Request.TYPE_SMS)) {
String path = Constants.FILE_DOWNLOAD_URL
+ mRequest.getMessageId();
URL url = new URL(path);
HttpURLConnection hurlconn = (HttpURLConnection) url
.openConnection();// 基于HTTP协议的连接对象
hurlconn.setConnectTimeout(5000);// 请求超时时间 5s
hurlconn.setRequestMethod("GET");// 请求方式
hurlconn.connect();
long fileSize = hurlconn.getContentLength();
InputStream instream = hurlconn.getInputStream();
byte[] buffer = new byte[1024];
int len = 0;
int number = 0;
RandomAccessFile rasf = new RandomAccessFile(filePath,
"rwd");
while ((len = instream.read(buffer)) != -1) {// 开始下载文件
if (getrunflag() && progress < 100) {
rasf.seek(number);
number += len;
rasf.write(buffer, 0, len);
progress = (((float) number) / fileSize) * 100;
// 发送广播,修改进度条进度
sendUpdateBroast();
} else {
this.interrupt();
if (number != fileSize) {// 取消下载,将已经缓存的未下载完成的文件删除
File file = new File(filePath);
if (file.exists())
file.delete();
}
PushService.downloadInfos.remove(download);
sendUpdateBroast();
return;
}
}
instream.close();
PushService.downloadInfos.remove(download);
sendUpdateBroast();
} else {// 收到消息,将信息保存到数据库

PushService.downloadInfos.remove(download);
sendUpdateBroast();
}
// 将文件信息保存到数据库
dao.create(bean);
sendUpdateFinishBroast();

} catch (Exception e) {
PushService.downloadInfos.remove(download);
sendUpdateBroast();
e.printStackTrace();
}
}
}

public static ArrayList<DownloadInfo> downloadInfos = new ArrayList<DownloadInfo>();

public ArrayList<DownloadInfo> getDownloadInfos() {
return PushService.downloadInfos;
}

public void setDownloadInfos(ArrayList<DownloadInfo> downloadInfos) {
PushService.downloadInfos = downloadInfos;
}
}</pre>
<p><br>
ps:</p>
<p>接收者必须订阅发送者的TOPICS才能收到消息</p>
<p> </p>
<pre></pre>
<pre></pre>
<pre></pre>
<pre></pre>

[/java]

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • GitLab

    GitLab 是利用 Ruby 一个开源的版本管理系统,实现一个自托管的 Git 项目仓库,可通过 Web 界面操作公开或私有项目。

    46 引用 • 72 回帖
  • Thymeleaf

    Thymeleaf 是一款用于渲染 XML/XHTML/HTML5 内容的模板引擎。类似 Velocity、 FreeMarker 等,它也可以轻易的与 Spring 等 Web 框架进行集成作为 Web 应用的模板引擎。与其它模板引擎相比,Thymeleaf 最大的特点是能够直接在浏览器中打开并正确显示模板页面,而不需要启动整个 Web 应用。

    11 引用 • 19 回帖 • 319 关注
  • Scala

    Scala 是一门多范式的编程语言,集成面向对象编程和函数式编程的各种特性。

    13 引用 • 11 回帖 • 108 关注
  • 倾城之链
    23 引用 • 66 回帖 • 102 关注
  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    53 引用 • 85 回帖
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 45 关注
  • Swagger

    Swagger 是一款非常流行的 API 开发工具,它遵循 OpenAPI Specification(这是一种通用的、和编程语言无关的 API 描述规范)。Swagger 贯穿整个 API 生命周期,如 API 的设计、编写文档、测试和部署。

    26 引用 • 35 回帖 • 13 关注
  • FreeMarker

    FreeMarker 是一款好用且功能强大的 Java 模版引擎。

    23 引用 • 20 回帖 • 428 关注
  • CAP

    CAP 指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得。

    11 引用 • 5 回帖 • 565 关注
  • 书籍

    宋真宗赵恒曾经说过:“书中自有黄金屋,书中自有颜如玉。”

    76 引用 • 390 回帖
  • React

    React 是 Facebook 开源的一个用于构建 UI 的 JavaScript 库。

    192 引用 • 291 回帖 • 443 关注
  • 房星科技

    房星网,我们不和没有钱的程序员谈理想,我们要让程序员又有理想又有钱。我们有雄厚的房地产行业线下资源,遍布昆明全城的 100 家门店、四千地产经纪人是我们坚实的后盾。

    6 引用 • 141 回帖 • 559 关注
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖 • 5 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    16 引用 • 53 回帖 • 123 关注
  • MySQL

    MySQL 是一个关系型数据库管理系统,由瑞典 MySQL AB 公司开发,目前属于 Oracle 公司。MySQL 是最流行的关系型数据库管理系统之一。

    675 引用 • 535 回帖
  • etcd

    etcd 是一个分布式、高可用的 key-value 数据存储,专门用于在分布式系统中保存关键数据。

    5 引用 • 26 回帖 • 492 关注
  • gRpc
    10 引用 • 8 回帖 • 54 关注
  • Hexo

    Hexo 是一款快速、简洁且高效的博客框架,使用 Node.js 编写。

    21 引用 • 140 回帖 • 28 关注
  • OpenStack

    OpenStack 是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管理通过前端界面管理员就可以完成,同样也可以通过 Web 接口让最终用户部署资源。

    10 引用 • 6 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖 • 3 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 23 关注
  • C

    C 语言是一门通用计算机编程语言,应用广泛。C 语言的设计目标是提供一种能以简易的方式编译、处理低级存储器、产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言。

    83 引用 • 165 回帖 • 44 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 47 关注
  • Pipe

    Pipe 是一款小而美的开源博客平台。Pipe 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    131 引用 • 1114 回帖 • 151 关注
  • 阿里云

    阿里云是阿里巴巴集团旗下公司,是全球领先的云计算及人工智能科技公司。提供云服务器、云数据库、云安全等云计算服务,以及大数据、人工智能服务、精准定制基于场景的行业解决方案。

    89 引用 • 345 回帖
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    35 引用 • 200 回帖 • 23 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 549 关注