存储PLCNext社区PLCNext. on LinkedInPLCNext. on Instagram  PLCNext. on YouTube github plcnext社区存储PLCNext社区

 

 如何创建博客条目

制造商博客
Hits: 647

通过MQTT将PLCNext控件连接到Apache Kafka

技术背景

Kafka.

Apache Kafka是一种数据摄取,存储,处理和再分配的框架。如今,它广泛部署在世界各地的公司。 Kafka.的官方网站 提供有关其想法以及如何部署的更多信息。其中一个关键功能是其他应用程序和MQTT等通信协议的大量现有连接器。

MQTT.

MQTT.是一种基于轻型TCP的消息传递协议,通常用于IOT通信由于其鲁棒性和小的占地面积。有关OASIS标准MQTT的详细信息可以在其上找到 网站.

这里 您可以找到关于如何交叉编译Mosquitto for Plcnext的制造商博客文章,这是来自Eclipse的MQTT实现。或者, PLCNext. Store 提供准备好MQTT应用程序。

要求

  • PLCNext上的MQTT客户端(请参阅上一节以实现提示)
  • 控制器连接到PC / VM 
  • PC / VM上的MQTT Broker(例如, Mosquitto.)
  • PC / VM上的Kafka实例(参见 Kafka.的QuickStart指南)

设置

下图显示了我们将实现从PLCNext控件到Kafka的数据的设置概述。虽然可以使用 Confluent的MQTT Proxy 对于他们的Kafka(2)版本,我们将专注于更通用的解决方案(1)。它包括一个MQTT代理,客户端连接到并发布订阅代理主题的消息和连接器,处理消息并将其转发给Kafka。 

PLCNext. to Kafka 2

 

创建连接器

在本教程中,我们的连接器基础 viokly / kafka-connect-mqtt 从GitHub的存储库,在MIT许可证下许可(详细的许可证信息). First, we download and extract the repository. Since the latest repository version is of the end of 2016 we update the build.gradle file, by replacing old dependencies with their new versions:

ext { kafkaVersion = '2.6.0' }
...
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.13' 
    compile "org.apache.kafka:connect-api:$kafkaVersion"
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' 
    compile 'org.bouncycastle:bcprov-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpkix-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpg-jdk15on:1.67' 
    compile 'commons-io:commons-io:2.8.0' 
    compile 'org.slf4j:slf4j-api:1.7.30' 
    testCompile 'org.slf4j:slf4j-simple:1.7.30'
}

In this example we will send plain String messages to Kafka. Therefore we have to edit the Java class DumbProcessor.java in the folder /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt, which is the default message processor:

@Override
public SourceRecord[] getRecords(String kafkaTopic) {

    return new SourceRecord[]{new SourceRecord(null, //sourcePartition
                   null,                //sourceOffset
                   kafkaTopic,          //topic
                   null,                //partition
                   null,                //keySchema
                   mTopic,              //key
                   null,                //valueSchema
                   mMessage.toString(), //value
                   new Long(123L))};    //long timestamp
}

Thereafter, we build a Java Archive File (JAR) that contains the dependencies: ./gradlew clean jar. We copy the output JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar that could be found in the folder /kafka-connect-mqtt-master/build/libs to the libs directory of Kafka.

Furthermore, we have to create a configuration file for the connector mqtt.properties in Kafka's config folder. The file has the following content:

name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

kafka.topic=test_in                     # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123

mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

mqtt.server_uris=tcp://172.17.0.1:1883  # address of the MQTT broker
mqtt.topic=test/#                       # MQTT topic where the messages should be collected

#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor

本地测试

现在我们可以在本地测试我们的连接器。转到Kafka的目录并启动一个ZooKeeper和Broker实例:

# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# start Kafka:
bin/kafka-server-start.sh config/server.properties

# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto 

# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties

# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true

# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123

该消息显示在控制台消费者中。