forked from OpenNeo/impress
Emi Matchu
992954ce89
Doing that sweet, sweet backfill!! It's not exactly *fast*, since there's about 570k records to work through, but it's pretty good all things considered! Thanks, surprisingly-reusable async code!
169 lines
6 KiB
Ruby
169 lines
6 KiB
Ruby
require 'async/barrier'
|
|
require 'async/http/internet/instance'
|
|
|
|
namespace :swf_assets do
|
|
# NOTE: I'm not sure how these duplicate records enter our database, probably
|
|
# a bug in the modeling code somewhere? For now, let's just remove them, and
|
|
# be ready to run it again if needed!
|
|
# NOTE: Run with DRY_RUN=1 to see what it would do first!
|
|
desc "Remove duplicate SwfAsset records"
|
|
task remove_duplicates: [:environment] do
|
|
duplicate_groups = SwfAsset.group(:type, :remote_id).
|
|
having("COUNT(*) > 1").
|
|
pluck(:type, :remote_id, Arel.sql("GROUP_CONCAT(id ORDER BY id ASC)"))
|
|
|
|
total = duplicate_groups.size
|
|
puts "Found #{total} groups of duplicate records"
|
|
|
|
SwfAsset.transaction do
|
|
duplicate_groups.each_with_index do |(type, remote_id, ids_str), index|
|
|
ids = ids_str.split(",")
|
|
duplicate_ids = ids[1..]
|
|
duplicate_records = SwfAsset.find(duplicate_ids)
|
|
|
|
if ENV["DRY_RUN"]
|
|
puts "[#{index + 1}/#{total}] #{type}/#{remote_id}: " +
|
|
"Would delete #{duplicate_records.size} records " +
|
|
"(#{duplicate_records.map(&:id).join(", ")})"
|
|
else
|
|
puts "[#{index + 1}/#{total}] #{type}/#{remote_id}: " +
|
|
"Deleting #{duplicate_records.size} records " +
|
|
"(#{duplicate_records.map(&:id).join(", ")})"
|
|
duplicate_records.each(&:destroy)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
namespace :manifests do
|
|
desc "Save all known manifests to the Neopets Media Archive"
|
|
task load: [:environment] do
|
|
# Log errors to STDOUT, but we don't need the info messages about
|
|
# successful saves.
|
|
Rails.logger = Logger.new(STDOUT, level: :error)
|
|
|
|
# Find all the manifests with known URLs. (We don't have a database
|
|
# filter for "do we already have the manifest downloaded", but that's
|
|
# okay, the preload method will quickly check that for us!)
|
|
swf_assets = SwfAsset.where.not(manifest_url: nil)
|
|
total_count = swf_assets.count
|
|
puts "Found #{total_count} assets with manifests"
|
|
|
|
# For each batch of 1000 assets, load their manifests concurrently.
|
|
# Wrap everything in a top-level sync, so keyboard interrupts will
|
|
# propagate all the way up to here, instead of just cancelling the
|
|
# current batch.
|
|
Sync do
|
|
saved_count = 0
|
|
swf_assets.find_in_batches(batch_size: 1000) do |swf_assets|
|
|
SwfAsset.preload_manifests(swf_assets)
|
|
saved_count += swf_assets.size
|
|
puts "Loaded #{saved_count} of #{total_count} manifests"
|
|
end
|
|
end
|
|
end
|
|
|
|
desc "Backfill manifest_url for SwfAsset models"
|
|
task urls: [:environment] do
|
|
timeout = ENV.fetch("TIMEOUT", "5").to_i
|
|
|
|
assets = SwfAsset.where(manifest_url: nil)
|
|
count = assets.count
|
|
puts "Found #{count} assets without manifests"
|
|
|
|
Sync do
|
|
# Share a pool of persistent connections, rather than reconnecting on
|
|
# each request. (This library does that automatically!)
|
|
internet = Async::HTTP::Internet.instance
|
|
|
|
# Load the assets in batches, then process each batch in two steps: first
|
|
# inferring all manifest URLs in the batch, then saving all assets in the
|
|
# batch. (This makes the update step more efficient, and it also avoids
|
|
# simultaneous queries across the fibers, which ActiveRecord disallows!)
|
|
#
|
|
# We keep track of a shared index `i` here, but we only actually
|
|
# increment it once each task is *done*, so that the numbers output in
|
|
# the right order!
|
|
i = 0
|
|
assets.find_in_batches(batch_size: 1000) do |batch|
|
|
# Create a barrier, to let us wait on all the tasks; then under it
|
|
# create a semaphore, to limit how many tasks run at once.
|
|
barrier = Async::Barrier.new
|
|
semaphore = Async::Semaphore.new(100, parent: barrier)
|
|
|
|
batch.each do |asset|
|
|
semaphore.async do |task|
|
|
manifest_url = nil
|
|
begin
|
|
task.with_timeout(timeout) do
|
|
manifest_url = infer_manifest_url(asset.url, internet)
|
|
end
|
|
rescue StandardError => error
|
|
i += 1
|
|
puts "[#{i}/#{count}] ⚠️ Skipping #{asset.id}: #{error.message}"
|
|
next
|
|
end
|
|
|
|
i += 1
|
|
puts "[#{i}/#{count}] Manifest for #{asset.id}: #{manifest_url}"
|
|
|
|
# Write, but don't yet save, the manifest URL.
|
|
asset.manifest_url = manifest_url
|
|
end
|
|
end
|
|
|
|
# Wait for all the above tasks to finish. (Then, all of the assets that
|
|
# succeeded should have an unsaved `manifest_url` change.)
|
|
barrier.wait
|
|
|
|
# Save all of the assets in the batch. (We do this in a transaction not
|
|
# for the transactional semantics, but because it's notably faster than
|
|
# doing a commit between each query, which is what sending the queries
|
|
# individually would effectively do!)
|
|
begin
|
|
SwfAsset.transaction do
|
|
batch.each do |asset|
|
|
begin
|
|
asset.save!
|
|
rescue StandardError => error
|
|
puts "⚠️ Saving asset #{asset.id} failed: #{error.full_message}"
|
|
end
|
|
end
|
|
end
|
|
rescue StandardError => error
|
|
puts "⚠️ Saving this batch failed: #{error.full_message}"
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
SWF_URL_PATTERN = %r{^(?:https?:)?//images\.neopets\.com/cp/(bio|items)/swf/(.+?)_([a-z0-9]+)\.swf$}
|
|
def infer_manifest_url(swf_url, internet)
|
|
url_match = swf_url.match(SWF_URL_PATTERN)
|
|
raise ArgumentError, "not a valid SWF URL: #{swf_url}" if url_match.nil?
|
|
|
|
# Build the potential manifest URLs, from the two structures we know of.
|
|
type, folders, hash_str = url_match.captures
|
|
potential_manifest_urls = [
|
|
"https://images.neopets.com/cp/#{type}/data/#{folders}/manifest.json",
|
|
"https://images.neopets.com/cp/#{type}/data/#{folders}_#{hash_str}/manifest.json",
|
|
]
|
|
|
|
# Send a HEAD request to test each manifest URL, without downloading its
|
|
# content. If it succeeds, we're done!
|
|
potential_manifest_urls.each do |potential_manifest_url|
|
|
res = internet.head potential_manifest_url
|
|
if res.ok?
|
|
return potential_manifest_url
|
|
elsif res.status == 404
|
|
next # Ok, this was not the manifest!
|
|
else
|
|
raise "unexpected manifest response code: #{res.status}"
|
|
end
|
|
end
|
|
|
|
# Otherwise, there's no valid manifest URL.
|
|
raise "all of the common manifest URL patterns returned HTTP 404"
|
|
end
|