# Kafka 高级部分之自己定义Encoder实现Class级别的数据传送已经解析
# 前言
# Class级别信息Send的原理
简单的说就是将一个Class给序列化成一个Byte[]。然后再将Byte[]给反序列化成一个Class,前提是这个Class必须实现java.io.Serializable这个接口就OK,是不是非常easy,饿靠!、、、
然后再自己定义Encoder即可了,以下是一个參考案例,使用一个User类
# 自己定义Encoder实现Class级别的producer和consumer
在这里我们使用一个User类作为producer的send。详细请看以下的源码
#自己定义Partition实现HashCode Partition
详细请看以下的源码
# 执行 UserProducer,以下是执行结果(Eclipse下执行)
log4j:WARN No appenders could be found for logger(kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
User [addr=addr000, age=age0, id=id000,name=name000, sex=sex0]
encoder ---> User [addr=addr000,age=age0, id=id000, name=name000, sex=sex0]
encoder ---> User [addr=addr000,age=age0, id=id000, name=name000, sex=sex0]
hash partition ---> User [addr=addr000,age=age0, id=id000, name=name000, sex=sex0]
User [addr=addr001, age=age1, id=id001,name=name001, sex=sex1]
encoder ---> User [addr=addr001,age=age1, id=id001, name=name001, sex=sex1]
encoder ---> User [addr=addr001,age=age1, id=id001, name=name001, sex=sex1]
hash partition ---> User [addr=addr001,age=age1, id=id001, name=name001, sex=sex1]
User [addr=addr002, age=age2, id=id002,name=name002, sex=sex0]
encoder ---> User [addr=addr002,age=age2, id=id002, name=name002, sex=sex0]
encoder ---> User [addr=addr002,age=age2, id=id002, name=name002, sex=sex0]
hash partition ---> User [addr=addr002,age=age2, id=id002, name=name002, sex=sex0]
User [addr=addr003, age=age3, id=id003,name=name003, sex=sex1]
encoder ---> User [addr=addr003,age=age3, id=id003, name=name003, sex=sex1]
encoder ---> User [addr=addr003,age=age3, id=id003, name=name003, sex=sex1]
hash partition ---> User [addr=addr003,age=age3, id=id003, name=name003, sex=sex1]
User [addr=addr004, age=age4, id=id004,name=name004, sex=sex0]
encoder ---> User [addr=addr004,age=age4, id=id004, name=name004, sex=sex0]
encoder ---> User [addr=addr004,age=age4, id=id004, name=name004, sex=sex0]
hash partition ---> User [addr=addr004,age=age4, id=id004, name=name004, sex=sex0]
User [addr=addr005, age=age5, id=id005,name=name005, sex=sex1]
encoder ---> User [addr=addr005,age=age5, id=id005, name=name005, sex=sex1]
encoder ---> User [addr=addr005,age=age5, id=id005, name=name005, sex=sex1]
hash partition ---> User [addr=addr005,age=age5, id=id005, name=name005, sex=sex1]
User [addr=addr006, age=age6, id=id006,name=name006, sex=sex0]
encoder ---> User [addr=addr006,age=age6, id=id006, name=name006, sex=sex0]
encoder ---> User [addr=addr006,age=age6, id=id006, name=name006, sex=sex0]
hash partition ---> User [addr=addr006,age=age6, id=id006, name=name006, sex=sex0]
User [addr=addr007, age=age7, id=id007,name=name007, sex=sex1]
encoder ---> User [addr=addr007,age=age7, id=id007, name=name007, sex=sex1]
encoder ---> User [addr=addr007,age=age7, id=id007, name=name007, sex=sex1]
hash partition ---> User [addr=addr007,age=age7, id=id007, name=name007, sex=sex1]
User [addr=addr008, age=age8, id=id008,name=name008, sex=sex0]
encoder ---> User [addr=addr008,age=age8, id=id008, name=name008, sex=sex0]
encoder ---> User [addr=addr008,age=age8, id=id008, name=name008, sex=sex0]
hash partition ---> User [addr=addr008,age=age8, id=id008, name=name008, sex=sex0]
User [addr=addr009, age=age9, id=id009,name=name009, sex=sex1]
encoder ---> User [addr=addr009,age=age9, id=id009, name=name009, sex=sex1]
encoder ---> User [addr=addr009,age=age9, id=id009, name=name009, sex=sex1]
hash partition ---> User [addr=addr009,age=age9, id=id009, name=name009, sex=sex1]
producer is successful .
这里能够看到我们的UserProducer已经将User类的数据传送到Kafka了。如今就等Consumer从Kafka中取出数据了
# 执行 UserSimpleConsumer。以下是执行结果(Eclipse下执行)
# partition 0的执行结果
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel).
log4j:WARN Please initialize the log4j system properly.
0: User [addr=addr000, age=age0, id=id000,name=name000, sex=sex0]
1: User [addr=addr002, age=age2, id=id002,name=name002, sex=sex0]
2: User [addr=addr006, age=age6, id=id006,name=name006, sex=sex0]
3: User [addr=addr009, age=age9, id=id009, name=name009,sex=sex1]
0~3,一共4条记录
# partition 1的执行结果
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinderfor further details.
log4j:WARN No appenders could be found for logger(kafka.network.BlockingChannel).
log4j:WARN Please initialize the log4j system properly.
0: User [addr=addr001, age=age1, id=id001,name=name001, sex=sex1]
1: User [addr=addr003, age=age3, id=id003,name=name003, sex=sex1]
2: User [addr=addr004, age=age4, id=id004,name=name004, sex=sex0]
3: User [addr=addr005, age=age5, id=id005,name=name005, sex=sex1]
4: User [addr=addr007, age=age7, id=id007,name=name007, sex=sex1]
5: User [addr=addr008, age=age8, id=id008, name=name008,sex=sex0]
0~5,一共6条记录
两个分区加起来刚好10条记录
序列化跟反序列化都成功了,OK
# 这里是源码
# User.java
package com.yting.cloud.kafka.entity;
import java.io.Serializable;
/**
* User entity
*
* @Author 王扬庭
* @Time 2014-07-18
*
*/
public class Userimplements Serializable{
private static final longserialVersionUID= 6345666479504626985L;
private String id;
private String name;
private String sex;
private String age;
private String addr;
public User() {
}
public User(String id, String name, String sex, Stringage, String addr) {
this.id = id;
this.name = name;
this.sex = sex;
this.age = age;
this.addr = addr;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
@Override
public String toString() {
return "User [addr=" + addr + ",age=" + age + ", id=" + id + ", name="
+ name + ", sex=" + sex + "]";
}
}
# HashSimplePartitioner.java
package com.yting.cloud.kafka.partition;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* Kafka官网给的案例SimplePartitioner,官网给的是0.8.0的版本号,跟0.8.1的版本号不一样,所以改了下。你懂的!
*
* @Author 王扬庭
* @Time2014-07-18
*
*/
public class HashSimplePartitioner implementsPartitioner {
publicHashSimplePartitioner(VerifiableProperties props) {
}
@Override
publicint partition(Object key, int numPartitions) {
System.out.println("hashpartition ---> " + key);
returnkey.hashCode() % numPartitions;
}
}
# UserEncoder.java
package com.yting.cloud.kafka.encoder;
import com.yting.cloud.kafka.entity.User;
import com.yting.cloud.kafka.util.BeanUtils;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
/** * UserEncoder * * @Author 王扬庭 * @Time 2014-07-18 * */
public class UserEncoderimplementsEncoder<User>{
publicUserEncoder(VerifiableProperties props) {
}
@Override
public byte[] toBytes(User user) {
System.out.println("encoder ---> " +user);
return BeanUtils.object2Bytes(user);
}
}
# UserProducer.java
package com.yting.cloud.kafka.producer;
import java.util.*;
import com.yting.cloud.kafka.entity.User;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Kafka官网给的案例 Producer,饿在Eclipse下本地连接server測试。所以改动了一些代码
*
* @Author 王扬庭
* @Time 2014-07-18
*
*/
public class UserProducer {
public static void main(String[]args) {
long events = 10;
Properties props = newProperties();
// props.put("metadata.broker.list","broker1:9092,broker2:9092");
props.put("metadata.broker.list","rs229:9092"); // Eclipse下rs229在本地hosts也要配置。或者写成ip形式也能够
props.put("serializer.class","com.yting.cloud.kafka.encoder.UserEncoder"); //须要改动
props.put("partitioner.class","com.yting.cloud.kafka.partition.HashSimplePartitioner");
props.put("zookeeper.connect","rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka"); //须要改动
props.put("request.required.acks","1");
ProducerConfig config = newProducerConfig(props);
Producer<User, User>producer = new Producer<User, User>(config);
for (long nEvents = 0; nEvents< events; nEvents++) {
User msg = newUser("id00"+nEvents, "name00"+nEvents, "sex"+nEvents%2,"age"+nEvents, "addr00"+nEvents);
System.out.println(msg);
KeyedMessage<User,User> data = new KeyedMessage<User, User>("test-user-001",msg, msg);
producer.send(data);
}
producer.close();
System.out.println("produceris successful .");
}
}
# UserSimpleConsumer.java
package com.yting.cloud.kafka.consumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.yting.cloud.kafka.entity.User;
import com.yting.cloud.kafka.util.BeanUtils;
/**
* Kafka官网给的案例 SimpleConsumer。饿在Eclipse本地连接server測试,所以改动了一些代码
*
* @Author 王扬庭
* @Time 2014-07-18
*
*/
public class UserSimpleConsumer {
public static void main(Stringargs[]) {
UserSimpleConsumer example =new UserSimpleConsumer();
long maxReads = 100;
String topic ="test-user-001";
int partition = 0; //
// int partition = 1; //
List<String> seeds = newArrayList<String>();
seeds.add("rs229");
seeds.add("rs227");
seeds.add("rs226");
seeds.add("rs198");
seeds.add("rs197");
int port =Integer.parseInt("9092");
try {
example.run(maxReads,topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:"+ e);
e.printStackTrace();
}
}
private List<String>m_replicaBrokers = new ArrayList<String>();
public UserSimpleConsumer() {
m_replicaBrokers = newArrayList<String>();
}
public void run(long a_maxReads,String a_topic, int a_partition, List<String> a_seedBrokers, int a_port)throws Exception {
// find the meta data aboutthe topic and partition we are interested in
//
PartitionMetadata metadata =findLeader(a_seedBrokers, a_port, a_topic,
a_partition);
if (metadata == null) {
System.out
.println("Can'tfind metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null){
System.out
.println("Can'tfind Leader for Topic and Partition. Exiting");
return;
}
String leadBroker =metadata.leader().host();
String clientName ="Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = newSimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
long readOffset =getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.EarliestTime(),clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = newSimpleConsumer(leadBroker, a_port, 100000,
64 * 1024,clientName);
}
FetchRequest req = newFetchRequestBuilder().clientId(clientName)
.addFetch(a_topic,a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might needto be increased if large batches are written to Kafka
.build();
FetchResponsefetchResponse = consumer.fetch(req);
if(fetchResponse.hasError()) {
numErrors++;
// Something wentwrong!
short code =fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Errorfetching data from the Broker:"
+ leadBroker +" Reason: " + code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for aninvalid offset. For simple case ask for
// the last elementto reset
readOffset =getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(),clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker =findNewLeader(leadBroker, a_topic, a_partition,
a_port);
continue;
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffsetmessageAndOffset : fetchResponse.messageSet(
a_topic,a_partition)) {
long currentOffset =messageAndOffset.offset();
if (currentOffset <readOffset) {
System.out.println("Foundan old offset: " + currentOffset
+ " Expecting: " + readOffset);
continue;
}
readOffset =messageAndOffset.nextOffset();
ByteBuffer payload =messageAndOffset.message().payload();
byte[] bytes = newbyte[payload.limit()];
payload.get(bytes);
// ===这里就是反序列化=======================================================
User user = (User)BeanUtils.bytes2Object(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + user);
//=========================================================================
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch(InterruptedException ie) {
}
}
}
if (consumer != null)
consumer.close();
}
public static longgetLastOffset(SimpleConsumer consumer, String topic,
int partition, longwhichTime, String clientName) {
TopicAndPartitiontopicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition,new PartitionOffsetRequestInfo(
whichTime, 1));
kafka.javaapi.OffsetRequestrequest = new kafka.javaapi.OffsetRequest(
requestInfo,kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response =consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Errorfetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic,partition));
return 0;
}
long[] offsets =response.offsets(topic, partition);
return offsets[0];
}
private StringfindNewLeader(String a_oldLeader, String a_topic,
int a_partition, inta_port) throws Exception {
for (int i = 0; i < 3; i++){
boolean goToSleep = false;
PartitionMetadata metadata= findLeader(m_replicaBrokers, a_port,
a_topic,a_partition);
if (metadata == null) {
goToSleep = true;
} else if(metadata.leader() == null) {
goToSleep = true;
} else if(a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& i == 0){
// first time throughif the leader hasn't changed give
// ZooKeeper a secondto recover
// second time, assumethe broker did recover before failover,
// or it was anon-Broker issue
//
goToSleep = true;
} else {
returnmetadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch(InterruptedException ie) {
}
}
}
System.out
.println("Unableto find new leader after Broker failure. Exiting");
throw new Exception(
"Unable to findnew leader after Broker failure. Exiting");
}
private PartitionMetadatafindLeader(List<String> a_seedBrokers,
int a_port, Stringa_topic, int a_partition) {
PartitionMetadatareturnMetaData = null;
loop: for (String seed :a_seedBrokers) {
SimpleConsumer consumer =null;
try {
consumer = newSimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String>topics = Collections.singletonList(a_topic);
TopicMetadataRequestreq = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponseresp = consumer.send(req);
List<TopicMetadata>metaData = resp.topicsMetadata();
for (TopicMetadata item: metaData) {
for(PartitionMetadata part : item.partitionsMetadata()) {
if(part.partitionId() == a_partition) {
returnMetaData= part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Errorcommunicating with Broker [" + seed
+ "] tofind Leader for [" + a_topic + ", "
+ a_partition +"] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Brokerreplica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
}
# 结束感言
# Time2014-07-18 11:08:22
版权声明:本文博主原创文章,博客,未经同意不得转载。