package com.adxinfo.common.data.adxp.message.impl;

import com.adxinfo.common.data.adxp.common.MsgUtils;
import com.adxinfo.common.data.adxp.config.Configuration;
import com.adxinfo.common.data.adxp.message.IMsg;
import com.adxinfo.common.data.adxp.message.IMsgClient;
import com.adxinfo.common.data.adxp.message.IMsgReceiver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/adxinfo/common/data/adxp/message/impl/ActiveMQClient.class */
public class ActiveMQClient implements IMsgClient {
    private static Log log = LogFactory.getLog(ActiveMQClient.class);
    private PooledConnectionFactory connectionFactory;
    private Map<String, List<IMsgReceiver>> msgReceivers = Collections.synchronizedMap(new HashMap());

    @Override // com.adxinfo.common.data.adxp.message.IMsgClient
    public void init(Map<String, Object> map) throws JMSException {
        log.info("start init!!");
        this.connectionFactory = new PooledConnectionFactory(new ActiveMQConnectionFactory(String.valueOf(map.get(Configuration.PropKey.NACOS_BUS_MQ_ACTIVEMQ_USERNAME.getValue())), String.valueOf(map.get(Configuration.PropKey.NACOS_BUS_MQ_ACTIVEMQ_PASSWORD.getValue())), String.valueOf(map.get(Configuration.PropKey.NACOS_BUS_MQ_ACTIVEMQ_BROKER.getValue()))));
        this.connectionFactory.setMaximumActiveSessionPerConnection(Integer.parseInt(map.get(Configuration.PropKey.NACOS_BUS_MQ_ACTIVEMQ_MAX_ACTIVE.getValue()).toString()));
        this.connectionFactory.setIdleTimeout(Integer.parseInt(map.get(Configuration.PropKey.NACOS_BUS_MQ_ACTIVEMQ_IDEL_TIMEOUT.getValue()).toString()));
        this.connectionFactory.setMaxConnections(Integer.parseInt(map.get(Configuration.PropKey.NACOS_BUS_MQ_ACTIVEMQ_MAX_CONN.getValue()).toString()));
        this.connectionFactory.setBlockIfSessionPoolIsFull(Boolean.getBoolean(map.get(Configuration.PropKey.NACOS_BUS_MQ_ACTIVEMQ_IS_BLOCK.getValue()).toString()));
    }

    @Override // com.adxinfo.common.data.adxp.message.IMsgClient
    public void sendMsgToTopic(String str, IMsg iMsg) throws JMSException, IOException {
        TopicConnection createTopicConnection = this.connectionFactory.createTopicConnection();
        createTopicConnection.start();
        Session createSession = createTopicConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createTopic(str));
        createProducer.send(createSession.createTextMessage(MsgUtils.serialize(iMsg)));
        createProducer.close();
        createSession.close();
        createTopicConnection.close();
    }

    @Override // com.adxinfo.common.data.adxp.message.IMsgClient
    public void registTopicReceiver(String str, final IMsgReceiver iMsgReceiver) throws JMSException {
        List<IMsgReceiver> list = this.msgReceivers.get(str);
        if (list == null) {
            this.msgReceivers.put(str, new ArrayList<IMsgReceiver>() { // from class: com.adxinfo.common.data.adxp.message.impl.ActiveMQClient.1
                {
                    add(iMsgReceiver);
                }
            });
            listenTopic(str);
        } else if (list.contains(iMsgReceiver)) {
            list.add(iMsgReceiver);
        }
    }

    private void listenTopic(final String str) throws JMSException {
        TopicConnection createTopicConnection = this.connectionFactory.createTopicConnection();
        createTopicConnection.start();
        Session createSession = createTopicConnection.createSession(false, 1);
        createSession.createConsumer(createSession.createTopic(str)).setMessageListener(new MessageListener() { // from class: com.adxinfo.common.data.adxp.message.impl.ActiveMQClient.2
            public void onMessage(final Message message) {
                for (final IMsgReceiver iMsgReceiver : (List) ActiveMQClient.this.msgReceivers.get(str)) {
                    new Thread(new Runnable() { // from class: com.adxinfo.common.data.adxp.message.impl.ActiveMQClient.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            iMsgReceiver.receiveMsg(message);
                        }
                    }).start();
                }
            }
        });
    }
}
