/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.ManagedKeyValueIterator;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatchInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBTimestampedStore
extends RocksDBStore
implements TimestampedBytesStore {
    private static final Logger log = LoggerFactory.getLogger(RocksDBTimestampedStore.class);
    private static final byte[] TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME = "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);

    public RocksDBTimestampedStore(String name, String metricsScope) {
        super(name, metricsScope);
    }

    RocksDBTimestampedStore(String name, String parentDir, RocksDBMetricsRecorder metricsRecorder) {
        super(name, parentDir, metricsRecorder);
    }

    @Override
    void openRocksDB(DBOptions dbOptions, ColumnFamilyOptions columnFamilyOptions) {
        List<ColumnFamilyHandle> columnFamilies = this.openRocksDB(dbOptions, new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME, columnFamilyOptions));
        ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0);
        ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1);
        RocksIterator noTimestampsIter = this.db.newIterator(noTimestampColumnFamily);
        noTimestampsIter.seekToFirst();
        if (noTimestampsIter.isValid()) {
            log.info("Opening store {} in upgrade mode", (Object)this.name);
            this.cfAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
        } else {
            log.info("Opening store {} in regular mode", (Object)this.name);
            this.cfAccessor = new RocksDBStore.SingleColumnFamilyAccessor(withTimestampColumnFamily);
            noTimestampColumnFamily.close();
        }
        noTimestampsIter.close();
    }

    private class DualColumnFamilyAccessor
    implements RocksDBStore.ColumnFamilyAccessor {
        private final ColumnFamilyHandle oldColumnFamily;
        private final ColumnFamilyHandle newColumnFamily;

        private DualColumnFamilyAccessor(ColumnFamilyHandle oldColumnFamily, ColumnFamilyHandle newColumnFamily) {
            this.oldColumnFamily = oldColumnFamily;
            this.newColumnFamily = newColumnFamily;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void put(RocksDBStore.DBAccessor accessor, byte[] key, byte[] valueWithTimestamp) {
            Position position = RocksDBTimestampedStore.this.position;
            synchronized (position) {
                if (valueWithTimestamp == null) {
                    try {
                        accessor.delete(this.oldColumnFamily, key);
                    }
                    catch (RocksDBException e) {
                        throw new ProcessorStateException("Error while removing key from store " + RocksDBTimestampedStore.this.name, e);
                    }
                    try {
                        accessor.delete(this.newColumnFamily, key);
                    }
                    catch (RocksDBException e) {
                        throw new ProcessorStateException("Error while removing key from store " + RocksDBTimestampedStore.this.name, e);
                    }
                }
                try {
                    accessor.delete(this.oldColumnFamily, key);
                }
                catch (RocksDBException e) {
                    throw new ProcessorStateException("Error while removing key from store " + RocksDBTimestampedStore.this.name, e);
                }
                try {
                    accessor.put(this.newColumnFamily, key, valueWithTimestamp);
                    StoreQueryUtils.updatePosition(RocksDBTimestampedStore.this.position, RocksDBTimestampedStore.this.context);
                }
                catch (RocksDBException e) {
                    throw new ProcessorStateException("Error while putting key/value into store " + RocksDBTimestampedStore.this.name, e);
                }
            }
        }

        @Override
        public void prepareBatch(List<KeyValue<Bytes, byte[]>> entries, WriteBatchInterface batch) throws RocksDBException {
            for (KeyValue<Bytes, byte[]> entry : entries) {
                Objects.requireNonNull((Bytes)entry.key, "key cannot be null");
                this.addToBatch(((Bytes)entry.key).get(), (byte[])entry.value, batch);
            }
        }

        @Override
        public byte[] get(RocksDBStore.DBAccessor accessor, byte[] key) throws RocksDBException {
            return this.get(accessor, key, Optional.empty());
        }

        @Override
        public byte[] get(RocksDBStore.DBAccessor accessor, byte[] key, ReadOptions readOptions) throws RocksDBException {
            return this.get(accessor, key, Optional.of(readOptions));
        }

        private byte[] get(RocksDBStore.DBAccessor accessor, byte[] key, Optional<ReadOptions> readOptions) throws RocksDBException {
            byte[] plainValue;
            byte[] valueWithTimestamp;
            byte[] byArray = valueWithTimestamp = readOptions.isPresent() ? accessor.get(this.newColumnFamily, readOptions.get(), key) : accessor.get(this.newColumnFamily, key);
            if (valueWithTimestamp != null) {
                return valueWithTimestamp;
            }
            byte[] byArray2 = plainValue = readOptions.isPresent() ? accessor.get(this.oldColumnFamily, readOptions.get(), key) : accessor.get(this.oldColumnFamily, key);
            if (plainValue != null) {
                byte[] valueWithUnknownTimestamp = TimestampedBytesStore.convertToTimestampedFormat(plainValue);
                this.put(accessor, key, valueWithUnknownTimestamp);
                return valueWithUnknownTimestamp;
            }
            return null;
        }

        @Override
        public byte[] getOnly(RocksDBStore.DBAccessor accessor, byte[] key) throws RocksDBException {
            byte[] valueWithTimestamp = accessor.get(this.newColumnFamily, key);
            if (valueWithTimestamp != null) {
                return valueWithTimestamp;
            }
            byte[] plainValue = accessor.get(this.oldColumnFamily, key);
            if (plainValue != null) {
                return TimestampedBytesStore.convertToTimestampedFormat(plainValue);
            }
            return null;
        }

        @Override
        public ManagedKeyValueIterator<Bytes, byte[]> range(RocksDBStore.DBAccessor accessor, Bytes from, Bytes to, boolean forward) {
            return new RocksDBDualCFRangeIterator(RocksDBTimestampedStore.this.name, accessor.newIterator(this.newColumnFamily), accessor.newIterator(this.oldColumnFamily), from, to, forward, true);
        }

        @Override
        public void deleteRange(RocksDBStore.DBAccessor accessor, byte[] from, byte[] to) {
            try {
                accessor.deleteRange(this.oldColumnFamily, from, to);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while removing key from store " + RocksDBTimestampedStore.this.name, e);
            }
            try {
                accessor.deleteRange(this.newColumnFamily, from, to);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while removing key from store " + RocksDBTimestampedStore.this.name, e);
            }
        }

        @Override
        public ManagedKeyValueIterator<Bytes, byte[]> all(RocksDBStore.DBAccessor accessor, boolean forward) {
            RocksIterator innerIterWithTimestamp = accessor.newIterator(this.newColumnFamily);
            RocksIterator innerIterNoTimestamp = accessor.newIterator(this.oldColumnFamily);
            if (forward) {
                innerIterWithTimestamp.seekToFirst();
                innerIterNoTimestamp.seekToFirst();
            } else {
                innerIterWithTimestamp.seekToLast();
                innerIterNoTimestamp.seekToLast();
            }
            return new RocksDBDualCFIterator(RocksDBTimestampedStore.this.name, innerIterWithTimestamp, innerIterNoTimestamp, forward);
        }

        @Override
        public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(RocksDBStore.DBAccessor accessor, Bytes prefix) {
            Bytes to = RocksDBStore.incrementWithoutOverflow(prefix);
            return new RocksDBDualCFRangeIterator(RocksDBTimestampedStore.this.name, accessor.newIterator(this.newColumnFamily), accessor.newIterator(this.oldColumnFamily), prefix, to, true, false);
        }

        @Override
        public long approximateNumEntries(RocksDBStore.DBAccessor accessor) throws RocksDBException {
            return accessor.approximateNumEntries(this.oldColumnFamily) + accessor.approximateNumEntries(this.newColumnFamily);
        }

        @Override
        public void flush(RocksDBStore.DBAccessor accessor) throws RocksDBException {
            accessor.flush(this.oldColumnFamily, this.newColumnFamily);
        }

        @Override
        public void addToBatch(byte[] key, byte[] value, WriteBatchInterface batch) throws RocksDBException {
            if (value == null) {
                batch.delete(this.oldColumnFamily, key);
                batch.delete(this.newColumnFamily, key);
            } else {
                batch.delete(this.oldColumnFamily, key);
                batch.put(this.newColumnFamily, key, value);
            }
        }

        @Override
        public void close() {
            this.oldColumnFamily.close();
            this.newColumnFamily.close();
        }
    }

    private class RocksDBDualCFRangeIterator
    extends RocksDBDualCFIterator {
        private final Comparator<byte[]> comparator;
        private final byte[] rawLastKey;
        private final boolean forward;
        private final boolean toInclusive;

        RocksDBDualCFRangeIterator(String storeName, RocksIterator iterWithTimestamp, RocksIterator iterNoTimestamp, Bytes from, Bytes to, boolean forward, boolean toInclusive) {
            super(storeName, iterWithTimestamp, iterNoTimestamp, forward);
            this.comparator = Bytes.BYTES_LEXICO_COMPARATOR;
            this.forward = forward;
            this.toInclusive = toInclusive;
            if (forward) {
                if (from == null) {
                    iterWithTimestamp.seekToFirst();
                    iterNoTimestamp.seekToFirst();
                } else {
                    iterWithTimestamp.seek(from.get());
                    iterNoTimestamp.seek(from.get());
                }
                this.rawLastKey = to == null ? null : to.get();
            } else {
                if (to == null) {
                    iterWithTimestamp.seekToLast();
                    iterNoTimestamp.seekToLast();
                } else {
                    iterWithTimestamp.seekForPrev(to.get());
                    iterNoTimestamp.seekForPrev(to.get());
                }
                this.rawLastKey = from == null ? null : from.get();
            }
        }

        @Override
        protected KeyValue<Bytes, byte[]> makeNext() {
            Object next = super.makeNext();
            if (next == null) {
                return (KeyValue)this.allDone();
            }
            if (this.rawLastKey == null) {
                return next;
            }
            if (this.forward) {
                if (this.comparator.compare(((Bytes)((KeyValue)next).key).get(), this.rawLastKey) < 0) {
                    return next;
                }
                if (this.comparator.compare(((Bytes)((KeyValue)next).key).get(), this.rawLastKey) == 0) {
                    return this.toInclusive ? next : (KeyValue)this.allDone();
                }
                return (KeyValue)this.allDone();
            }
            if (this.comparator.compare(((Bytes)((KeyValue)next).key).get(), this.rawLastKey) >= 0) {
                return next;
            }
            return (KeyValue)this.allDone();
        }
    }

    private class RocksDBDualCFIterator
    extends AbstractIterator<KeyValue<Bytes, byte[]>>
    implements ManagedKeyValueIterator<Bytes, byte[]> {
        private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
        private final String storeName;
        private final RocksIterator iterWithTimestamp;
        private final RocksIterator iterNoTimestamp;
        private final boolean forward;
        private volatile boolean open = true;
        private byte[] nextWithTimestamp;
        private byte[] nextNoTimestamp;
        private KeyValue<Bytes, byte[]> next;
        private Runnable closeCallback = null;

        RocksDBDualCFIterator(String storeName, RocksIterator iterWithTimestamp, RocksIterator iterNoTimestamp, boolean forward) {
            this.iterWithTimestamp = iterWithTimestamp;
            this.iterNoTimestamp = iterNoTimestamp;
            this.storeName = storeName;
            this.forward = forward;
        }

        @Override
        public synchronized boolean hasNext() {
            if (!this.open) {
                throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", this.storeName));
            }
            return super.hasNext();
        }

        @Override
        public synchronized KeyValue<Bytes, byte[]> next() {
            return (KeyValue)super.next();
        }

        protected KeyValue<Bytes, byte[]> makeNext() {
            if (this.nextNoTimestamp == null && this.iterNoTimestamp.isValid()) {
                this.nextNoTimestamp = this.iterNoTimestamp.key();
            }
            if (this.nextWithTimestamp == null && this.iterWithTimestamp.isValid()) {
                this.nextWithTimestamp = this.iterWithTimestamp.key();
            }
            if (this.nextNoTimestamp == null && !this.iterNoTimestamp.isValid()) {
                if (this.nextWithTimestamp == null && !this.iterWithTimestamp.isValid()) {
                    return (KeyValue)this.allDone();
                }
                this.next = KeyValue.pair(new Bytes(this.nextWithTimestamp), this.iterWithTimestamp.value());
                this.nextWithTimestamp = null;
                if (this.forward) {
                    this.iterWithTimestamp.next();
                } else {
                    this.iterWithTimestamp.prev();
                }
            } else if (this.nextWithTimestamp == null) {
                this.next = KeyValue.pair(new Bytes(this.nextNoTimestamp), TimestampedBytesStore.convertToTimestampedFormat(this.iterNoTimestamp.value()));
                this.nextNoTimestamp = null;
                if (this.forward) {
                    this.iterNoTimestamp.next();
                } else {
                    this.iterNoTimestamp.prev();
                }
            } else if (this.forward) {
                if (this.comparator.compare(this.nextNoTimestamp, this.nextWithTimestamp) <= 0) {
                    this.next = KeyValue.pair(new Bytes(this.nextNoTimestamp), TimestampedBytesStore.convertToTimestampedFormat(this.iterNoTimestamp.value()));
                    this.nextNoTimestamp = null;
                    this.iterNoTimestamp.next();
                } else {
                    this.next = KeyValue.pair(new Bytes(this.nextWithTimestamp), this.iterWithTimestamp.value());
                    this.nextWithTimestamp = null;
                    this.iterWithTimestamp.next();
                }
            } else if (this.comparator.compare(this.nextNoTimestamp, this.nextWithTimestamp) >= 0) {
                this.next = KeyValue.pair(new Bytes(this.nextNoTimestamp), TimestampedBytesStore.convertToTimestampedFormat(this.iterNoTimestamp.value()));
                this.nextNoTimestamp = null;
                this.iterNoTimestamp.prev();
            } else {
                this.next = KeyValue.pair(new Bytes(this.nextWithTimestamp), this.iterWithTimestamp.value());
                this.nextWithTimestamp = null;
                this.iterWithTimestamp.prev();
            }
            return this.next;
        }

        @Override
        public synchronized void close() {
            if (this.closeCallback == null) {
                throw new IllegalStateException("RocksDBDualCFIterator expects close callback to be set immediately upon creation");
            }
            this.closeCallback.run();
            this.iterNoTimestamp.close();
            this.iterWithTimestamp.close();
            this.open = false;
        }

        @Override
        public Bytes peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return (Bytes)this.next.key;
        }

        @Override
        public void onClose(Runnable closeCallback) {
            this.closeCallback = closeCallback;
        }
    }
}

