package de.unijena.bioinf.ms.amqp.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import de.unijena.bioinf.ChemistryBase.jobs.SiriusJobs;
import de.unijena.bioinf.ChemistryBase.utils.IOFunctions;
import de.unijena.bioinf.ChemistryBase.utils.NetUtils;
import de.unijena.bioinf.fingerid.connection_pooling.PooledConnection;
import de.unijena.bioinf.jjobs.BasicJJob;
import de.unijena.bioinf.ms.amqp.client.jobs.AmqpWebJJob;
import de.unijena.bioinf.ms.amqp.client.jobs.JobMessage;
import de.unijena.bioinf.ms.properties.PropertyManager;
import de.unijena.bioinf.rabbitmq.RabbitMqChannelPool;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/unijena/bioinf/ms/amqp/client/AmqpClient.class */
public class AmqpClient {
    protected static AtomicLong MESSAGE_COUNTER;
    public static long JOB_TIME_OUT;
    protected static final String REGISTER_PREFIX;
    protected static final String CLIENT_EXCHANGE;
    protected static final String CLIENT_TYPE;
    protected final String clientID;
    protected final String userID;
    protected final RabbitMqChannelPool channelPool;
    protected final String consumerQ;
    protected final String registerRKey;
    protected final int threads;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected final List<String> consumerThreads = new ArrayList();
    protected final Map<String, AmqpWebJJob<?, ?, ?>> messageJobs = new ConcurrentHashMap();

    /* loaded from: input_file:de/unijena/bioinf/ms/amqp/client/AmqpClient$AMPQCallbackJJob.class */
    public class AMPQCallbackJJob extends BasicJJob<JobMessage<?>> {
        private final AMQP.BasicProperties properties;
        private final byte[] body;
        private final String consumerTag;
        static final /* synthetic */ boolean $assertionsDisabled;

        public AMPQCallbackJJob(String str, AMQP.BasicProperties basicProperties, byte[] bArr) {
            this.properties = basicProperties;
            this.body = bArr;
            this.consumerTag = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: compute, reason: merged with bridge method [inline-methods] */
        public JobMessage<?> m2compute() throws Exception {
            JobMessage<?> jobMessage = (JobMessage) new ObjectMapper().readValue(this.body, new TypeReference<Object>() { // from class: de.unijena.bioinf.ms.amqp.client.AmqpClient.AMPQCallbackJJob.1
            });
            AmqpWebJJob<?, ?, ?> amqpWebJJob = AmqpClient.this.messageJobs.get(jobMessage.getID());
            if (!$assertionsDisabled && !((String) amqpWebJJob.getJobId()).equals(jobMessage.getID())) {
                throw new AssertionError();
            }
            amqpWebJJob.update(jobMessage);
            return jobMessage;
        }

        static {
            $assertionsDisabled = !AmqpClient.class.desiredAssertionStatus();
        }
    }

    public AmqpClient(@NotNull RabbitMqChannelPool rabbitMqChannelPool, @NotNull String str, @NotNull String str2, int i) {
        this.channelPool = rabbitMqChannelPool;
        this.userID = str;
        this.clientID = str2;
        this.threads = i;
        this.consumerQ = CLIENT_TYPE + "." + str + "." + str2;
        this.registerRKey = REGISTER_PREFIX + "." + CLIENT_TYPE + "." + str + "." + str2;
    }

    public void startConsuming(long j) {
        NetUtils.tryAndWaitAsJJob(() -> {
            PooledConnection orderConnection = this.channelPool.orderConnection();
            try {
                ((Channel) orderConnection.connection).basicPublish(CLIENT_EXCHANGE, this.registerRKey, defaultProps().build(), new byte[0]);
                ((Channel) orderConnection.connection).waitForConfirms(5000L);
                if (orderConnection != null) {
                    orderConnection.close();
                }
            } catch (Throwable th) {
                if (orderConnection != null) {
                    try {
                        orderConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, j);
        if (!((AMQP.Queue.DeclareOk) NetUtils.tryAndWaitAsJJob(() -> {
            PooledConnection orderConnection = this.channelPool.orderConnection();
            try {
                AMQP.Queue.DeclareOk queueDeclarePassive = ((Channel) orderConnection.connection).queueDeclarePassive(this.consumerQ);
                if (orderConnection != null) {
                    orderConnection.close();
                }
                return queueDeclarePassive;
            } catch (Throwable th) {
                if (orderConnection != null) {
                    try {
                        orderConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, j)).getQueue().equals(this.consumerQ)) {
            throw new IllegalArgumentException("Illegal q name returned from Server");
        }
        LoggerFactory.getLogger(getClass()).info("Successfully created callback queue!");
        this.consumerThreads.add((String) NetUtils.tryAndWaitAsJJob(() -> {
            final Channel channel = (Channel) this.channelPool.orderConnection().connection;
            return channel.basicConsume(this.consumerQ, false, new DefaultConsumer(channel) { // from class: de.unijena.bioinf.ms.amqp.client.AmqpClient.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    SiriusJobs.getGlobalJobManager().submitJob(new AMPQCallbackJJob(str, basicProperties, bArr));
                    channel.basicAck(deliveryTag, false);
                }
            });
        }, j));
    }

    public <T, I, O, R> AmqpWebJJob<I, O, R> publish(@NotNull String str, T t, @NotNull Function<String, AmqpWebJJob<I, O, R>> function) throws IOException {
        return publish(str, t, obj -> {
            return new ObjectMapper().writeValueAsString(obj);
        }, function);
    }

    public <T, I, O, R> AmqpWebJJob<I, O, R> publish(@NotNull String str, T t, @NotNull IOFunctions.IOFunction<T, String> iOFunction, @NotNull Function<String, AmqpWebJJob<I, O, R>> function) throws IOException {
        return publish(str, (String) iOFunction.apply(t), (Function) function);
    }

    public <I, O, R> AmqpWebJJob<I, O, R> publish(@NotNull String str, String str2, @NotNull Function<String, AmqpWebJJob<I, O, R>> function) throws IOException {
        return publish(str, str2.getBytes(StandardCharsets.UTF_8.name()), (Function) function);
    }

    public <I, O, R> AmqpWebJJob<I, O, R> publish(@NotNull String str, byte[] bArr, @NotNull Function<String, AmqpWebJJob<I, O, R>> function) throws IOException {
        String str2 = str + "." + MESSAGE_COUNTER.incrementAndGet();
        AmqpWebJJob<I, O, R> apply = function.apply(str2);
        if (!$assertionsDisabled && this.messageJobs.containsKey(str2)) {
            throw new AssertionError();
        }
        this.messageJobs.put(str2, apply);
        try {
            PooledConnection orderConnection = this.channelPool.orderConnection();
            try {
                ((Channel) orderConnection.connection).basicPublish(CLIENT_EXCHANGE, decorateRoutingPrefix(str), defaultProps().messageId((String) apply.getJobId()).build(), bArr);
                try {
                    if (!((Channel) orderConnection.connection).waitForConfirms(5000L)) {
                        LoggerFactory.getLogger(getClass()).warn("Could not confirm publication of Job '" + str2 + "' Jobs might not be delivered an is likely to timeout.");
                    }
                } catch (InterruptedException | TimeoutException e) {
                    LoggerFactory.getLogger(getClass()).warn("Could not confirm publication of Job: " + str2 + System.lineSeparator() + e.getMessage());
                }
                if (orderConnection != null) {
                    orderConnection.close();
                }
                return apply;
            } finally {
            }
        } catch (InterruptedException e2) {
            throw new IOException(e2);
        }
    }

    private String decorateRoutingPrefix(String str) {
        return str + "." + this.userID + "." + this.clientID;
    }

    private AMQP.BasicProperties.Builder defaultProps() {
        return new AMQP.BasicProperties.Builder().contentEncoding(StandardCharsets.UTF_8.name()).contentType("application/json").userId(this.userID).appId("SIRIUS");
    }

    public boolean isConnected() {
        return !this.consumerThreads.isEmpty();
    }

    static {
        $assertionsDisabled = !AmqpClient.class.desiredAssertionStatus();
        MESSAGE_COUNTER = new AtomicLong(0L);
        JOB_TIME_OUT = PropertyManager.getLong("de.unijena.bioinf.fingerid.web.job.timeout", 3600000L).longValue();
        REGISTER_PREFIX = PropertyManager.getProperty("de.unijena.bioinf.ms.sirius.amqp.prefix.register", (String) null, "register");
        CLIENT_EXCHANGE = PropertyManager.getProperty("", (String) null, "sirius.client.in");
        CLIENT_TYPE = PropertyManager.getProperty("de.unijena.bioinf.ms.sirius.amqp.client", (String) null, "sirius");
    }
}
