워낙 유명한 org.eclipse.paho 라이브러리를 활용한 자바연동 방법이다.
메이븐에 가서 해당 라이브러리를 받기위해 검색하면 쉽게 받을 수 있다.
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
대부분의 기능은 MqttClient 클래스에 집중되어 있으며, 해당 클래스를 통해서 접속, 구독, 종료 등을 할 수 있다.
또한 MqttCallback 인터페이스를 상속받으면 콜백행위를 정의할 수 있다.
사용자가 사용할 클래스를 만들어주자.
클래스 이름은 MyMqttClient로 하였다.
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MyMqttClient implements MqttCallback{
private MqttClient client;
private MqttConnectOptions option;
public MyMqttClient init(String userName, String password, String serverURI, String clientId){
option = new MqttConnectOptions();
option.setCleanSession(true);
option.setKeepAliveInterval(30);
option.setUserName(userName);
option.setPassword(password.toCharArray()); //옵션 객체에 접속하기위한 세팅끝!
try {
client = new MqttClient(serverURI, clientId);
client.setCallback(this);
client.connect(option);
} catch (Exception e) {
e.printStackTrace();
}
return this;
}
@Override
public void connectionLost(Throwable arg0) {
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
}
}
MqttCallback 인터페이스를 상속받는다.
해당인터페이스를 상속받으면 3개의 메소드를 오버라이딩 할 수 있다.
메시지 도착, 커넥션종료, 메시지 전달완료 등 3가지 사항에 대해서 행위를 지정 할 수 있다.
가장 먼저 init이라는 메소드를 만들어서 서버에 접속하는 부분을 완성 하였다.
MqttClient를 글로벌 변수로 선언한 뒤에 객체를 생성 하였다.
init 메소드 내용이 직관적고 크게 어렵지 않으므로 이해하기 쉬울 것 이다.
Mqtt에서의 메시지를 받기 위해서는 topic을 구독해야한다.
이에 따른 메소드는 Mqttclient가 제공하는 subscribe 메소드를 사용하면 된다.
MyMqttClient 클래스에다가 아래 subscribe메소드를 추가하자.
//구독 대상 전달
public boolean subscribe(String... topics){
try {
if(topics != null){
for(String topic : topics){
client.subscribe(topic,0); //구독할 주제, 숫자는 품질 값
}
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
해당 메소드를 추가하였다.
그렇다면 다른 클래스에서 MyMqttClient 클래스를 한번 호출하여보자.
public static void main(String [] args){
MyMqttClient client = new MyMqttClient();
client.init("접속아이디", "접속비번", "tcp://주소:1883", "고유아이디")
.subscribe(new String[]{"구독할주제1","구독할주제2"});
}
요렇게까지 완성하고나면 아이디와 비밀번호 그리고 주소와 포트번호가 틀리지 않는이상 아무런 문제없이 클래스가 동작하는 것을 볼 수 있다.
에러가 안나면 다음단계로 진행하자~
메시지를 보내는 메소드는 MqttClient 클래스에서 publish라는 메소드를 호출하면 된다.
아래 소스코드를 MyMqttClient 클래스에 추가하여보자.
//전송
public boolean sender(String topic, String msg) throws MqttPersistenceException, MqttException{
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes()); //보낼 메시지
client.publish(topic, message); //토픽과 함께 보낸다.
return false;
}
여기까지 하고나면 전송하는 메소드도 만들어졌는데..이제 문제는 메시지를 받을 때 행위를 어떻게 할 것인지에 대한 정의이다.
MyMqttClient클래스에다가 직접 코딩을 하여도 되지만 그렇게 할 경우에 소스코드가 하드코딩되어 다른 클래스에서 사용하지 못하는 경우가 발생한다.
그러므로 여기서는 Java의 기본 함수인 Consumer를 통해서 콜백행위를 지정해 보도록 하였다.
먼저 메시지를 받을 때 동작하는 메소드는 MqttCallback 인터페이스를 상속받아서 오버라이딩한 메소드인 messageArrived 이다.
해당 메소드가 동작 할 때 사용자가 지정한 함수가 동작하도록 수정하면 다른 클래스에서도 사용이 가능한 생산적인 기능을 만들 수 있다.
테스트용 메인메소드가 존재하는 클래스에서 함수를 먼저 정의하여보자.
public static void main(String [] args){
final Consumer<HashMap<Object, Object>> pdk = (arg)->{ //해쉬맵을 받는 함수를 정의하자.
arg.forEach((key, value)->{
System.out.println( String.format("대상, 키 -> %s, 값 -> %s", key, value) );
});
};
MyMqttClient client = new MyMqttClient();
client.init("접속아이디", "접속비번", "tcp://주소:1883", "고유아이디")
.subscribe(new String[]{"구독할주제1","구독할주제2"});
}
해쉬맵을 받는 함수를 정의한 다음 해당 함수를 MyMqttClient 클래스의 생성자나 다른 메소드를 만들어 전달하여 주자.
여기서는 MyMqttClient 생성자에 넣도록 하였다.
public class MyMqttClient implements MqttCallback{
private MqttClient client;
private MqttConnectOptions option;
private Consumer<HashMap<Object, Object>> FNC = null; //메시지 도착 후 응답하는 함수
public MyMqttClient (Consumer<HashMap<Object, Object>> fnc){ //생성자를 추가하자.
this.FNC = fnc;
}
//...생략
}
MyMqttClient가 오버라이딩 한 messageArrived 메소드를 수정하자.
//메시지 도착
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
if(FNC != null){
HashMap<Object, Object> result = new HashMap<>();
result.put("topic", arg0);
result.put("message", new String(arg1.getPayload(),"UTF-8"));
FNC.accept(result); //콜백행위 실행
}
}
위 방법과 동일하게 connectionLost 메소드와 deliveryComplete 메소드에도 동일하게 기능을 부여하면 된다.
최종 완성된 모습이다.
import java.util.HashMap;
import java.util.function.Consumer;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
public class MyMqttClient implements MqttCallback{
private MqttClient client;
private MqttConnectOptions option;
private Consumer<HashMap<Object, Object>> FNC = null; //메시지 도착 후 응답하는 함수
private Consumer<HashMap<Object, Object>> FNC2 = null; //커넥션이 끊긴 후 응답하는 함수
private Consumer<HashMap<Object, Object>> FNC3 = null; //전송이 완료된 이후 응답하는 함수.
/**
* 기본 생성자로 Predicate를 받습니다. 해당 Predicate는 메시지가 도착한 행위에 대한 콜백함수입니다.<br>
* 해당 함수를 구현하지 않으면 클래스를 생성 할 수 없습니다.
* **/
public MyMqttClient (Consumer<HashMap<Object, Object>> fnc){
this.FNC = fnc;
}
/**
* 설정파일을 등록합니다.<br>
* 파라미터는 총 4개가 필요합니다.<br>
* 사용자이름, 비밀번호, 주소, 접속후에 사용할 아이디값 입니다.
* */
public MyMqttClient init(String userName, String password, String serverURI, String clientId){
option = new MqttConnectOptions();
option.setCleanSession(true);
option.setKeepAliveInterval(30);
option.setUserName(userName);
option.setPassword(password.toCharArray());
try {
client = new MqttClient(serverURI, clientId);
client.setCallback(this);
client.connect(option);
} catch (Exception e) {
e.printStackTrace();
}
return this;
}
/***
* 전송 메소드입니다.
*
**/
public boolean sender(String topic, String msg) throws MqttPersistenceException, MqttException{
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());
client.publish(topic, message);
return true;
}
/***
* 구독 대상을 전달합니다.
*
* **/
public boolean subscribe(String... topics){
try {
if(topics != null){
for(String topic : topics){
client.subscribe(topic,0);
}
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 커넥션이 끊어진 이후의 콜백행위를 등록합니다.<br>
* 해쉬맵 형태의 결과에 키는 result, 값은 Throwable 객체를 반환 합니다.
* **/
public void initConnectionLost (Consumer<HashMap<Object, Object>> fnc){
FNC2 = fnc;
}
/**
* 커넥션이 끊어진 이후의 콜백행위를 등록합니다.<br>
* 해쉬맵 형태의 결과에 키는 result, 값은 IMqttDeliveryToken 객체를 반환 합니다.
* **/
public void initDeliveryComplete (Consumer<HashMap<Object, Object>> fnc){
FNC3 = fnc;
}
/**
* 종료메소드입니다.<br>
* 클라이언트를 종료 합니다.
* */
public void close(){
if(client != null){
try {
client.disconnect();
client.close();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
@Override
public void connectionLost(Throwable arg0) {
if(FNC2 != null){
HashMap<Object, Object> result = new HashMap<>();
result.put("result", arg0);
FNC2.accept(result);
arg0.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
if(FNC3 != null){
HashMap<Object, Object> result = new HashMap<>();
try {
result.put("result", arg0);
} catch (Exception e) {
e.printStackTrace();
result.put("result", "ERROR");
result.put("error", e.getMessage());
}
FNC3.accept(result);
}
}
//메시지 도착
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
if(FNC != null){
HashMap<Object, Object> result = new HashMap<>();
result.put("topic", arg0);
result.put("message", new String(arg1.getPayload(),"UTF-8"));
FNC.accept(result); //콜백행위 실행
}
}
}
MyMqttClient 클래스를 사용하는 테스트용 클래스 예제이다.
Consumer에 제네릭을 해쉬맵으로 사용하였지만 원하는 타입으로 바꾸어도 상관 없다.
import java.util.HashMap;
import java.util.function.Consumer;
public class TestMqttMainMethod {
public static void main(String [] args){
final Consumer<HashMap<Object, Object>> pdk = (arg)->{ //메시지를 받는 콜백 행위
arg.forEach((key, value)->{
System.out.println( String.format("메시지 도착 : 키 -> %s, 값 -> %s", key, value) );
});
};
MyMqttClient client = new MyMqttClient(pdk); //해당 함수를 생성자로 넣어준다.
client.init("접속아이디", "비밀번호", "tcp://주소:포트번호", "아이디")
.subscribe(new String[]{"주제1","주제2"}); //subscribe 메소드는 구독할 대상
client.initConnectionLost( (arg)->{ //콜백행위1, 서버와의 연결이 끊기면 동작
arg.forEach((key, value)->{
System.out.println( String.format("커넥션 끊김~! 키 -> %s, 값 -> %s", key, value) );
});
});
client.initDeliveryComplete((arg)-> { //콜백행위2, 메시지를 전송한 이후 동작
arg.forEach((key, value)->{
System.out.println( String.format("메시지 전달 완료~! 키 -> %s, 값 -> %s", key, value) );
});
});
new Thread( ()->{
try {
Thread.sleep(9000);
client.sender("new_topic", "Hello world! 한글한글!"); //이런식으로 보낸다.
client.close(); //종료는 이렇게!
} catch (Exception e) {
e.printStackTrace();
}
} ).start();
}
}
아파치에서 제공하는 ActiveMq로 간단히 테스트해본 모습이다.
MqttClient는 사용자가 종료 메소드를 호출 할 때 까지 Mqtt서버와의 연결을 계속해서 유지한다.
그러므로 disconnect 메소드가 구현된 close 메소드를 호출해 주어야 동작을 멈추게 된다.
'Java(자바)' 카테고리의 다른 글
Java에서 Firebase 데이터 베이스 연동하기(안드로이드 없이, without Android) (8) | 2020.06.25 |
---|---|
Java quartz synchronize (quartz 동기화, quartz 순서) (0) | 2020.05.26 |
Java excel poi 메모리 누수 대비(memory leak), 자바 xlsx만들기, Java xlsx 만들기 (6) | 2019.12.19 |
Java 매우 쉬운 날짜 사용 (LocalDate, Java 1.8 date), Java 현재 날짜 (0) | 2019.12.16 |
JAVA sftp 파일전송, 자바 sftp 파일전송 (JSch 파일전송) (2) | 2019.12.13 |
댓글