在Hadoop中处理输入的CSV文件

本贴最后更新于 2812 天前,其中的信息可能已经东海扬尘

在Hadoop中,InputFormat类用来生成可供Mapper处理的<key, value>键值对。当数据传送给Mapper时,Mapper会将输入分片传送到InputFormat上,InputFormat调用getRecordReader()方法生成RecordReader,RecordReader再创建可供map函数处理的键值对<K1, V1>。

Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的键值对。比如,TextInputFormat,Hadoop中默认的输入方法,会将每行数据生成一条记录,其中key值为每条记录在分片中的字节偏移量,value则为每行的内容。

在Hadoop预定义的InputFormat中,并没有处理CSV文件的方法。CSV文件的本质其实是用逗号分隔开的文本文件。一种很直观的处理方法是:将CSV文件作为文本文件处理,使用TextInputFormat将文件按行传入map函数,在map函数中再按照CSV文件的格式进行处理。但这样很容易将数据格式的处理逻辑与业务处理逻辑混淆在一起,并且出现很多copy-and-pasted的代码。

实际上,可以写一个自己的InputFormat以及RecordReader类,专门用来处理CSV文件的输入,直接传递给map函数解析后的数据。

1 数据结构

我们传递给map函数一个ArrayWritable(A Writable for arrays containing instances of a class),元素类型为Text,即CSV文件每一行各个字段的数据。数据结构如下:

代码1:TextArrayWritable.java

public class TextArrayWritable extends ArrayWritable {
    public TextArrayWritable() {
        super(Text.class);
    }
public TextArrayWritable(Text[] strings) {
    super(Text.class, strings);
}

}

2 CSVInputFormat

FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类。它提供了两个功能:一是定义哪些文件包含在一个作业的输入中,另一个是为输入文件生成分片(Input Splits)。而把分片分割成记录的事情交由其子类来完成。所以CSVInputFormat类的实现上,同样是继承InputFormat类,并只需要简单的重写createRecordReader和isSplitable即可。

代码2:CSVInputFormat.java

public class CSVInputFormat extends FileInputFormat<LongWritable, TextArrayWritable> {
    public static final String CSV_TOKEN_SEPARATOR_CONFIG = "csvinputformat.token.delimiter";
@Override
protected boolean isSplitable(JobContext context, Path filename) {
    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);

    return codec == null;
}

@Override
public RecordReader&lt;LongWritable, TextArrayWritable&gt; createRecordReader(
    InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
    String csvDelimiter = context.getConfiguration()
                                 .get(CSV_TOKEN_SEPARATOR_CONFIG);
    Character separator = null;

    if ((csvDelimiter != null) &amp;&amp; (csvDelimiter.length() == 1)) {
        separator = csvDelimiter.charAt(0);
    }

    return new CSVRecordReader(separator);
}

}

其中csvinputformat.token.delimiter是可在配置文件中配置的CSV输入文件分隔符,createRecordReader完成的工作只是从配置文件中得到分隔符,调用真正对CSV文件分片进行处理,并生成键值对的CSVRecordReader函数,并返回RecordReader对象。

3 CSVRecordReader

对于CSVRecordReader,要实现的功能无非就是将CSV文件中每一行的各字段提取出来,并将各字段作为TextArrayWritable类型的数据结构传递给map函数。

在Hadoop中有一个LineRecordReader类,它将文本文件每一行的内容作为值返回,类型为Text。所以可以直接在CSVRecordReader中使用LineRecordReader,将LineRecordReader返回的每一行再次进行处理。在CSV文件的处理上,这里用到了OpenCSV对CSV文件的每一行进行解析,具体可参见这里。

下面是CSVRecordReader的实现代码。除了CSV文件的解析、nextKeyValue()方法和getCurrentValue()方法外,大部分方法都直接调用LineRecordReader实例的相应方法。毕竟我们是踩在巨人的肩膀上继续前进嘛。O(∩_∩)O~

代码3:CSVRecordReader.java

public class CSVRecordReader extends RecordReader<LongWritable, TextArrayWritable> {
    private LineRecordReader lineReader;
    private TextArrayWritable value;
    private CSVParser parser;
// 新建CSVParser实例,用来解析每一行CSV文件的每一行
public CSVRecordReader(Character delimiter) {
    this.lineReader = new LineRecordReader();

    if (delimiter == null) {
        this.parser = new CSVParser();
    } else {
        this.parser = new CSVParser(delimiter);
    }
}

// 调用LineRecordReader的初始化方法,寻找分片的开始位置
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
    lineReader.initialize(split, context);
}

// 使用LineRecordReader来得到下一条记录(即下一行)。
// 如果到了分片(Input Split)的尾部,nextKeyValue将返回NULL
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (lineReader.nextKeyValue()) {
        //如果有新记录,则进行处理
        loadCSV();

        return true;
    } else {
        value = null;

        return false;
    }
}

@Override
public LongWritable getCurrentKey()
    throws IOException, InterruptedException {
    return lineReader.getCurrentKey();
}

@Override
public TextArrayWritable getCurrentValue()
    throws IOException, InterruptedException {
    return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return lineReader.getProgress();
}

@Override
public void close() throws IOException {
    lineReader.close();
}

// 对CSV文件的每一行进行处理
private void loadCSV() throws IOException {
    String line = lineReader.getCurrentValue().toString();

    // 通过OpenCSV将解析每一行的各字段
    String[] tokens = parser.parseLine(line);
    value = new TextArrayWritable(convert(tokens));
}

// 将字符串数组批量处理为Text数组
private Text[] convert(String[] tokens) {
    Text[] t = new Text[tokens.length];

    for (int i = 0; i &lt; t.length; i++) {
        t[i] = new Text(tokens[i]);
    }

    return t;
}

}

4 简单的应用

用于处理CSV文件输入的InputFormat已经写完了,现在构造一个简单的应用场景,来试验下这个CSVInputFormat。

假设有这样一些数据,每一列第一个字段为一个标识,后面为随机产生的数字,标识各不相同,求每一行标识后的数字之和并输出,输出格式为:每一行为标识和数字和。

 

由于标识没有重复,并且逻辑比较简单,这里只写一个Mapper即可,不需要Reducer。

代码4:CSVMapper.java

public class CSVMapper extends Mapper<LongWritable, TextArrayWritable, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, TextArrayWritable value,
        Context context) throws IOException, InterruptedException {
        String[] values = value.toStrings();
        int sum = 0;
        Text resultKey = new Text(values[0]);
    for (int i = 1; i &lt; values.length; i++) {
        sum = sum + Integer.valueOf(values[i].trim());
    }

    IntWritable resultValue = new IntWritable(sum);
    context.write(resultKey, resultValue);
}

}

在作业的提交部分,由于没有Reducer,所以将ReduceTask设置为了0

代码5:JustRun.java

public class JustRun extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
    Job job = new Job(conf);
    job.setJobName("CSVTest");
    job.setJarByClass(JustRun.class);

    job.setMapperClass(CSVMapper.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(CSVInputFormat.class);

    job.setNumReduceTasks(0);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new JustRun(), args);
    System.exit(ret);
}

}

执行完毕后,输出如下,跟预想是一致的。

好了,这就是利用InputFormat对CSV文件的处理过程。除了CSV文件,还可根据处理数据的类型,写出更多的InputFormat。同时,我们还可以利用OutputFormat输出需要的格式。

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    82 引用 • 122 回帖 • 614 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    69 引用 • 190 回帖 • 492 关注
  • Gitea

    Gitea 是一个开源社区驱动的轻量级代码托管解决方案,后端采用 Go 编写,采用 MIT 许可证。

    4 引用 • 16 回帖 • 3 关注
  • JRebel

    JRebel 是一款 Java 虚拟机插件,它使得 Java 程序员能在不进行重部署的情况下,即时看到代码的改变对一个应用程序带来的影响。

    26 引用 • 78 回帖 • 620 关注
  • 30Seconds

    📙 前端知识精选集,包含 HTML、CSS、JavaScript、React、Node、安全等方面,每天仅需 30 秒。

    • 精选常见面试题,帮助您准备下一次面试
    • 精选常见交互,帮助您拥有简洁酷炫的站点
    • 精选有用的 React 片段,帮助你获取最佳实践
    • 精选常见代码集,帮助您提高打码效率
    • 整理前端界的最新资讯,邀您一同探索新世界
    488 引用 • 383 回帖 • 3 关注
  • HHKB

    HHKB 是富士通的 Happy Hacking 系列电容键盘。电容键盘即无接点静电电容式键盘(Capacitive Keyboard)。

    5 引用 • 74 回帖 • 404 关注
  • 负能量

    上帝为你关上了一扇门,然后就去睡觉了....努力不一定能成功,但不努力一定很轻松 (° ー °〃)

    85 引用 • 1201 回帖 • 455 关注
  • RIP

    愿逝者安息!

    8 引用 • 92 回帖 • 293 关注
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖
  • 思源笔记

    思源笔记是一款隐私优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。

    融合块、大纲和双向链接,重构你的思维。

    18600 引用 • 69242 回帖 • 1 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    139 引用 • 441 回帖
  • Eclipse

    Eclipse 是一个开放源代码的、基于 Java 的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。

    75 引用 • 258 回帖 • 629 关注
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    261 引用 • 662 回帖
  • wolai

    我来 wolai:不仅仅是未来的云端笔记!

    1 引用 • 11 回帖 • 1 关注
  • Ngui

    Ngui 是一个 GUI 的排版显示引擎和跨平台的 GUI 应用程序开发框架,基于
    Node.js / OpenGL。目标是在此基础上开发 GUI 应用程序可拥有开发 WEB 应用般简单与速度同时兼顾 Native 应用程序的性能与体验。

    7 引用 • 9 回帖 • 346 关注
  • CSS

    CSS(Cascading Style Sheet)“层叠样式表”是用于控制网页样式并允许将样式信息与网页内容分离的一种标记性语言。

    180 引用 • 447 回帖 • 1 关注
  • JWT

    JWT(JSON Web Token)是一种用于双方之间传递信息的简洁的、安全的表述性声明规范。JWT 作为一个开放的标准(RFC 7519),定义了一种简洁的,自包含的方法用于通信双方之间以 JSON 的形式安全的传递信息。

    20 引用 • 15 回帖 • 18 关注
  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    53 引用 • 85 回帖
  • 反馈

    Communication channel for makers and users.

    123 引用 • 906 回帖 • 191 关注
  • 招聘

    哪里都缺人,哪里都不缺人。

    189 引用 • 1056 回帖
  • Chrome

    Chrome 又称 Google 浏览器,是一个由谷歌公司开发的网页浏览器。该浏览器是基于其他开源软件所编写,包括 WebKit,目标是提升稳定性、速度和安全性,并创造出简单且有效率的使用者界面。

    60 引用 • 287 回帖 • 2 关注
  • 微软

    微软是一家美国跨国科技公司,也是世界 PC 软件开发的先导,由比尔·盖茨与保罗·艾伦创办于 1975 年,公司总部设立在华盛顿州的雷德蒙德(Redmond,邻近西雅图)。以研发、制造、授权和提供广泛的电脑软件服务业务为主。

    8 引用 • 44 回帖
  • OpenStack

    OpenStack 是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管理通过前端界面管理员就可以完成,同样也可以通过 Web 接口让最终用户部署资源。

    10 引用 • 9 关注
  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 457 关注
  • ngrok

    ngrok 是一个反向代理,通过在公共的端点和本地运行的 Web 服务器之间建立一个安全的通道。

    7 引用 • 63 回帖 • 598 关注
  • frp

    frp 是一个可用于内网穿透的高性能的反向代理应用,支持 TCP、UDP、 HTTP 和 HTTPS 协议。

    15 引用 • 7 回帖 • 9 关注
  • PHP

    PHP(Hypertext Preprocessor)是一种开源脚本语言。语法吸收了 C 语言、 Java 和 Perl 的特点,主要适用于 Web 开发领域,据说是世界上最好的编程语言。

    164 引用 • 407 回帖 • 526 关注
  • uTools

    uTools 是一个极简、插件化、跨平台的现代桌面软件。通过自由选配丰富的插件,打造你得心应手的工具集合。

    5 引用 • 13 回帖