Skip to content

Commit

Permalink
Add missing codecs
Browse files Browse the repository at this point in the history
  • Loading branch information
kornilova203 committed May 23, 2024
1 parent 9b1a861 commit 9a0b43f
Show file tree
Hide file tree
Showing 20 changed files with 807 additions and 98 deletions.
30 changes: 30 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/CassandraConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,25 @@
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
import com.ing.data.cassandra.jdbc.codec.BigintToBigDecimalCodec;
import com.ing.data.cassandra.jdbc.codec.BlobToByteArrayCodec;
import com.ing.data.cassandra.jdbc.codec.DateToDateCodec;
import com.ing.data.cassandra.jdbc.codec.DecimalToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.DecimalToLongCodec;
import com.ing.data.cassandra.jdbc.codec.DoubleToLongCodec;
import com.ing.data.cassandra.jdbc.codec.FloatToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.FloatToLongCodec;
import com.ing.data.cassandra.jdbc.codec.InetToStringCodec;
import com.ing.data.cassandra.jdbc.codec.IntToLongCodec;
import com.ing.data.cassandra.jdbc.codec.LongToIntCodec;
import com.ing.data.cassandra.jdbc.codec.SmallintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.SmallintToLongCodec;
import com.ing.data.cassandra.jdbc.codec.TimestampToLongCodec;
import com.ing.data.cassandra.jdbc.codec.TimeuuidToStringCodec;
import com.ing.data.cassandra.jdbc.codec.TinyintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.TinyintToLongCodec;
import com.ing.data.cassandra.jdbc.codec.UuidToStringCodec;
import com.ing.data.cassandra.jdbc.codec.VarintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.VarintToLongCodec;
import com.ing.data.cassandra.jdbc.optionset.Default;
import com.ing.data.cassandra.jdbc.optionset.OptionSet;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -193,6 +204,8 @@ public class CassandraConnection extends AbstractConnection implements Connectio
LOG.info("Node: {} runs Cassandra v.{}", entry.getValue().getEndPoint().resolve(), cassandraVersion);
}
});

registerCodecs(cSession);
}

/**
Expand Down Expand Up @@ -221,6 +234,10 @@ public CassandraConnection(final Session cSession, final String currentKeyspace,
this.metadata = cSession.getMetadata();
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.debugMode = debugMode;
registerCodecs(cSession);
}

private static void registerCodecs(Session cSession) {
final List<TypeCodec<?>> codecs = new ArrayList<>();
codecs.add(new TimestampToLongCodec());
codecs.add(new LongToIntCodec());
Expand All @@ -231,6 +248,19 @@ public CassandraConnection(final Session cSession, final String currentKeyspace,
codecs.add(new VarintToIntCodec());
codecs.add(new SmallintToIntCodec());
codecs.add(new TinyintToIntCodec());
codecs.add(new BlobToByteArrayCodec());
codecs.add(new DateToDateCodec());
codecs.add(new DecimalToLongCodec());
codecs.add(new DoubleToLongCodec());
// codecs.add(new DurationToStringCodec());
codecs.add(new InetToStringCodec());
// codecs.add(new TimeToTimeCodec());
codecs.add(new TimeuuidToStringCodec());
codecs.add(new UuidToStringCodec());
codecs.add(new VarintToLongCodec());
codecs.add(new FloatToLongCodec());
codecs.add(new SmallintToLongCodec());
codecs.add(new TinyintToLongCodec());

codecs.forEach(codec -> ((DefaultCodecRegistry) cSession.getContext().getCodecRegistry()).register(codec));
}
Expand Down
26 changes: 1 addition & 25 deletions src/main/java/com/ing/data/cassandra/jdbc/SessionHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,10 @@
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy;
import com.datastax.oss.driver.internal.core.ssl.DefaultSslEngineFactory;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.ing.data.cassandra.jdbc.codec.BigintToBigDecimalCodec;
import com.ing.data.cassandra.jdbc.codec.DecimalToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.FloatToDoubleCodec;
import com.ing.data.cassandra.jdbc.codec.IntToLongCodec;
import com.ing.data.cassandra.jdbc.codec.LongToIntCodec;
import com.ing.data.cassandra.jdbc.codec.SmallintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.TimestampToLongCodec;
import com.ing.data.cassandra.jdbc.codec.TinyintToIntCodec;
import com.ing.data.cassandra.jdbc.codec.VarintToIntCodec;
import com.ing.data.cassandra.jdbc.utils.ContactPoint;
import com.instaclustr.cassandra.driver.auth.KerberosAuthProviderBase;
import com.instaclustr.cassandra.driver.auth.ProgrammaticKerberosAuthProvider;
Expand All @@ -53,7 +43,6 @@
import java.sql.SQLNonTransientConnectionException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -67,7 +56,6 @@
import static com.ing.data.cassandra.jdbc.utils.DriverUtil.JSSE_TRUSTSTORE_PASSWORD_PROPERTY;
import static com.ing.data.cassandra.jdbc.utils.DriverUtil.JSSE_TRUSTSTORE_PROPERTY;
import static com.ing.data.cassandra.jdbc.utils.ErrorConstants.SSL_CONFIG_FAILED;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_USE_KERBEROS;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CLOUD_SECURE_CONNECT_BUNDLE;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONFIG_FILE;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_CONNECT_TIMEOUT;
Expand All @@ -88,6 +76,7 @@
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_SSL_HOSTNAME_VERIFICATION;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_TCP_NO_DELAY;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_USER;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.TAG_USE_KERBEROS;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.parseReconnectionPolicy;
import static com.ing.data.cassandra.jdbc.utils.JdbcUrlUtil.parseURL;

Expand Down Expand Up @@ -341,19 +330,6 @@ private Session createSession(final Properties properties) throws SQLException {
));
}

// Declare and register codecs.
final List<TypeCodec<?>> codecs = new ArrayList<>();
codecs.add(new TimestampToLongCodec());
codecs.add(new LongToIntCodec());
codecs.add(new IntToLongCodec());
codecs.add(new BigintToBigDecimalCodec());
codecs.add(new DecimalToDoubleCodec());
codecs.add(new FloatToDoubleCodec());
codecs.add(new VarintToIntCodec());
codecs.add(new SmallintToIntCodec());
codecs.add(new TinyintToIntCodec());
builder.addTypeCodecs(codecs.toArray(new TypeCodec[]{}));

builder.withKeyspace(keyspace);
builder.withConfigLoader(driverConfigLoaderBuilder.build());

Expand Down
56 changes: 56 additions & 0 deletions src/main/java/com/ing/data/cassandra/jdbc/codec/BaseLongCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.ing.data.cassandra.jdbc.codec;

import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.codec.PrimitiveLongCodec;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;

import java.nio.ByteBuffer;

/**
* @author Liudmila Kornilova
**/
public abstract class BaseLongCodec implements PrimitiveLongCodec {
abstract int getNumberOfBytes();

abstract void serializeNoBoxingInner(long value, ByteBuffer bb);

abstract long deserializeNoBoxingInner(ByteBuffer bytes);

@NonNull
@Override
public GenericType<Long> getJavaType() {
return GenericType.LONG;
}

@Override
public Long parse(String value) {
return value == null || value.isEmpty() || value.equalsIgnoreCase("NULL") ? null : Long.parseLong(value);
}

@Override
@NonNull
public String format(@Nullable Long value) {
if (value == null) return "NULL";
return Long.toString(value);
}

@Override
public ByteBuffer encodePrimitive(long value, @NonNull ProtocolVersion protocolVersion) {
ByteBuffer bb = ByteBuffer.allocate(getNumberOfBytes());
serializeNoBoxingInner(value, bb);
bb.flip();
return bb;
}

@Override
public long decodePrimitive(ByteBuffer bytes, @NonNull ProtocolVersion protocolVersion) {
if (bytes == null || bytes.remaining() == 0) return 0;
if (bytes.remaining() != getNumberOfBytes()) {
throw new IllegalStateException("Invalid value, expecting " + getNumberOfBytes() + " bytes but got " + bytes.remaining());
}

return deserializeNoBoxingInner(bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.ing.data.cassandra.jdbc.codec;


import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.datastax.oss.protocol.internal.util.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;

import java.nio.ByteBuffer;

/**
* @author Liudmila Kornilova
**/
public class BlobToByteArrayCodec implements TypeCodec<byte[]> {
@NonNull
@Override
public DataType getCqlType() {
return DataTypes.BLOB;
}

@NonNull
@Override
public GenericType<byte[]> getJavaType() {
return GenericType.of(byte[].class);
}

@Override
public ByteBuffer encode(byte[] value, @NonNull ProtocolVersion protocolVersion) {
return value == null ? null : ByteBuffer.wrap(value);
}

@Override
public byte[] decode(ByteBuffer bytes, @NonNull ProtocolVersion protocolVersion) {
return bytes == null ? null : Bytes.getArray(bytes);
}

@Override
public byte[] parse(String value) {
return value == null || value.isEmpty() || value.equalsIgnoreCase("NULL")
? null
: Bytes.getArray(Bytes.fromHexString(value));
}

@Override
@NonNull
public String format(byte[] value) {
if (value == null) return "NULL";
return Bytes.toHexString(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.ing.data.cassandra.jdbc.codec;

import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import edu.umd.cs.findbugs.annotations.NonNull;

import java.nio.ByteBuffer;
import java.sql.Date;
import java.time.Instant;
import java.time.LocalDate;

/**
* @author Liudmila Kornilova
**/
public class DateToDateCodec implements TypeCodec<Date> {
private final TypeCodec<LocalDate> dateCodec = CodecRegistry.DEFAULT.codecFor(DataTypes.DATE, LocalDate.class);

@NonNull
@Override
public DataType getCqlType() {
return DataTypes.DATE;
}

@NonNull
@Override
public GenericType<Date> getJavaType() {
return GenericType.of(Date.class);
}

@Override
public ByteBuffer encode(Date value, @NonNull ProtocolVersion protocolVersion) {
if (value == null) return null;
LocalDate localDate = LocalDate.from(Instant.ofEpochMilli(value.getTime()));
return dateCodec.encode(localDate, protocolVersion);
}

@Override
public Date decode(ByteBuffer bytes, @NonNull ProtocolVersion protocolVersion) {
if (bytes == null) return null;
LocalDate localDate = dateCodec.decode(bytes, protocolVersion);
return localDate == null ? null : new Date(localDate.toEpochDay() * 86400L);
}

@Override
public Date parse(String value) {
throw new RuntimeException("Not supported");
}

@Override
@NonNull
public String format(Date value) {
throw new RuntimeException("Not supported");
}
}
Original file line number Diff line number Diff line change
@@ -1,69 +1,46 @@
/*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ing.data.cassandra.jdbc.codec;

import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.PrimitiveDoubleCodec;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.ing.data.cassandra.jdbc.utils.ByteBufferUtil;
import edu.umd.cs.findbugs.annotations.NonNull;

import javax.annotation.Nonnull;
import java.math.BigDecimal;
import java.nio.ByteBuffer;

/**
* Manages the two-way conversion between the CQL type {@link DataTypes#DECIMAL} and the Java type {@link Double}.
*/
public class DecimalToDoubleCodec extends AbstractCodec<Double> implements TypeCodec<Double> {

/**
* Constructor for {@code DecimalToDoubleCodec}.
*/
public DecimalToDoubleCodec() {
public class DecimalToDoubleCodec extends AbstractCodec<Double> implements PrimitiveDoubleCodec {
private final TypeCodec<BigDecimal> decimalCodec = CodecRegistry.DEFAULT.codecFor(DataTypes.DECIMAL, BigDecimal.class);

@NonNull
@Override
public DataType getCqlType() {
return DataTypes.DECIMAL;
}

@Nonnull
@NonNull
@Override
public GenericType<Double> getJavaType() {
return GenericType.DOUBLE;
}

@Nonnull
@Override
public DataType getCqlType() {
return DataTypes.DECIMAL;
public ByteBuffer encodePrimitive(double value, @NonNull ProtocolVersion protocolVersion) {
return decimalCodec.encode(BigDecimal.valueOf(value), protocolVersion);
}

@Override
public ByteBuffer encode(final Double value, @Nonnull final ProtocolVersion protocolVersion) {
if (value == null) {
return null;
}
return ByteBufferUtil.bytes(value);
public double decodePrimitive(ByteBuffer bytes, @NonNull ProtocolVersion protocolVersion) {
BigDecimal bigDecimal = decimalCodec.decode(bytes, protocolVersion);
if (bigDecimal == null) return 0;
return bigDecimal.doubleValue();
}

@Override
public Double decode(final ByteBuffer bytes, @Nonnull final ProtocolVersion protocolVersion) {
if (bytes == null) {
return null;
}
// always duplicate the ByteBuffer instance before consuming it!
return ByteBufferUtil.toDouble(bytes.duplicate());
}

@Override
Double parseNonNull(@Nonnull final String value) {
Expand All @@ -74,5 +51,4 @@ Double parseNonNull(@Nonnull final String value) {
String formatNonNull(@Nonnull final Double value) {
return String.valueOf(value);
}

}
Loading

0 comments on commit 9a0b43f

Please sign in to comment.