Mysql
 sql >> Datenbank >  >> RDS >> Mysql

Speichern der Apache Hadoop-Datenausgabe in der MySQL-Datenbank

Das großartige Beispiel wird in diesem Blog , ich habe es ausprobiert und es geht wirklich gut. Ich zitiere die wichtigsten Teile des Codes.

Zuerst müssen Sie eine Klasse erstellen, die Daten darstellt, die Sie speichern möchten. Die Klasse muss die DBWritable-Schnittstelle implementieren:

public class DBOutputWritable implements Writable, DBWritable
{
   private String name;
   private int count;

   public DBOutputWritable(String name, int count) {
     this.name = name;
     this.count = count;
   }

   public void readFields(DataInput in) throws IOException {   }

   public void readFields(ResultSet rs) throws SQLException {
     name = rs.getString(1);
     count = rs.getInt(2);
   }

   public void write(DataOutput out) throws IOException {    }

   public void write(PreparedStatement ps) throws SQLException {
     ps.setString(1, name);
     ps.setInt(2, count);
   }
}

Erstellen Sie Objekte der zuvor definierten Klasse in Ihrem Reducer:

public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> {

   protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
     int sum = 0;

     for(IntWritable value : values) {
       sum += value.get();
     }

     try {
       ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get());
     } catch(IOException e) {
       e.printStackTrace();
     } catch(InterruptedException e) {
       e.printStackTrace();
     }
   }
}

Schließlich müssen Sie eine Verbindung zu Ihrer DB konfigurieren (vergessen Sie nicht, Ihren DB-Konnektor zum Klassenpfad hinzuzufügen) und die Eingabe-/Ausgabedatentypen Ihres Mappers und Reducers registrieren.

public class Main
{
   public static void main(String[] args) throws Exception
   {
     Configuration conf = new Configuration();
     DBConfiguration.configureDB(conf,
     "com.mysql.jdbc.Driver",   // driver class
     "jdbc:mysql://localhost:3306/testDb", // db url
     "user",    // username
     "password"); //password

     Job job = new Job(conf);
     job.setJarByClass(Main.class);
     job.setMapperClass(Map.class); // your mapper - not shown in this example
     job.setReducerClass(Reduce.class);
     job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example
     job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example
     job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT
     job.setOutputValueClass(NullWritable.class);   // reducer's VALUEOUT
     job.setInputFormatClass(...);
     job.setOutputFormatClass(DBOutputFormat.class);

     DBInputFormat.setInput(...);

     DBOutputFormat.setOutput(
     job,
     "output",    // output table name
     new String[] { "name", "count" }   //table columns
     );

     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}