본문 바로가기
블로그 이미지

방문해 주셔서 감사합니다! 항상 행복하세요!

  
   - 문의사항은 메일 또는 댓글로 언제든 연락주세요.
   - "해줘","답 내놔" 같은 질문은 답변드리지 않습니다.
   - 메일주소 : lts06069@naver.com


Java(자바)

Java Mqtt 연동(MqttClient, MqttCallback, org.eclipse.paho)

야근없는 행복한 삶을 위해 ~
by 마샤와 곰 2020. 3. 5.

 

워낙 유명한 org.eclipse.paho 라이브러리를 활용한 자바연동 방법이다.

메이븐에 가서 해당 라이브러리를 받기위해 검색하면 쉽게 받을 수 있다.

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

 

대부분의 기능은 MqttClient 클래스에 집중되어 있으며, 해당 클래스를 통해서 접속, 구독, 종료 등을 할 수 있다.

또한 MqttCallback 인터페이스를 상속받으면 콜백행위를 정의할 수 있다.

 

사용자가 사용할 클래스를 만들어주자.

클래스 이름은 MyMqttClient로 하였다.

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;


public class MyMqttClient implements MqttCallback{

	private MqttClient client;
	private MqttConnectOptions option;	
	

	public MyMqttClient init(String userName, String password, String serverURI, String clientId){
		option = new MqttConnectOptions();
		option.setCleanSession(true);
		option.setKeepAliveInterval(30);
		option.setUserName(userName);
		option.setPassword(password.toCharArray());  //옵션 객체에 접속하기위한 세팅끝!
		try {
			client = new MqttClient(serverURI, clientId);
			client.setCallback(this);
			client.connect(option);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return this;
	}
	


	
	@Override
	public void connectionLost(Throwable arg0) {

	}	
	
	@Override
	public void deliveryComplete(IMqttDeliveryToken arg0) {

	}


	@Override
	public void messageArrived(String arg0, MqttMessage arg1) throws Exception {

	}

}

 

MqttCallback 인터페이스를 상속받는다.

해당인터페이스를 상속받으면 3개의 메소드를 오버라이딩 할 수 있다.

메시지 도착, 커넥션종료, 메시지 전달완료 등 3가지 사항에 대해서 행위를 지정 할 수 있다.

 

가장 먼저 init이라는 메소드를 만들어서 서버에 접속하는 부분을 완성 하였다.

MqttClient를 글로벌 변수로 선언한 뒤에 객체를 생성 하였다.

init 메소드 내용이 직관적고 크게 어렵지 않으므로 이해하기 쉬울 것 이다.

 

Mqtt에서의 메시지를 받기 위해서는 topic을 구독해야한다.

이에 따른 메소드는 Mqttclient가 제공하는 subscribe 메소드를 사용하면 된다.

MyMqttClient 클래스에다가 아래 subscribe메소드를 추가하자.

	//구독 대상 전달
	public boolean subscribe(String... topics){
		try {
			if(topics != null){
				for(String topic : topics){
					client.subscribe(topic,0);  //구독할 주제, 숫자는 품질 값
				}
			}			
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}
	

 

해당 메소드를 추가하였다.

그렇다면 다른 클래스에서 MyMqttClient 클래스를 한번 호출하여보자.

public static void main(String [] args){

    MyMqttClient client = new MyMqttClient();
		
    client.init("접속아이디", "접속비번", "tcp://주소:1883", "고유아이디")
          .subscribe(new String[]{"구독할주제1","구독할주제2"});
}

 

요렇게까지 완성하고나면 아이디와 비밀번호 그리고 주소와 포트번호가 틀리지 않는이상 아무런 문제없이 클래스가 동작하는 것을 볼 수 있다.

에러가 안나면 다음단계로 진행하자~

 

메시지를 보내는 메소드는 MqttClient 클래스에서 publish라는 메소드를 호출하면 된다.

아래 소스코드를 MyMqttClient 클래스에 추가하여보자.

	//전송
	public boolean sender(String topic, String msg) throws MqttPersistenceException, MqttException{
		MqttMessage message = new MqttMessage();
		message.setPayload(msg.getBytes());  //보낼 메시지
		client.publish(topic, message);  //토픽과 함께 보낸다.
		return false;
	}

 

여기까지 하고나면 전송하는 메소드도 만들어졌는데..이제 문제는 메시지를 받을 때 행위를 어떻게 할 것인지에 대한 정의이다.

MyMqttClient클래스에다가 직접 코딩을 하여도 되지만 그렇게 할 경우에 소스코드가 하드코딩되어 다른 클래스에서 사용하지 못하는 경우가 발생한다.

그러므로 여기서는 Java의 기본 함수인 Consumer를 통해서 콜백행위를 지정해 보도록 하였다.

 

먼저 메시지를 받을 때 동작하는 메소드는 MqttCallback 인터페이스를 상속받아서 오버라이딩한 메소드인 messageArrived 이다.

해당 메소드가 동작 할 때 사용자가 지정한 함수가 동작하도록 수정하면 다른 클래스에서도 사용이 가능한 생산적인 기능을 만들 수 있다.

 

테스트용 메인메소드가 존재하는 클래스에서 함수를 먼저 정의하여보자.

public static void main(String [] args){

    final Consumer<HashMap<Object, Object>> pdk = (arg)->{  //해쉬맵을 받는 함수를 정의하자.
        arg.forEach((key, value)->{
		    System.out.println( String.format("대상, 키 -> %s, 값 -> %s", key, value) );
        });			
    };


    MyMqttClient client = new MyMqttClient();
    client.init("접속아이디", "접속비번", "tcp://주소:1883", "고유아이디")
          .subscribe(new String[]{"구독할주제1","구독할주제2"});
}

 

해쉬맵을 받는 함수를 정의한 다음 해당 함수를 MyMqttClient 클래스의 생성자나 다른 메소드를 만들어 전달하여 주자.

여기서는 MyMqttClient 생성자에 넣도록 하였다.

public class MyMqttClient implements MqttCallback{

    private MqttClient client;
    private MqttConnectOptions option;
    private Consumer<HashMap<Object, Object>> FNC = null;  //메시지 도착 후 응답하는 함수

    public MyMqttClient (Consumer<HashMap<Object, Object>> fnc){  //생성자를 추가하자.
        this.FNC = fnc;
    }
    
    //...생략
}    

 

MyMqttClient가 오버라이딩 한 messageArrived 메소드를 수정하자.

    //메시지 도착
    @Override
    public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
        if(FNC != null){
            HashMap<Object, Object> result = new HashMap<>();
            result.put("topic", arg0);
            result.put("message", new String(arg1.getPayload(),"UTF-8"));
            FNC.accept(result);  //콜백행위 실행
        }
    }

 

위 방법과 동일하게 connectionLost 메소드와 deliveryComplete 메소드에도 동일하게 기능을 부여하면 된다.

최종 완성된 모습이다.

import java.util.HashMap;
import java.util.function.Consumer;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

public class MyMqttClient implements MqttCallback{

    private MqttClient client;
    private MqttConnectOptions option;
    private Consumer<HashMap<Object, Object>> FNC = null;  //메시지 도착 후 응답하는 함수
    private Consumer<HashMap<Object, Object>> FNC2 = null; //커넥션이 끊긴 후 응답하는 함수
    private Consumer<HashMap<Object, Object>> FNC3 = null; //전송이 완료된 이후 응답하는 함수.

	
    /**
     * 기본 생성자로 Predicate를 받습니다. 해당 Predicate는 메시지가 도착한 행위에 대한 콜백함수입니다.<br>
     * 해당 함수를 구현하지 않으면 클래스를 생성 할 수 없습니다.
     * **/
    public MyMqttClient (Consumer<HashMap<Object, Object>> fnc){
        this.FNC = fnc;
    }
	
    /**
     * 설정파일을 등록합니다.<br>
     * 파라미터는 총 4개가 필요합니다.<br>
     * 사용자이름, 비밀번호, 주소, 접속후에 사용할 아이디값 입니다.
     * */
    public MyMqttClient init(String userName, String password, String serverURI, String clientId){
        option = new MqttConnectOptions();
        option.setCleanSession(true);
        option.setKeepAliveInterval(30);
        option.setUserName(userName);
        option.setPassword(password.toCharArray());
        try {
            client = new MqttClient(serverURI, clientId);
            client.setCallback(this);
            client.connect(option);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return this;
    }
	
    /***
     * 전송 메소드입니다.
     *
    **/
    public boolean sender(String topic, String msg) throws MqttPersistenceException, MqttException{
        MqttMessage message = new MqttMessage();
        message.setPayload(msg.getBytes());
        client.publish(topic, message);
        return true;
    }
	
    /***
     * 구독 대상을 전달합니다.
     *
     * **/
    public boolean subscribe(String... topics){
        try {
            if(topics != null){
                for(String topic : topics){
                    client.subscribe(topic,0);
                }
            }			
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
	
    /**
     * 커넥션이 끊어진 이후의 콜백행위를 등록합니다.<br>
     * 해쉬맵 형태의 결과에 키는 result, 값은 Throwable 객체를 반환 합니다. 
     * **/
    public void initConnectionLost (Consumer<HashMap<Object, Object>> fnc){
        FNC2 = fnc;
    }
	

    /**
     * 커넥션이 끊어진 이후의 콜백행위를 등록합니다.<br>
     * 해쉬맵 형태의 결과에 키는 result, 값은 IMqttDeliveryToken 객체를 반환 합니다. 
     * **/
    public void initDeliveryComplete (Consumer<HashMap<Object, Object>> fnc){
        FNC3 = fnc;
    }
	
    /**
     * 종료메소드입니다.<br>
     * 클라이언트를 종료 합니다.
     * */
    public void close(){
        if(client != null){
            try {
                client.disconnect();
                client.close();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }
	
    @Override
    public void connectionLost(Throwable arg0) {
        if(FNC2 != null){
            HashMap<Object, Object> result = new HashMap<>();
            result.put("result", arg0);
            FNC2.accept(result);
            arg0.printStackTrace();
        }
    }	
	
    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {
        if(FNC3 != null){
            HashMap<Object, Object> result = new HashMap<>();
            try {
                result.put("result", arg0);
            } catch (Exception e) {
                e.printStackTrace();
                result.put("result", "ERROR");
                result.put("error", e.getMessage());
            }
            FNC3.accept(result);
        }
    }

    //메시지 도착
    @Override
    public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
        if(FNC != null){
            HashMap<Object, Object> result = new HashMap<>();
            result.put("topic", arg0);
            result.put("message", new String(arg1.getPayload(),"UTF-8"));
            FNC.accept(result);  //콜백행위 실행
        }
    }
}

 

MyMqttClient 클래스를 사용하는 테스트용 클래스 예제이다.

Consumer에 제네릭을 해쉬맵으로 사용하였지만 원하는 타입으로 바꾸어도 상관 없다.

import java.util.HashMap;
import java.util.function.Consumer;

public class TestMqttMainMethod {
    
    
    public static void main(String [] args){
        
        final Consumer<HashMap<Object, Object>> pdk = (arg)->{  //메시지를 받는 콜백 행위
            arg.forEach((key, value)->{
                System.out.println( String.format("메시지 도착 : 키 -> %s, 값 -> %s", key, value) );
            });            
        };

        MyMqttClient client = new MyMqttClient(pdk);  //해당 함수를 생성자로 넣어준다.

        client.init("접속아이디", "비밀번호", "tcp://주소:포트번호", "아이디")
              .subscribe(new String[]{"주제1","주제2"});  //subscribe 메소드는 구독할 대상


        client.initConnectionLost( (arg)->{  //콜백행위1, 서버와의 연결이 끊기면 동작
            arg.forEach((key, value)->{
                System.out.println( String.format("커넥션 끊김~! 키 -> %s, 값 -> %s", key, value) );
            });
        });

        client.initDeliveryComplete((arg)-> {  //콜백행위2, 메시지를 전송한 이후 동작
            arg.forEach((key, value)->{
                System.out.println( String.format("메시지 전달 완료~! 키 -> %s, 값 -> %s", key, value) );
            });
        });


        new Thread( ()->{
            try {
                Thread.sleep(9000);
                client.sender("new_topic", "Hello world! 한글한글!");  //이런식으로 보낸다.
                client.close();  //종료는 이렇게!
            } catch (Exception e) {
                e.printStackTrace();
            }
        } ).start();        
        
    }
}

 

아파치에서 제공하는 ActiveMq로 간단히 테스트해본 모습이다.

잘 되는군~

 

 

MqttClient는 사용자가 종료 메소드를 호출 할 때 까지 Mqtt서버와의 연결을 계속해서 유지한다.

그러므로 disconnect 메소드가 구현된 close 메소드를 호출해 주어야 동작을 멈추게 된다.

 

 

반응형
* 위 에니메이션은 Html의 캔버스(canvas)기반으로 동작하는 기능 입니다. Html 캔버스 튜토리얼 도 한번 살펴보세요~ :)
* 직접 만든 Html 캔버스 애니메이션 도 한번 살펴보세요~ :)

댓글