Java源码示例:org.apache.kafka.common.acl.AclBinding

示例1
public Map<String, Collection<AclBinding>> fetchAclsList() {
  Map<String, Collection<AclBinding>> acls = new HashMap<>();

  try {
    Collection<AclBinding> list = adminClient.describeAcls(AclBindingFilter.ANY).values().get();
    list.forEach(
        aclBinding -> {
          String name = aclBinding.pattern().name();
          if (acls.get(name) == null) {
            acls.put(name, new ArrayList<>());
          }
          Collection<AclBinding> updatedList = acls.get(name);
          updatedList.add(aclBinding);
          acls.put(name, updatedList);
        });
  } catch (Exception e) {
    return new HashMap<>();
  }
  return acls;
}
 
示例2
public List<AclBinding> setAclsForStreamsApp(
    String principal, String topicPrefix, List<String> readTopics, List<String> writeTopics)
    throws IOException {

  List<AclBinding> acls = new ArrayList<>();

  readTopics.forEach(
      topic -> {
        acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.READ));
      });

  writeTopics.forEach(
      topic -> {
        acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.WRITE));
      });

  acls.add(buildTopicLevelAcl(principal, topicPrefix, PatternType.PREFIXED, AclOperation.ALL));
  createAcls(acls);
  return acls;
}
 
示例3
private void verifyControlCenterAcls(Platform platform)
    throws ExecutionException, InterruptedException {

  List<ControlCenter> c3List = platform.getControlCenter();

  for (ControlCenter c3 : c3List) {
    ResourcePatternFilter resourceFilter =
        new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY);

    AccessControlEntryFilter entryFilter =
        new AccessControlEntryFilter(
            c3.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);

    AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);

    Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

    Assert.assertEquals(16, acls.size());
  }
}
 
示例4
@Test
public void testFromCrdToKafkaAclBinding()   {
    AclRule rule = new AclRuleBuilder()
        .withType(AclRuleType.ALLOW)
        .withResource(aclRuleTopicResource)
        .withHost("127.0.0.1")
        .withOperation(AclOperation.READ)
        .build();

    AclBinding expectedKafkaAclBinding = new AclBinding(
            kafkaResourcePattern,
            new AccessControlEntry(kafkaPrincipal.toString(), "127.0.0.1",
                    org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW)
    );

    assertThat(SimpleAclRule.fromCrd(rule).toKafkaAclBinding(kafkaPrincipal), is(expectedKafkaAclBinding));
}
 
示例5
public TopologyAclBinding(AclBinding binding) {

    AccessControlEntry entry = binding.entry();
    ResourcePattern pattern = binding.pattern();

    this.resourceType = pattern.resourceType();
    this.resourceName = pattern.name();
    this.principal = entry.principal();
    this.operation = entry.operation().name();
    this.pattern = pattern.patternType().name();
    this.host = entry.host();
  }
 
示例6
@Override
public List<TopologyAclBinding> setAclsForConsumers(Collection<String> principals, String topic) {
  return principals.stream()
      .flatMap(
          principal -> {
            List<AclBinding> acls = new ArrayList<>();
            try {
              acls = adminClient.setAclsForConsumer(principal, topic);
            } catch (IOException e) {
              LOGGER.error(e);
            }
            return acls.stream().map(aclBinding -> new TopologyAclBinding(aclBinding));
          })
      .collect(Collectors.toList());
}
 
示例7
@Override
public List<TopologyAclBinding> setAclsForProducers(Collection<String> principals, String topic) {
  return principals.stream()
      .flatMap(
          principal -> {
            List<AclBinding> acls = new ArrayList<>();
            try {
              acls = adminClient.setAclsForProducer(principal, topic);
            } catch (IOException e) {
              LOGGER.error(e);
            }
            return acls.stream().map(aclBinding -> new TopologyAclBinding(aclBinding));
          })
      .collect(Collectors.toList());
}
 
示例8
public List<AclBinding> setAclsForProducer(String principal, String topic) throws IOException {
  List<AclBinding> acls = new ArrayList<>();
  acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.DESCRIBE));
  acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.WRITE));
  createAcls(acls);
  return acls;
}
 
示例9
public List<AclBinding> setAclsForConsumer(String principal, String topic) throws IOException {

    List<AclBinding> acls = new ArrayList<>();
    acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.DESCRIBE));
    acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.READ));
    acls.add(buildGroupLevelAcl(principal, "*", PatternType.LITERAL, AclOperation.READ));
    createAcls(acls);
    return acls;
  }
 
示例10
public List<AclBinding> setAclForSchemaRegistry(String principal) throws IOException {
  List<AclBinding> bindings =
      Arrays.asList(AclOperation.DESCRIBE_CONFIGS, AclOperation.WRITE, AclOperation.READ).stream()
          .map(
              aclOperation -> {
                return buildTopicLevelAcl(
                    principal, "_schemas", PatternType.LITERAL, aclOperation);
              })
          .collect(Collectors.toList());
  createAcls(bindings);
  return bindings;
}
 
示例11
public List<AclBinding> setAclsForControlCenter(String principal, String appId)
    throws IOException {
  List<AclBinding> bindings = new ArrayList<>();

  bindings.add(buildGroupLevelAcl(principal, appId, PatternType.PREFIXED, AclOperation.READ));
  bindings.add(
      buildGroupLevelAcl(principal, appId + "-command", PatternType.PREFIXED, AclOperation.READ));

  Arrays.asList("_confluent-monitoring", "_confluent-command", " _confluent-metrics")
      .forEach(
          topic ->
              Stream.of(
                      AclOperation.WRITE,
                      AclOperation.READ,
                      AclOperation.CREATE,
                      AclOperation.DESCRIBE)
                  .map(
                      aclOperation ->
                          buildTopicLevelAcl(principal, topic, PatternType.LITERAL, aclOperation))
                  .forEach(aclBinding -> bindings.add(aclBinding)));

  Stream.of(AclOperation.WRITE, AclOperation.READ, AclOperation.CREATE, AclOperation.DESCRIBE)
      .map(
          aclOperation ->
              buildTopicLevelAcl(principal, appId, PatternType.PREFIXED, aclOperation))
      .forEach(aclBinding -> bindings.add(aclBinding));

  ResourcePattern resourcePattern =
      new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL);
  AccessControlEntry entry =
      new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW);
  bindings.add(new AclBinding(resourcePattern, entry));

  entry =
      new AccessControlEntry(
          principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW);
  bindings.add(new AclBinding(resourcePattern, entry));
  createAcls(bindings);
  return bindings;
}
 
示例12
public List<AclBinding> setAclsForConnect(
    String principal, String topicPrefix, List<String> readTopics, List<String> writeTopics)
    throws IOException {

  List<AclBinding> acls = new ArrayList<>();

  List<String> topics = Arrays.asList("connect-status", "connect-offsets", "connect-configs");
  for (String topic : topics) {
    acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.READ));
    acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.WRITE));
  }

  ResourcePattern resourcePattern =
      new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL);
  AccessControlEntry entry =
      new AccessControlEntry(principal, "*", AclOperation.CREATE, AclPermissionType.ALLOW);
  acls.add(new AclBinding(resourcePattern, entry));

  resourcePattern = new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL);
  entry = new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW);
  acls.add(new AclBinding(resourcePattern, entry));

  if (readTopics != null) {
    readTopics.forEach(
        topic -> {
          acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.READ));
        });
  }

  if (writeTopics != null) {
    writeTopics.forEach(
        topic -> {
          acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.WRITE));
        });
  }

  createAcls(acls);
  return acls;
}
 
示例13
private void createAcls(Collection<AclBinding> acls) throws IOException {
  try {
    adminClient.createAcls(acls).all().get();
  } catch (ExecutionException | InterruptedException e) {
    LOGGER.error(e);
    throw new IOException(e);
  }
}
 
示例14
private AclBinding buildTopicLevelAcl(
    String principal, String topic, PatternType patternType, AclOperation op) {
  return new AclBuilder(principal)
      .addResource(ResourceType.TOPIC, topic, patternType)
      .addControlEntry("*", op, AclPermissionType.ALLOW)
      .build();
}
 
示例15
private AclBinding buildGroupLevelAcl(
    String principal, String group, PatternType patternType, AclOperation op) {
  return new AclBuilder(principal)
      .addResource(ResourceType.GROUP, group, patternType)
      .addControlEntry("*", op, AclPermissionType.ALLOW)
      .build();
}
 
示例16
private void verifyProducerAcls(List<Producer> producers, String topic)
    throws InterruptedException, ExecutionException {

  for (Producer producer : producers) {
    ResourcePatternFilter resourceFilter = ResourcePatternFilter.ANY;
    AccessControlEntryFilter entryFilter =
        new AccessControlEntryFilter(
            producer.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);

    AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);
    Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

    Assert.assertEquals(2, acls.size());

    List<ResourceType> types =
        acls.stream()
            .map(aclBinding -> aclBinding.pattern().resourceType())
            .collect(Collectors.toList());

    Assert.assertTrue(types.contains(ResourceType.TOPIC));

    List<AclOperation> ops =
        acls.stream()
            .map(aclsBinding -> aclsBinding.entry().operation())
            .collect(Collectors.toList());

    Assert.assertTrue(ops.contains(AclOperation.DESCRIBE));
    Assert.assertTrue(ops.contains(AclOperation.WRITE));
  }
}
 
示例17
private void verifyConsumerAcls(List<Consumer> consumers, String topic)
    throws InterruptedException, ExecutionException {

  for (Consumer consumer : consumers) {
    ResourcePatternFilter resourceFilter = ResourcePatternFilter.ANY;
    AccessControlEntryFilter entryFilter =
        new AccessControlEntryFilter(
            consumer.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);

    AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);
    Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

    Assert.assertEquals(3, acls.size());

    List<ResourceType> types =
        acls.stream()
            .map(aclBinding -> aclBinding.pattern().resourceType())
            .collect(Collectors.toList());

    Assert.assertTrue(types.contains(ResourceType.GROUP));
    Assert.assertTrue(types.contains(ResourceType.TOPIC));

    List<AclOperation> ops =
        acls.stream()
            .map(aclsBinding -> aclsBinding.entry().operation())
            .collect(Collectors.toList());

    Assert.assertTrue(ops.contains(AclOperation.DESCRIBE));
    Assert.assertTrue(ops.contains(AclOperation.READ));
  }
}
 
示例18
Collection<AclBinding> listAcls() {
  final Collection<AclBinding> aclsBindings;
  try {
    aclsBindings = adminClient.describeAcls(new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY))
        .values().get();
  } catch (InterruptedException | ExecutionException e) {
    if (e.getCause() instanceof SecurityDisabledException) {
      return Collections.emptyList();
    } else {
      throw new KafkaAdminClientException(e);
    }
  }
  return aclsBindings;
}
 
示例19
/**
 * Create all ACLs for given user
 */
protected Future<ReconcileResult<Set<SimpleAclRule>>> internalCreate(String username, Set<SimpleAclRule> desired) {
    try {
        Collection<AclBinding> aclBindings = getAclBindings(username, desired);
        adminClient.createAcls(aclBindings).all().get();
    } catch (Exception e) {
        log.error("Adding Acl rules for user {} failed", username, e);
        return Future.failedFuture(e);
    }

    return Future.succeededFuture(ReconcileResult.created(desired));
}
 
示例20
private Collection<AclBinding> getAclBindings(String username, Set<SimpleAclRule> aclRules) {
    KafkaPrincipal principal = new KafkaPrincipal("User", username);
    Collection<AclBinding> aclBindings = new ArrayList<>();
    for (SimpleAclRule rule: aclRules) {
        aclBindings.add(rule.toKafkaAclBinding(principal));
    }
    return aclBindings;
}
 
示例21
/**
 * Returns Set of ACLs applying to single user.
 *
 * @param username  Name of the user.
 * @return The Set of ACLs applying to single user.
 */
public Set<SimpleAclRule> getAcls(String username)   {
    log.debug("Searching for ACL rules of user {}", username);
    Set<SimpleAclRule> result = new HashSet<>();
    KafkaPrincipal principal = new KafkaPrincipal("User", username);

    AclBindingFilter aclBindingFilter = new AclBindingFilter(ResourcePatternFilter.ANY,
        new AccessControlEntryFilter(principal.toString(), null, AclOperation.ANY, AclPermissionType.ANY));

    Collection<AclBinding> aclBindings = null;
    try {
        aclBindings = adminClient.describeAcls(aclBindingFilter).values().get();
    } catch (InterruptedException | ExecutionException e) {
        // Admin Client API needs authorizer enabled on the Kafka brokers
        if (e.getCause() instanceof SecurityDisabledException) {
            throw new InvalidResourceException("Authorization needs to be enabled in the Kafka custom resource", e.getCause());
        } else if (e.getCause() instanceof UnknownServerException && e.getMessage().contains("Simple ACL delegation not enabled")) {
            throw new InvalidResourceException("Simple ACL delegation needs to be enabled in the Kafka custom resource", e.getCause());
        }
    }

    if (aclBindings != null) {
        log.debug("ACL rules for user {}", username);
        for (AclBinding aclBinding : aclBindings) {
            log.debug("{}", aclBinding);
            result.add(SimpleAclRule.fromAclBinding(aclBinding));
        }
    }

    return result;
}
 
示例22
/**
 * Returns set with all usernames which have some ACLs.
 *
 * @return The set with all usernames which have some ACLs.
 */
public Set<String> getUsersWithAcls()   {
    Set<String> result = new HashSet<>();
    Set<String> ignored = new HashSet<>(IGNORED_USERS.size());

    log.debug("Searching for Users with any ACL rules");

    Collection<AclBinding> aclBindings;
    try {
        aclBindings = adminClient.describeAcls(AclBindingFilter.ANY).values().get();
    } catch (InterruptedException | ExecutionException e) {
        return result;
    }

    for (AclBinding aclBinding : aclBindings) {
        KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal(aclBinding.entry().principal());

        if (KafkaPrincipal.USER_TYPE.equals(principal.getPrincipalType()))  {
            // Username in ACL might keep different format (for example based on user's subject) and need to be decoded
            String username = KafkaUserModel.decodeUsername(principal.getName());

            if (IGNORED_USERS.contains(username))   {
                if (!ignored.contains(username)) {
                    // This info message is loged only once per reocnciliation even if there are multiple rules
                    log.info("Existing ACLs for user '{}' will be ignored.", username);
                    ignored.add(username);
                }
            } else {
                if (log.isTraceEnabled()) {
                    log.trace("Adding user {} to Set of users with ACLs", username);
                }

                result.add(username);
            }
        }
    }

    return result;
}
 
示例23
/**
 * Create Kafka's AclBinding instance from current SimpleAclRule instance for the provided principal
 *
 * @param principal KafkaPrincipal instance for the current SimpleAclRule
 * @return Kafka AclBinding instance
 */
public AclBinding toKafkaAclBinding(KafkaPrincipal principal) {
    ResourcePattern resourcePattern = resource.toKafkaResourcePattern();
    AclPermissionType kafkaType = toKafkaAclPermissionType(type);
    org.apache.kafka.common.acl.AclOperation kafkaOperation = toKafkaAclOperation(operation);
    return new AclBinding(resourcePattern, new AccessControlEntry(principal.toString(), getHost(), kafkaOperation, kafkaType));
}
 
示例24
/**
 * Creates SimpleAclRule instance based on Kafka's AclBinding instance containing the resource the rule should apply to.
 *
 * @param aclBinding the AclBinding instance which should be used to create the rule
 * @return the SimpleAclRule instance
 */
public static SimpleAclRule fromAclBinding(AclBinding aclBinding) {
    SimpleAclRuleResource resource = SimpleAclRuleResource.fromKafkaResourcePattern(aclBinding.pattern());
    AclRuleType type = fromKafkaAclPermissionType(aclBinding.entry().permissionType());
    AclOperation operation = fromKafkaAclOperation(aclBinding.entry().operation());
    return new SimpleAclRule(type, resource, aclBinding.entry().host(), operation);
}
 
示例25
@Test
public void testGetUsersFromAcls(VertxTestContext context)  {
    Admin mockAdminClient = mock(AdminClient.class);
    SimpleAclOperator aclOp = new SimpleAclOperator(vertx, mockAdminClient);

    ResourcePattern res1 = new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL);
    ResourcePattern res2 = new ResourcePattern(ResourceType.GROUP, "my-group", PatternType.LITERAL);

    KafkaPrincipal foo = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "CN=foo");
    AclBinding fooAclBinding = new AclBinding(res1, new AccessControlEntry(foo.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal bar = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "CN=bar");
    AclBinding barAclBinding = new AclBinding(res1, new AccessControlEntry(bar.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal baz = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "baz");
    AclBinding bazAclBinding = new AclBinding(res2, new AccessControlEntry(baz.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal all = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
    AclBinding allAclBinding = new AclBinding(res1, new AccessControlEntry(all.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal anonymous = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
    AclBinding anonymousAclBinding = new AclBinding(res2, new AccessControlEntry(anonymous.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));

    Collection<AclBinding> aclBindings =
            asList(fooAclBinding, barAclBinding, bazAclBinding, allAclBinding, anonymousAclBinding);

    assertDoesNotThrow(() -> mockDescribeAcls(mockAdminClient, AclBindingFilter.ANY, aclBindings));
    assertThat(aclOp.getUsersWithAcls(), is(new HashSet<>(asList("foo", "bar", "baz"))));
    context.completeNow();
}
 
示例26
@Test
public void testReconcileInternalDelete(VertxTestContext context) {
    Admin mockAdminClient = mock(AdminClient.class);
    SimpleAclOperator aclOp = new SimpleAclOperator(vertx, mockAdminClient);

    ResourcePattern resource = new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL);

    KafkaPrincipal foo = new KafkaPrincipal("User", "CN=foo");
    AclBinding readAclBinding = new AclBinding(resource, new AccessControlEntry(foo.toString(), "*", org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));

    ArgumentCaptor<Collection<AclBindingFilter>> aclBindingFiltersCaptor = ArgumentCaptor.forClass(Collection.class);
    assertDoesNotThrow(() -> {
        mockDescribeAcls(mockAdminClient, null, Collections.singleton(readAclBinding));
        mockDeleteAcls(mockAdminClient, Collections.singleton(readAclBinding), aclBindingFiltersCaptor);
    });

    Checkpoint async = context.checkpoint();
    aclOp.reconcile("CN=foo", null)
            .onComplete(context.succeeding(rr -> context.verify(() -> {

                Collection<AclBindingFilter> capturedAclBindingFilters = aclBindingFiltersCaptor.getValue();
                assertThat(capturedAclBindingFilters, hasSize(1));
                assertThat(capturedAclBindingFilters, hasItem(readAclBinding.toFilter()));

                Set<ResourcePatternFilter> capturedResourcePatternFilters =
                        capturedAclBindingFilters.stream().map(AclBindingFilter::patternFilter).collect(Collectors.toSet());
                assertThat(capturedResourcePatternFilters, hasSize(1));
                assertThat(capturedResourcePatternFilters, hasItem(resource.toFilter()));

                async.flag();
            })));
}
 
示例27
private void mockDescribeAcls(Admin mockAdminClient, AclBindingFilter aclBindingFilter, Collection<AclBinding> aclBindings)
        throws InterruptedException, ExecutionException {
    DescribeAclsResult result = mock(DescribeAclsResult.class);
    KafkaFuture<Collection<AclBinding>> future = mock(KafkaFuture.class);
    when(future.get()).thenReturn(aclBindings);
    when(result.values()).thenReturn(future);
    when(mockAdminClient.describeAcls(aclBindingFilter != null ? aclBindingFilter : any())).thenReturn(result);
}
 
示例28
private void mockCreateAcls(Admin mockAdminClient, ArgumentCaptor<Collection<AclBinding>> aclBindingsCaptor)
        throws InterruptedException, ExecutionException {
    CreateAclsResult result = mock(CreateAclsResult.class);
    KafkaFuture<Void> future = mock(KafkaFuture.class);
    when(future.get()).thenReturn(null);
    when(result.all()).thenReturn(future);
    when(mockAdminClient.createAcls(aclBindingsCaptor.capture())).thenReturn(result);
}
 
示例29
private void mockDeleteAcls(Admin mockAdminClient, Collection<AclBinding> aclBindings, ArgumentCaptor<Collection<AclBindingFilter>> aclBindingFiltersCaptor)
        throws InterruptedException, ExecutionException {
    DeleteAclsResult result = mock(DeleteAclsResult.class);
    KafkaFuture<Collection<AclBinding>> future = mock(KafkaFuture.class);
    when(future.get()).thenReturn(aclBindings);
    when(result.all()).thenReturn(future);
    when(mockAdminClient.deleteAcls(aclBindingFiltersCaptor.capture())).thenReturn(result);
}
 
示例30
@Test
public void testToKafkaAclBindingForSpecifiedKafkaPrincipalReturnsKafkaAclBindingForKafkaPrincipal() {
    SimpleAclRule kafkaTopicSimpleAclRule = new SimpleAclRule(AclRuleType.ALLOW, resource, "127.0.0.1", AclOperation.READ);
    AclBinding expectedAclBinding = new AclBinding(
            kafkaResourcePattern,
            new AccessControlEntry(kafkaPrincipal.toString(), "127.0.0.1",
                    org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW)
    );
    assertThat(kafkaTopicSimpleAclRule.toKafkaAclBinding(kafkaPrincipal), is(expectedAclBinding));
}