diff --git a/app/models/swf_asset.rb b/app/models/swf_asset.rb index 2a56c5c3..7a88a55b 100644 --- a/app/models/swf_asset.rb +++ b/app/models/swf_asset.rb @@ -117,10 +117,12 @@ class SwfAsset < ApplicationRecord end def manifest + raise "manifest_url is blank" if manifest_url.blank? NeopetsMediaArchive.load_json(manifest_url) end def preload_manifest + raise "manifest_url is blank" if manifest_url.blank? NeopetsMediaArchive.preload_file(manifest_url) end @@ -251,7 +253,14 @@ class SwfAsset < ApplicationRecord # NeopetsMediaArchive will share a pool of persistent connections for # them. swf_assets.map do |swf_asset| - semaphore.async { swf_asset.preload_manifest } + semaphore.async do + begin + swf_asset.preload_manifest + rescue StandardError => error + Rails.logger.error "Could not preload manifest for asset " + + "#{swf_asset.id} (#{swf_asset.manifest_url}): #{error.message}" + end + end end # Wait until all tasks are done. diff --git a/lib/tasks/swf_assets.rake b/lib/tasks/swf_assets.rake index a5a39029..a5bb4b66 100644 --- a/lib/tasks/swf_assets.rake +++ b/lib/tasks/swf_assets.rake @@ -35,75 +35,104 @@ namespace :swf_assets do end end - desc "Backfill manifest_url for SwfAsset models" - task manifests: [:environment] do - timeout = ENV.fetch("TIMEOUT", "5").to_i + namespace :manifests do + desc "Save all known manifests to the Neopets Media Archive" + task load: [:environment] do + # Log errors to STDOUT, but we don't need the info messages about + # successful saves. + Rails.logger = Logger.new(STDOUT, level: :error) - assets = SwfAsset.where(manifest_url: nil) - count = assets.count - puts "Found #{count} assets without manifests" + # Find all the manifests with known URLs. (We don't have a database + # filter for "do we already have the manifest downloaded", but that's + # okay, the preload method will quickly check that for us!) + swf_assets = SwfAsset.where.not(manifest_url: nil) + total_count = swf_assets.count + puts "Found #{total_count} assets with manifests" - Async do - # Share a pool of persistent connections, rather than reconnecting on - # each request. (This library does that automatically!) - internet = Async::HTTP::Internet.instance - - # Load the assets in batches, then process each batch in two steps: first - # inferring all manifest URLs in the batch, then saving all assets in the - # batch. (This makes the update step more efficient, and it also avoids - # simultaneous queries across the fibers, which ActiveRecord disallows!) - # - # We keep track of a shared index `i` here, but we only actually - # increment it once each task is *done*, so that the numbers output in - # the right order! - i = 0 - assets.find_in_batches(batch_size: 1000) do |batch| - # Create a barrier, to let us wait on all the tasks; then under it - # create a semaphore, to limit how many tasks run at once. - barrier = Async::Barrier.new - semaphore = Async::Semaphore.new(100, parent: barrier) - - batch.each do |asset| - semaphore.async do |task| - manifest_url = nil - begin - task.with_timeout(timeout) do - manifest_url = infer_manifest_url(asset.url, internet) - end - rescue StandardError => error - i += 1 - puts "[#{i}/#{count}] ⚠️ Skipping #{asset.id}: #{error.message}" - next - end - - i += 1 - puts "[#{i}/#{count}] Manifest for #{asset.id}: #{manifest_url}" - - # Write, but don't yet save, the manifest URL. - asset.manifest_url = manifest_url - end + # For each batch of 1000 assets, load their manifests concurrently. + # Wrap everything in a top-level sync, so keyboard interrupts will + # propagate all the way up to here, instead of just cancelling the + # current batch. + Sync do + saved_count = 0 + swf_assets.find_in_batches(batch_size: 1000) do |swf_assets| + SwfAsset.preload_manifests(swf_assets) + saved_count += swf_assets.size + puts "Loaded #{saved_count} of #{total_count} manifests" end + end + end - # Wait for all the above tasks to finish. (Then, all of the assets that - # succeeded should have an unsaved `manifest_url` change.) - barrier.wait + desc "Backfill manifest_url for SwfAsset models" + task urls: [:environment] do + timeout = ENV.fetch("TIMEOUT", "5").to_i - # Save all of the assets in the batch. (We do this in a transaction not - # for the transactional semantics, but because it's notably faster than - # doing a commit between each query, which is what sending the queries - # individually would effectively do!) - begin - SwfAsset.transaction do - batch.each do |asset| + assets = SwfAsset.where(manifest_url: nil) + count = assets.count + puts "Found #{count} assets without manifests" + + Sync do + # Share a pool of persistent connections, rather than reconnecting on + # each request. (This library does that automatically!) + internet = Async::HTTP::Internet.instance + + # Load the assets in batches, then process each batch in two steps: first + # inferring all manifest URLs in the batch, then saving all assets in the + # batch. (This makes the update step more efficient, and it also avoids + # simultaneous queries across the fibers, which ActiveRecord disallows!) + # + # We keep track of a shared index `i` here, but we only actually + # increment it once each task is *done*, so that the numbers output in + # the right order! + i = 0 + assets.find_in_batches(batch_size: 1000) do |batch| + # Create a barrier, to let us wait on all the tasks; then under it + # create a semaphore, to limit how many tasks run at once. + barrier = Async::Barrier.new + semaphore = Async::Semaphore.new(100, parent: barrier) + + batch.each do |asset| + semaphore.async do |task| + manifest_url = nil begin - asset.save! + task.with_timeout(timeout) do + manifest_url = infer_manifest_url(asset.url, internet) + end rescue StandardError => error - puts "⚠️ Saving asset #{asset.id} failed: #{error.full_message}" + i += 1 + puts "[#{i}/#{count}] ⚠️ Skipping #{asset.id}: #{error.message}" + next end + + i += 1 + puts "[#{i}/#{count}] Manifest for #{asset.id}: #{manifest_url}" + + # Write, but don't yet save, the manifest URL. + asset.manifest_url = manifest_url end end - rescue StandardError => error - puts "⚠️ Saving this batch failed: #{error.full_message}" + + # Wait for all the above tasks to finish. (Then, all of the assets that + # succeeded should have an unsaved `manifest_url` change.) + barrier.wait + + # Save all of the assets in the batch. (We do this in a transaction not + # for the transactional semantics, but because it's notably faster than + # doing a commit between each query, which is what sending the queries + # individually would effectively do!) + begin + SwfAsset.transaction do + batch.each do |asset| + begin + asset.save! + rescue StandardError => error + puts "⚠️ Saving asset #{asset.id} failed: #{error.full_message}" + end + end + end + rescue StandardError => error + puts "⚠️ Saving this batch failed: #{error.full_message}" + end end end end