Create swf_assets:manifests:load task to save all manifest files
Doing that sweet, sweet backfill!! It's not exactly *fast*, since there's about 570k records to work through, but it's pretty good all things considered! Thanks, surprisingly-reusable async code!
This commit is contained in:
parent
9a3b33ea2f
commit
992954ce89
2 changed files with 98 additions and 60 deletions
|
@ -117,10 +117,12 @@ class SwfAsset < ApplicationRecord
|
|||
end
|
||||
|
||||
def manifest
|
||||
raise "manifest_url is blank" if manifest_url.blank?
|
||||
NeopetsMediaArchive.load_json(manifest_url)
|
||||
end
|
||||
|
||||
def preload_manifest
|
||||
raise "manifest_url is blank" if manifest_url.blank?
|
||||
NeopetsMediaArchive.preload_file(manifest_url)
|
||||
end
|
||||
|
||||
|
@ -251,7 +253,14 @@ class SwfAsset < ApplicationRecord
|
|||
# NeopetsMediaArchive will share a pool of persistent connections for
|
||||
# them.
|
||||
swf_assets.map do |swf_asset|
|
||||
semaphore.async { swf_asset.preload_manifest }
|
||||
semaphore.async do
|
||||
begin
|
||||
swf_asset.preload_manifest
|
||||
rescue StandardError => error
|
||||
Rails.logger.error "Could not preload manifest for asset " +
|
||||
"#{swf_asset.id} (#{swf_asset.manifest_url}): #{error.message}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Wait until all tasks are done.
|
||||
|
|
|
@ -35,75 +35,104 @@ namespace :swf_assets do
|
|||
end
|
||||
end
|
||||
|
||||
desc "Backfill manifest_url for SwfAsset models"
|
||||
task manifests: [:environment] do
|
||||
timeout = ENV.fetch("TIMEOUT", "5").to_i
|
||||
namespace :manifests do
|
||||
desc "Save all known manifests to the Neopets Media Archive"
|
||||
task load: [:environment] do
|
||||
# Log errors to STDOUT, but we don't need the info messages about
|
||||
# successful saves.
|
||||
Rails.logger = Logger.new(STDOUT, level: :error)
|
||||
|
||||
assets = SwfAsset.where(manifest_url: nil)
|
||||
count = assets.count
|
||||
puts "Found #{count} assets without manifests"
|
||||
# Find all the manifests with known URLs. (We don't have a database
|
||||
# filter for "do we already have the manifest downloaded", but that's
|
||||
# okay, the preload method will quickly check that for us!)
|
||||
swf_assets = SwfAsset.where.not(manifest_url: nil)
|
||||
total_count = swf_assets.count
|
||||
puts "Found #{total_count} assets with manifests"
|
||||
|
||||
Async do
|
||||
# Share a pool of persistent connections, rather than reconnecting on
|
||||
# each request. (This library does that automatically!)
|
||||
internet = Async::HTTP::Internet.instance
|
||||
|
||||
# Load the assets in batches, then process each batch in two steps: first
|
||||
# inferring all manifest URLs in the batch, then saving all assets in the
|
||||
# batch. (This makes the update step more efficient, and it also avoids
|
||||
# simultaneous queries across the fibers, which ActiveRecord disallows!)
|
||||
#
|
||||
# We keep track of a shared index `i` here, but we only actually
|
||||
# increment it once each task is *done*, so that the numbers output in
|
||||
# the right order!
|
||||
i = 0
|
||||
assets.find_in_batches(batch_size: 1000) do |batch|
|
||||
# Create a barrier, to let us wait on all the tasks; then under it
|
||||
# create a semaphore, to limit how many tasks run at once.
|
||||
barrier = Async::Barrier.new
|
||||
semaphore = Async::Semaphore.new(100, parent: barrier)
|
||||
|
||||
batch.each do |asset|
|
||||
semaphore.async do |task|
|
||||
manifest_url = nil
|
||||
begin
|
||||
task.with_timeout(timeout) do
|
||||
manifest_url = infer_manifest_url(asset.url, internet)
|
||||
end
|
||||
rescue StandardError => error
|
||||
i += 1
|
||||
puts "[#{i}/#{count}] ⚠️ Skipping #{asset.id}: #{error.message}"
|
||||
next
|
||||
end
|
||||
|
||||
i += 1
|
||||
puts "[#{i}/#{count}] Manifest for #{asset.id}: #{manifest_url}"
|
||||
|
||||
# Write, but don't yet save, the manifest URL.
|
||||
asset.manifest_url = manifest_url
|
||||
end
|
||||
# For each batch of 1000 assets, load their manifests concurrently.
|
||||
# Wrap everything in a top-level sync, so keyboard interrupts will
|
||||
# propagate all the way up to here, instead of just cancelling the
|
||||
# current batch.
|
||||
Sync do
|
||||
saved_count = 0
|
||||
swf_assets.find_in_batches(batch_size: 1000) do |swf_assets|
|
||||
SwfAsset.preload_manifests(swf_assets)
|
||||
saved_count += swf_assets.size
|
||||
puts "Loaded #{saved_count} of #{total_count} manifests"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Wait for all the above tasks to finish. (Then, all of the assets that
|
||||
# succeeded should have an unsaved `manifest_url` change.)
|
||||
barrier.wait
|
||||
desc "Backfill manifest_url for SwfAsset models"
|
||||
task urls: [:environment] do
|
||||
timeout = ENV.fetch("TIMEOUT", "5").to_i
|
||||
|
||||
# Save all of the assets in the batch. (We do this in a transaction not
|
||||
# for the transactional semantics, but because it's notably faster than
|
||||
# doing a commit between each query, which is what sending the queries
|
||||
# individually would effectively do!)
|
||||
begin
|
||||
SwfAsset.transaction do
|
||||
batch.each do |asset|
|
||||
assets = SwfAsset.where(manifest_url: nil)
|
||||
count = assets.count
|
||||
puts "Found #{count} assets without manifests"
|
||||
|
||||
Sync do
|
||||
# Share a pool of persistent connections, rather than reconnecting on
|
||||
# each request. (This library does that automatically!)
|
||||
internet = Async::HTTP::Internet.instance
|
||||
|
||||
# Load the assets in batches, then process each batch in two steps: first
|
||||
# inferring all manifest URLs in the batch, then saving all assets in the
|
||||
# batch. (This makes the update step more efficient, and it also avoids
|
||||
# simultaneous queries across the fibers, which ActiveRecord disallows!)
|
||||
#
|
||||
# We keep track of a shared index `i` here, but we only actually
|
||||
# increment it once each task is *done*, so that the numbers output in
|
||||
# the right order!
|
||||
i = 0
|
||||
assets.find_in_batches(batch_size: 1000) do |batch|
|
||||
# Create a barrier, to let us wait on all the tasks; then under it
|
||||
# create a semaphore, to limit how many tasks run at once.
|
||||
barrier = Async::Barrier.new
|
||||
semaphore = Async::Semaphore.new(100, parent: barrier)
|
||||
|
||||
batch.each do |asset|
|
||||
semaphore.async do |task|
|
||||
manifest_url = nil
|
||||
begin
|
||||
asset.save!
|
||||
task.with_timeout(timeout) do
|
||||
manifest_url = infer_manifest_url(asset.url, internet)
|
||||
end
|
||||
rescue StandardError => error
|
||||
puts "⚠️ Saving asset #{asset.id} failed: #{error.full_message}"
|
||||
i += 1
|
||||
puts "[#{i}/#{count}] ⚠️ Skipping #{asset.id}: #{error.message}"
|
||||
next
|
||||
end
|
||||
|
||||
i += 1
|
||||
puts "[#{i}/#{count}] Manifest for #{asset.id}: #{manifest_url}"
|
||||
|
||||
# Write, but don't yet save, the manifest URL.
|
||||
asset.manifest_url = manifest_url
|
||||
end
|
||||
end
|
||||
rescue StandardError => error
|
||||
puts "⚠️ Saving this batch failed: #{error.full_message}"
|
||||
|
||||
# Wait for all the above tasks to finish. (Then, all of the assets that
|
||||
# succeeded should have an unsaved `manifest_url` change.)
|
||||
barrier.wait
|
||||
|
||||
# Save all of the assets in the batch. (We do this in a transaction not
|
||||
# for the transactional semantics, but because it's notably faster than
|
||||
# doing a commit between each query, which is what sending the queries
|
||||
# individually would effectively do!)
|
||||
begin
|
||||
SwfAsset.transaction do
|
||||
batch.each do |asset|
|
||||
begin
|
||||
asset.save!
|
||||
rescue StandardError => error
|
||||
puts "⚠️ Saving asset #{asset.id} failed: #{error.full_message}"
|
||||
end
|
||||
end
|
||||
end
|
||||
rescue StandardError => error
|
||||
puts "⚠️ Saving this batch failed: #{error.full_message}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue