一.环境准备
当前环境:centos7.3三台
软件版本:kafka_2.12
部署目录:/usr/local/kafka
启动端口:9092
配置文件:/usr/local/kafka/config/server.properties
yum依赖(3台同时操作)
yum install java-1.8.0-openjdk
需要部署zookeeper集群
二.安装
1.下载kafka包(3台节点都执行)
wget http:
//mirrors.hust.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz
2.解压并移动,然后创建日志目录(3台节点都执行)
tar zxvf kafka_2.12-
2.1.
1.tgz
mv kafka_2.12-
2.1.
1 /usr/local/
kafka
mkdir /usr/local/kafka/log
3.修改配置文件(3台同时操作,需要更改的地方不一样)
vim /usr/local/kafka/config/server.properties
#此为第一台,第二台为2 第三台为3
broker.id=
1
# Switch to enable topic deletion or not, default value
is falsedelete.topic.enable=
true
#本机开启端口和监听端口
advertised.host.name=
192.168.
1.189
# The number of threads handling network requests
num.network.threads=
3
# The number of threads doing disk I/
O
num.io.threads=
8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=
102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=
102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=
104857600
#日志目录
log.dirs=/usr/local/kafka/
log
#开启10个分区
num.partitions=
10
#kafka保存消息的副本数default.replication.factor=
3
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value
is recommended to be increased
for installations with data dirs located
in RAID array.
num.recovery.threads.per.data.dir=
1
#持久化时间
log.retention.hours=
48
# The maximum size of a log segment file. When this size
is reached a
new log segment will be created.
log.segment.bytes=
1073741824
# to the retention policies
log.retention.check.interval.ms=
300000
#连接zookeeper地址端口
zookeeper.connect=
192.168.
1.189:
2181,
192.168.
1.190:
2181,
192.168.
1.191:
2181
# Timeout in ms
for connecting to zookeeper
zookeeper.connection.timeout.ms=
6000
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 #
this work
for additional information regarding copyright ownership.
4 # The ASF licenses
this file to You under the Apache License, Version
2.0
5 # (the
"License"); you may not use
this file except
in compliance with
6 # the License. You may obtain a copy of the License at
7 #
8 # http:
//www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to
in writing, software
11 # distributed under the License
is distributed on an
"AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License
for the specific language governing permissions and
14 # limitations under the License.
15
16 # see kafka.server.KafkaConfig
for additional details and defaults
17
18 ############################# Server Basics #############################
19
20 # The id of the broker. This must be
set to a unique integer
for each broker.
21 broker.id=
3
22
23 ############################# Socket Server Settings #############################
24
25 # The address the socket server listens on. It will
get the value returned
from
26 # java.net.InetAddress.getCanonicalHostName()
if not configured.
27 # FORMAT:
28 # listeners = listener_name:
//host_name:port
29 # EXAMPLE:
30 # listeners = PLAINTEXT:
//your.host.name:9092
31 listeners=PLAINTEXT:
//10.0.0.49:9092
32
33 # Hostname and port the broker will advertise to producers and consumers. If not
set,
34 # it uses the value
for "listeners" if configured. Otherwise, it will use the value
35 # returned
from java.net.InetAddress.getCanonicalHostName().
36 advertised.listeners=PLAINTEXT:
//10.0.0.49:9092
37 advertised.host.name=
10.0.
0.49
38 advertised.host.port=
9092
39 host.port=
9092
40 host.name=
10.0.
0.49
41 # Maps listener names to security protocols, the
default is for them to be the same. See the config documentation
for more details
42 #listener.security.protocol.map=
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
43
44 # The number of threads that the server uses
for receiving requests
from the network and sending responses to the network
45 num.network.threads=
3
46
47 # The number of threads that the server uses
for processing requests, which may include disk I/
O
48 num.io.threads=
8
49
50 # The send buffer (SO_SNDBUF) used by the socket server
51 socket.send.buffer.bytes=
102400
52
53 # The receive buffer (SO_RCVBUF) used by the socket server
54 socket.receive.buffer.bytes=
102400
55
56 # The maximum size of a request that the socket server will accept (protection against OOM)
57 socket.request.max.bytes=
104857600
58
59
60 ############################# Log Basics #############################
61
62 # A comma separated list of directories under which to store log files
63 log.dirs=/opt/kafka/
kafkalogs
64
65 # The
default number of log partitions per topic. More partitions allow greater
66 # parallelism
for consumption, but
this will also result
in more files across
67 # the brokers.
68 num.partitions=
3
69
70 # The number of threads per data directory to be used
for log recovery at startup and flushing at shutdown.
71 # This value
is recommended to be increased
for installations with data dirs located
in RAID array.
72 num.recovery.threads.per.data.dir=
1
73
74 ############################# Internal Topic Settings #############################
75 # The replication factor
for the group metadata
internal topics
"__consumer_offsets" and
"__transaction_state"
76 # For anything other than development testing, a value greater than
1 is recommended
for to ensure availability such
as 3.
77 offsets.topic.replication.factor=
2
78 transaction.state.log.replication.factor=
1
79 transaction.state.log.min.isr=
1
80
81 ############################# Log Flush Policy #############################
82
83 # Messages are immediately written to the filesystem but by
default we only fsync() to sync
84 # the OS cache lazily. The following configurations control the flush of data to disk.
85 # There are a few important trade-
offs here:
86 #
1. Durability: Unflushed data may be lost
if you are not
using replication.
87 #
2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur
as there will be a lot of data to flush.
88 #
3. Throughput: The flush
is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
89 # The settings below allow one to configure the flush policy to flush data after a period of time or
90 # every N messages (or both). This can be done globally and overridden on a per-
topic basis.
91
92 # The number of messages to accept before forcing a flush of data to disk
93 #log.flush.interval.messages=
10000
94
95 # The maximum amount of time a message can sit
in a log before we force a flush
96 #log.flush.interval.ms=
1000
97
98 ############################# Log Retention Policy #############################
99
100 # The following configurations control the disposal of log segments. The policy can
101 # be
set to delete segments after a period of time, or after a given size has accumulated.
102 # A segment will be deleted whenever *either*
of these criteria are met. Deletion always happens
103 #
from the end of the log.
104
105 # The minimum age of a log file to be eligible
for deletion due to age
106 log.retention.hours=
168
107
108 # A size-based retention policy
for logs. Segments are pruned
from the log unless the remaining
109 # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
110 #log.retention.bytes=
1073741824
111
112 # The maximum size of a log segment file. When
this size
is reached a
new log segment will be created.
113 log.segment.bytes=
1073741824
114
115 # The interval at which log segments are
checked to see
if they can be deleted according
116 # to the retention policies
117 log.retention.check.interval.ms=
300000
118
119 ############################# Zookeeper #############################
120
121 # Zookeeper connection
string (see zookeeper docs
for details).
122 # This
is a comma separated host:port pairs, each corresponding to a zk
123 # server. e.g.
"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
124 # You can also append an optional chroot
string to the urls to specify the
125 # root directory
for all kafka znodes.
126 zookeeper.connect=
10.0.
0.5:
2181,
10.0.
0.43:
2181,
10.0.
0.49:
2181
127
128 # Timeout
in ms
for connecting to zookeeper
129 zookeeper.connection.timeout.ms=
6000
130
131 delete.topic.enble=
true
132
133 ############################# Group Coordinator Settings #############################
134
135 # The following configuration specifies the time,
in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
136 # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms
as new members join the group, up to a maximum of max.poll.interval.ms.
137 # The
default value
for this is 3 seconds.
138 # We
override this to
0 here
as it makes
for a better
out-of-the-box experience
for development and testing.
139 # However,
in production environments the
default value of
3 seconds
is more suitable
as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
140 group.initial.rebalance.delay.ms=
0
生产配置文件案例
三.使用验证
启动(3台都需要启动)
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
检查
netstat -unltp | grep
9092
转载于:https://www.cnblogs.com/xiangjun555/articles/11193941.html