You may be asking me at this point: Why is our application so slow? minutes is a long time to wait to fill some data into a database. Why are we only addressing performance now?
The first answer is that the performance of this data insertion doesn't matter quite so much. We only run it once against the production database.
The "real" answer is because we're awaiting all of the requests in a loop so we only send one at a time. Requests across the internet are quite slow, and while the actual queries we're making against PlanetScale can finish in single-digit milliseconds, the request across the internet can not complete quite that fast.
The second answer holds a bit more to it. Async/await and zero-cost futures were first stabilized in late 2019. This has given the ecosystem some time to adopt async Rust such that async runtimes like tokio are now 1.0 and libraries such as sqlx and web frameworks like Rocket have had time to integrate the 1.0 runtimes.
However, this stabilization only included the core framework on which the even higher level async tools are built. Futures are stable, async/await syntax is stable, but Stream
s are not...
and we're about to use Streams
.
This is the point at which I tell you this lesson is optional and you can choose to skip it, just watch it for context, or work through it.
First we need to add a new crate: futures
. Yes, the crate has the same name as Future
s. This is because futures development was done in this crate before its stabilization. When the Future
trait stabilized, some code was moved out of the futures
crate into Rust core, and some more code that added extra functionality was left in the futures
crate.
cargo add -p upload-pokemon-data futures
The next step is to work with the FuturesUnordered
type. This type is something like Promise.all
from JavaScript. FuturesUnordered
allows us to build up a set of futures that we then wait on all together, allowing them to run in any order (hence why it's called FuturesUnordered
).
FuturesUnordered
is a collection of futures that can run in any order.
Bring FuturesUnordered
into scope.
use futures::stream::FuturesUnordered;
We can create a FuturesUnordered
with new
.
let tasks = FuturesUnordered::new();
for record in pokemon.clone().into_iter().progress() {
...
}
We're then going to change our insert_pokemon
call. Instead of awaiting on the result now, we're going to tell tokio "here's a thing we'll want to run later" by using tokio::spawn
.
tokio::spawn
returns a type called a JoinHandle
, which as it happens is also a future. By using spawn
we've told tokio to accept the work we want to do and in return tokio has given us a way to tell it when we want to await that work.
Since the return value of tokio::spawn
is a Future
, we can push that future into tasks
which will let us await it alongside all the other tasks later.
tasks.push(tokio::spawn(insert_pokemon(
&pool,
&pokemon_row,
)));
This code tells us that &pool
and &pokemon_row
don't live long enough. This is because we can't pass references to spawned async functions. Those references are only going to live for a set amount of time and we've effectively said that we're going to await the Future... eventually. There's no guarantee that these references will live that long, so we need to change our insert_pokemon
function to not use references.
In src/db.rs
remove the &
from the arguments
pub async fn insert_pokemon(
pool: MySqlPool,
PokemonTableRow {...}: PokemonTableRow,
)
We also need to change the .execute
in that same function to take a reference now.
).execute(&pool).await
Then back in our spawn code, remove the &
s as well. We also need to clone the pool, or it will be moved. Luckily if we peek behind the scenes of the Pool
type, the authors of sqlx already anticipated this problem and powered the type with an Arc
. Arc
is an "automatically reference counted" type and is very cheap to clone
.
We also clone pokemon_row
because we use it in all of the later sql queries, so we are basically giving the task a copy of the pokemon_row
it can own and move around however it wants, while we keep ours to use later.
tasks.push(tokio::spawn(insert_pokemon(
pool.clone(),
pokemon_row.clone(),
)));
To do this we need to derive Clone
for PokemonTableRow
and PokemonId
in src/db.rs
.
#[derive(Debug, Clone)]
pub struct PokemonTableRow {
#[derive(Clone)]
pub struct PokemonId(Ksuid);
This will clear up the insert_pokemon
tasks, but we still have to do the same work for the other tasks.
For abilities
we don't have a separate function that can capture the variables we're passing in so that they live as long as the async function, so we need to use an async block.
We've created the local variables (pool
, pokemon_id
, and ability
) and then given them to the Future using the async move
syntax. This allows the variables to outlive their original scope but it also means that the local variables can't be used by anything else, which is why we're cloning new copies of them to move into the Future.
The move
keyword allows the block to take ownership over the local variables.
for ability in record.abilities.iter() {
let pool = pool.clone();
let pokemon_id = pokemon_row.id.clone();
let ability = ability.clone();
tasks.push(tokio::spawn(async move {
sqlx::query!(
r#"
INSERT INTO abilities (
id, pokemon_id, ability
) VALUES (?, ?, ?)"#,
PokemonId::new(),
pokemon_id,
ability,
)
.execute(&pool)
.await
}));
}
Rinse, repeat for the other two sql query loops
for egg_group in record.egg_groups.iter() {
let pool = pool.clone();
let pokemon_id = pokemon_row.id.clone();
let egg_group = egg_group.clone();
tasks.push(tokio::spawn(async move {
sqlx::query!(
r#"
INSERT INTO egg_groups (
id, pokemon_id, egg_group
) VALUES (?, ?, ?)"#,
PokemonId::new(),
pokemon_id,
egg_group,
)
.execute(&pool)
.await
}))
}
for typing in record.typing.iter() {
let pool = pool.clone();
let pokemon_id = pokemon_row.id.clone();
let typing = typing.clone();
tasks.push(tokio::spawn(async move {
sqlx::query!(
r#"
INSERT INTO typing (
id, pokemon_id, typing
) VALUES (?, ?, ?)"#,
PokemonId::new(),
pokemon_id,
typing,
)
.execute(&pool)
.await
}))
}
and again for the evolutions queries. Every time we're repeating the same concepts. prep the variables, move the new variables into an async block, add the spawned future to tasks for later.
for pokemon in pokemon
.into_iter()
.progress()
.filter(|pokemon| pokemon.evolves_from.is_some())
{
let name = pokemon.evolves_from.expect(
"Expected a value here since we just checked",
);
let pokemon_id =
pokemon_map.get(&pokemon.name).unwrap().clone();
let evolves_from_id =
pokemon_map.get(&name).unwrap().clone();
let pool = pool.clone();
tasks.push(tokio::spawn(async move {
sqlx::query!(
r#"
INSERT INTO evolutions (
id, pokemon_id, evolves_from
) VALUES (?, ?, ?)"#,
PokemonId::new(),
pokemon_id,
evolves_from_id,
)
.execute(&pool)
.await
}))
}
awaiting a Stream
Now that we have all of our futures set up in a FuturesUnordered
, we can await the Stream
.
We're going to need to bring StreamExt
and ProgressBar
into scope.
use futures::{stream::FuturesUnordered, StreamExt};
use indicatif::{ProgressBar, ProgressIterator};
To be able to call .next()
on the tasks
, tasks
needs to be mutable, so head back up and add mut
to the tasks
declaration.
let mut tasks = FuturesUnordered::new();
We're going to create our own ProgressBar
instead of attaching to an iterator this time.
We have to use while let
syntax to operate on the Stream
. A for loop will not work here. One day we might have something like async for
but we do not today.
While tasks.next().await
still produces Some(item)
, where tasks.next()
is an async function that gives us the result of whatever future finishes next. So when we await
on tasks.next()
we either get Some(item)
which means there are still items left in the stream, or None
which means the stream is done.
item
then, is the result of the JoinHandle
future.
In this case, that means we have nested Results
, one for the JoinHandle
and one for our own return value.
Result<Result<MySqlQueryResult, Error>, JoinError>
ProgressBar
already has it's own Arc
, so we don't need to worry about it and can .inc(1)
to tick the progress bar once for each completed task.
Finally we .finish
the progress bar when we're done processing.
let pb = ProgressBar::new(tasks.len() as u64);
while let Some(item) = tasks.next().await {
item.into_diagnostic()?.into_diagnostic()?;
pb.inc(1);
}
pb.finish();
We run into an error fairly quickly though
pool timed out while waiting for an open connection
We're now in debugging territory. Good ideas might be to search the sqlx github issues, or the discord history. Either way we'd end up finding our that when using a large amount of futures like we are now, we either need to increase the timeout or make it so that the futures complete faster so that enough mysql connections are freed up and returned to the pool so that all the futures can complete in the timeout time.
Our max_connections
for the pool is 5 right now which is kinda low for us. We could crank that up to 50 and PlanetScale wouldn't really care. Contrary to some other SQL providers, connections are cheap for PlanetScale. This is one of the reasons we can use it from serverless functions.
The default connection timeout is 30 seconds though, so we either need to complete our script in 30 seconds or we get the boot.
We'll need std::time::Duration
.
use std::{collections::HashMap, env, time::Duration};
Since we're not using sqlx in a server context right now, we kinda just want it to wait on a connection for awhile. No user is waiting for a request to complete.
With 50 max connections and 5 minutes (5 * 60 seconds), our pool initialization looks like this now.
let pool = MySqlPoolOptions::new()
.max_connections(50)
.acquire_timeout(Duration::from_secs(60 * 5))
.connect(&database_url)
.await
.map_err(|e| {
miette!(
help="database urls must be in the form `mysql://username:password@host:port/database`",
"{e}"
)
})?;
With these new settings and the added concurrency of FuturesUnordered
, it took me 40 seconds to complete all ~7000 requests to PlanetScale.