package org.eclipse.hono.client.amqp;

import com.github.benmanes.caffeine.cache.Cache;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-amqp-common-1.7.4.jar:org/eclipse/hono/client/amqp/AbstractRequestResponseServiceClient.class */
public abstract class AbstractRequestResponseServiceClient<T, R extends RequestResponseResult<T>> extends AbstractServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractRequestResponseServiceClient.class);
    private static final int[] CACHEABLE_STATUS_CODES = {200, 203, 206, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, 301, 410};
    protected final CachingClientFactory<RequestResponseClient<R>> clientFactory;
    private final Cache<Object, R> responseCache;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRequestResponseServiceClient(HonoConnection honoConnection, SendMessageSampler.Factory factory, CachingClientFactory<RequestResponseClient<R>> cachingClientFactory, Cache<Object, R> cache) {
        super(honoConnection, factory);
        this.clientFactory = (CachingClientFactory) Objects.requireNonNull(cachingClientFactory);
        this.responseCache = (Cache) Optional.ofNullable(cache).orElse(null);
    }

    protected abstract String getKey(String str);

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeClient(String str) {
        this.clientFactory.removeClient(getKey(str));
    }

    protected void handleTenantTimeout(Message<Object> message) {
        Optional ofNullable = Optional.ofNullable(message.body());
        Class<String> cls = String.class;
        Objects.requireNonNull(String.class);
        Optional<T> filter = ofNullable.filter(cls::isInstance);
        Class<String> cls2 = String.class;
        Objects.requireNonNull(String.class);
        filter.map(cls2::cast).map(this::getKey).ifPresent(str -> {
            this.clientFactory.removeClient(str, (v0) -> {
                v0.close();
            });
        });
    }

    @Override // org.eclipse.hono.client.amqp.AbstractServiceClient
    protected void onDisconnect() {
        this.clientFactory.clearState();
    }

    protected final boolean isSuccessResponse(int i, String str, Buffer buffer) {
        return StatusCodeMapper.isSuccessful(Integer.valueOf(i)) && buffer != null && "application/json".equalsIgnoreCase(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final R getRequestResponseResult(org.apache.qpid.proton.message.Message message) {
        Integer status = MessageHelper.getStatus(message);
        if (status == null) {
            LOG.debug("response message has no status code application property [reply-to: {}, correlation ID: {}]", message.getReplyTo(), message.getCorrelationId());
            return null;
        }
        return getResult(status.intValue(), message.getContentType(), MessageHelper.getPayload(message), CacheDirective.from(MessageHelper.getCacheDirective(message)), message.getApplicationProperties());
    }

    protected abstract R getResult(int i, String str, Buffer buffer, CacheDirective cacheDirective, ApplicationProperties applicationProperties);

    protected final long getResponseCacheDefaultTimeout() {
        if (this.connection.getConfig() instanceof RequestResponseClientConfigProperties) {
            return ((RequestResponseClientConfigProperties) this.connection.getConfig()).getResponseCacheDefaultTimeout();
        }
        return 600L;
    }

    protected final boolean isCachingEnabled() {
        return this.responseCache != null;
    }

    private boolean isCacheableStatusCode(int i) {
        return Arrays.binarySearch(CACHEABLE_STATUS_CODES, i) >= 0;
    }

    protected Future<R> getResponseFromCache(Object obj, Span span) {
        Objects.requireNonNull(obj);
        if (!isCachingEnabled()) {
            TracingHelper.TAG_CACHE_HIT.set(span, Boolean.FALSE);
            return Future.failedFuture(new IllegalStateException("no cache configured"));
        }
        R ifPresent = this.responseCache.getIfPresent(obj);
        TracingHelper.TAG_CACHE_HIT.set(span, Boolean.valueOf(ifPresent != null));
        return (Future) Optional.ofNullable(ifPresent).map((v0) -> {
            return Future.succeededFuture(v0);
        }).orElse(Future.failedFuture("cache miss"));
    }

    protected final void addToCache(Object obj, R r) {
        if (isCachingEnabled()) {
            Objects.requireNonNull(obj);
            Objects.requireNonNull(r);
            if (((Boolean) Optional.ofNullable(r.getCacheDirective()).map(cacheDirective -> {
                return Boolean.valueOf(cacheDirective.isCachingAllowed());
            }).orElse(Boolean.valueOf(isCacheableStatusCode(r.getStatus())))).booleanValue()) {
                this.responseCache.put(obj, r);
            }
        }
    }

    protected final Future<T> mapResultAndFinishSpan(Future<R> future, Function<R, T> function, Span span) {
        return future.recover(th -> {
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(span, th);
            return Future.failedFuture(th);
        }).map(requestResponseResult -> {
            setTagsForResult(span, requestResponseResult);
            return function.apply(requestResponseResult);
        }).onComplete2(asyncResult -> {
            span.finish();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setTagsForResult(Span span, R r) {
        Objects.requireNonNull(span);
        if (r == null) {
            Tags.HTTP_STATUS.set(span, (Integer) 202);
            return;
        }
        Tags.HTTP_STATUS.set(span, Integer.valueOf(r.getStatus()));
        if (r.isError()) {
            Tags.ERROR.set(span, Boolean.TRUE);
        }
    }

    protected final Map<String, Object> createDeviceIdProperties(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(MessageHelper.APP_PROPERTY_DEVICE_ID, str);
        return hashMap;
    }
}
