我们的Kafka一直是没有安全机制地裸奔,最近需要给它升个级,加上认证和授权机制,认证机制是SASL/PLAIN,授权是Kafka自带的ACL授权。
SASL/PLAIN认证机制开启方法:
修改Kafka配置文件 server.properties 或者其它名字(CDH和Ambari的发行版可能文件名有所差异) sasl.enabled.mechanisms = PLAIN sasl.mechanism.inter.broker.protocol = PLAIN security.inter.broker.protocol = SASL_PLAINTEXT listeners = SASL_PLAINTEXT://localhost:9092 编辑 kafka_jaas.conf: KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required # broker之间互相认证的凭据 username="admin" password="admin-secret" # 以下是用户信息 user_admin="admin-secret" # 用户admin的密码是admin-secret user_alice="alice-secret"; # 用户alice的密码是alice-secret };启动kafka的时候把 kafka_jaas.conf 的路径当作命令行参数传入:
-Djava.security.auth.login.config=/path/to/kafka_jaas.confACL授权机制开启方法:
编辑Kafka配置文件,添加如下内容:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer以上的配置方案除了没有使用SSL加密之外,还存在一个严重的缺陷:用户信息是通过静态配置文件的方式存储的,当对用户信息进行添加、删除和修改的时候都需要重启Kafka集群,而我们知道,作为消息中间件,Kafka的上下游与众多组件相连,重启可能造成数据丢失或重复,Kafka应当尽量避免重启。
还好,Kafka允许用户为SASL/PLAIN认证机制提供了自定义的回调函数,根据KIP-86,如果不希望采用静态配置文件存储用户认证信息的话,只需要编写一个实现了 AuthenticateCallbackHandler 接口的类,然后在配置文件中指明这个类即可,指明的方法为在Kafka配置文件中添加如下内容:
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.MyCallbackHandler先看下 AuthenticateCallbackHandler 这个接口的默认实现,可以在github上看到:
public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { private static final String JAAS_USER_PREFIX = "user_"; private List<AppConfigurationEntry> jaasConfigEntries; @Override public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { this.jaasConfigEntries = jaasConfigEntries; } @Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { String username = null; for (Callback callback: callbacks) { if (callback instanceof NameCallback) username = ((NameCallback) callback).getDefaultName(); else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; boolean authenticated = authenticate(username, plainCallback.password()); plainCallback.authenticated(authenticated); } else throw new UnsupportedCallbackException(callback); } } protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); } } @Override public void close() throws KafkaException { } }这个接口有三个方法需要实现,分别是configure()、handle()和close(),configure() 进行初始化配置,close() 关闭打开的相关资源,handle() 处理用户的认证请求。我采用MySQL来存储用户信息,因此可以满足动态增删用户的需求,大概的实现为:
public class MySQLAuthCallback implements AuthenticateCallbackHandler { private static final Logger log = LoggerFactory.getLogger("plugin"); private DruidDataSource dataSource = null; public MySQLAuthCallback() { this.dataSource = new DruidDataSource(); this.dataSource.setDriverClassName("com.mysql.jdbc.Driver"); this.dataSource.setUrl("jdbc:mysql://localhost:3306/kafka_user_info"); this.dataSource.setUsername("kafka"); this.dataSource.setPassword("kafka"); this.dataSource.setInitialSize(5); this.dataSource.setMinIdle(1); this.dataSource.setMaxActive(10); this.dataSource.setPoolPreparedStatements(false); } public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { } public void handle(Callback[] callbacks) throws UnsupportedCallbackException { String username = null; for (Callback callback: callbacks) { if (callback instanceof NameCallback) { username = ((NameCallback) callback).getDefaultName(); } else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; boolean authenticated = authenticate(username, plainCallback.password()); plainCallback.authenticated(authenticated); } else throw new UnsupportedCallbackException(callback); } } protected boolean authenticate(String username, char[] password) { return userExists(username, new String(password)); } private boolean userExists(String username, String password) { Connection conn = null; PreparedStatement statement = null; ResultSet resultSet = null; try { conn = getConnection(); statement = conn.prepareStatement("select * from users where username=? and password=?"); statement.setString(1, username); statement.setString(2, password); resultSet = statement.executeQuery(); while (resultSet.next()) { boolean b = username.equals(resultSet.getString("username")) && password.equals(resultSet.getString("password")); log.info("user {} authentication status: {}.", username, b); return b; } } catch (SQLException e) { log.info("sql exception occurred: {}", e.getMessage()); } finally { // 按照打开顺序逆序关闭打开的资源 } } return false; } private Connection getConnection() throws SQLException { // 通过datasource获取connection,省略了try/catch synchronized(dataSource) { return dataSource.getConnection(); } } public void close() throws KafkaException { } }自定义类编写完成后后,将jar包拷贝到每个broker的CLASSPATH下,比如kafka的libs目录下。在MySQL中插入几条用户信息,然后尝试以这些用户的身份来连接Kafka(为方便起见,测试阶段可以先不要开启ACL),我们会发现,MySQL中的用户可以连接,而写在配置文件中的用户无法连接,说明这个插件达到了预期的效果,增删用户不需要重启了。
在编写这个插件以及生产中使用这个插件的过程中,可能需要输出日志来帮助调试,因此还需要调整相关配置。
首先,在代码中创建Logger对象:
private static final Logger log = LoggerFactory.getLogger("plugin");其次,修改Kafka的 log4j.properties 配置文件:
log4j.logger.plugin=DEBUG, pluginAppender log4j.additivity.plugin=false log4j.appender.pluginAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.pluginAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.pluginAppender.File=/path/to/plugin.log log4j.appender.pluginAppender.layout=org.apache.log4j.PatternLayout log4j.appender.pluginAppender.layout.ConversionPattern=[%d{ISO8601}] %p %m (%c)%n log4j.appender.pluginAppender.MaxFileSize = 256MB log4j.appender.pluginAppender.MaxBackupIndex = 20注意代码和配置文件的联系在于getLogger的参数和log4j.logger后面的字符串需要相同。
PS:如果你和我一样对Hadoop/Spark/Kafka/Zeppelin/Flink等技术感兴趣,可以来【大数据学徒】一起交流讨论,共同学习。
