Add custom serializer and deserializer including the message dto based in lombok
This commit is contained in:
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.kafka.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class MessageDto {
|
||||
private String message;
|
||||
private String version;
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.baeldung.kafka.serdes;
|
||||
|
||||
import com.baeldung.kafka.dto.MessageDto;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.common.errors.SerializationException;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class CustomDeserializer implements Deserializer<MessageDto> {
|
||||
|
||||
private ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageDto deserialize(String topic, byte[] data) {
|
||||
try {
|
||||
if (data == null){
|
||||
System.out.println("Null received at deserializing");
|
||||
return null;
|
||||
}
|
||||
System.out.println("Deserializing...");
|
||||
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
|
||||
} catch (Exception e) {
|
||||
throw new SerializationException("Error when deserializing byte[] to MessageDto");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.kafka.serdes;
|
||||
|
||||
import com.baeldung.kafka.dto.MessageDto;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.common.errors.SerializationException;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class CustomSerializer implements Serializer<MessageDto> {
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String topic, MessageDto data) {
|
||||
try {
|
||||
if (data == null){
|
||||
System.out.println("Null received at serializing");
|
||||
return null;
|
||||
}
|
||||
System.out.println("Serializing...");
|
||||
return objectMapper.writeValueAsBytes(data);
|
||||
} catch (Exception e) {
|
||||
throw new SerializationException("Error when serializing MessageDto to byte[]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user