/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.amqp;

import io.opentracing.Span;
import io.vertx.core.Future;
import io.vertx.proton.ProtonHelper;
import java.util.Objects;
import java.util.Optional;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.amqp.SenderCachingServiceClient;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.InternalCommandSender;
import org.eclipse.hono.client.command.amqp.ProtonBasedCommand;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtonBasedInternalCommandSender
extends SenderCachingServiceClient
implements InternalCommandSender {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonBasedInternalCommandSender.class);

    public ProtonBasedInternalCommandSender(HonoConnection connection) {
        super(connection, SendMessageSampler.Factory.noop(), false);
    }

    @Override
    public Future<Void> sendCommand(CommandContext commandContext, String adapterInstanceId) {
        Objects.requireNonNull(commandContext);
        Objects.requireNonNull(adapterInstanceId);
        return this.getOrCreateSenderLink(ProtonBasedInternalCommandSender.getTargetAddress(adapterInstanceId)).recover(thr -> Future.failedFuture(StatusCodeMapper.toServerError(thr))).compose(sender -> {
            Span span = this.newChildSpan(commandContext.getTracingContext(), "delegate Command request");
            Command command = commandContext.getCommand();
            Message message = this.adoptOrCreateMessage(command);
            TracingHelper.setDeviceTags(span, command.getTenant(), command.getDeviceId());
            if (command.isTargetedAtGateway()) {
                AmqpUtils.addProperty(message, "via", command.getGatewayId());
                TracingHelper.TAG_GATEWAY_ID.set(span, command.getGatewayId());
            }
            return sender.sendAndWaitForRawOutcome(message, span);
        }).map(delivery -> {
            DeliveryState remoteState = delivery.getRemoteState();
            LOG.trace("command [{}] sent to downstream peer; remote state of delivery: {}", (Object)commandContext.getCommand(), (Object)remoteState);
            if (remoteState instanceof Accepted) {
                commandContext.accept();
            } else if (remoteState instanceof Rejected) {
                Rejected rejected = (Rejected)remoteState;
                commandContext.reject((String)Optional.ofNullable(rejected.getError()).map(ErrorCondition::getDescription).orElse(null));
            } else if (remoteState instanceof Released) {
                commandContext.release();
            } else if (remoteState instanceof Modified) {
                Modified modified = (Modified)remoteState;
                commandContext.modify(modified.getDeliveryFailed(), modified.getUndeliverableHere());
            }
            return null;
        }).onFailure(thr -> {
            LOG.debug("failed to send command [{}] to downstream peer", (Object)commandContext.getCommand(), thr);
            if (thr instanceof NoConsumerException) {
                TracingHelper.logError(commandContext.getTracingSpan(), "no credit - target adapter instance '" + adapterInstanceId + "' may be offline in which case the device hasn't subscribed again yet");
            }
            commandContext.release((Throwable)thr);
        });
    }

    static String getTargetAddress(String adapterInstanceId) {
        return "command_internal/" + Objects.requireNonNull(adapterInstanceId);
    }

    private static Message getShallowCopy(Message message) {
        Message copy = ProtonHelper.message();
        copy.setDeliveryAnnotations(message.getDeliveryAnnotations());
        copy.setMessageAnnotations(message.getMessageAnnotations());
        if (message.getProperties() != null) {
            copy.setProperties(new Properties(message.getProperties()));
        }
        copy.setApplicationProperties(message.getApplicationProperties());
        copy.setBody(message.getBody());
        copy.setFooter(message.getFooter());
        return copy;
    }

    private Message adoptOrCreateMessage(Command command) {
        Message msg;
        if (command instanceof ProtonBasedCommand) {
            msg = ProtonBasedInternalCommandSender.getShallowCopy(((ProtonBasedCommand)command).getMessage());
        } else {
            msg = ProtonHelper.message();
            byte[] payloadBytesOrNull = command.getPayload() != null ? command.getPayload().getBytes() : null;
            AmqpUtils.setPayload(msg, command.getContentType(), payloadBytesOrNull);
            msg.setAddress(this.getCommandMessageAddress(command));
            msg.setSubject(command.getName());
            if (command.getContentType() != null) {
                msg.setContentType(command.getContentType());
            }
        }
        if (command.getCorrelationId() != null) {
            msg.setCorrelationId(command.getCorrelationId());
        }
        if (!command.isOneWay()) {
            msg.setReplyTo(this.getReplyToAddress(command));
        }
        return msg;
    }

    private String getCommandMessageAddress(Command command) {
        return String.format("%s/%s/%s", "command", command.getTenant(), command.getDeviceId());
    }

    private String getReplyToAddress(Command command) {
        return command.isOneWay() ? null : String.format("%s/%s/%s", "command_response", command.getTenant(), command.getReplyToId());
    }

    public String toString() {
        return ProtonBasedInternalCommandSender.class.getName() + " via AMQP 1.0 Messaging Network";
    }
}

