package org.apache.kafka.connect.integration;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.class */
public class SessionedProtocolIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(SessionedProtocolIntegrationTest.class);
    private static final String CONNECTOR_NAME = "connector";
    private static final long CONNECTOR_SETUP_DURATION_MS = 60000;
    private EmbeddedConnectCluster connect;
    private ConnectorHandle connectorHandle;

    @BeforeEach
    public void setup() {
        HashMap hashMap = new HashMap();
        hashMap.put("connect.protocol", ConnectProtocolCompatibility.SESSIONED.protocol());
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(2).numBrokers(1).workerProps(hashMap).build();
        this.connect.start();
        this.connectorHandle = RuntimeHandles.get().connectorHandle("connector");
    }

    @AfterEach
    public void close() {
        this.connect.stop();
    }

    @Test
    public void ensureInternalEndpointIsSecured() throws Throwable {
        String endpointForResource = this.connect.endpointForResource(String.format("connectors/%s/tasks", "connector"));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("X-Connect-Authorization", "S2Fma2Flc3F1ZQ==");
        hashMap2.put("X-Connect-Request-Signature-Algorithm", "HmacSHA256");
        log.info("Making a POST request to the {} endpoint with no connector started and no signature header; expecting 400 error response", endpointForResource);
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), this.connect.requestPost(endpointForResource, "[]", hashMap).getStatus());
        log.info("Making a POST request to the {} endpoint with no connector started and an invalid signature header; expecting 403 error response", endpointForResource);
        Assertions.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), this.connect.requestPost(endpointForResource, "[]", hashMap2).getStatus());
        HashMap hashMap3 = new HashMap();
        hashMap3.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap3.put("tasks.max", String.valueOf(1));
        hashMap3.put("topics", "test-topic");
        hashMap3.put("key.converter", StringConverter.class.getName());
        hashMap3.put("value.converter", StringConverter.class.getName());
        log.info("Starting the {} connector", "connector");
        StartAndStopLatch expectedStarts = this.connectorHandle.expectedStarts(1);
        this.connect.configureConnector("connector", hashMap3);
        expectedStarts.await(60000L, TimeUnit.MILLISECONDS);
        log.info("Making a POST request to the {} endpoint with the connector started and no signature header; expecting 400 error response", endpointForResource);
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), this.connect.requestPost(endpointForResource, "[]", hashMap).getStatus());
        log.info("Making a POST request to the {} endpoint with the connector started and an invalid signature header; expecting 403 error response", endpointForResource);
        Assertions.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), this.connect.requestPost(endpointForResource, "[]", hashMap2).getStatus());
    }
}
