/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.Histogram;
import org.apache.kafka.trogdor.workload.RecordProcessor;
import org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec;
import org.apache.kafka.trogdor.workload.Throttle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShareConsumeBenchWorker
implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(ShareConsumeBenchWorker.class);
    private static final int THROTTLE_PERIOD_MS = 100;
    private final String id;
    private final ShareConsumeBenchSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ScheduledExecutorService executor;
    private WorkerStatusTracker workerStatus;
    private StatusUpdater statusUpdater;
    private Future<?> statusUpdaterFuture;
    private KafkaFutureImpl<String> doneFuture;

    public ShareConsumeBenchWorker(String id, ShareConsumeBenchSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ShareConsumeBenchWorker is already running.");
        }
        log.info("{}: Activating ShareConsumeBenchWorker with {}", (Object)this.id, (Object)this.spec);
        this.statusUpdater = new StatusUpdater();
        this.executor = Executors.newScheduledThreadPool(this.spec.threadsPerWorker() + 2, ThreadUtils.createThreadFactory((String)"ShareConsumeBenchWorkerThread%d", (boolean)false));
        this.statusUpdaterFuture = this.executor.scheduleAtFixedRate(this.statusUpdater, 1L, 1L, TimeUnit.MINUTES);
        this.workerStatus = status;
        this.doneFuture = doneFuture;
        this.executor.submit(new Prepare());
    }

    @Override
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ShareConsumeBenchWorker is not running.");
        }
        log.info("{}: Deactivating ShareConsumeBenchWorker.", (Object)this.id);
        this.doneFuture.complete((Object)"");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        this.executor = null;
        this.statusUpdater = null;
        this.statusUpdaterFuture = null;
        this.workerStatus = null;
        this.doneFuture = null;
    }

    class StatusUpdater
    implements Runnable {
        final Map<String, JsonNode> statuses = new HashMap<String, JsonNode>();

        StatusUpdater() {
        }

        @Override
        public void run() {
            try {
                this.update();
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, ShareConsumeBenchWorker.this.doneFuture);
            }
        }

        synchronized void update() {
            ShareConsumeBenchWorker.this.workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(this.statuses));
        }

        synchronized void updateConsumeStatus(String clientId, StatusData status) {
            this.statuses.put(clientId, JsonUtil.JSON_SERDE.valueToTree((Object)status));
        }
    }

    public class Prepare
    implements Runnable {
        @Override
        public void run() {
            try {
                ArrayList<Future<Void>> consumeTasks = new ArrayList<Future<Void>>();
                for (ConsumeMessages task : this.consumeTasks()) {
                    consumeTasks.add(ShareConsumeBenchWorker.this.executor.submit(task));
                }
                ShareConsumeBenchWorker.this.executor.submit(new CloseStatusUpdater(consumeTasks));
            }
            catch (Throwable e) {
                WorkerUtils.abort(log, "Prepare", e, ShareConsumeBenchWorker.this.doneFuture);
            }
        }

        private List<ConsumeMessages> consumeTasks() {
            ArrayList<ConsumeMessages> tasks = new ArrayList<ConsumeMessages>();
            String shareGroup = this.shareGroup();
            int consumerCount = ShareConsumeBenchWorker.this.spec.threadsPerWorker();
            HashSet<String> topics = new HashSet<String>(ShareConsumeBenchWorker.this.spec.expandTopicNames());
            for (int i = 0; i < consumerCount; ++i) {
                tasks.add(new ConsumeMessages(this.consumer(shareGroup, this.clientId(i)), ShareConsumeBenchWorker.this.spec.recordProcessor(), topics));
            }
            return tasks;
        }

        private String clientId(int idx) {
            return String.format("consumer.%s-%d", ShareConsumeBenchWorker.this.id, idx);
        }

        private ThreadSafeShareConsumer consumer(String shareGroup, String clientId) {
            Properties props = new Properties();
            props.put("bootstrap.servers", ShareConsumeBenchWorker.this.spec.bootstrapServers());
            props.put("client.id", clientId);
            props.put("group.id", shareGroup);
            props.put("max.poll.interval.ms", (Object)100000);
            WorkerUtils.addConfigsToProperties(props, ShareConsumeBenchWorker.this.spec.commonClientConf(), ShareConsumeBenchWorker.this.spec.consumerConf());
            return new ThreadSafeShareConsumer((KafkaShareConsumer<byte[], byte[]>)new KafkaShareConsumer(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()), clientId);
        }

        private String shareGroup() {
            return ShareConsumeBenchWorker.this.spec.shareGroup();
        }
    }

    private static class ThreadSafeShareConsumer {
        private final KafkaShareConsumer<byte[], byte[]> consumer;
        private final String clientId;
        private final ReentrantLock consumerLock;
        private boolean closed = false;

        ThreadSafeShareConsumer(KafkaShareConsumer<byte[], byte[]> consumer, String clientId) {
            this.consumer = consumer;
            this.clientId = clientId;
            this.consumerLock = new ReentrantLock();
        }

        ConsumerRecords<byte[], byte[]> poll() {
            this.consumerLock.lock();
            try {
                ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofMillis(50L));
                return consumerRecords;
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        void close() {
            if (this.closed) {
                return;
            }
            this.consumerLock.lock();
            try {
                this.consumer.unsubscribe();
                Utils.closeQuietly(this.consumer, (String)"consumer");
                this.closed = true;
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        void subscribe(Set<String> topics) {
            this.consumerLock.lock();
            try {
                this.consumer.subscribe(topics);
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        Set<String> subscription() {
            this.consumerLock.lock();
            try {
                Set set = this.consumer.subscription();
                return set;
            }
            finally {
                this.consumerLock.unlock();
            }
        }

        String clientId() {
            return this.clientId;
        }

        KafkaShareConsumer<byte[], byte[]> consumer() {
            return this.consumer;
        }
    }

    public static class StatusData {
        private final long totalMessagesReceived;
        private final Set<String> subscription;
        private final long totalBytesReceived;
        private final long averageMessageSizeBytes;
        private final float averageLatencyMs;
        private final int p50LatencyMs;
        private final int p95LatencyMs;
        private final int p99LatencyMs;
        private final Optional<JsonNode> recordProcessorStatus;
        static final float[] PERCENTILES = new float[]{0.5f, 0.95f, 0.99f};

        @JsonCreator
        StatusData(@JsonProperty(value="subscription") Set<String> subscription, @JsonProperty(value="totalMessagesReceived") long totalMessagesReceived, @JsonProperty(value="totalBytesReceived") long totalBytesReceived, @JsonProperty(value="averageMessageSizeBytes") long averageMessageSizeBytes, @JsonProperty(value="averageLatencyMs") float averageLatencyMs, @JsonProperty(value="p50LatencyMs") int p50latencyMs, @JsonProperty(value="p95LatencyMs") int p95latencyMs, @JsonProperty(value="p99LatencyMs") int p99latencyMs, @JsonProperty(value="recordProcessorStatus") Optional<JsonNode> recordProcessorStatus) {
            this.subscription = subscription;
            this.totalMessagesReceived = totalMessagesReceived;
            this.totalBytesReceived = totalBytesReceived;
            this.averageMessageSizeBytes = averageMessageSizeBytes;
            this.averageLatencyMs = averageLatencyMs;
            this.p50LatencyMs = p50latencyMs;
            this.p95LatencyMs = p95latencyMs;
            this.p99LatencyMs = p99latencyMs;
            this.recordProcessorStatus = recordProcessorStatus;
        }

        @JsonProperty
        public Set<String> subscription() {
            return this.subscription;
        }

        @JsonProperty
        public long totalMessagesReceived() {
            return this.totalMessagesReceived;
        }

        @JsonProperty
        public long totalBytesReceived() {
            return this.totalBytesReceived;
        }

        @JsonProperty
        public long averageMessageSizeBytes() {
            return this.averageMessageSizeBytes;
        }

        @JsonProperty
        public float averageLatencyMs() {
            return this.averageLatencyMs;
        }

        @JsonProperty
        public int p50LatencyMs() {
            return this.p50LatencyMs;
        }

        @JsonProperty
        public int p95LatencyMs() {
            return this.p95LatencyMs;
        }

        @JsonProperty
        public int p99LatencyMs() {
            return this.p99LatencyMs;
        }

        @JsonProperty
        public JsonNode recordProcessorStatus() {
            return this.recordProcessorStatus.orElse(null);
        }
    }

    public class ConsumeStatusUpdater
    implements Runnable {
        private final Histogram latencyHistogram;
        private final Histogram messageSizeHistogram;
        private final ThreadSafeShareConsumer consumer;
        private final Optional<RecordProcessor> recordProcessor;

        ConsumeStatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram, ThreadSafeShareConsumer consumer, Optional<RecordProcessor> recordProcessor) {
            this.latencyHistogram = latencyHistogram;
            this.messageSizeHistogram = messageSizeHistogram;
            this.consumer = consumer;
            this.recordProcessor = recordProcessor;
        }

        @Override
        public void run() {
            try {
                this.update();
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, ShareConsumeBenchWorker.this.doneFuture);
            }
        }

        StatusData update() {
            Histogram.Summary latSummary = this.latencyHistogram.summarize(StatusData.PERCENTILES);
            Histogram.Summary msgSummary = this.messageSizeHistogram.summarize(StatusData.PERCENTILES);
            Optional<JsonNode> recordProcessorStatus = Optional.empty();
            if (this.recordProcessor.isPresent()) {
                recordProcessorStatus = Optional.of(this.recordProcessor.get().processorStatus());
            }
            StatusData statusData = new StatusData(this.consumer.subscription(), latSummary.numSamples(), (long)((float)msgSummary.numSamples() * msgSummary.average()), (long)msgSummary.average(), latSummary.average(), latSummary.percentiles().get(0).value(), latSummary.percentiles().get(1).value(), latSummary.percentiles().get(2).value(), recordProcessorStatus);
            ShareConsumeBenchWorker.this.statusUpdater.updateConsumeStatus(this.consumer.clientId(), statusData);
            log.info("Status={}", (Object)JsonUtil.toJsonString(statusData));
            return statusData;
        }
    }

    public class CloseStatusUpdater
    implements Runnable {
        private final List<Future<Void>> consumeTasks;

        CloseStatusUpdater(List<Future<Void>> consumeTasks) {
            this.consumeTasks = consumeTasks;
        }

        @Override
        public void run() {
            while (!this.consumeTasks.stream().allMatch(Future::isDone)) {
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException e) {
                    log.debug("{} was interrupted. Closing...", (Object)this.getClass().getName());
                    break;
                }
            }
            ShareConsumeBenchWorker.this.statusUpdaterFuture.cancel(false);
            ShareConsumeBenchWorker.this.statusUpdater.update();
            ShareConsumeBenchWorker.this.doneFuture.complete((Object)"");
        }
    }

    public class ConsumeMessages
    implements Callable<Void> {
        private final Histogram latencyHistogram = new Histogram(10000);
        private final Histogram messageSizeHistogram = new Histogram(0x200000);
        private final Future<?> statusUpdaterFuture;
        private final Throttle throttle;
        private final String clientId;
        private final ThreadSafeShareConsumer consumer;
        private final Optional<RecordProcessor> recordProcessor;

        private ConsumeMessages(ThreadSafeShareConsumer consumer, Optional<RecordProcessor> recordProcessor) {
            this.clientId = consumer.clientId();
            this.statusUpdaterFuture = ShareConsumeBenchWorker.this.executor.scheduleAtFixedRate(new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, consumer, recordProcessor), 1L, 1L, TimeUnit.MINUTES);
            int perPeriod = ShareConsumeBenchWorker.this.spec.targetMessagesPerSec() <= 0 ? Integer.MAX_VALUE : WorkerUtils.perSecToPerPeriod(ShareConsumeBenchWorker.this.spec.targetMessagesPerSec(), 100L);
            this.throttle = new Throttle(perPeriod, 100);
            this.consumer = consumer;
            this.recordProcessor = recordProcessor;
        }

        ConsumeMessages(ThreadSafeShareConsumer consumer, Optional<RecordProcessor> recordProcessor, Set<String> topics) {
            this(consumer, recordProcessor);
            log.info("Will consume from topics {}.", topics);
            this.consumer.subscribe(topics);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            long startTimeMs;
            long messagesConsumed = 0L;
            long bytesConsumed = 0L;
            long startBatchMs = startTimeMs = Time.SYSTEM.milliseconds();
            long maxMessages = ShareConsumeBenchWorker.this.spec.maxMessages();
            try {
                while (messagesConsumed < maxMessages) {
                    ConsumerRecords<byte[], byte[]> records = this.consumer.poll();
                    if (records.isEmpty()) continue;
                    long endBatchMs = Time.SYSTEM.milliseconds();
                    long elapsedBatchMs = endBatchMs - startBatchMs;
                    this.recordProcessor.ifPresent(processor -> processor.processRecords(records));
                    for (ConsumerRecord record : records) {
                        ++messagesConsumed;
                        long messageBytes = 0L;
                        if (record.key() != null) {
                            messageBytes += (long)record.serializedKeySize();
                        }
                        if (record.value() != null) {
                            messageBytes += (long)record.serializedValueSize();
                        }
                        this.latencyHistogram.add(elapsedBatchMs);
                        this.messageSizeHistogram.add(messageBytes);
                        bytesConsumed += messageBytes;
                        if (messagesConsumed >= maxMessages) break;
                        this.throttle.increment();
                    }
                    startBatchMs = Time.SYSTEM.milliseconds();
                }
                this.statusUpdaterFuture.cancel(false);
            }
            catch (Exception e) {
                try {
                    WorkerUtils.abort(log, "ConsumeRecords", e, ShareConsumeBenchWorker.this.doneFuture);
                    this.statusUpdaterFuture.cancel(false);
                }
                catch (Throwable throwable) {
                    this.statusUpdaterFuture.cancel(false);
                    StatusData statusData = new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer, ShareConsumeBenchWorker.this.spec.recordProcessor()).update();
                    long curTimeMs = Time.SYSTEM.milliseconds();
                    log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
                    throw throwable;
                }
                StatusData statusData = new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer, ShareConsumeBenchWorker.this.spec.recordProcessor()).update();
                long curTimeMs = Time.SYSTEM.milliseconds();
                log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
            }
            StatusData statusData = new ConsumeStatusUpdater(this.latencyHistogram, this.messageSizeHistogram, this.consumer, ShareConsumeBenchWorker.this.spec.recordProcessor()).update();
            long curTimeMs = Time.SYSTEM.milliseconds();
            log.info("{} Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{this.clientId, messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData});
            this.consumer.close();
            return null;
        }
    }
}

