webentwicklung-frage-antwort-db.com.de

Iterieren Sie das HDFS-Verzeichnis

Ich habe ein Verzeichnis von Verzeichnissen auf HDFS und möchte die Verzeichnisse durchlaufen. Gibt es eine einfache Möglichkeit, dies mit Spark zu tun, indem Sie das SparkContext-Objekt verwenden?

25
Jon

Sie können org.Apache.hadoop.fs.FileSystem verwenden. Insbesondere FileSystem.listFiles([path], true)

Und mit Spark ...

FileSystem.get(sc.hadoopConfiguration()).listFiles(..., true)

Bearbeiten 

Es ist erwähnenswert, dass eine bewährte Methode die FileSystem erhält, die dem Schema der Path zugeordnet ist.

path.getFileSystem(sc.hadoopConfiguration).listFiles(path, true)
41
Mike Park

Hier ist die PySpark-Version, wenn jemand interessiert ist:

hadoop = sc._jvm.org.Apache.hadoop

fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration() 
path = hadoop.fs.Path('/hivewarehouse/disc_mrt.db/unified_fact/')

for f in fs.get(conf).listStatus(path):
    print f.getPath()

In diesem speziellen Fall erhalte ich eine Liste aller Dateien, aus denen die disc_mrt.unified_fact-Hive-Tabelle besteht.

Andere Methoden des FileStatus-Objekts, wie zum Beispiel getLen (), um die Dateigröße zu erhalten, werden hier beschrieben:

Klasse FileStatus

15
Tagar
import  org.Apache.hadoop.fs.{FileSystem,Path}

FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path("hdfs:///tmp")).foreach( x => println(x.getPath ))

Das hat bei mir funktioniert.

Spark-Version 1.5.0-cdh5.5.2

11
ozw1z5rd

das hat die Arbeit für mich erledigt

FileSystem.get(new URI("hdfs://HAservice:9000"), sc.hadoopConfiguration).listStatus( new Path("/tmp/")).foreach( x => println(x.getPath ))
1
Vincent Claes

@Tagar hat nicht gesagt, wie man Remote-HDFS verbindet, aber diese Antwort tat:

URI           = sc._gateway.jvm.Java.net.URI
Path          = sc._gateway.jvm.org.Apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.Apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.Apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))

for fileStatus in status:
    print(fileStatus.getPath())
0
Mithril

Sie können es auch mit dem Status von globStatus versuchen

val listStatus = org.Apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration).globStatus(new org.Apache.hadoop.fs.Path(url))

      for (urlStatus <- listStatus) {
        println("urlStatus get Path:"+urlStatus.getPath())
}
0
Nitin