/*
 * Decompiled with CFR 0.152.
 */
package kafka.api;

import java.io.File;
import java.io.Serializable;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\t\rb\u0001\u0002\u0017.\u0001IBQ!\u000f\u0001\u0005\u0002iBq!\u0010\u0001C\u0002\u0013\u0005a\b\u0003\u0004F\u0001\u0001\u0006Ia\u0010\u0005\b\r\u0002\u0011\r\u0011\"\u0001?\u0011\u00199\u0005\u0001)A\u0005\u007f!9\u0001\n\u0001b\u0001\n\u0003q\u0004BB%\u0001A\u0003%q\bC\u0004K\u0001\t\u0007I\u0011\u0001 \t\r-\u0003\u0001\u0015!\u0003@\u0011\u001da\u0005A1A\u0005\u00025CaA\u0016\u0001!\u0002\u0013q\u0005bB,\u0001\u0005\u0004%\t!\u0014\u0005\u00071\u0002\u0001\u000b\u0011\u0002(\t\u000fe\u0003!\u0019!C\u00015\"1a\u000f\u0001Q\u0001\nmCqa\u001e\u0001C\u0002\u0013\u0005\u0001\u0010C\u0004\u0002\u0002\u0001\u0001\u000b\u0011B=\t\u0011\u0005\r\u0001A1A\u0005\u0002aDq!!\u0002\u0001A\u0003%\u0011\u0010C\u0004\u0002\b\u0001!\t%!\u0003\t\u000f\u0005=\u0002\u0001\"\u0011\u00022!9\u0011q\t\u0001\u0005B\u0005E\u0002bBA)\u0001\u0011\u0005\u0011\u0011\u0007\u0005\b\u00037\u0002A\u0011AA\u0019\u0011\u001d\ty\u0006\u0001C\u0001\u0003cAq!a\u0019\u0001\t\u0003\t\t\u0004C\u0004\u0002h\u0001!\t!!\r\t\u000f\u0005-\u0004\u0001\"\u0001\u00022!9\u0011q\u000e\u0001\u0005\u0002\u0005E\u0002bBA:\u0001\u0011\u0005\u0011\u0011\u0007\u0005\b\u0003o\u0002A\u0011AA\u0019\u0011\u001d\tY\b\u0001C\u0001\u0003cAq!a \u0001\t\u0003\t\t\u0004C\u0004\u0002\u0004\u0002!\t!!\r\t\u000f\u0005]\u0005\u0001\"\u0003\u0002\u001a\"9\u0011\u0011\u0019\u0001\u0005\n\u0005\r\u0007bBAi\u0001\u0011%\u00111\u001b\u0005\n\u0003C\u0004\u0011\u0013!C\u0005\u0003GD\u0011\"!?\u0001#\u0003%I!a?\t\u0013\u0005}\b!%A\u0005\n\t\u0005\u0001b\u0002B\u0003\u0001\u0011%!q\u0001\u0005\b\u0005\u0017\u0001A\u0011\u0002B\u0007\u0011%\u0011i\u0002AI\u0001\n\u0013\u0011yB\u0001\tUe\u0006t7/Y2uS>t7\u000fV3ti*\u0011afL\u0001\u0004CBL'\"\u0001\u0019\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001a\r\t\u0003i]j\u0011!\u000e\u0006\u0003m=\n1\"\u001b8uK\u001e\u0014\u0018\r^5p]&\u0011\u0001(\u000e\u0002\u0017\u0017\u000647.Y*feZ,'\u000fV3ti\"\u000b'O\\3tg\u00061A(\u001b8jiz\"\u0012a\u000f\t\u0003y\u0001i\u0011!L\u0001\u000b]Vl7+\u001a:wKJ\u001cX#A \u0011\u0005\u0001\u001bU\"A!\u000b\u0003\t\u000bQa]2bY\u0006L!\u0001R!\u0003\u0007%sG/A\u0006ok6\u001cVM\u001d<feN\u0004\u0013A\u0007;sC:\u001c\u0018m\u0019;j_:\fG\u000e\u0015:pIV\u001cWM]\"pk:$\u0018a\u0007;sC:\u001c\u0018m\u0019;j_:\fG\u000e\u0015:pIV\u001cWM]\"pk:$\b%\u0001\u000eue\u0006t7/Y2uS>t\u0017\r\\\"p]N,X.\u001a:D_VtG/A\u000eue\u0006t7/Y2uS>t\u0017\r\\\"p]N,X.\u001a:D_VtG\u000fI\u0001\u001e]>tGK]1og\u0006\u001cG/[8oC2\u001cuN\\:v[\u0016\u00148i\\;oi\u0006qbn\u001c8Ue\u0006t7/Y2uS>t\u0017\r\\\"p]N,X.\u001a:D_VtG\u000fI\u0001\u0007i>\u0004\u0018nY\u0019\u0016\u00039\u0003\"a\u0014+\u000e\u0003AS!!\u0015*\u0002\t1\fgn\u001a\u0006\u0002'\u0006!!.\u0019<b\u0013\t)\u0006K\u0001\u0004TiJLgnZ\u0001\bi>\u0004\u0018nY\u0019!\u0003\u0019!x\u000e]5de\u00059Ao\u001c9jGJ\u0002\u0013A\u0006;sC:\u001c\u0018m\u0019;j_:\fG\u000e\u0015:pIV\u001cWM]:\u0016\u0003m\u00032\u0001X1d\u001b\u0005i&B\u00010`\u0003\u001diW\u000f^1cY\u0016T!\u0001Y!\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002c;\n1!)\u001e4gKJ\u0004B\u0001\u001a8qa6\tQM\u0003\u0002gO\u0006A\u0001O]8ek\u000e,'O\u0003\u0002iS\u000691\r\\5f]R\u001c(B\u0001\u0019k\u0015\tYG.\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002[\u0006\u0019qN]4\n\u0005=,'!D&bM.\f\u0007K]8ek\u000e,'\u000fE\u0002AcNL!A]!\u0003\u000b\u0005\u0013(/Y=\u0011\u0005\u0001#\u0018BA;B\u0005\u0011\u0011\u0015\u0010^3\u0002/Q\u0014\u0018M\\:bGRLwN\\1m!J|G-^2feN\u0004\u0013A\u0006;sC:\u001c\u0018m\u0019;j_:\fGnQ8ogVlWM]:\u0016\u0003e\u00042\u0001X1{!\u0011Yh\u0010\u001d9\u000e\u0003qT!!`4\u0002\u0011\r|gn];nKJL!a ?\u0003\u001b-\u000bgm[1D_:\u001cX/\\3s\u0003]!(/\u00198tC\u000e$\u0018n\u001c8bY\u000e{gn];nKJ\u001c\b%A\ro_:$&/\u00198tC\u000e$\u0018n\u001c8bY\u000e{gn];nKJ\u001c\u0018A\u00078p]R\u0013\u0018M\\:bGRLwN\\1m\u0007>t7/^7feN\u0004\u0013aD4f]\u0016\u0014\u0018\r^3D_:4\u0017nZ:\u0016\u0005\u0005-\u0001CBA\u0007\u0003;\t\u0019C\u0004\u0003\u0002\u0010\u0005ea\u0002BA\t\u0003/i!!a\u0005\u000b\u0007\u0005U\u0011'\u0001\u0004=e>|GOP\u0005\u0002\u0005&\u0019\u00111D!\u0002\u000fA\f7m[1hK&!\u0011qDA\u0011\u0005\r\u0019V-\u001d\u0006\u0004\u00037\t\u0005\u0003BA\u0013\u0003Wi!!a\n\u000b\u0007\u0005%r&\u0001\u0004tKJ4XM]\u0005\u0005\u0003[\t9CA\u0006LC\u001a\\\u0017mQ8oM&<\u0017!B:fiV\u0003HCAA\u001a!\r\u0001\u0015QG\u0005\u0004\u0003o\t%\u0001B+oSRD3!FA\u001e!\u0011\ti$a\u0011\u000e\u0005\u0005}\"bAA!Y\u0006)!.\u001e8ji&!\u0011QIA \u0005\u0019\u0011UMZ8sK\u0006AA/Z1s\t><h\u000eK\u0002\u0017\u0003\u0017\u0002B!!\u0010\u0002N%!\u0011qJA \u0005\u0015\te\r^3s\u0003U!Xm\u001d;CCNL7\r\u0016:b]N\f7\r^5p]ND3aFA+!\u0011\ti$a\u0016\n\t\u0005e\u0013q\b\u0002\u0005)\u0016\u001cH/\u0001\u001auKN$(+Z1e\u0007>lW.\u001b;uK\u0012\u001cuN\\:v[\u0016\u00148\u000b[8vY\u0012tu\u000e^*fKVsG-Z2jI\u0016$G)\u0019;bQ\rA\u0012QK\u0001+i\u0016\u001cH\u000fR3mCf,GMR3uG\"Len\u00197vI\u0016\u001c\u0018IY8si\u0016$GK]1og\u0006\u001cG/[8oQ\rI\u0012QK\u0001\u0010i\u0016\u001cHoU3oI>3gm]3ug\"\u001a!$!\u0016\u0002'Q,7\u000f\u001e$f]\u000eLgnZ(o\u0007>lW.\u001b;)\u0007m\t)&\u0001\ruKN$h)\u001a8dS:<wJ\\*f]\u0012|eMZ:fiND3\u0001HA+\u00031\"Xm\u001d;PM\u001a\u001cX\r^'fi\u0006$\u0017\r^1J]N+g\u000eZ(gMN,Go\u001d+p)J\fgn]1di&|g\u000eK\u0002\u001e\u0003+\n\u0011\u0003^3ti\u001a+gnY5oO>s7+\u001a8eQ\rq\u0012QK\u0001\u001bi\u0016\u001cHOR3oG&twm\u00148BI\u0012\u0004\u0016M\u001d;ji&|gn\u001d\u0015\u0004?\u0005U\u0013A\t;fgR4UM\\2j]\u001e|e\u000e\u0016:b]N\f7\r^5p]\u0016C\b/\u001b:bi&|g\u000eK\u0002!\u0003+\nA\u0004^3ti6+H\u000e^5qY\u0016l\u0015M]6feN|e.\u001a'fC\u0012,'\u000fK\u0002\"\u0003+\nA\u0005^3ti\u000e{gn]3dkRLg/\u001a7z%Vt\u0017J\\5u)J\fgn]1di&|gn\u001d\u0015\bE\u0005U\u0013qQAE\u0003!)\u0007\u0010]3di\u0016$7EAAF!\u0011\ti)a%\u000e\u0005\u0005=%bAAIS\u000611m\\7n_:LA!!&\u0002\u0010\nq1*\u00194lC\u0016C8-\u001a9uS>t\u0017aJ:f]\u0012$&/\u00198tC\u000e$\u0018n\u001c8bY6+7o]1hKN<\u0016\u000e\u001e5WC2,XMU1oO\u0016$B\"a\r\u0002\u001c\u0006u\u0015qVAZ\u0003oCQAZ\u0012A\u0002\rDq!a($\u0001\u0004\t\t+A\u0003u_BL7\r\u0005\u0003\u0002$\u0006-f\u0002BAS\u0003O\u00032!!\u0005B\u0013\r\tI+Q\u0001\u0007!J,G-\u001a4\n\u0007U\u000biKC\u0002\u0002*\u0006Ca!!-$\u0001\u0004y\u0014!B:uCJ$\bBBA[G\u0001\u0007q(A\u0002f]\u0012Dq!!/$\u0001\u0004\tY,A\bxS2d')Z\"p[6LG\u000f^3e!\r\u0001\u0015QX\u0005\u0004\u0003\u007f\u000b%a\u0002\"p_2,\u0017M\\\u0001\fg\u0016\u0014h/\u001a:Qe>\u00048\u000f\u0006\u0002\u0002FB!\u0011qYAg\u001b\t\tIMC\u0002\u0002LJ\u000bA!\u001e;jY&!\u0011qZAe\u0005)\u0001&o\u001c9feRLWm]\u0001\u001cGJ,\u0017\r^3SK\u0006$7i\\7nSR$X\rZ\"p]N,X.\u001a:\u0015\u000fi\f).!7\u0002^\"I\u0011q[\u0013\u0011\u0002\u0003\u0007\u0011\u0011U\u0001\u0006OJ|W\u000f\u001d\u0005\t\u00037,\u0003\u0013!a\u0001\u007f\u0005qQ.\u0019=Q_2d'+Z2pe\u0012\u001c\b\"CApKA\u0005\t\u0019AAc\u0003\u0015\u0001(o\u001c9t\u0003\u0015\u001a'/Z1uKJ+\u0017\rZ\"p[6LG\u000f^3e\u0007>t7/^7fe\u0012\"WMZ1vYR$\u0013'\u0006\u0002\u0002f*\"\u0011\u0011UAtW\t\tI\u000f\u0005\u0003\u0002l\u0006UXBAAw\u0015\u0011\ty/!=\u0002\u0013Ut7\r[3dW\u0016$'bAAz\u0003\u0006Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005]\u0018Q\u001e\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017!J2sK\u0006$XMU3bI\u000e{W.\\5ui\u0016$7i\u001c8tk6,'\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t\tiPK\u0002@\u0003O\fQe\u0019:fCR,'+Z1e\u0007>lW.\u001b;uK\u0012\u001cuN\\:v[\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001a\u0016\u0005\t\r!\u0006BAc\u0003O\fQd\u0019:fCR,'+Z1e+:\u001cw.\\7jiR,GmQ8ogVlWM\u001d\u000b\u0004u\n%\u0001bBAlS\u0001\u0007\u0011\u0011U\u0001\u001cGJ,\u0017\r^3Ue\u0006t7/Y2uS>t\u0017\r\u001c)s_\u0012,8-\u001a:\u0015\u000b\r\u0014yAa\u0005\t\u000f\tE!\u00061\u0001\u0002\"\u0006yAO]1og\u0006\u001cG/[8oC2LE\rC\u0005\u0003\u0016)\u0002\n\u00111\u0001\u0003\u0018\u0005!BO]1og\u0006\u001cG/[8o)&lWm\\;u\u001bN\u00042\u0001\u0011B\r\u0013\r\u0011Y\"\u0011\u0002\u0005\u0019>tw-A\u0013de\u0016\fG/\u001a+sC:\u001c\u0018m\u0019;j_:\fG\u000e\u0015:pIV\u001cWM\u001d\u0013eK\u001a\fW\u000f\u001c;%eU\u0011!\u0011\u0005\u0016\u0005\u0005/\t9\u000f")
public class TransactionsTest
extends KafkaServerTestHarness {
    private final int numServers;
    private final int transactionalProducerCount;
    private final int transactionalConsumerCount;
    private final int nonTransactionalConsumerCount;
    private final String topic1;
    private final String topic2;
    private final Buffer<KafkaProducer<byte[], byte[]>> transactionalProducers = (Buffer)Buffer$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final Buffer<KafkaConsumer<byte[], byte[]>> transactionalConsumers = (Buffer)Buffer$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final Buffer<KafkaConsumer<byte[], byte[]>> nonTransactionalConsumers = (Buffer)Buffer$.MODULE$.apply((Seq)Nil$.MODULE$);

    public int numServers() {
        return this.numServers;
    }

    public int transactionalProducerCount() {
        return this.transactionalProducerCount;
    }

    public int transactionalConsumerCount() {
        return this.transactionalConsumerCount;
    }

    public int nonTransactionalConsumerCount() {
        return this.nonTransactionalConsumerCount;
    }

    public String topic1() {
        return this.topic1;
    }

    public String topic2() {
        return this.topic2;
    }

    public Buffer<KafkaProducer<byte[], byte[]>> transactionalProducers() {
        return this.transactionalProducers;
    }

    public Buffer<KafkaConsumer<byte[], byte[]>> transactionalConsumers() {
        return this.transactionalConsumers;
    }

    public Buffer<KafkaConsumer<byte[], byte[]>> nonTransactionalConsumers() {
        return this.nonTransactionalConsumers;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(this.numServers(), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1 & Serializable & scala.Serializable)x$1 -> KafkaConfig$.MODULE$.fromProps(x$1, this.serverProps()), Seq$.MODULE$.canBuildFrom());
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        int numPartitions = 4;
        Properties topicConfig = new Properties();
        topicConfig.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), ((Object)BoxesRunTime.boxToInteger((int)2)).toString());
        this.createTopic(this.topic1(), numPartitions, this.numServers(), topicConfig);
        this.createTopic(this.topic2(), numPartitions, this.numServers(), topicConfig);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.transactionalProducerCount()).foreach((Function1 & Serializable & scala.Serializable)_ -> this.createTransactionalProducer("transactional-producer", this.createTransactionalProducer$default$2()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.transactionalConsumerCount()).foreach((Function1 & Serializable & scala.Serializable)_ -> this.createReadCommittedConsumer("transactional-group", this.createReadCommittedConsumer$default$2(), this.createReadCommittedConsumer$default$3()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.nonTransactionalConsumerCount()).foreach((Function1 & Serializable & scala.Serializable)_ -> this.createReadUncommittedConsumer("non-transactional-group"));
    }

    @Override
    @After
    public void tearDown() {
        this.transactionalProducers().foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
            x$2.close();
            return BoxedUnit.UNIT;
        });
        this.transactionalConsumers().foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
            x$3.close();
            return BoxedUnit.UNIT;
        });
        this.nonTransactionalConsumers().foreach((Function1 & Serializable & scala.Serializable)x$4 -> {
            x$4.close();
            return BoxedUnit.UNIT;
        });
        super.tearDown();
    }

    @Test
    public void testBasicTransactions() {
        KafkaProducer producer = (KafkaProducer)this.transactionalProducers().head();
        KafkaConsumer consumer = (KafkaConsumer)this.transactionalConsumers().head();
        KafkaConsumer unCommittedConsumer = (KafkaConsumer)this.nonTransactionalConsumers().head();
        producer.initTransactions();
        producer.beginTransaction();
        producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "2", "2", false));
        producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "4", "4", false));
        producer.flush();
        producer.abortTransaction();
        producer.beginTransaction();
        producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "1", true));
        producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "3", "3", true));
        producer.commitTransaction();
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$))).asJava());
        unCommittedConsumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$))).asJava());
        Seq records = TestUtils$.MODULE$.consumeRecords(consumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        records.foreach((Function1 & Serializable & scala.Serializable)record -> TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record));
        Seq allRecords = TestUtils$.MODULE$.consumeRecords(unCommittedConsumer, 4, TestUtils$.MODULE$.consumeRecords$default$3());
        scala.collection.immutable.Set expectedValues = new .colon.colon((Object)"1", (List)new .colon.colon((Object)"2", (List)new .colon.colon((Object)"3", (List)new .colon.colon((Object)"4", (List)Nil$.MODULE$)))).toSet();
        allRecords.foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsTest.$anonfun$testBasicTransactions$2(expectedValues, record);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testReadCommittedConsumerShouldNotSeeUndecidedData() {
        KafkaProducer producer1 = (KafkaProducer)this.transactionalProducers().head();
        KafkaProducer<byte[], byte[]> producer2 = this.createTransactionalProducer("other", this.createTransactionalProducer$default$2());
        KafkaConsumer readCommittedConsumer = (KafkaConsumer)this.transactionalConsumers().head();
        KafkaConsumer readUncommittedConsumer = (KafkaConsumer)this.nonTransactionalConsumers().head();
        producer1.initTransactions();
        producer2.initTransactions();
        producer1.beginTransaction();
        producer2.beginTransaction();
        long latestVisibleTimestamp = System.currentTimeMillis();
        producer2.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestVisibleTimestamp), (Object)"x".getBytes(), (Object)"1".getBytes()));
        producer2.send(new ProducerRecord(this.topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestVisibleTimestamp), (Object)"x".getBytes(), (Object)"1".getBytes()));
        producer2.flush();
        long latestWrittenTimestamp = latestVisibleTimestamp + 1L;
        producer1.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestWrittenTimestamp), (Object)"a".getBytes(), (Object)"1".getBytes()));
        producer1.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestWrittenTimestamp), (Object)"b".getBytes(), (Object)"2".getBytes()));
        producer1.send(new ProducerRecord(this.topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestWrittenTimestamp), (Object)"c".getBytes(), (Object)"3".getBytes()));
        producer1.send(new ProducerRecord(this.topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestWrittenTimestamp), (Object)"d".getBytes(), (Object)"4".getBytes()));
        producer1.flush();
        producer2.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestWrittenTimestamp), (Object)"x".getBytes(), (Object)"2".getBytes()));
        producer2.send(new ProducerRecord(this.topic2(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(latestWrittenTimestamp), (Object)"x".getBytes(), (Object)"2".getBytes()));
        producer2.commitTransaction();
        TopicPartition tp1 = new TopicPartition(this.topic1(), 0);
        TopicPartition tp2 = new TopicPartition(this.topic2(), 0);
        readUncommittedConsumer.assign((Collection)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp1, tp2}))).asJava());
        TestUtils$.MODULE$.consumeRecords(readUncommittedConsumer, 8, TestUtils$.MODULE$.consumeRecords$default$3());
        java.util.Map readUncommittedOffsetsForTimes = readUncommittedConsumer.offsetsForTimes((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp1), (Object)Predef$.MODULE$.long2Long(latestWrittenTimestamp)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp2), (Object)Predef$.MODULE$.long2Long(latestWrittenTimestamp))}))).asJava());
        Assert.assertEquals((long)2L, (long)readUncommittedOffsetsForTimes.size());
        Assert.assertEquals((long)latestWrittenTimestamp, (long)((OffsetAndTimestamp)readUncommittedOffsetsForTimes.get(tp1)).timestamp());
        Assert.assertEquals((long)latestWrittenTimestamp, (long)((OffsetAndTimestamp)readUncommittedOffsetsForTimes.get(tp2)).timestamp());
        readUncommittedConsumer.unsubscribe();
        readCommittedConsumer.assign((Collection)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp1, tp2}))).asJava());
        Seq records = TestUtils$.MODULE$.consumeRecords(readCommittedConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        records.foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsTest.$anonfun$testReadCommittedConsumerShouldNotSeeUndecidedData$1(record);
            return BoxedUnit.UNIT;
        });
        Assert.assertEquals((long)2L, (long)readCommittedConsumer.assignment().size());
        readCommittedConsumer.seekToEnd((Collection)readCommittedConsumer.assignment());
        ((IterableLike)JavaConverters$.MODULE$.asScalaSetConverter(readCommittedConsumer.assignment()).asScala()).foreach((Function1 & Serializable & scala.Serializable)tp -> {
            TransactionsTest.$anonfun$testReadCommittedConsumerShouldNotSeeUndecidedData$2(readCommittedConsumer, tp);
            return BoxedUnit.UNIT;
        });
        java.util.Map readCommittedOffsetsForTimes = readCommittedConsumer.offsetsForTimes((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp1), (Object)Predef$.MODULE$.long2Long(latestWrittenTimestamp)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp2), (Object)Predef$.MODULE$.long2Long(latestWrittenTimestamp))}))).asJava());
        Assert.assertNull(readCommittedOffsetsForTimes.get(tp1));
        Assert.assertNull(readCommittedOffsetsForTimes.get(tp2));
    }

    @Test
    public void testDelayedFetchIncludesAbortedTransaction() {
        KafkaProducer producer1 = (KafkaProducer)this.transactionalProducers().head();
        KafkaProducer<byte[], byte[]> producer2 = this.createTransactionalProducer("other", this.createTransactionalProducer$default$2());
        producer1.initTransactions();
        producer2.initTransactions();
        producer1.beginTransaction();
        producer2.beginTransaction();
        producer2.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), (Object)"x".getBytes(), (Object)"1".getBytes()));
        producer2.flush();
        producer1.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), (Object)"y".getBytes(), (Object)"1".getBytes()));
        producer1.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), (Object)"y".getBytes(), (Object)"2".getBytes()));
        producer1.flush();
        producer2.send(new ProducerRecord(this.topic1(), Predef$.MODULE$.int2Integer(0), (Object)"x".getBytes(), (Object)"2".getBytes()));
        producer2.flush();
        producer1.abortTransaction();
        producer2.commitTransaction();
        Properties consumerProps = new Properties();
        consumerProps.put("fetch.min.bytes", "100000");
        consumerProps.put("fetch.max.wait.ms", "100");
        Properties x$1 = consumerProps;
        String x$2 = this.createReadCommittedConsumer$default$1();
        int x$3 = this.createReadCommittedConsumer$default$2();
        KafkaConsumer<byte[], byte[]> readCommittedConsumer = this.createReadCommittedConsumer(x$2, x$3, x$1);
        readCommittedConsumer.assign((Collection)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{new TopicPartition(this.topic1(), 0)}))).asJava());
        Seq<ConsumerRecord<byte[], byte[]>> records = TestUtils$.MODULE$.consumeRecords(readCommittedConsumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        Assert.assertEquals((long)2L, (long)records.size());
        ConsumerRecord first = (ConsumerRecord)records.head();
        Assert.assertEquals((Object)"x", (Object)new String((byte[])first.key()));
        Assert.assertEquals((Object)"1", (Object)new String((byte[])first.value()));
        Assert.assertEquals((long)0L, (long)first.offset());
        ConsumerRecord second = (ConsumerRecord)records.last();
        Assert.assertEquals((Object)"x", (Object)new String((byte[])second.key()));
        Assert.assertEquals((Object)"2", (Object)new String((byte[])second.value()));
        Assert.assertEquals((long)3L, (long)second.offset());
    }

    @Test
    public void testSendOffsets() {
        String consumerGroupId = "foobar-consumer-group";
        int numSeedMessages = 500;
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.topic1(), numSeedMessages, (Seq<KafkaServer>)this.servers());
        KafkaProducer producer = (KafkaProducer)this.transactionalProducers().apply(0);
        KafkaConsumer<byte[], byte[]> consumer = this.createReadCommittedConsumer(consumerGroupId, numSeedMessages / 4, this.createReadCommittedConsumer$default$3());
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)Nil$.MODULE$)).asJava());
        producer.initTransactions();
        BooleanRef shouldCommit = BooleanRef.create((boolean)false);
        IntRef recordsProcessed = IntRef.create((int)0);
        try {
            while (recordsProcessed.elem < numSeedMessages) {
                Seq<ConsumerRecord<byte[], byte[]>> records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed.elem), TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                producer.beginTransaction();
                shouldCommit.elem = !shouldCommit.elem;
                records.foreach((Function1 & Serializable & scala.Serializable)record -> {
                    String key = new String((byte[])record.key(), "UTF-8");
                    String value = new String((byte[])record.value(), "UTF-8");
                    return producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), key, value, shouldCommit$1.elem));
                });
                producer.sendOffsetsToTransaction((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions(consumer)).asJava(), consumerGroupId);
                if (shouldCommit.elem) {
                    producer.commitTransaction();
                    recordsProcessed.elem += records.size();
                    this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("committed transaction.. Last committed record: ").append(new String((byte[])((ConsumerRecord)records.last()).value(), "UTF-8")).append(". Num ").append(new StringBuilder(21).append("records written to ").append(this.topic2()).append(": ").append(recordsProcessed$1.elem).toString()).toString());
                    continue;
                }
                producer.abortTransaction();
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("aborted transaction Last committed record: ").append(new String((byte[])((ConsumerRecord)records.last()).value(), "UTF-8")).append(". Num ").append(new StringBuilder(21).append("records written to ").append(this.topic2()).append(": ").append(recordsProcessed$1.elem).toString()).toString());
                TestUtils$.MODULE$.resetToCommittedPositions(consumer);
            }
        }
        finally {
            consumer.close();
        }
        KafkaConsumer verifyingConsumer = (KafkaConsumer)this.transactionalConsumers().apply(0);
        verifyingConsumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$)).asJava());
        Seq valueSeq = (Seq)TestUtils$.MODULE$.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).map((Function1 & Serializable & scala.Serializable)record -> BoxesRunTime.boxToInteger((int)TransactionsTest.$anonfun$testSendOffsets$4(record)), Seq$.MODULE$.canBuildFrom());
        scala.collection.immutable.Set valueSet = valueSeq.toSet();
        Assert.assertEquals((String)new StringBuilder(21).append("Expected ").append(numSeedMessages).append(" values in ").append(this.topic2()).append(".").toString(), (long)numSeedMessages, (long)valueSeq.size());
        Assert.assertEquals((String)new StringBuilder(30).append("Expected ").append(valueSeq.size()).append(" unique messages in ").append(this.topic2()).append(".").toString(), (long)valueSeq.size(), (long)valueSet.size());
    }

    @Test
    public void testFencingOnCommit() {
        KafkaProducer producer1 = (KafkaProducer)this.transactionalProducers().apply(0);
        KafkaProducer producer2 = (KafkaProducer)this.transactionalProducers().apply(1);
        KafkaConsumer consumer = (KafkaConsumer)this.transactionalConsumers().apply(0);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$))).asJava());
        producer1.initTransactions();
        producer1.beginTransaction();
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "1", false));
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "3", "3", false));
        producer2.initTransactions();
        producer2.beginTransaction();
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "2", "4", true));
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "2", "4", true));
        try {
            producer1.commitTransaction();
            throw this.fail("Should not be able to commit transactions from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 313));
        }
        catch (ProducerFencedException producerFencedException) {
        }
        catch (Exception e) {
            throw this.fail("Got an unexpected exception from a fenced producer.", e, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 318));
        }
        producer2.commitTransaction();
        Seq records = TestUtils$.MODULE$.consumeRecords(consumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        records.foreach((Function1 & Serializable & scala.Serializable)record -> TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record));
    }

    @Test
    public void testFencingOnSendOffsets() {
        KafkaProducer producer1 = (KafkaProducer)this.transactionalProducers().apply(0);
        KafkaProducer producer2 = (KafkaProducer)this.transactionalProducers().apply(1);
        KafkaConsumer consumer = (KafkaConsumer)this.transactionalConsumers().apply(0);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$))).asJava());
        producer1.initTransactions();
        producer1.beginTransaction();
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "1", false));
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "3", "3", false));
        producer2.initTransactions();
        producer2.beginTransaction();
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "2", "4", true));
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "2", "4", true));
        try {
            producer1.sendOffsetsToTransaction((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("foobartopic", 0)), (Object)new OffsetAndMetadata(110L))}))).asJava(), "foobarGroup");
            throw this.fail("Should not be able to send offsets from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 351));
        }
        catch (ProducerFencedException producerFencedException) {
        }
        catch (Exception e) {
            throw this.fail("Got an unexpected exception from a fenced producer.", e, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 356));
        }
        producer2.commitTransaction();
        Seq records = TestUtils$.MODULE$.consumeRecords(consumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        records.foreach((Function1 & Serializable & scala.Serializable)record -> TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record));
    }

    @Test
    public void testOffsetMetadataInSendOffsetsToTransaction() {
        TopicPartition tp = new TopicPartition(this.topic1(), 0);
        String groupId = "group";
        KafkaProducer producer = (KafkaProducer)this.transactionalProducers().head();
        KafkaConsumer<byte[], byte[]> consumer = this.createReadCommittedConsumer(groupId, this.createReadCommittedConsumer$default$2(), this.createReadCommittedConsumer$default$3());
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)Nil$.MODULE$)).asJava());
        producer.initTransactions();
        producer.beginTransaction();
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(110L, Optional.of(Predef$.MODULE$.int2Integer(15)), "some metadata");
        producer.sendOffsetsToTransaction((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)offsetAndMetadata)}))).asJava(), groupId);
        producer.commitTransaction();
        KafkaProducer producer2 = (KafkaProducer)this.transactionalProducers().apply(1);
        producer2.initTransactions();
        Assert.assertEquals((Object)offsetAndMetadata, (Object)consumer.committed(tp));
    }

    @Test
    public void testFencingOnSend() {
        KafkaProducer producer1 = (KafkaProducer)this.transactionalProducers().apply(0);
        KafkaProducer producer2 = (KafkaProducer)this.transactionalProducers().apply(1);
        KafkaConsumer consumer = (KafkaConsumer)this.transactionalConsumers().apply(0);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$))).asJava());
        producer1.initTransactions();
        producer1.beginTransaction();
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "1", false));
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "3", "3", false));
        producer2.initTransactions();
        producer2.beginTransaction();
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "2", "4", true)).get();
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "2", "4", true)).get();
        try {
            Future result = producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "5", false));
            RecordMetadata recordMetadata = (RecordMetadata)result.get();
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(69).append("Missed a producer fenced exception when writing to ").append(recordMetadata.topic()).append("-").append(recordMetadata.partition()).append(". Grab the logs!!").toString());
            this.servers().foreach((Function1 & Serializable & scala.Serializable)server -> {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(10).append("log dirs: ").append(((IterableLike)server.logManager().liveLogDirs().map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.getAbsolutePath(), Seq$.MODULE$.canBuildFrom())).head()).toString());
                return BoxedUnit.UNIT;
            });
            throw this.fail("Should not be able to send messages from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 418));
        }
        catch (ProducerFencedException producerFencedException) {
            producer1.close();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof ProducerFencedException));
        }
        catch (Exception e) {
            throw this.fail("Got an unexpected exception from a fenced producer.", e, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 425));
        }
        producer2.commitTransaction();
        Seq records = TestUtils$.MODULE$.consumeRecords(consumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        records.foreach((Function1 & Serializable & scala.Serializable)record -> TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record));
    }

    @Test
    public void testFencingOnAddPartitions() {
        KafkaProducer producer1 = (KafkaProducer)this.transactionalProducers().apply(0);
        KafkaProducer producer2 = (KafkaProducer)this.transactionalProducers().apply(1);
        KafkaConsumer consumer = (KafkaConsumer)this.transactionalConsumers().apply(0);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$))).asJava());
        producer1.initTransactions();
        producer1.beginTransaction();
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "1", false));
        producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "3", "3", false));
        producer1.abortTransaction();
        producer2.initTransactions();
        producer2.beginTransaction();
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "2", "4", true)).get(20L, TimeUnit.SECONDS);
        producer2.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "2", "4", true)).get(20L, TimeUnit.SECONDS);
        try {
            producer1.beginTransaction();
            Future result = producer1.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "5", false));
            RecordMetadata recordMetadata = (RecordMetadata)result.get();
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(69).append("Missed a producer fenced exception when writing to ").append(recordMetadata.topic()).append("-").append(recordMetadata.partition()).append(". Grab the logs!!").toString());
            this.servers().foreach((Function1 & Serializable & scala.Serializable)server -> {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(10).append("log dirs: ").append(((IterableLike)server.logManager().liveLogDirs().map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.getAbsolutePath(), Seq$.MODULE$.canBuildFrom())).head()).toString());
                return BoxedUnit.UNIT;
            });
            throw this.fail("Should not be able to send messages from a fenced producer.", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 465));
        }
        catch (ProducerFencedException producerFencedException) {
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof ProducerFencedException));
        }
        catch (Exception e) {
            throw this.fail("Got an unexpected exception from a fenced producer.", e, new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 471));
        }
        producer2.commitTransaction();
        Seq records = TestUtils$.MODULE$.consumeRecords(consumer, 2, TestUtils$.MODULE$.consumeRecords$default$3());
        records.foreach((Function1 & Serializable & scala.Serializable)record -> TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record));
    }

    @Test
    public void testFencingOnTransactionExpiration() {
        KafkaProducer<byte[], byte[]> producer = this.createTransactionalProducer("expiringProducer", 100L);
        producer.initTransactions();
        producer.beginTransaction();
        RecordMetadata firstMessageResult = (RecordMetadata)producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic1(), "1", "1", false)).get();
        Assert.assertTrue((boolean)firstMessageResult.hasOffset());
        Thread.sleep(600L);
        try {
            producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.topic2(), "2", "2", false)).get();
            throw this.fail("should have raised a ProducerFencedException since the transaction has expired", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 499));
        }
        catch (ProducerFencedException producerFencedException) {
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof ProducerFencedException));
        }
        KafkaConsumer nonTransactionalConsumer = (KafkaConsumer)this.nonTransactionalConsumers().apply(0);
        nonTransactionalConsumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)Nil$.MODULE$)).asJava());
        Seq records = TestUtils$.MODULE$.consumeRecordsFor(nonTransactionalConsumer, 1000L);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((Object)"1", (Object)TestUtils$.MODULE$.recordValueAsString((ConsumerRecord<byte[], byte[]>)((ConsumerRecord)records.head())));
        KafkaConsumer transactionalConsumer = (KafkaConsumer)this.transactionalConsumers().head();
        transactionalConsumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)this.topic1(), (List)Nil$.MODULE$)).asJava());
        Seq transactionalRecords = TestUtils$.MODULE$.consumeRecordsFor(transactionalConsumer, 1000L);
        Assert.assertTrue((boolean)transactionalRecords.isEmpty());
    }

    @Test
    public void testMultipleMarkersOneLeader() {
        KafkaProducer firstProducer = (KafkaProducer)this.transactionalProducers().head();
        KafkaConsumer consumer = (KafkaConsumer)this.transactionalConsumers().head();
        KafkaConsumer unCommittedConsumer = (KafkaConsumer)this.nonTransactionalConsumers().head();
        String topicWith10Partitions = "largeTopic";
        String topicWith10PartitionsAndOneReplica = "largeTopicOneReplica";
        Properties topicConfig = new Properties();
        topicConfig.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), ((Object)BoxesRunTime.boxToInteger((int)2)).toString());
        this.createTopic(topicWith10Partitions, 10, this.numServers(), topicConfig);
        this.createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties());
        firstProducer.initTransactions();
        firstProducer.beginTransaction();
        this.sendTransactionalMessagesWithValueRange((KafkaProducer<byte[], byte[]>)firstProducer, topicWith10Partitions, 0, 5000, false);
        this.sendTransactionalMessagesWithValueRange((KafkaProducer<byte[], byte[]>)firstProducer, topicWith10PartitionsAndOneReplica, 5000, 10000, false);
        firstProducer.abortTransaction();
        firstProducer.beginTransaction();
        this.sendTransactionalMessagesWithValueRange((KafkaProducer<byte[], byte[]>)firstProducer, topicWith10Partitions, 10000, 11000, true);
        firstProducer.commitTransaction();
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)topicWith10PartitionsAndOneReplica, (List)new .colon.colon((Object)topicWith10Partitions, (List)Nil$.MODULE$))).asJava());
        unCommittedConsumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)topicWith10PartitionsAndOneReplica, (List)new .colon.colon((Object)topicWith10Partitions, (List)Nil$.MODULE$))).asJava());
        Seq records = TestUtils$.MODULE$.consumeRecords(consumer, 1000, TestUtils$.MODULE$.consumeRecords$default$3());
        records.foreach((Function1 & Serializable & scala.Serializable)record -> TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record));
        Seq allRecords = TestUtils$.MODULE$.consumeRecords(unCommittedConsumer, 11000, TestUtils$.MODULE$.consumeRecords$default$3());
        scala.collection.immutable.Set expectedValues = ((TraversableOnce)package$.MODULE$.Range().apply(0, 11000).map((Function1 & Serializable & scala.Serializable)x$7 -> TransactionsTest.$anonfun$testMultipleMarkersOneLeader$2(BoxesRunTime.unboxToInt((Object)x$7)), IndexedSeq$.MODULE$.canBuildFrom())).toSet();
        allRecords.foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsTest.$anonfun$testMultipleMarkersOneLeader$3(expectedValues, record);
            return BoxedUnit.UNIT;
        });
    }

    @Test(expected=KafkaException.class)
    public void testConsecutivelyRunInitTransactions() {
        KafkaProducer<byte[], byte[]> producer = this.createTransactionalProducer("normalProducer", this.createTransactionalProducer$default$2());
        try {
            producer.initTransactions();
            producer.initTransactions();
            throw this.fail("Should have raised a KafkaException", new Position("TransactionsTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 566));
        }
        catch (Throwable throwable) {
            producer.close();
            throw throwable;
        }
    }

    private void sendTransactionalMessagesWithValueRange(KafkaProducer<byte[], byte[]> producer, String topic, int start, int end, boolean willBeCommitted) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(start), end).foreach((Function1 & Serializable & scala.Serializable)i -> producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(topic, ((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)i))).toString(), ((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)i))).toString(), willBeCommitted)));
        producer.flush();
    }

    /*
     * WARNING - void declaration
     */
    private Properties serverProps() {
        void var1_1;
        Properties serverProps = new Properties();
        serverProps.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)false)).toString());
        serverProps.put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        serverProps.put(KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), ((Object)BoxesRunTime.boxToInteger((int)3)).toString());
        serverProps.put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), ((Object)BoxesRunTime.boxToInteger((int)2)).toString());
        serverProps.put(KafkaConfig$.MODULE$.TransactionsTopicMinISRProp(), ((Object)BoxesRunTime.boxToInteger((int)2)).toString());
        serverProps.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)true)).toString());
        serverProps.put(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)false)).toString());
        serverProps.put(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)false)).toString());
        serverProps.put(KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), "0");
        serverProps.put(KafkaConfig$.MODULE$.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp(), "200");
        return var1_1;
    }

    private KafkaConsumer<byte[], byte[]> createReadCommittedConsumer(String group, int maxPollRecords, Properties props) {
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        String x$2 = group;
        boolean x$3 = false;
        boolean x$4 = true;
        int x$5 = maxPollRecords;
        String x$6 = TestUtils$.MODULE$.createConsumer$default$3();
        SecurityProtocol x$7 = TestUtils$.MODULE$.createConsumer$default$7();
        Option<File> x$8 = TestUtils$.MODULE$.createConsumer$default$8();
        Option<Properties> x$9 = TestUtils$.MODULE$.createConsumer$default$9();
        ByteArrayDeserializer x$10 = TestUtils$.MODULE$.createConsumer$default$10();
        ByteArrayDeserializer x$11 = TestUtils$.MODULE$.createConsumer$default$11();
        KafkaConsumer consumer = TestUtils$.MODULE$.createConsumer(x$1, x$2, x$6, x$3, x$4, x$5, x$7, x$8, x$9, x$10, x$11);
        this.transactionalConsumers().$plus$eq(consumer);
        return consumer;
    }

    private String createReadCommittedConsumer$default$1() {
        return "group";
    }

    private int createReadCommittedConsumer$default$2() {
        return 500;
    }

    private Properties createReadCommittedConsumer$default$3() {
        return new Properties();
    }

    /*
     * WARNING - void declaration
     */
    private KafkaConsumer<byte[], byte[]> createReadUncommittedConsumer(String group) {
        void var2_13;
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        String x$2 = group;
        boolean x$3 = false;
        String x$4 = TestUtils$.MODULE$.createConsumer$default$3();
        boolean x$5 = TestUtils$.MODULE$.createConsumer$default$5();
        int x$6 = TestUtils$.MODULE$.createConsumer$default$6();
        SecurityProtocol x$7 = TestUtils$.MODULE$.createConsumer$default$7();
        Option<File> x$8 = TestUtils$.MODULE$.createConsumer$default$8();
        Option<Properties> x$9 = TestUtils$.MODULE$.createConsumer$default$9();
        ByteArrayDeserializer x$10 = TestUtils$.MODULE$.createConsumer$default$10();
        ByteArrayDeserializer x$11 = TestUtils$.MODULE$.createConsumer$default$11();
        KafkaConsumer consumer = TestUtils$.MODULE$.createConsumer(x$1, x$2, x$4, x$3, x$5, x$6, x$7, x$8, x$9, x$10, x$11);
        this.nonTransactionalConsumers().$plus$eq(consumer);
        return var2_13;
    }

    private KafkaProducer<byte[], byte[]> createTransactionalProducer(String transactionalId, long transactionTimeoutMs) {
        String x$1 = transactionalId;
        Buffer<KafkaServer> x$2 = this.servers();
        long x$3 = transactionTimeoutMs;
        int x$4 = TestUtils$.MODULE$.createTransactionalProducer$default$3();
        KafkaProducer<byte[], byte[]> producer = TestUtils$.MODULE$.createTransactionalProducer(x$1, (Seq<KafkaServer>)x$2, x$4, x$3);
        this.transactionalProducers().$plus$eq(producer);
        return producer;
    }

    private long createTransactionalProducer$default$2() {
        return 60000L;
    }

    public static final /* synthetic */ void $anonfun$testBasicTransactions$2(scala.collection.immutable.Set expectedValues$1, ConsumerRecord record) {
        Assert.assertTrue((boolean)expectedValues$1.contains((Object)TestUtils$.MODULE$.recordValueAsString((ConsumerRecord<byte[], byte[]>)record)));
    }

    public static final /* synthetic */ void $anonfun$testReadCommittedConsumerShouldNotSeeUndecidedData$1(ConsumerRecord record) {
        Assert.assertEquals((Object)"x", (Object)new String((byte[])record.key()));
        Assert.assertEquals((Object)"1", (Object)new String((byte[])record.value()));
    }

    public static final /* synthetic */ void $anonfun$testReadCommittedConsumerShouldNotSeeUndecidedData$2(KafkaConsumer readCommittedConsumer$1, TopicPartition tp) {
        Assert.assertEquals((long)1L, (long)readCommittedConsumer$1.position(tp));
    }

    public static final /* synthetic */ int $anonfun$testSendOffsets$4(ConsumerRecord record) {
        return new StringOps(Predef$.MODULE$.augmentString(TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record))).toInt();
    }

    public static final /* synthetic */ String $anonfun$testMultipleMarkersOneLeader$2(int x$7) {
        return ((Object)BoxesRunTime.boxToInteger((int)x$7)).toString();
    }

    public static final /* synthetic */ void $anonfun$testMultipleMarkersOneLeader$3(scala.collection.immutable.Set expectedValues$2, ConsumerRecord record) {
        Assert.assertTrue((boolean)expectedValues$2.contains((Object)TestUtils$.MODULE$.recordValueAsString((ConsumerRecord<byte[], byte[]>)record)));
    }

    public TransactionsTest() {
        this.numServers = 3;
        this.transactionalProducerCount = 2;
        this.transactionalConsumerCount = 1;
        this.nonTransactionalConsumerCount = 1;
        this.topic1 = "topic1";
        this.topic2 = "topic2";
    }
}

