@@ -678,8 +678,8 @@ else if (listener instanceof MessageListener) {
678678 this .logger .info (this .toString ());
679679 }
680680 Map <String , Object > props = KafkaMessageListenerContainer .this .consumerFactory .getConfigurationProperties ();
681- this .checkNullKeyForExceptions = checkDeserializer (findDeserializerClass (props , false ));
682- this .checkNullValueForExceptions = checkDeserializer (findDeserializerClass (props , true ));
681+ this .checkNullKeyForExceptions = checkDeserializer (findDeserializerClass (props , consumerProperties , false ));
682+ this .checkNullValueForExceptions = checkDeserializer (findDeserializerClass (props , consumerProperties , true ));
683683 this .syncCommitTimeout = determineSyncCommitTimeout ();
684684 if (this .containerProperties .getSyncCommitTimeout () == null ) {
685685 // update the property so we can use it directly from code elsewhere
@@ -844,14 +844,21 @@ else if (timeout instanceof String) {
844844 }
845845 }
846846
847- private Object findDeserializerClass (Map <String , Object > props , boolean isValue ) {
847+ @ Nullable
848+ private Object findDeserializerClass (Map <String , Object > props , Properties consumerOverrides , boolean isValue ) {
848849 Object configuredDeserializer = isValue
849850 ? KafkaMessageListenerContainer .this .consumerFactory .getValueDeserializer ()
850851 : KafkaMessageListenerContainer .this .consumerFactory .getKeyDeserializer ();
851852 if (configuredDeserializer == null ) {
852- return props .get (isValue
853+ Object deser = consumerOverrides .get (isValue
853854 ? ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG
854855 : ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG );
856+ if (deser == null ) {
857+ deser = props .get (isValue
858+ ? ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG
859+ : ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG );
860+ }
861+ return deser ;
855862 }
856863 else {
857864 return configuredDeserializer .getClass ();
@@ -883,10 +890,23 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
883890 }
884891 }
885892
886- private boolean checkDeserializer (Object deser ) {
887- return deser instanceof Class
888- ? ErrorHandlingDeserializer2 .class .isAssignableFrom ((Class <?>) deser )
889- : deser instanceof String && deser .equals (ErrorHandlingDeserializer2 .class .getName ());
893+ private boolean checkDeserializer (@ Nullable Object deser ) {
894+ Class <?> deserializer = null ;
895+ if (deser instanceof Class ) {
896+ deserializer = (Class <?>) deser ;
897+ }
898+ else if (deser instanceof String ) {
899+ try {
900+ deserializer = ClassUtils .forName ((String ) deser , getApplicationContext ().getClassLoader ());
901+ }
902+ catch (ClassNotFoundException | LinkageError e ) {
903+ throw new IllegalStateException (e );
904+ }
905+ }
906+ else if (deser != null ) {
907+ throw new IllegalStateException ("Deserializer must be a class or class name, not a " + deser .getClass ());
908+ }
909+ return deserializer == null ? false : ErrorHandlingDeserializer2 .class .isAssignableFrom (deserializer );
890910 }
891911
892912 protected void checkConsumer () {
0 commit comments