kafka在window下部署

kafka在window下部署记录。

一、环境
window 7 64位
jdk 1.7
kafka:2.10-0.10.0.1
下载后,解压到指定的目录下,例如:D:\home\kafka_2.10-0.10.0.1

二、修改配置文件
1、修改kafka_2.10-0.10.0.1\config\log4j.properties中的kafka.logs.dir 为/tmp。
2、复制kafka_2.10-0.10.0.1\config\server.properties 为server-9093.properties,然后修改新文件的内容,主要关注2点broker.id、listeners、log.dirs修改内容如下:

1
2
3
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-9093

3、复制kafka_2.10-0.10.0.1\config\server.properties 为server-9094.properties,然后修改新文件的内容,主要关注2点broker.id、listeners、log.dirs修改内容如下:

1
2
3
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-9094

三、修改启动脚本文件
把kafka_2.10-0.10.0.1\bin目录下的.sh文件,删除掉。把kafka_2.10-0.10.0.1\bin\window文件夹下的拷贝到kafka_2.10-0.10.0.1\bin目录下,然后做相应的修改如下:
1、修改zookeeper-server-start.bat文件中的%~dp0../../为%~dp0/../。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
wmic process where (commandline like "%%zookeeper%%" and not name="wmic.exe") delete

2、修改kafka-server-start.bat文件中的%~dp0../../为%~dp0/../。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
IF [%1] EQU [] (
echo USAGE: %0 server.properties
EXIT /B 1
)
SetLocal
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
)
%~dp0kafka-run-class.bat kafka.Kafka %*
EndLocal

3、修改kafka-run-class.bat文件中的pushd %~dp0..\ 为 pushd %~dp0..\
修改通过for循环添加jar到classpath的代码部分为

1
2
3
4
rem Classpath addition for kafka-core dependencies
for %%i in (%BASE_DIR%\libs\*.jar) do (
call :concat %%i
)

文件修改后为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
setlocal enabledelayedexpansion
IF [%1] EQU [] (
echo USAGE: %0 classname [opts]
EXIT /B 1
)
rem Using pushd popd to set BASE_DIR to the absolute path
pushd %~dp0..\
set BASE_DIR=%CD%
popd
echo %BASE_DIR%
IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.10.6
)
IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
set SCALA_BINARY_VERSION=2.10
)
rem Classpath addition for kafka-core dependencies
for %%i in (%BASE_DIR%\libs\*.jar) do (
call :concat %%i
)
rem JMX settings
IF ["%KAFKA_JMX_OPTS%"] EQU [""] (
set KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
)
rem JMX port to use
IF ["%JMX_PORT%"] NEQ [""] (
set KAFKA_JMX_OPTS=%KAFKA_JMX_OPTS% -Dcom.sun.management.jmxremote.port=%JMX_PORT%
)
rem Log directory to use
IF ["%LOG_DIR%"] EQU [""] (
set LOG_DIR=%BASE_DIR%/logs
)
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties
) ELSE (
# create logs directory
IF not exist %LOG_DIR% (
mkdir %LOG_DIR%
)
)
set KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=%LOG_DIR% %KAFKA_LOG4J_OPTS%
rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (
set KAFKA_OPTS=
)
set DEFAULT_JAVA_DEBUG_PORT=5005
set DEFAULT_DEBUG_SUSPEND_FLAG=n
rem Set Debug options if enabled
IF ["%KAFKA_DEBUG%"] NEQ [""] (
IF ["%JAVA_DEBUG_PORT%"] EQU [""] (
set JAVA_DEBUG_PORT=%DEFAULT_JAVA_DEBUG_PORT%
)
IF ["%DEBUG_SUSPEND_FLAG%"] EQU [""] (
set DEBUG_SUSPEND_FLAG=%DEFAULT_DEBUG_SUSPEND_FLAG%
)
set DEFAULT_JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=!DEBUG_SUSPEND_FLAG!,address=!JAVA_DEBUG_PORT!
IF ["%JAVA_DEBUG_OPTS%"] EQU [""] (
set JAVA_DEBUG_OPTS=!DEFAULT_JAVA_DEBUG_OPTS!
)
echo Enabling Java debug options: !JAVA_DEBUG_OPTS!
set KAFKA_OPTS=!JAVA_DEBUG_OPTS! !KAFKA_OPTS!
)
rem Which java to use
IF ["%JAVA_HOME%"] EQU [""] (
set JAVA=java
) ELSE (
set JAVA="%JAVA_HOME%/bin/java"
)
rem Memory options
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx256M
)
rem JVM performance options
IF ["%KAFKA_JVM_PERFORMANCE_OPTS%"] EQU [""] (
set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
)
IF ["%CLASSPATH%"] EQU [""] (
echo Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'
EXIT /B 2
)
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*
rem echo.
rem echo %COMMAND%
rem echo.
%COMMAND%
goto :eof
:concat
IF ["%CLASSPATH%"] EQU [""] (
set CLASSPATH="%1"
) ELSE (
set CLASSPATH=%CLASSPATH%;"%1"
)

四、启动zookeeper
打开window命令提示窗口,转到D:\home\kafka_2.10-0.10.0.1\bin目录下,执行一下命令:

1
zookeeper-server-start.bat ..\config\zookeeper.properties

提出创建成功,如果出现错误,请根据错误提示修改直到可以运行即可。

五、启动kafka
打开window命令提示窗口,转到D:\home\kafka_2.10-0.10.0.1\bin目录下,执行一下命令:

1
kafka-server-start.bat ..\config\server-9093.properties

在新开一个window命令提示窗口,转到D:\home\kafka_2.10-0.10.0.1\bin目录下,执行一下命令:

1
kafka-server-start.bat ..\config\server-9094.properties

这就启动了2个kafka的broker节点。
如果出现错误,请根据错误做相应的修改。

六、创建topic
打开window命令提示窗口,转到D:\home\kafka_2.10-0.10.0.1\bin目录下,执行一下命令:

1
2
3
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-test-topic
D:\home\kafka_2.10-0.10.0.1
Created topic "my-test-topic".

提出创建成功.

七、查看topic信息
打开window命令提示窗口,转到D:\home\kafka_2.10-0.10.0.1\bin目录下,执行一下命令:

1
2
3
4
5
6
7
D:\home\kafka_2.10-0.10.0.1\bin>kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-test-topic
D:\home\kafka_2.10-0.10.0.1
Topic:my-test-topic PartitionCount:3 ReplicationFactor:2
Configs:
Topic: my-test-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2
Topic: my-test-topic Partition: 1 Leader: 2 Replicas: 0,2 Isr: 2
Topic: my-test-topic Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2

八、通过脚本测试生产topic消息
打开window命令提示窗口,转到D:\home\kafka_2.10-0.10.0.1\bin目录下,执行一下命令:

1
2
3
D:\home\kafka_2.10-0.10.0.1\bin>kafka-console-producer.bat --broker-list localhost:9093 --topic my-test-topic
D:\home\kafka_2.10-0.10.0.1
在这里输入要发送的消息,然后回车

九、消费topic消息
打开window命令提示窗口,转到D:\home\kafka_2.10-0.10.0.1\bin目录下,执行一下命令:

1
2
3
D:\home\kafka_2.10-0.10.0.1\bin>kafka-console-consumer.bat --zookeeper localhost:2181 --from-beginning --topic my-test-topic
D:\home\kafka_2.10-0.10.0.1
在这里可以看到生产者发出的消息内容。

修改后的文件下载地址

文章目录