- File Systems
- Local File System
- Pluggable File Systems
- HDFS and Hadoop File System support
- Hadoop Configuration
- Adding a new pluggable File System implementation
File Systems
Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.These are some of most of the popular file systems, including local, hadoop-compatible, Amazon S3, MapR FS, OpenStack Swift FS, Aliyun OSS and Azure Blob Storage.
The file system used for a particular file is determined by its URI scheme.For example, file:///home/user/text.txt
refers to a file in the local file system, while hdfs://namenode:50010/data/user/text.txt
is a file in a specific HDFS cluster.
File system instances are instantiated once per process and then cached/pooled, to avoid configuration overhead per stream creation and to enforce certain constraints, such as connection/stream limits.
- Local File System
- Pluggable File Systems
- HDFS and Hadoop File System support
- Hadoop Configuration
- Adding a new pluggable File System implementation
Local File System
Flink has built-in support for the file system of the local machine, including any NFS or SAN drives mounted into that local file system.It can be used by default without additional configuration. Local files are referenced with the file:// URI scheme.
Pluggable File Systems
The Apache Flink project supports the following file systems:
Amazon S3 object storage is supported by two alternative implementations:
flink-s3-fs-presto
andflink-s3-fs-hadoop
. Both implementations are self-contained with no dependency footprint.MapR FS file system adapter is already supported in the main Flink distribution under the maprfs:// URI scheme. You must provide the MapR libraries in the classpath (for example in
lib
directory).OpenStack Swift FS is supported by
flink-swift-fs-hadoop
and registered under the swift:// URI scheme. The implementation is based on the Hadoop Project but is self-contained with no dependency footprint. To use it when using Flink as a library, add the respective maven dependency (org.apache.flink:flink-swift-fs-hadoop:1.9.0
).Aliyun Object Storage Service is supported by
flink-oss-fs-hadoop
and registered under the oss:// URI scheme. The implementation is based on the Hadoop Project but is self-contained with no dependency footprint.Azure Blob Storage is supported by
flink-azure-fs-hadoop
and registered under the wasb(s):// URI schemes. The implementation is based on the Hadoop Project but is self-contained with no dependency footprint.
Except MapR FS, you can use any of them as plugins.
To use a pluggable file systems, copy the corresponding JAR file from the opt
directory to a directory under plugins
directoryof your Flink distribution before starting Flink, e.g.
mkdir ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-1.9.0.jar ./plugins/s3-fs-hadoop/
Attention The plugin mechanism for file systems was introduced in Flink version 1.9
tosupport dedicated Java class loaders per plugin and to move away from the class shading mechanism.You can still use the provided file systems (or your own implementations) via the old mechanism by copying the correspondingJAR file into lib
directory.
It’s encouraged to use the plugin-based loading mechanism for file systems that support it. Loading file systems components from the lib
directory may be not supported in future Flink versions.
HDFS and Hadoop File System support
For all schemes where Flink cannot find a directly supported file system, it falls back to Hadoop.All Hadoop file systems are automatically available when flink-runtime
and the Hadoop libraries are on the classpath.
This way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-compatible file systems (HCFS).
- hdfs
- ftp
- s3n and s3a
- har
- …
Hadoop Configuration
We recommend using Flink’s built-in file systems unless required otherwise. Using a Hadoop File System directly may be required, for example, when using that file system for YARN’s resource storage, via the fs.defaultFS
configuration property in Hadoop’s core-site.xml
.
Putting the Hadoop configuration in the same class path as the Hadoop libraries makes the Hadoop File Systems pick up that configuration.You can reference another Hadoop configuration by setting the environment variable HADOOP_CONF_DIR
, or by referencing it via the Flink configuration.
fs.hdfs.hadoopconf: /path/to/etc/hadoop
This registers /path/to/etc/hadoop
as Hadoop’s configuration directory and is where Flink will look for the core-site.xml
and hdfs-site.xml
files.
Adding a new pluggable File System implementation
File systems are represented via the org.apache.flink.core.fs.FileSystem
class, which captures the ways to access and modify files and objects in that file system.
To add a new file system:
- Add the File System implementation, which is a subclass of
org.apache.flink.core.fs.FileSystem
. - Add a factory that instantiates that file system and declares the scheme under which the FileSystem is registered. This must be a subclass of
org.apache.flink.core.fs.FileSystemFactory
. - Add a service entry. Create a file
META-INF/services/org.apache.flink.core.fs.FileSystemFactory
which contains the class name of your file system factory class (see the Java Service Loader docs for more details).
During plugins discovery, the file system factory class will be loaded by a dedicated Java class loader to avoid class conflicts with other plugins and Flink components.The same class loader should be used during file system instantiation and the file system operation calls.
Warning In practice, it means you should avoid using Thread.currentThread().getContextClassLoader()
class loaderin your implementation.