Aufnahme von Musikinhalten in großem Maßstab mit DDEX: Teil Zwei

Analysieren von DDEX-ERN-Nachrichten (Electronic Resource Notification)

Rekapitulieren

Im letzten Beitrag habe ich Data Definition EXchange (DDEX) als Format eingeführt, das in der Musikindustrie zum digitalen Vertrieb von Produkten verwendet wird. Ich habe auch die Ziele und Vorgaben beschrieben, die wir beim Verbrauch von DDEX-Ressourcen hatten, um unseren Benutzern Audio-Inhalte zur Verfügung zu stellen.

In diesem Beitrag gehe ich auf die technischen Details der Implementierung ein.

Arbeitsablauf

Neue Nachricht empfangen

Unser Workflow beginnt, wenn wir eine ERN-Nachricht (Electronic Resource Notification) von einem Musikanbieter erhalten. Dies erfolgt über AWS S3. Jeder Anbieter verfügt über einen dedizierten Bucket, auf den nur sie zugreifen können.

Wir verwenden einen Trigger, um die entsprechende Lambda-Funktion auszulösen, wenn die Nachricht empfangen wird.

Hier ist die Triggerkonfiguration dargestellt, die zeigt, dass Dateien, die im Eingangsordner mit der Erweiterung .xml abgelegt werden, die Lambda-Funktion auslösen.

Zugriff auf die Nachricht

Beim Auslösen der Lambda-Funktion wird ein Parameter an die Funktion übergeben. Dieser Parameter enthält den Namen des S3-Buckets und den Schlüsselnamen (a.k.a. Dateiname) der Datei, die die Funktion ausgelöst hat. Wir verwenden diese Informationen, um die Datei in die Lambda-Funktion herunterzuladen, wo sie gelesen werden kann.

Def-Prozess (Ereignis, Kontext):
    "" "
    Lambda-Handler zur Verarbeitung neuer ERN-Nachrichten
    "" "
    # s3 Datei bekommen
    zur Aufzeichnung in Ereignis ['Aufzeichnungen']:
        bucket = record ['s3'] ['bucket'] ['name']
        key = record ['s3'] ['object'] ['key']
        file_path = '/tmp/{0}'.format(key.split("/")[-1])
        s3_client.download_file (Bucket, Schlüssel, Dateipfad)
        ''

Sie werden feststellen, dass wir die Datei in das Verzeichnis / tmp / herunterladen. Dies ist der einzige beschreibbare Speicherort innerhalb einer Lambda-Funktion, der Rest des Dateisystems ist schreibgeschützt. Es ist auch auf 500 MB begrenzt (Sie werden später sehen, warum dies wichtig ist).

Analysieren der Nachricht

Nachdem wir eine Nachricht erhalten haben, ist es an der Zeit, sie zu analysieren und nachzusehen, was sich darin befindet.

Es gibt drei Hauptbotschaften, die wir erwarten und auf die wir reagieren:

  • OriginalMessage: Eine brandneue Nachricht mit neuer Musik.
  • UpdateMessage: Zeigt an, dass eine Änderung an einem bereits vorhandenen Audio-Asset vorgenommen wurde. Beispielsweise ändert eine Datensatzbezeichnung den Namen oder die Verteilungsdetails haben sich geändert.
  • TakedownMessage: Wird verwendet, um uns zu benachrichtigen, dass wir dieses Musikstück oder diese Ressource nicht mehr verwenden können.

Zum Parsen der Nachricht haben wir ElementTree verwendet. Es ist einfach zu bedienen, reich an Funktionen und Teil von Kern-Python. Unsere Bedürfnisse waren recht einfach, sodass ich keinen Grund sah, weiter zu suchen.

Hier ist der Kicker: Jede Nachricht vom Absender enthält eine Menge Informationen. Eine typische Nachricht enthält Folgendes:

  • Nachrichtenkopf: Dazu gehören die Kennung des Absenders, die Kennung des vorgesehenen Empfängers und die eindeutige Nachrichten-ID.
  • Ressourcenliste: Jede in der Nachricht dargestellte Ressource wird hier aufgelistet. Für ein vollständiges Album gibt es ein XML-Element für das Album, für jeden Titel im Album und für das Cover oder die Bilder. Zu den einzelnen Elementen gehören die eindeutige Kennung für das Album oder den Titel, die Musiker und die von ihnen gespielte Rolle (Sänger, Gitarre, Bass usw.) sowie regionale Elemente, die den Titel des Titels angeben. Auf diese Weise kann dasselbe Lied in verschiedenen Teilen der Welt unterschiedliche Namen haben. Weitere hier enthaltene Informationen sind das Audioformat, die Dauer und der MD5-Hash für die Datei.
  • Versionsliste: Jede Version für jede Ressource ist hier aufgelistet, und es kann ziemlich kompliziert werden. Für ein bestimmtes Lied kann es als Single, als Teil eines Albums oder als Vorschau veröffentlicht werden. Es kann auch an verschiedenen Tagen in verschiedenen Teilen der Welt veröffentlicht werden, wobei für jede Instanz ein separates Freigabeelement erforderlich ist. Die Schemaspezifikation führt hier auch relationales XML ein, dh, die in dieser Version beschriebene Ressource wird durch ein eindeutiges Bezeichnerelement identifiziert, das auf das Ressourcenlistenelement in der Datei verweist.
  • Deal List: Endlich kommen wir zur Deal List. Der Deal beschreibt, wie und wann Sie zur Nutzung der Musik berechtigt sind. Die Musikbranche ist riesig, sodass Sie mit Geschäftsinformationen rechnen können, die beschreiben, wann ein Musikgeschäft die CD in seinem Regal ausstellen kann und wie hoch der Einzelhandelspreis ist. Bei einigen Angeboten können Sie die Musik zum Herunterladen bereitstellen, bei anderen können Sie die Audiodaten streamen. Wieder sehen wir hier das relationale XML, das auf eine einzelne Version derselben ERN-Nachricht verweist. Dieser Abschnitt ist wohl der wichtigste: Er beschreibt, wie Sie die Ressource verwenden dürfen, wenn Sie dies dürfen, und Sie müssen dem Absender monatliche Berichte vorlegen, in denen beschrieben wird, wie Sie sie verwendet haben. Dieser Bericht wird verwendet, um die Lizenzgebühren zu berechnen, die Sie dem Musikanbieter für die Wiedergabe ihrer Musik schulden.

Als Audio-Fitness-Unternehmen verwenden wir Musik, um Wiedergabelisten zu erstellen, die von unseren Trainern als Soundtrack für Ihr geführtes Training verwendet werden. Das heißt, wir werden wahrscheinlich nie ein Album spielen. Wir werden auch niemals eine physische CD (oder Kassette, Album, 8-Track oder Download) verkaufen. Mit anderen Worten, einige Angebote gelten nicht für uns.

Mit all dem neuen Wissen hatte ich das Gefühl, genug Informationen zu haben, um unseren Workflow auszubauen.

Implementierung

Bewahren Sie die Integrität des aufgenommenen Inhalts

Der erste Schritt besteht darin, die XML-Nachricht vollständig in ein verwendbares Format umzuwandeln. Dies stellt sicher, dass wir immer die vollständige Nachricht haben, die für die Fehlerbehebung oder Überprüfung erforderlich ist, dass wir die richtige Aktion basierend auf dem Nachrichteninhalt durchgeführt haben.

Ich habe SQLAlchemy verwendet, um ein Datenbankschema zu erstellen. Es gibt viele ORM-Hasser, und in einigen Fällen sind sie berechtigt. In diesem Fall muss die Nachricht nur in einer Datenbank gespeichert werden. Meiner Erfahrung nach ist das Problem, das die meisten Menschen mit ORMs haben, auf die Ausführung komplexer SQL-Abfragen zurückzuführen (die oft auf ein schlechtes Schema-Design zurückzuführen sind). Ich war zuversichtlich, dass wir das hier vermeiden können, und die Verwendung eines ORM gibt mir ein nettes Python-Klassenobjekt, das die ERN-Nachricht vollständig darstellt.

Im Nachhinein wünschte ich mir wirklich, ich hätte MongoDB anstelle von Postgres verwendet. Aus relationaler Sicht sieht das ziemlich einfach aus, und das ist es auch. Nach mehreren Integrationen habe ich jedoch erfahren, dass jeder Absender Felder hinzufügt, die für ihn eindeutig sind, oder ein anderes Datenformat verwendet.

Erraten Sie, was?

Wenn ich nicht dafür codiert habe, wird es nicht erfasst.

Wenn ich allerdings MongoDB verwendet hätte, hätte ich das XML einfach in JSON konvertieren und es in der Datenbank speichern können. Darüber hinaus verfügt Mongo über hervorragende Abfragemöglichkeiten und erledigt in den meisten Fällen außergewöhnliche Abfragen über spärliche Datensätze hinweg.

Lektion gelernt.

Überprüfen Sie die erforderlichen Daten

Einige XML-Elemente sind erforderlich, beispielsweise die MessageId. Dies ist eine für den Absender eindeutige ID, mit der wir auf eine einzelne ERN-Nachricht verweisen können. Es ist logisch anzunehmen, dass dieses Feld immer enthalten ist, aber jeder, der viel Zeit mit der Arbeit mit Daten von Drittanbietern verbracht hat, kann dies auch bestätigen.

wenn message_header.find ('MessageId') nicht None ist:
            ern.message_id = message_header.find ('MessageId'). text
        sonst:
            Ausnahme auslösen ("Missing MessageId")

Mit der .find () -Methode wird geprüft, ob das Element im XML-Code vorhanden ist, und ob der Wert mithilfe von .text abgerufen wird.

Ein ähnlicher Code wird für jedes erforderliche XML-Element ausgeführt. Sobald der Vorgang abgeschlossen ist, können wir die XML-Nachricht verarbeiten, da wir sicher sind, über die erforderlichen Daten für eine erfolgreiche Aufnahme zu verfügen.

Laden Sie die Ressourcen herunter und speichern Sie sie

Der nächste Schritt ist das Durchlaufen der ResourceList. Dadurch werden alle in dieser Nachricht enthaltenen Ressourcen (Audiotracks, Albumcover usw.) bereitgestellt. Jeder von diesen muss vom Absender heruntergeladen werden.

  T5 
  FLAC 
  2 
  44.1 
  16 
  false 
 
     https://s3.amazonaws.com/XXXXXX/XXXX.flac 
    
         014a089377bb23c80c693d10065f03a9 
         MD5 
    
  

Da wir AWS Lambda verwenden, stellten sich zwei einzigartige Probleme. Erstens hat jede Lambda-Funktion eine maximale Laufzeit von 5 Minuten. Zweitens arbeitet die Lambda-Funktion mit Ausnahme eines Verzeichnisses mit 500 MB / tmp in einem schreibgeschützten Dateisystem.

track = orders.get (url, stream = True)
    wenn track.ok:
        Dateiname = '/ tmp /' + url.split ('/') [- 1] .split ('?') [0]
        mit open (dateiname, 'wb') als f:
            für chunk in track.iter_content (chunk_size = 1024):
                wenn stück:
                    f.write (chunk)
        if get_hash (file_name) == hashsum:
            Rückgabe Dateiname
        sonst:
            Ausnahme auslösen ("Hash stimmt nicht mit heruntergeladener Datei überein")
    sonst:
        wenn track.status_code> = 400 und track.status_code <500:
            Ausnahmen auslösen.File400Exception ("Fehler beim Zugriff auf Datei")
        elif track.status_code> = 500:
            Ausnahmen auslösen.File500Exception ("HTTP 5XX beim Herunterladen der Datei aufgetreten")

Mithilfe der Python-Anforderungsbibliothek überprüfen wir zunächst, ob die Datei-URL eine HTTP 200-Antwort zurückgibt, wenn track.ok. Als Nächstes laden wir die Datei herunter und speichern sie im Ordner / tmp. Überprüfen Sie dann, ob der MD5-Hash mit dem in der XML-Nachricht angegebenen Wert übereinstimmt. Dies stellt sicher, dass die heruntergeladene Datei vollständig und ohne Beschädigung ist.

Nach dem Download und der Überprüfung laden wir die Datei mithilfe der boto3-Bibliothek in unseren eigenen S3-Bucket hoch.

res = upload.Bucket (UPLOAD_BUCKET) .put_object (Key = file_name.split ("/") [- 1], Body = data, ServerSideEncryption = 'aws: kms')

Beachten Sie, dass wir auch ServerSideEncryption angeben, damit die Dateien in unserem S3-Bucket verschlüsselt werden.

Während dies funktioniert, sind wir auf Skalenprobleme gestoßen. Einige XML-Nachrichten waren riesig und enthielten fast 100 Audiospuren. Jeder von ihnen musste heruntergeladen, überprüft und dann in unseren S3-Bucket hochgeladen werden. In einigen Fällen hat dies zu einem Timeout der Lambda-Funktion vor dem Abschluss geführt. Da die Funktion nicht erfolgreich beendet wurde, wird sie erneut ausgelöst und der Vorgang wird erneut gestartet.

bei Spur 1. ‍♂

Um dies zu überwinden, habe ich eine kurze Überprüfung hinzugefügt, um festzustellen, ob die heruntergeladene Datei bereits in unserem letzten S3-Bucket vorhanden ist:

client = boto3.client ('s3')
    Versuchen:
        response = client.head_object (Bucket = Bucket, Key = Key)
        return True
    außer ClientError als e:
        if int (e.response ['Error'] ['Code']) == 404:
            falsch zurückgeben

Dies bedeutete, dass zuvor heruntergeladene Dateien übersprungen werden konnten, sodass sich die Funktion auf die verbleibenden Dateien in der ERN-Nachricht konzentrieren konnte.

Als nächstes bin ich auf die zweite Frage gestoßen: die 500-MB-Grenze. Wenn jede ERN-Nachricht eine eigene Lambda-Funktion auslöst, würden Sie nicht glauben, dass veraltete Daten ein Problem darstellen (zumindest habe ich das nicht getan). Es stellt sich heraus, dass es ist. Wenn die Ausführung einer Lambda-Funktion beendet ist und ein anderer Trigger in einem bestimmten Fenster ausgelöst wird, wird diese Lambda-Funktion erneut verwendet.

Dies bedeutet, dass der / tmp-Bereich möglicherweise bereits Dateien enthält. Sobald Sie die Kapazität von 500 MB erreicht haben, schlagen die Funktionen aufgrund von Speicherplatzmangel fehl. Als Ergebnis habe ich hinzugefügt

os.remove (Dateiname)

Unmittelbar nach dem Hochladen der Datei in unseren S3-Bucket, um sicherzustellen, dass keine verbliebenen Dateien verbleiben.

Speichern Sie die analysierte Nachricht in der master-Datenbank

Zu diesem Zeitpunkt haben wir

  • validierte unsere erforderlichen Felder
  • hat die in der Nachricht angegebenen Ressourcen heruntergeladen
  • hat die Dateien in unserem verschlüsselten S3-Bucket gespeichert

Erst dann schreibe ich die Nachricht in die Datenbank. Dies stellt sicher, dass eine Nachricht vollständig und erfolgreich verarbeitet wird, wenn sie in die Datenbank geschrieben wird. Da ich SQLAlchemy als ORM verwendet und die ERN-Nachricht in ein Python-Objekt geparst habe, ist das Speichern so einfach wie

session.add (ern)
session.commit ()

Sagen Sie jemandem, dass Sie fertig sind

Nachdem wir den neuen Inhalt erfolgreich aufgenommen haben, müssen wir einige Personen benachrichtigen. Zunächst müssen wir den Absender darüber informieren, dass wir die Nachricht erfolgreich empfangen haben. Dazu senden wir eine Nachricht an einen vom Absender bereitgestellten Webdienst.

Antwort = Anfragen.post (WS_URL, Daten = Anfrage, Auth = (WS_USER, WS_PWD), Header = Header)

Die Webdienst-URL, der Benutzername und das Kennwort sind Umgebungsvariablen. Dies bedeutet, dass ich sie pro Funktion festlegen kann und niemals Anmeldeinformationen im Code speichern muss. Die Nutzlast für den Webdienst ist eine XML-Datei mit dem Element SuccessfullyIngestedByDistributor .

Denken Sie daran, wir haben viele Informationen zu jeder Audiospur erhalten. Einige davon interessieren uns nicht, zum Beispiel alle Mitwirkenden an einem bestimmten Audiotrack. Das Durchsuchen dieser Informationen über Millionen von Songs hinweg würde zu einer schlechten Benutzererfahrung führen. Stattdessen nehme ich die Teile der Daten, die für unser Audioteam relevant sind, und poste sie als Nachricht in einer SQS-Warteschlange. Dies löst eine weitere Lambda-Funktion aus, um die SQS-Nachricht in einer anderen Datenbank zu speichern, die von unserem Audioteam durchsucht werden kann.

Dies mag als unnötiger Schritt erscheinen, bietet uns jedoch eine Menge Flexibilität und Skalierbarkeit. Der Vorgang des Empfangens von ERN-Nachrichten ist von dem Vorgang des Hinzufügens neuer Musik zu der Datenbank, die von unserem Audio-Engineering-Team verwendet wird, abgekoppelt. Dies ermöglicht, dass ein Fehler in beiden Prozessen nur auf seine Funktion beschränkt wird, wodurch ein Szenario verhindert wird, in dem ein Fehler beim Aktualisieren der Audio Engineers-Datenbank eine Bestätigung an den ERN-Absender blockiert. Um die Skalierbarkeit zu gewährleisten, können beide Prozesse so schnell wie möglich arbeiten und die SQS-Warteschlange als Puffer dazwischen verwenden.

Als nächstes

Im nächsten Beitrag werden wir uns die verschiedenen implementierten Testarten ansehen, um sicherzustellen, dass die Dinge wie erwartet funktionieren, und uns die Möglichkeit geben, Änderungen sicher zu implementieren, ohne befürchten zu müssen, sie zu beschädigen.