Skip to content

Commit

Permalink
scylla configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Jan 16, 2025
1 parent d79767f commit 6743098
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 418 deletions.
578 changes: 240 additions & 338 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ license.workspace = true
[dependencies]
anyhow = "1.0.70"
dotenv = "0.15.0"
google-cloud-storage = "0.23.0"
lazy_static = "1.4.0"
regex = "1.10.2"
serde = "1.0.145"
Expand All @@ -23,6 +22,7 @@ opentelemetry-jaeger = { version = "0.18", features = [
"collector_client",
"isahc_collector_client",
], optional = true }
scylla = "0.15.1"
toml = "0.8.4"
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.15", features = [
Expand Down
72 changes: 55 additions & 17 deletions configuration/src/configs/tx_details_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,77 @@ use crate::configs::{deserialize_optional_data_or_env, required_value_or_panic};

#[derive(Debug, Clone)]
pub struct TxDetailsStorageConfig {
pub bucket_name: String,
pub scylla_url: String,
pub scylla_user: Option<String>,
pub scylla_password: Option<String>,
pub scylla_preferred_dc: Option<String>,
pub scylla_keepalive_interval: u64,
}

impl TxDetailsStorageConfig {
pub async fn gcs_config(&self) -> google_cloud_storage::client::ClientConfig {
let default_config = google_cloud_storage::client::ClientConfig::default();
if std::env::var("STORAGE_EMULATOR_HOST").is_ok() {
// if we are running local, and we are using gcs emulator
// we need to use anonymous access with the emulator host
let mut config = default_config.anonymous();
config.storage_endpoint = std::env::var("STORAGE_EMULATOR_HOST").unwrap();
config
} else {
default_config.with_auth().await.unwrap()
pub async fn scylla_client(&self) -> scylla::Session {
let mut load_balancing_policy_builder =
scylla::transport::load_balancing::DefaultPolicy::builder();

if let Some(scylla_preferred_dc) = self.scylla_preferred_dc.clone() {
load_balancing_policy_builder =
load_balancing_policy_builder.prefer_datacenter(scylla_preferred_dc);
}
}

pub async fn storage_client(&self) -> google_cloud_storage::client::Client {
let gcs_config = self.gcs_config().await;
google_cloud_storage::client::Client::new(gcs_config)
let scylla_execution_profile_handle = scylla::transport::ExecutionProfile::builder()
.load_balancing_policy(load_balancing_policy_builder.build())
.build()
.into_handle();

let mut session: scylla::SessionBuilder = scylla::SessionBuilder::new()
.known_node(self.scylla_url.clone())
.keepalive_interval(std::time::Duration::from_secs(
self.scylla_keepalive_interval,
))
.default_execution_profile_handle(scylla_execution_profile_handle);

if let Some(user) = self.scylla_user.clone() {
if let Some(password) = self.scylla_password.clone() {
session = session.user(user, password);
}
}
session
.build()
.await
.expect("Failed to create scylla session")
}
}

#[derive(Deserialize, Debug, Clone, Default)]
pub struct CommonTxDetailStorageConfig {
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub bucket_name: Option<String>,
pub scylla_url: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub scylla_user: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub scylla_password: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub scylla_preferred_dc: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub scylla_keepalive_interval: Option<u64>,
}

impl CommonTxDetailStorageConfig {
pub fn default_scylla_keepalive_interval() -> u64 {
60
}
}

impl From<CommonTxDetailStorageConfig> for TxDetailsStorageConfig {
fn from(common_config: CommonTxDetailStorageConfig) -> Self {
Self {
bucket_name: required_value_or_panic("bucket_name", common_config.bucket_name),
scylla_url: required_value_or_panic("scylla_url", common_config.scylla_url),
scylla_user: common_config.scylla_user,
scylla_password: common_config.scylla_password,
scylla_preferred_dc: common_config.scylla_preferred_dc,
scylla_keepalive_interval: common_config
.scylla_keepalive_interval
.unwrap_or_else(CommonTxDetailStorageConfig::default_scylla_keepalive_interval),
}
}
}
26 changes: 24 additions & 2 deletions configuration/src/default_env_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,30 @@ tracked_changes = "${TRACKED_CHANGES}"
## Transaction details are stored in the Google Cloud Storage
[tx_details_storage]
## Storage Bucket Name
bucket_name = "${TX_BUCKET_NAME}"
## ScyllaDB database connection URL
## Example: "127.0.0.1:9042"
scylla_url = "${SCYLLA_URL}"
## Scylla user(login)
## Optional database user
scylla_user = "${SCYLLA_USER}"
## Scylla password
## Optional database password
scylla_password = "${SCYLLA_PASSWORD}"
## ScyllaDB preferred DataCenter
## Accepts the DC name of the ScyllaDB to filter the connection to that DC only (preferrably).
## If you connect to multi-DC cluter, you might experience big latencies while working with the DB.
## This is due to the fact that ScyllaDB driver tries to connect to any of the nodes in the cluster disregarding of the location of the DC.
## This option allows to filter the connection to the DC you need.
## Example: "DC1" where DC1 is located in the same region as the application.
## Default value is None
scylla_preferred_dc = "${SCYLLA_PREFERRED_DC}"
## Scylla keepalive interval
## By default we use 60 seconds
scylla_keepalive_interval = "${SCYLLA_KEEPALIVE_INTERVAL}"
## Database configuration
[database]
Expand Down
6 changes: 3 additions & 3 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ impl ServerContext {
);

let tx_details_storage = tx_details_storage::TxDetailsStorage::new(
rpc_server_config.tx_details_storage.storage_client().await,
rpc_server_config.tx_details_storage.bucket_name.clone(),
);
rpc_server_config.tx_details_storage.scylla_client().await,
)
.await?;

let tx_cache_storage =
cache_storage::TxIndexerCache::new(rpc_server_config.general.redis_url.to_string())
Expand Down
1 change: 0 additions & 1 deletion tx-details-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ license.workspace = true

[dependencies]
anyhow = "1.0.70"
google-cloud-storage = "0.23.0"
scylla = "0.15.1"
56 changes: 4 additions & 52 deletions tx-details-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use google_cloud_storage::http::objects::download::Range;
use google_cloud_storage::http::objects::get::GetObjectRequest;
use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};

pub struct ScyllaTxDetailsStorage {
pub struct TxDetailsStorage {
add_transaction: scylla::prepared_statement::PreparedStatement,
get_transaction: scylla::prepared_statement::PreparedStatement,
scylla_session: scylla::Session,
}

impl ScyllaTxDetailsStorage {
impl TxDetailsStorage {
pub async fn new(scylla_session: scylla::Session) -> anyhow::Result<Self> {
Self::create_keyspace(&scylla_session).await?;
Self::create_table(&scylla_session).await?;
Expand All @@ -22,7 +18,7 @@ impl ScyllaTxDetailsStorage {
).await?,
get_transaction: Self::prepare_query(
&scylla_session,
"SELECT block_height, transaction_details FROM tx_details.transactions WHERE transaction_hash = ? LIMIT 1",
"SELECT transaction_details FROM tx_details.transactions WHERE transaction_hash = ? LIMIT 1",
scylla::frame::types::Consistency::LocalOne,
).await?,
scylla_session,
Expand Down Expand Up @@ -56,7 +52,7 @@ impl ScyllaTxDetailsStorage {
pub async fn create_table(scylla_session: &scylla::Session) -> anyhow::Result<()> {
scylla_session
.query_unpaged(
"CREATE TABLE IF NOT EXISTS transactions (
"CREATE TABLE IF NOT EXISTS tx_details.transactions (
transaction_hash varchar PRIMARY KEY,
transaction_details BLOB
)",
Expand All @@ -83,47 +79,3 @@ impl ScyllaTxDetailsStorage {
Ok(data)
}
}

pub struct TxDetailsStorage {
client: google_cloud_storage::client::Client,
bucket_name: String,
}

impl TxDetailsStorage {
/// Create a new instance of the `TxDetailsStorage` struct.
pub fn new(client: google_cloud_storage::client::Client, bucket_name: String) -> Self {
Self {
client,
bucket_name,
}
}

pub async fn store(&self, key: &str, data: Vec<u8>) -> anyhow::Result<()> {
self.client
.upload_object(
&UploadObjectRequest {
bucket: self.bucket_name.to_string(),
..Default::default()
},
data,
&UploadType::Simple(Media::new(key.to_string())),
)
.await?;
Ok(())
}

pub async fn retrieve(&self, key: &str) -> anyhow::Result<Vec<u8>> {
let data = self
.client
.download_object(
&GetObjectRequest {
bucket: self.bucket_name.to_string(),
object: key.to_string(),
..Default::default()
},
&Range::default(),
)
.await?;
Ok(data)
}
}
7 changes: 3 additions & 4 deletions tx-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ async fn main() -> anyhow::Result<()> {
);

tracing::info!(target: INDEXER, "Instantiating the tx_details storage client...");
let tx_details_storage = std::sync::Arc::new(TxDetailsStorage::new(
indexer_config.tx_details_storage.storage_client().await,
indexer_config.tx_details_storage.bucket_name.clone(),
));
let tx_details_storage = std::sync::Arc::new(
TxDetailsStorage::new(indexer_config.tx_details_storage.scylla_client().await).await?,
);

tracing::info!(target: INDEXER, "Instantiating the stream...",);
let (sender, stream) = near_lake_framework::streamer(lake_config);
Expand Down

0 comments on commit 6743098

Please sign in to comment.