use SSL - Neethahiremath/Wiki GitHub Wiki

if you want to apply ssl to kafka


place the certs in application under resources add command to copy those files to /app folder from docker


COPY src/main/resources/ssl/env/* /app/ssl/env/
COPY src/main/resources/ssl/env/* /app/ssl/env/

env can be dev,stage and prod

add config in kafka config while producing and consuming


private Properties getDefaultProperties() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", bootstrapServers));
    props.put(ProducerConfig.ACKS_CONFIG, acks);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorClasses);
    setupSsl(props);
    return props;
  }


  private void setupSsl(final Properties props) {
    if (sslEnabled) {
      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, sslSecurityProtocol);
      props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, location);
      props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);
      props.put("ssl.endpoint.identification.algorithm", StringUtils.EMPTY);
      props.put("confluent.monitoring.interceptor.security.protocol", sslSecurityProtocol);
      props.put("confluent.monitoring.interceptor.ssl.truststore.location", location);
      props.put("confluent.monitoring.interceptor.ssl.truststore.password", password);
    }
  }

in yml add those new ssl related fields


kafka:
    producer:
        topicname: topicname
        bootstrap-servers: localhost:9092
        acks: all
        batch-size: 16384
        buffer-memory: 33554432
        key-serializer:  org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
        linger-ms: 1
        retries: 0
        interceptor-classes: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
        ssl-enabled: true
        ssl-security-protocol: SSL
        ssl-truststore-location: src/main/resources/ssl/env/truststore.jks
        ssl-truststore-password: ****
    consumer:
        bootstrap-servers: localhost:9092
        topic: topic
        auto-commit-interval-ms: 500
        auto-offset-reset: earliest
        enable-auto-commit: true
        session-timeout-ms: 10000
        max-poll-records: 200
        max-poll-interval: 3000000
        group-id: group-id
        concurrency: 1
        ssl-enabled: false
        ssl-security-protocol: SSL
        ssl-truststore-location: src/main/resources/ssl/env/truststore.jks
        ssl-truststore-password: ****

for cassandra :

non ssl port will be 9042 and with ssl port will be diff, u can use that port

files can be placed in application or in vault if you place in vault, u need to place it base64 encoded string and read that string from vault path. convert that encoded string back to jks.file

while creating session use ssl and in json files you can add the ssl engine factory



@Bean(name = "cassandraSessionBean")
	public DseSession session() throws NoSuchAlgorithmException, KeyManagementException {
		SSLContext sslcontext = SSLContext.getInstance("TLS");
		sslcontext.init(null, null, null);
		decodeFileFromBase64(SSLEncodedStr());

		return DseSession.builder()
				.withSslContext(sslcontext)
				.withAuthCredentials(username, password)
				.withConfigLoader(driverConfigLoader()).build();
	}


private static void decodeBase64(String string) {
		try {
			byte[] decodedBytes;
			FileOutputStream fop;
			decodedBytes = Base64.getDecoder().decode(string);
			File file = new File("/app/cassandra/ssl/truststore.jks");

			if (file.createNewFile()) {
				log.debug("JKS File is created!");
			} else {
				log.debug("JKS file already exists.");
			}

			fop = new FileOutputStream(file);
			fop.write(decodedBytes);
			fop.flush();
			fop.close();

			if (file.exists()) {
				log.debug("JKS file created and decoded base64 value");
			} else {
				log.debug("File Doesn't exist");
			}
		} catch (Exception e) {
			log.error("Error while writing SSL certificate to pod as .jks file {} ",
					e.getMessage());
		}
	}




@Bean
	public DriverConfigLoader driverConfigLoader() {
		File file = null;
		boolean fileReadable = true;
		try {
			file = ResourceUtils.getFile(filePath);
			fileReadable = file.canRead();
			log.info("Cassandra Json config file readable {} from file system {}", fileReadable,file);
		} catch (FileNotFoundException ex) {
			log.info("File doesNot exists:", file, ex);
		}
		if (fileReadable && file!= null) {
			return DseDriverConfigLoader.fromFile(file);
		} else {
			return DseDriverConfigLoader.fromClasspath(filePath);
		}
	}

filepath is where u have cassandra json file

sample cassandra.json can be //use port with ssl or without ssl

truststore-path - where u have the trust store placed


{
	"datastax-java-driver": {
		"basic": {
			"contact-points": [
				"localhost:9042"
			],
			"session-keyspace": "keyspace",
			"request": {
				"timeout": "5 seconds",
				"consistency": "LOCAL_QUORUM",
				"page-size": 5000
			},
			"load-balancing-policy": {
				"local-datacenter": "Cassandra"
			}
		},
		"advanced": {
			"auth-provider": {
				"class": "DsePlainTextAuthProvider",
				"username": "username",
				"password": "******"
			},
			"ssl-engine-factory": {
				"class": "DefaultSslEngineFactory",
				"hostname-validation": true,
				"truststore-path": "/app/ssl/truststore.jks",
				"truststore-password": "******"
			},
			"connection": {
        "init-query-timeout": "60 seconds",
        "set-keyspace-timeout": "60 seconds",
				"max-requests-per-connection": 1024,
				"pool": {
					"local": {
						"size": 4
					},
					"remote": {
						"size": 1
					}
				}
			},
			"control-connection": {
        "timeout": "60 seconds"
			},
			"continuous-paging": {
				"timeout": {
          "first-page": "60 seconds",
          "other-pages": "30 seconds"
				}
      },
      "speculative-execution-policy": {
        "class": "ConstantSpeculativeExecutionPolicy",
        "max-executions": 2,
        "delay": "1000 milliseconds"
			}
		}
	}
}

another way to implement SSL :

CassandraConfig:



  private Session session;
  private final CassandraProperties connectorProps;
  private final Environment environment;

  @Autowired
  public CassandraConfig(CassandraProperties connectorProps, Environment environment) {
    this.connectorProps = connectorProps;
    this.environment = environment;
  }

  @Bean(name = "cassandraSessionBean")
  public Session session() {
    connect();
    return session;
  }

  @Bean
  public MappingManager mappingManager(Session session) {
    final PropertyMapper propertyMapper =
        new DefaultPropertyMapper()
            .setNamingStrategy(
                new DefaultNamingStrategy(
                    NamingConventions.LOWER_CAMEL_CASE, NamingConventions.LOWER_SNAKE_CASE));

    final MappingConfiguration configuration =
        MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
    return new MappingManager(session, configuration);
  }

  public void connect() {
    SocketOptions socketOptions = new SocketOptions();
    socketOptions.setReadTimeoutMillis(3000);
    socketOptions.setConnectTimeoutMillis(3000);

    QueryOptions queryOptions =
        new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);

    DCAwareRoundRobinPolicy lbPolicy =
        DCAwareRoundRobinPolicy.builder().withLocalDc(connectorProps.getDatacenter()).build();
    CodecRegistry codecRegistry = CodecRegistry.DEFAULT_INSTANCE;

    // Default load balancing policy is datacenter-aware, token-aware policy
    Cluster.Builder clusterBuilder =
        Cluster.builder()
            .addContactPoints(connectorProps.getContactPoints())
            .withPort(connectorProps.getPort())
            .withoutJMXReporting() // To ignore java.lang.NoClassDefFoundError
            .withQueryOptions(queryOptions)
            .withSocketOptions(socketOptions)
            .withLoadBalancingPolicy(lbPolicy)
            .withAuthProvider(
                new PlainTextAuthProvider(
                    connectorProps.getUserName(), connectorProps.getPassword()))
            .withCodecRegistry(codecRegistry);

    boolean isSSLEnabledForCassandra =
        Arrays.stream(environment.getActiveProfiles())
            .anyMatch(Constant.SSL_CASSANDRA::equalsIgnoreCase);
    if (isSSLEnabledForCassandra) {
      clusterBuilder.withSSL(createSslOptions());
    }
    Cluster cluster = clusterBuilder.build();

    Metadata metadata = cluster.getMetadata();
    log.info("Cluster name: {}", metadata.getClusterName());

    for (Host host : metadata.getAllHosts()) {
      log.info(
          "Datacenter: {}, Host: {}, Rack: {}",
          host.getDatacenter(),
          host.getAddress(),
          host.getRack());
    }

    session = cluster.connect();
    session.execute("use " + connectorProps.getKeyspaceName() + ";");
  }

  private SSLOptions createSslOptions() {
    return RemoteEndpointAwareJdkSSLOptions.builder()
        .withSSLContext(createSslContextWithTruststore())
        .build();
  }

  private SSLContext createSslContextWithTruststore() {
    try {
      return new SSLContextBuilder()
          .loadTrustMaterial(
              new File(connectorProps.getTrustStorePath()),
              connectorProps.getTrustStorePassword().toCharArray())
          .build();
    } catch (NoSuchAlgorithmException
        | KeyManagementException
        | KeyStoreException
        | CertificateException
        | IOException e) {
      log.error("Error while creating SslContextWithTruststore", e);
      throw new SecurityException("Could not create SSL context", e);
    }
  }



cassandra:
      contact-points: localhost
      port: 9042
      keyspace-name: table_name
      user-name: *****
      password:  *****
      datacenter: dc1
      trustStorePassword: *****


---
spring:
    profiles: sslcassandra

cassandra:
    port: 9142
    trustStorePath: /src/resource/truststore.jks