package com.adxinfo.adsp.ability.eventcenter.bus;

import com.adxinfo.adsp.ability.eventcenter.bus.constants.GlobalConstantsKafka;
import com.adxinfo.adsp.common.utils.RequestUtils;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:com/adxinfo/adsp/ability/eventcenter/bus/GreenChanel.class */
public class GreenChanel {
    private static final Logger log = LoggerFactory.getLogger(GreenChanel.class);

    @Autowired
    private KafkaConfig kafkaConfig;

    @Value("${spring.profiles}")
    private String springProfiles;

    @Value("${platform.server.projectId:null}")
    private String serverProject;

    @Value("${spring.application.name}")
    private String applicationName;
    HashMap<String, KafkaConsumer> mapConsumers = new HashMap<>();

    private void sendToTopic(String str, String str2) {
        log.info("发送消息开始：topic:{},发送json数据:{}", str, str2);
        KafkaProducer kafkaProducer = new KafkaProducer(this.kafkaConfig.getProductProp());
        ArrayList arrayList = new ArrayList(128);
        ProducerRecord producerRecord = new ProducerRecord(str + "_" + this.springProfiles, str2);
        producerRecord.headers().add("projectId", getProjectId().getBytes());
        log.error("发送到topic：" + str + "_" + this.springProfiles);
        arrayList.add(kafkaProducer.send(producerRecord));
        kafkaProducer.flush();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                System.out.println("Produce ok:" + ((RecordMetadata) ((Future) it.next()).get()).toString());
            } catch (Throwable th) {
                th.printStackTrace();
            }
        }
    }

    public void sendToTopic(String str, JSONObject jSONObject) {
        sendToTopic(str, jSONObject.toJSONString());
    }

    public void sendEventData(String str, JSONObject jSONObject, String str2) {
        JSONObject jSONObject2 = new JSONObject();
        jSONObject2.put("eventTypeCode", str);
        jSONObject2.put("content", jSONObject);
        jSONObject2.put("publisher", str2);
        log.info("发送消息：发送至{}的topic,发送json数据:{}", GlobalConstantsKafka.TOPIC_PREFIX + str, jSONObject2);
        sendToTopic(GlobalConstantsKafka.TOPIC_PREFIX + str, jSONObject2);
    }

    public void addEventListener(String str, final AbstractEventListener abstractEventListener) {
        if (!str.contains(GlobalConstantsKafka.TOPIC_PREFIX)) {
            str = GlobalConstantsKafka.TOPIC_PREFIX + str;
        }
        log.info(">>>>绿色通道准备监听：" + str + "_" + this.springProfiles + " bootstrapServers:" + this.kafkaConfig.getBootstrapServers());
        if (this.mapConsumers.containsKey(str + this.applicationName)) {
            throw new RuntimeException("事件类型在该项目中已经订阅!");
        }
        final KafkaConsumer kafkaConsumer = new KafkaConsumer(this.kafkaConfig.getConsumerProp(this.applicationName));
        kafkaConsumer.subscribe(Collections.singletonList(str + "_" + this.springProfiles));
        log.info(">>>>>>>>>>>>" + this.applicationName + "正在监听：" + str + "_" + this.springProfiles);
        this.mapConsumers.put(str + this.applicationName, kafkaConsumer);
        final String str2 = str;
        new ThreadPoolExecutor(GlobalConstantsKafka.POOL_SIZE.intValue(), GlobalConstantsKafka.MAX_POOL_SIZE.intValue(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), new ThreadPoolExecutor.CallerRunsPolicy()).execute(new Runnable() { // from class: com.adxinfo.adsp.ability.eventcenter.bus.GreenChanel.1
            @Override // java.lang.Runnable
            public void run() {
                while (null != GreenChanel.this.mapConsumers.get(str2 + GreenChanel.this.applicationName)) {
                    try {
                        Iterator it = kafkaConsumer.poll(1000L).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            GreenChanel.log.debug(">>>>>>>>>>>>>>>" + ((String) consumerRecord.value()));
                            try {
                                abstractEventListener.invoke(JSONObject.parseObject((String) consumerRecord.value()));
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    } catch (Exception e2) {
                        GreenChanel.log.error("线程执行异常:{}", e2);
                        e2.printStackTrace();
                    }
                }
                GreenChanel.log.debug(">>>>>>>>>>>>" + GreenChanel.this.applicationName + "监听停止：" + str2 + "_" + GreenChanel.this.springProfiles);
            }
        });
    }

    public void removeEventListener(String str) {
        log.info(">>>>>>>>>>>>" + this.applicationName + "移除监听：" + str + "_" + this.springProfiles);
        KafkaConsumer kafkaConsumer = this.mapConsumers.get(str + this.applicationName);
        if (null != kafkaConsumer) {
            this.mapConsumers.remove(str + this.applicationName);
            kafkaConsumer.close();
        }
    }

    public String getProjectId() {
        String str = this.serverProject;
        if (StringUtils.isEmpty(this.serverProject) || "ability".equals(this.serverProject)) {
            str = RequestUtils.currentUser().getProjectId();
        }
        return str == null ? "" : str;
    }
}
